Skip to content

Commit

Permalink
add http proxy support in mqtt3 client
Browse files Browse the repository at this point in the history
 Signed-off-by: Tony Guo <tony.guo@broadcom.com>
  • Loading branch information
tg892580 committed Aug 15, 2023
1 parent 8730324 commit 2e7ceb4
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public class MqttConnectOptions {
private int maxReconnectDelay = 128000;
private boolean skipPortDuringHandshake = false;
private Map<String, String> customWebSocketHeaders = null;
private String httpProxyHost;
private int httpProxyPort;
private String httpProxyUser;
private String httpProxyPassword;

// Client Operation Parameters
private int executorServiceTimeout = 1; // How long to wait in seconds when terminating the executor service.
Expand Down Expand Up @@ -716,6 +720,38 @@ public void setCustomWebSocketHeaders(Map<String, String> props) {
public Map<String, String> getCustomWebSocketHeaders() {
return customWebSocketHeaders;
}
public String getHttpProxyHost() {
return httpProxyHost;
}

public void setHttpProxyHost(String httpProxyHost) {
this.httpProxyHost = httpProxyHost;
}

public int getHttpProxyPort() {
return httpProxyPort;
}

public void setHttpProxyPort(int httpProxyPort) {
this.httpProxyPort = httpProxyPort;
}

public String getHttpProxyUser() {
return httpProxyUser;
}

public void setHttpProxyUser(String httpProxyUser) {
this.httpProxyUser = httpProxyUser;
}

public String getHttpProxyPassword() {
return httpProxyPassword;
}

public void setHttpProxyPassword(String httpProxyPassword) {
this.httpProxyPassword = httpProxyPassword;
}


public String toString() {
return Debug.dumpProperties(getDebug(), "Connection options");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ public class MqttException extends Exception {
* state. New up a new client to continue.
*/
public static final short REASON_CODE_CLIENT_CLOSED = 32111;

/** Unable to connect to server though http proxy*/
public static final short REASON_CODE_HTTP_PROXY_CONNECT_ERROR = 32112;

/**
* A request has been made to use a token that is already associated with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public NetworkModule createNetworkModule(URI brokerUri, MqttConnectOptions optio
netModule.setEnabledCiphers(enabledCiphers);
}
}

netModule.configHttpProxy(options.getHttpProxyHost(), options.getHttpProxyPort(),
options.getHttpProxyUser(), options.getHttpProxyPassword());
return netModule;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;

import javax.net.SocketFactory;

import javax.net.ssl.SSLSocketFactory;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.internal.websocket.Base64;
import org.eclipse.paho.client.mqttv3.logging.Logger;
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;

Expand All @@ -42,6 +45,11 @@ public class TCPNetworkModule implements NetworkModule {
private int port;
private int conTimeout;

private String httpProxyHost;
private int httpProxyPort;
private String httpProxyUser;
private String httpProxyPassword;

/**
* Constructs a new TCPNetworkModule using the specified host and
* port. The supplied SocketFactory is used to supply the network
Expand All @@ -66,19 +74,139 @@ public TCPNetworkModule(SocketFactory factory, String host, int port, String res
*/
public void start() throws IOException, MqttException {
final String methodName = "start";

// @TRACE 252=connect to host {0} port {1} timeout {2}
log.fine(CLASS_NAME,methodName, "252", new Object[] {host, Integer.valueOf(port), Long.valueOf(conTimeout*1000)});

if(httpProxyHost != null) {
Socket tunnel;

/*
* Set up a socket to do tunneling through the proxy.
* Start it off as a regular socket, then layer SSL
* over the top of it.
*/
try {
tunnel = new Socket(httpProxyHost, httpProxyPort);
doTunnelHandshake(tunnel, host, port, httpProxyUser, httpProxyPassword);
}catch (IOException ex) {
//@TRACE 251=Failed to create TCP tunnel
log.fine(CLASS_NAME,methodName,"251",null,ex);
throw new MqttException(MqttException.REASON_CODE_HTTP_PROXY_CONNECT_ERROR, ex);
}

try {
socket = ((SSLSocketFactory) factory).createSocket(tunnel, host, port, true);
} catch (ConnectException ex) {
//@TRACE 250=Failed to create TCP socket
log.fine(CLASS_NAME,methodName,"250",null,ex);
throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex);
}

} else {
try {
SocketAddress sockaddr = new InetSocketAddress(host, port);
socket = factory.createSocket();
socket.connect(sockaddr, conTimeout * 1000);
socket.setSoTimeout(1000);
} catch (ConnectException ex) {
//@TRACE 250=Failed to create TCP socket
log.fine(CLASS_NAME, methodName, "250", null, ex);
throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex);
}
}
}

/*
* Tell our tunnel where we want to CONNECT, and look for the
* right reply. Throw IOException if anything goes wrong.
*/
private void doTunnelHandshake(Socket tunnel, String host, int port, String proxyUser, String proxyPassword)
throws IOException {
OutputStream out = tunnel.getOutputStream();

String msg;
if(proxyUser != null) {
String proxyUserPass = String.format("%s:%s", proxyUser, proxyPassword);
msg = "CONNECT " + host + ":" + port + " HTTP/1.1\n"
+ "Proxy-Authorization: Basic " + Base64.encode(proxyUserPass) + "\n"
+ "User-Agent: Paho MQTT3 Client\n"
+ "Proxy-Connection: Keep-Alive"
+ "\r\n\r\n";
} else {
msg = "CONNECT " + host + ":" + port + " HTTP/1.0\n"
+ "User-Agent: "
+ "User-Agent: Paho MQTT3 Client\n"
+ "Proxy-Connection: Keep-Alive"
+ "\r\n\r\n";
}

byte b[];
try {
// @TRACE 252=connect to host {0} port {1} timeout {2}
log.fine(CLASS_NAME,methodName, "252", new Object[] {host, Integer.valueOf(port), Long.valueOf(conTimeout*1000)});
SocketAddress sockaddr = new InetSocketAddress(host, port);
socket = factory.createSocket();
socket.connect(sockaddr, conTimeout*1000);
socket.setSoTimeout(1000);
/*
* We really do want ASCII7 -- the http protocol doesn't change
* with locale.
*/
b = msg.getBytes("ASCII7");
} catch (UnsupportedEncodingException ignored) {
/*
* If ASCII7 isn't there, something serious is wrong, but
* Paranoia Is Good (tm)
*/
b = msg.getBytes();
}
catch (ConnectException ex) {
//@TRACE 250=Failed to create TCP socket
log.fine(CLASS_NAME,methodName,"250",null,ex);
throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex);
out.write(b);
out.flush();

/*
* We need to store the reply so we can create a detailed
* error message to the user.
*/
byte reply[] = new byte[200];
int replyLen = 0;
int newlinesSeen = 0;
boolean headerDone = false; /* Done on first newline */

InputStream in = tunnel.getInputStream();
boolean error = false;

while (newlinesSeen < 2) {
int i = in.read();
if (i < 0) {
throw new IOException("Unexpected EOF from proxy");
}
if (i == '\n') {
headerDone = true;
++newlinesSeen;
} else if (i != '\r') {
newlinesSeen = 0;
if (!headerDone && replyLen < reply.length) {
reply[replyLen++] = (byte) i;
}
}
}

/*
* Converting the byte array to a string is slightly wasteful
* in the case where the connection was successful, but it's
* insignificant compared to the network overhead.
*/
String replyStr;
try {
replyStr = new String(reply, 0, replyLen, "ASCII7");
} catch (UnsupportedEncodingException ignored) {
replyStr = new String(reply, 0, replyLen);
}

/* We asked for HTTP/1.0, so we should get that back */
// if (!replyStr.startsWith("HTTP/1.0 200")) {
if(replyStr.indexOf("200") == -1) {
throw new IOException("Unable to tunnel through "
+ tunnel.getInetAddress().getHostName() + ":" + tunnel.getPort()
+ ". Proxy returns \"" + replyStr + "\"");
}

/* tunneling Handshake was successful! */
}

public InputStream getInputStream() throws IOException {
Expand Down Expand Up @@ -110,4 +238,33 @@ public void setConnectTimeout(int timeout) {
public String getServerURI() {
return "tcp://" + host + ":" + port;
}
}

public void setHttpProxyHost(String httpProxyHost) {
this.httpProxyHost = httpProxyHost;
}

public void setHttpProxyPort(int httpProxyPort) {
this.httpProxyPort = httpProxyPort;
}

public void setHttpProxyUser(String httpProxyUser) {
this.httpProxyUser = httpProxyUser;
}

public void setHttpProxyPassword(String httpProxyPassword) {
this.httpProxyPassword = httpProxyPassword;
}

public void configHttpProxy(String proxyHost, int proxyPort, String user, String password) {
if(proxyHost != null && proxyHost.length() > 0 &&
proxyPort > 0){
setHttpProxyHost(proxyHost);
setHttpProxyPort(proxyPort);
if(user != null && user.length() > 0 &&
password != null && password.length() > 0) {
setHttpProxyUser(user);
setHttpProxyPassword(password);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public NetworkModule createNetworkModule(URI brokerUri, MqttConnectOptions optio
}
TCPNetworkModule networkModule = new TCPNetworkModule(factory, host, port, clientId);
networkModule.setConnectTimeout(options.getConnectionTimeout());

networkModule.configHttpProxy(options.getHttpProxyHost(), options.getHttpProxyPort(),
options.getHttpProxyUser(), options.getHttpProxyPassword());
return networkModule;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public NetworkModule createNetworkModule(URI brokerUri, MqttConnectOptions optio
WebSocketNetworkModule netModule = new WebSocketNetworkModule(factory, brokerUri.toString(), host, port,
clientId, options.getCustomWebSocketHeaders(), options.isSkipPortDuringHandshake());
netModule.setConnectTimeout(options.getConnectionTimeout());

netModule.configHttpProxy(options.getHttpProxyHost(), options.getHttpProxyPort(),
options.getHttpProxyUser(), options.getHttpProxyPassword());
return netModule;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public NetworkModule createNetworkModule(URI brokerUri, MqttConnectOptions optio
((SSLNetworkModule) netModule).setEnabledCiphers(enabledCiphers);
}
}

netModule.configHttpProxy(options.getHttpProxyHost(), options.getHttpProxyPort(),
options.getHttpProxyUser(), options.getHttpProxyPassword());

return netModule;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
223=failed: in closed state
224=failed: not disconnected
250=Failed to create TCP socket
251=Failed to create TCP tunnel
252=connect to host {0} port {1} timeout {2}
260=setEnabledCiphers ciphers={0}
300=key={0} message={1}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@
32200=Persistence already in use
32201=Token already in use
32202=Too many publishes in progress
32203=Unable to connect to Http Proxy

0 comments on commit 2e7ceb4

Please # to comment.