Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions mqtt-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
<artifactId>pulsar-protocol-handler-mqtt-proxy</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class MQTTServiceServlet extends HttpServlet {
// Define transient by spotbugs
private final transient PulsarService pulsar;

private static volatile Pair<Object, Method> metricsCollectorRef;
private volatile Pair<Object, Method> metricsCollectorRef;

private volatile Object mqttService;

Expand All @@ -56,7 +56,8 @@ public MQTTServiceServlet(PulsarService pulsar) {
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
response.setContentType("text/plain");
if ("/stats".equals(getRequestPath(request))) {
String requestPath = getRequestPath(request);
if ("/stats".equals(requestPath)) {
response.setStatus(HttpStatus.OK_200);
response.getOutputStream().write(getJsonStats());
} else {
Expand Down Expand Up @@ -125,6 +126,7 @@ private byte[] getJsonStats() {
Pair<Object, Method> metricsCollector = getMetricsCollector();
return (byte[]) metricsCollector.getRight().invoke(metricsCollector.getLeft());
} catch (Throwable ex) {
log.warn("Failed to get MQTT JSON stats", ex);
return ex.getMessage().getBytes(StandardCharsets.UTF_8);
}
}
Expand All @@ -134,7 +136,7 @@ private Object getMqttService() throws IllegalAccessException, InvocationTargetE
if (mqttService == null) {
synchronized (LOCK) {
if (mqttService == null) {
ProtocolHandler protocolHandler = pulsar.getProtocolHandlers().protocol("mqtt");
ProtocolHandler protocolHandler = getProtocolHandler();
Method mqttServiceMethod = getMethod(protocolHandler.getClass(), "getMqttService");
mqttService = mqttServiceMethod.invoke(protocolHandler);
}
Expand All @@ -143,6 +145,17 @@ private Object getMqttService() throws IllegalAccessException, InvocationTargetE
return mqttService;
}

private ProtocolHandler getProtocolHandler() throws IllegalAccessException, InvocationTargetException,
NoSuchMethodException {
ProtocolHandler protocolHandler = pulsar.getProtocolHandlers().protocol("mqtt");
Method mqttServiceMethod = getOptionalMethod(protocolHandler.getClass(), "getMqttService");
if (mqttServiceMethod != null) {
return protocolHandler;
}
Method getHandlerMethod = getMethod(protocolHandler.getClass(), "getHandler");
return (ProtocolHandler) getHandlerMethod.invoke(protocolHandler);
}

private Pair<Object, Method> getMetricsCollector() throws IllegalAccessException, InvocationTargetException,
NoSuchMethodException {
if (metricsCollectorRef == null) {
Expand All @@ -160,7 +173,20 @@ private Pair<Object, Method> getMetricsCollector() throws IllegalAccessException

private Method getMethod(Class<?> clazz, String methodName, Class<?>... parameterTypes)
throws NoSuchMethodException {
Method method = clazz.getMethod(methodName, parameterTypes);
Method method = getOptionalMethod(clazz, methodName, parameterTypes);
if (method == null) {
throw new NoSuchMethodException(clazz.getName() + "." + methodName);
}
return method;
}

private Method getOptionalMethod(Class<?> clazz, String methodName, Class<?>... parameterTypes) {
Method method;
try {
method = clazz.getMethod(methodName, parameterTypes);
} catch (NoSuchMethodException e) {
return null;
}
method.setAccessible(true);
return method;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.Beta;
import io.streamnative.pulsar.handlers.mqtt.common.utils.RetryUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -31,7 +32,6 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Backoff;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.common.utils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.common.util.Backoff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RetryUtil {

private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);

public static <T> void retryAsynchronously(Supplier<CompletableFuture<T>> supplier, Backoff backoff,
ScheduledExecutorService scheduledExecutorService,
CompletableFuture<T> callback) {
if (backoff.getMax().isZero() || backoff.getMax().isNegative()) {
throw new IllegalArgumentException("Illegal max retry time");
}
if (backoff.getInitial().isZero() || backoff.getInitial().isNegative()) {
throw new IllegalArgumentException("Illegal initial time");
}
scheduledExecutorService.execute(() ->
executeWithRetry(supplier, backoff, scheduledExecutorService, callback));
}

private static <T> void executeWithRetry(Supplier<CompletableFuture<T>> supplier, Backoff backoff,
ScheduledExecutorService scheduledExecutorService,
CompletableFuture<T> callback) {
supplier.get().whenComplete((result, e) -> {
if (e != null) {
long next = backoff.next().toMillis();
boolean isMandatoryStop = backoff.isMandatoryStopMade();
if (isMandatoryStop) {
callback.completeExceptionally(e);
} else {
log.warn("Execution with retry fail, because of {}, will retry in {} ms", e.getMessage(), next);
scheduledExecutorService.schedule(() ->
executeWithRetry(supplier, backoff, scheduledExecutorService, callback),
next, TimeUnit.MILLISECONDS);
}
return;
}
callback.complete(result);
});
}
}
14 changes: 7 additions & 7 deletions mqtt-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.ee8</groupId>
<artifactId>jetty-ee8-servlet</artifactId>
<groupId>org.eclipse.jetty.ee10</groupId>
<artifactId>jetty-ee10-servlet</artifactId>
<version>${jetty.ee10.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
Expand All @@ -64,13 +65,12 @@
<version>${swagger-annotations.version}</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>${javax.ws.rs.version}</version>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ public void start0() throws MQTTProxyException {
}
this.lookupHandler = new PulsarServiceLookupHandler(pulsarService, proxyConfig);
this.eventService.start();
this.webService.start();
}

@Override
Expand Down Expand Up @@ -294,5 +295,10 @@ public void close() {
if (sslContextRefresher != null) {
sslContextRefresher.shutdownNow();
}
try {
this.webService.close();
} catch (Exception e) {
log.error("Failed to close web service.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.common.util.PulsarSslFactory;
import org.apache.pulsar.jetty.metrics.JettyStatisticsCollector;
import org.eclipse.jetty.ee8.servlet.ServletContextHandler;
import org.eclipse.jetty.ee8.servlet.ServletHolder;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHolder;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ForwardedRequestCustomizer;
import org.eclipse.jetty.server.Handler;
Expand Down Expand Up @@ -174,7 +174,7 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require
if (attributeMap != null) {
attributeMap.forEach(servletContextHandler::setAttribute);
}
handlers.add(servletContextHandler.get());
handlers.add(servletContextHandler);
}

public void addStaticResources(String basePath, String resourcePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.container.Suspended;
import jakarta.ws.rs.core.MediaType;
import java.util.Collection;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import org.apache.pulsar.broker.web.RestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@
import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration;
import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService;
import io.streamnative.pulsar.handlers.mqtt.proxy.web.WebService;
import jakarta.servlet.ServletContext;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.Response.Status;
import jakarta.ws.rs.core.UriInfo;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.AuthenticationFilter;
Expand Down
14 changes: 10 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,10 @@
<maven-checkstyle-plugin.version>3.1.2</maven-checkstyle-plugin.version>
<spotbugs-maven-plugin.version>3.1.8</spotbugs-maven-plugin.version>
<maven-jacoco-plugin.version>0.8.7</maven-jacoco-plugin.version>
<opentelemetry.version>1.56.0</opentelemetry.version>
<validation-api.version>2.0.1.Final</validation-api.version>
<jersey.version>2.42</jersey.version>
<jersey.version>3.1.10</jersey.version>
<jetty.ee10.version>12.1.10</jetty.ee10.version>
<swagger-annotations.version>1.6.15</swagger-annotations.version>
<javax.ws.rs.version>2.1.1</javax.ws.rs.version>
<conscrypt.version>2.5.2</conscrypt.version>
</properties>

Expand All @@ -87,6 +86,14 @@
<artifactId>netty-codec-http2</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>jetty-ee8-servlet</artifactId>
<groupId>org.eclipse.jetty.ee8</groupId>
</exclusion>
<exclusion>
<artifactId>javax.servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -133,7 +140,6 @@
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure-spi</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
1 change: 0 additions & 1 deletion tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure-spi</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import org.awaitility.Awaitility;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
Expand All @@ -48,8 +50,8 @@ protected MQTTCommonConfiguration initConfig() throws Exception {

@Test
public void testGetDeviceList() throws Exception {
int index = random.nextInt(mqttProxyPortList.size());
List<Integer> mqttProxyPortList = getMqttProxyPortList();
int index = random.nextInt(mqttProxyPortList.size());
List<Integer> mqttProxyHttpPortList = getMqttProxyHttpPortList();
MQTT mqttProducer = new MQTT();
int port = mqttProxyPortList.get(index);
Expand All @@ -62,19 +64,21 @@ public void testGetDeviceList() throws Exception {
Thread.sleep(4000);
HttpClient httpClient = HttpClientBuilder.create().build();
final String mopEndPoint = "http://localhost:" + mqttProxyHttpPortList.get(index) + "/admin/devices/list";
HttpResponse response = httpClient.execute(new HttpGet(mopEndPoint));
InputStream inputStream = response.getEntity().getContent();
InputStreamReader isReader = new InputStreamReader(inputStream);
BufferedReader reader = new BufferedReader(isReader);
StringBuffer buffer = new StringBuffer();
String str;
while ((str = reader.readLine()) != null){
buffer.append(str);
}
String ret = buffer.toString();
ArrayList deviceList = new Gson().fromJson(ret, ArrayList.class);
Assert.assertEquals(deviceList.size(), 1);
Assert.assertTrue(deviceList.contains(clientId));
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
HttpResponse response = httpClient.execute(new HttpGet(mopEndPoint));
InputStream inputStream = response.getEntity().getContent();
InputStreamReader isReader = new InputStreamReader(inputStream);
BufferedReader reader = new BufferedReader(isReader);
StringBuffer buffer = new StringBuffer();
String str;
while ((str = reader.readLine()) != null){
buffer.append(str);
}
String ret = buffer.toString();
ArrayList deviceList = new Gson().fromJson(ret, ArrayList.class);
Assert.assertEquals(deviceList.size(), 1);
Assert.assertTrue(deviceList.contains(clientId));
});
}

}
Loading