Skip to content

Commit

Permalink
Add onFilter for Interceptor (apache#10489)
Browse files Browse the repository at this point in the history
* Add onFilter for Interceptor

* code style
  • Loading branch information
315157973 authored and eolivelli committed May 11, 2021
1 parent cb7987d commit b5e761b
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.intercept;

import java.io.IOException;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
Expand Down Expand Up @@ -77,6 +78,16 @@ default void beforeSendMessage(Subscription subscription,
*/
void onWebserviceResponse(ServletRequest request, ServletResponse response) throws IOException, ServletException;

/**
* The interception of web processing, as same as `Filter.onFilter`.
* So In this method, we must call `chain.doFilter` to continue the chain.
*/
default void onFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
// Just continue the chain by default.
chain.doFilter(request, response);
}

/**
* Initialize the broker interceptor.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;

import java.io.IOException;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.ws.rs.core.MediaType;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;

public class ProcessHandlerFilter implements Filter {

private final BrokerInterceptor interceptor;
private final boolean interceptorEnabled;

public ProcessHandlerFilter(PulsarService pulsar) {
this.interceptor = pulsar.getBrokerInterceptor();
this.interceptorEnabled = !pulsar.getConfig().getBrokerInterceptors().isEmpty();
}

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
if (interceptorEnabled
&& !StringUtils.containsIgnoreCase(request.getContentType(), MediaType.MULTIPART_FORM_DATA)
&& !StringUtils.containsIgnoreCase(request.getContentType(), MediaType.APPLICATION_OCTET_STREAM)) {
interceptor.onFilter(request, response, chain);
} else {
chain.doFilter(request, response);
}
}

@Override
public void init(FilterConfig arg) throws ServletException {
// No init necessary.
}

@Override
public void destroy() {
// No state to clean up.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require
// Enable PreInterceptFilter only when interceptors are enabled
context.addFilter(new FilterHolder(new PreInterceptFilter(pulsar.getBrokerInterceptor())),
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
context.addFilter(new FilterHolder(new ProcessHandlerFilter(pulsar)),
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
}

if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.intercept;

import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
Expand All @@ -28,6 +30,7 @@
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import java.io.IOException;

@Slf4j
public class CounterBrokerInterceptor implements BrokerInterceptor {
Expand Down Expand Up @@ -68,6 +71,13 @@ public void onWebserviceResponse(ServletRequest request, ServletResponse respons
log.info("[{}] On [{}] Webservice response", count, ((HttpServletRequest)request).getRequestURL().toString());
}

@Override
public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
count = 100;
chain.doFilter(request, response);
}

@Override
public void initialize(PulsarService pulsarService) throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.web.PreInterceptFilter;
import org.apache.pulsar.broker.web.ProcessHandlerFilter;
import org.apache.pulsar.broker.web.ResponseHandlerFilter;
import org.mockito.Mockito;
import org.testng.Assert;
Expand All @@ -38,6 +39,8 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

/**
* Tests for the the interceptor filter out.
Expand Down Expand Up @@ -80,6 +83,33 @@ public void testFilterOutForPreInterceptFilter() throws Exception {
}
}

@Test
public void testOnFilter() throws Exception {
CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
PulsarService pulsarService = Mockito.mock(PulsarService.class);
Mockito.doReturn("pulsar://127.0.0.1:6650").when(pulsarService).getAdvertisedAddress();
Mockito.doReturn(interceptor).when(pulsarService).getBrokerInterceptor();
ServiceConfiguration conf = Mockito.mock(ServiceConfiguration.class);
Mockito.doReturn(Sets.newHashSet("interceptor")).when(conf).getBrokerInterceptors();
Mockito.doReturn(conf).when(pulsarService).getConfig();
//init filter
ProcessHandlerFilter filter = new ProcessHandlerFilter(pulsarService);

HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
FilterChain chain = Mockito.mock(FilterChain.class);
Mockito.doNothing().when(chain).doFilter(Mockito.any(), Mockito.any());
HttpServletRequestWrapper mockInputStream = new MockRequestWrapper(request);
Mockito.doReturn(mockInputStream.getInputStream()).when(request).getInputStream();
Mockito.doReturn(new StringBuffer("http://127.0.0.1:8080")).when(request).getRequestURL();
// "application/json" should be intercepted
Mockito.doReturn("application/json").when(request).getContentType();

filter.doFilter(request, response, chain);
Assert.assertEquals(interceptor.getCount(), 100);
verify(chain, times(1)).doFilter(request, response);
}

@Test
public void testFilterOutForResponseInterceptFilter() throws Exception {
CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
Expand Down

0 comments on commit b5e761b

Please # to comment.