diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttConnectOptions.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttConnectOptions.java index 2f4ff1bc6..357d8510d 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttConnectOptions.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttConnectOptions.java @@ -76,6 +76,10 @@ public class MqttConnectOptions { private int maxReconnectDelay = 128000; private boolean skipPortDuringHandshake = false; private Map customWebSocketHeaders = null; + private String httpProxyHost; + private int httpProxyPort; + private String httpProxyUser; + private String httpProxyPassword; // Client Operation Parameters private int executorServiceTimeout = 1; // How long to wait in seconds when terminating the executor service. @@ -716,6 +720,38 @@ public void setCustomWebSocketHeaders(Map props) { public Map getCustomWebSocketHeaders() { return customWebSocketHeaders; } + public String getHttpProxyHost() { + return httpProxyHost; + } + + public void setHttpProxyHost(String httpProxyHost) { + this.httpProxyHost = httpProxyHost; + } + + public int getHttpProxyPort() { + return httpProxyPort; + } + + public void setHttpProxyPort(int httpProxyPort) { + this.httpProxyPort = httpProxyPort; + } + + public String getHttpProxyUser() { + return httpProxyUser; + } + + public void setHttpProxyUser(String httpProxyUser) { + this.httpProxyUser = httpProxyUser; + } + + public String getHttpProxyPassword() { + return httpProxyPassword; + } + + public void setHttpProxyPassword(String httpProxyPassword) { + this.httpProxyPassword = httpProxyPassword; + } + public String toString() { return Debug.dumpProperties(getDebug(), "Connection options"); diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttException.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttException.java index 880d9f176..7dcd9586e 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttException.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttException.java @@ -139,6 +139,9 @@ public class MqttException extends Exception { * state. New up a new client to continue. */ public static final short REASON_CODE_CLIENT_CLOSED = 32111; + + /** Unable to connect to server though http proxy*/ + public static final short REASON_CODE_HTTP_PROXY_CONNECT_ERROR = 32112; /** * A request has been made to use a token that is already associated with diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/SSLNetworkModuleFactory.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/SSLNetworkModuleFactory.java index 96ae75c84..bfec9af58 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/SSLNetworkModuleFactory.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/SSLNetworkModuleFactory.java @@ -82,6 +82,9 @@ public NetworkModule createNetworkModule(URI brokerUri, MqttConnectOptions optio netModule.setEnabledCiphers(enabledCiphers); } } + + netModule.configHttpProxy(options.getHttpProxyHost(), options.getHttpProxyPort(), + options.getHttpProxyUser(), options.getHttpProxyPassword()); return netModule; } } diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/TCPNetworkModule.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/TCPNetworkModule.java index ddcff906a..d4da2a6f7 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/TCPNetworkModule.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/TCPNetworkModule.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.UnsupportedEncodingException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; @@ -25,7 +26,9 @@ import javax.net.SocketFactory; +import javax.net.ssl.SSLSocketFactory; import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.internal.websocket.Base64; import org.eclipse.paho.client.mqttv3.logging.Logger; import org.eclipse.paho.client.mqttv3.logging.LoggerFactory; @@ -42,6 +45,11 @@ public class TCPNetworkModule implements NetworkModule { private int port; private int conTimeout; + private String httpProxyHost; + private int httpProxyPort; + private String httpProxyUser; + private String httpProxyPassword; + /** * Constructs a new TCPNetworkModule using the specified host and * port. The supplied SocketFactory is used to supply the network @@ -66,19 +74,139 @@ public TCPNetworkModule(SocketFactory factory, String host, int port, String res */ public void start() throws IOException, MqttException { final String methodName = "start"; + + // @TRACE 252=connect to host {0} port {1} timeout {2} + log.fine(CLASS_NAME,methodName, "252", new Object[] {host, Integer.valueOf(port), Long.valueOf(conTimeout*1000)}); + + if(httpProxyHost != null) { + Socket tunnel; + + /* + * Set up a socket to do tunneling through the proxy. + * Start it off as a regular socket, then layer SSL + * over the top of it. + */ + try { + tunnel = new Socket(httpProxyHost, httpProxyPort); + doTunnelHandshake(tunnel, host, port, httpProxyUser, httpProxyPassword); + }catch (IOException ex) { + //@TRACE 251=Failed to create TCP tunnel + log.fine(CLASS_NAME,methodName,"251",null,ex); + throw new MqttException(MqttException.REASON_CODE_HTTP_PROXY_CONNECT_ERROR, ex); + } + + try { + socket = ((SSLSocketFactory) factory).createSocket(tunnel, host, port, true); + } catch (ConnectException ex) { + //@TRACE 250=Failed to create TCP socket + log.fine(CLASS_NAME,methodName,"250",null,ex); + throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex); + } + + } else { + try { + SocketAddress sockaddr = new InetSocketAddress(host, port); + socket = factory.createSocket(); + socket.connect(sockaddr, conTimeout * 1000); + socket.setSoTimeout(1000); + } catch (ConnectException ex) { + //@TRACE 250=Failed to create TCP socket + log.fine(CLASS_NAME, methodName, "250", null, ex); + throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex); + } + } + } + + /* + * Tell our tunnel where we want to CONNECT, and look for the + * right reply. Throw IOException if anything goes wrong. + */ + private void doTunnelHandshake(Socket tunnel, String host, int port, String proxyUser, String proxyPassword) + throws IOException { + OutputStream out = tunnel.getOutputStream(); + + String msg; + if(proxyUser != null) { + String proxyUserPass = String.format("%s:%s", proxyUser, proxyPassword); + msg = "CONNECT " + host + ":" + port + " HTTP/1.1\n" + + "Proxy-Authorization: Basic " + Base64.encode(proxyUserPass) + "\n" + + "User-Agent: Paho MQTT3 Client\n" + + "Proxy-Connection: Keep-Alive" + + "\r\n\r\n"; + } else { + msg = "CONNECT " + host + ":" + port + " HTTP/1.0\n" + + "User-Agent: " + + "User-Agent: Paho MQTT3 Client\n" + + "Proxy-Connection: Keep-Alive" + + "\r\n\r\n"; + } + + byte b[]; try { - // @TRACE 252=connect to host {0} port {1} timeout {2} - log.fine(CLASS_NAME,methodName, "252", new Object[] {host, Integer.valueOf(port), Long.valueOf(conTimeout*1000)}); - SocketAddress sockaddr = new InetSocketAddress(host, port); - socket = factory.createSocket(); - socket.connect(sockaddr, conTimeout*1000); - socket.setSoTimeout(1000); + /* + * We really do want ASCII7 -- the http protocol doesn't change + * with locale. + */ + b = msg.getBytes("ASCII7"); + } catch (UnsupportedEncodingException ignored) { + /* + * If ASCII7 isn't there, something serious is wrong, but + * Paranoia Is Good (tm) + */ + b = msg.getBytes(); } - catch (ConnectException ex) { - //@TRACE 250=Failed to create TCP socket - log.fine(CLASS_NAME,methodName,"250",null,ex); - throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex); + out.write(b); + out.flush(); + + /* + * We need to store the reply so we can create a detailed + * error message to the user. + */ + byte reply[] = new byte[200]; + int replyLen = 0; + int newlinesSeen = 0; + boolean headerDone = false; /* Done on first newline */ + + InputStream in = tunnel.getInputStream(); + boolean error = false; + + while (newlinesSeen < 2) { + int i = in.read(); + if (i < 0) { + throw new IOException("Unexpected EOF from proxy"); + } + if (i == '\n') { + headerDone = true; + ++newlinesSeen; + } else if (i != '\r') { + newlinesSeen = 0; + if (!headerDone && replyLen < reply.length) { + reply[replyLen++] = (byte) i; + } + } } + + /* + * Converting the byte array to a string is slightly wasteful + * in the case where the connection was successful, but it's + * insignificant compared to the network overhead. + */ + String replyStr; + try { + replyStr = new String(reply, 0, replyLen, "ASCII7"); + } catch (UnsupportedEncodingException ignored) { + replyStr = new String(reply, 0, replyLen); + } + + /* We asked for HTTP/1.0, so we should get that back */ +// if (!replyStr.startsWith("HTTP/1.0 200")) { + if(replyStr.indexOf("200") == -1) { + throw new IOException("Unable to tunnel through " + + tunnel.getInetAddress().getHostName() + ":" + tunnel.getPort() + + ". Proxy returns \"" + replyStr + "\""); + } + + /* tunneling Handshake was successful! */ } public InputStream getInputStream() throws IOException { @@ -110,4 +238,33 @@ public void setConnectTimeout(int timeout) { public String getServerURI() { return "tcp://" + host + ":" + port; } -} + + public void setHttpProxyHost(String httpProxyHost) { + this.httpProxyHost = httpProxyHost; + } + + public void setHttpProxyPort(int httpProxyPort) { + this.httpProxyPort = httpProxyPort; + } + + public void setHttpProxyUser(String httpProxyUser) { + this.httpProxyUser = httpProxyUser; + } + + public void setHttpProxyPassword(String httpProxyPassword) { + this.httpProxyPassword = httpProxyPassword; + } + + public void configHttpProxy(String proxyHost, int proxyPort, String user, String password) { + if(proxyHost != null && proxyHost.length() > 0 && + proxyPort > 0){ + setHttpProxyHost(proxyHost); + setHttpProxyPort(proxyPort); + if(user != null && user.length() > 0 && + password != null && password.length() > 0) { + setHttpProxyUser(user); + setHttpProxyPassword(password); + } + } + } +} \ No newline at end of file diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/TCPNetworkModuleFactory.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/TCPNetworkModuleFactory.java index b75441a6e..d9c9a3b3a 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/TCPNetworkModuleFactory.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/TCPNetworkModuleFactory.java @@ -58,6 +58,9 @@ public NetworkModule createNetworkModule(URI brokerUri, MqttConnectOptions optio } TCPNetworkModule networkModule = new TCPNetworkModule(factory, host, port, clientId); networkModule.setConnectTimeout(options.getConnectionTimeout()); + + networkModule.configHttpProxy(options.getHttpProxyHost(), options.getHttpProxyPort(), + options.getHttpProxyUser(), options.getHttpProxyPassword()); return networkModule; } -} +} \ No newline at end of file diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketNetworkModuleFactory.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketNetworkModuleFactory.java index f617dbfbd..fac01a3bc 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketNetworkModuleFactory.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketNetworkModuleFactory.java @@ -54,6 +54,9 @@ public NetworkModule createNetworkModule(URI brokerUri, MqttConnectOptions optio WebSocketNetworkModule netModule = new WebSocketNetworkModule(factory, brokerUri.toString(), host, port, clientId, options.getCustomWebSocketHeaders(), options.isSkipPortDuringHandshake()); netModule.setConnectTimeout(options.getConnectionTimeout()); + + netModule.configHttpProxy(options.getHttpProxyHost(), options.getHttpProxyPort(), + options.getHttpProxyUser(), options.getHttpProxyPassword()); return netModule; } } diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketSecureNetworkModuleFactory.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketSecureNetworkModuleFactory.java index b311e1312..d849f9d73 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketSecureNetworkModuleFactory.java +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/websocket/WebSocketSecureNetworkModuleFactory.java @@ -75,6 +75,10 @@ public NetworkModule createNetworkModule(URI brokerUri, MqttConnectOptions optio ((SSLNetworkModule) netModule).setEnabledCiphers(enabledCiphers); } } + + netModule.configHttpProxy(options.getHttpProxyHost(), options.getHttpProxyPort(), + options.getHttpProxyUser(), options.getHttpProxyPassword()); + return netModule; } } diff --git a/org.eclipse.paho.client.mqttv3/src/main/resources/org/eclipse/paho/client/mqttv3/internal/nls/logcat.properties b/org.eclipse.paho.client.mqttv3/src/main/resources/org/eclipse/paho/client/mqttv3/internal/nls/logcat.properties index 37b794ce5..f79c5d8b1 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/resources/org/eclipse/paho/client/mqttv3/internal/nls/logcat.properties +++ b/org.eclipse.paho.client.mqttv3/src/main/resources/org/eclipse/paho/client/mqttv3/internal/nls/logcat.properties @@ -37,6 +37,7 @@ 223=failed: in closed state 224=failed: not disconnected 250=Failed to create TCP socket +251=Failed to create TCP tunnel 252=connect to host {0} port {1} timeout {2} 260=setEnabledCiphers ciphers={0} 300=key={0} message={1} diff --git a/org.eclipse.paho.client.mqttv3/src/main/resources/org/eclipse/paho/client/mqttv3/internal/nls/messages.properties b/org.eclipse.paho.client.mqttv3/src/main/resources/org/eclipse/paho/client/mqttv3/internal/nls/messages.properties index 3eda71890..d73110a76 100644 --- a/org.eclipse.paho.client.mqttv3/src/main/resources/org/eclipse/paho/client/mqttv3/internal/nls/messages.properties +++ b/org.eclipse.paho.client.mqttv3/src/main/resources/org/eclipse/paho/client/mqttv3/internal/nls/messages.properties @@ -36,3 +36,4 @@ 32200=Persistence already in use 32201=Token already in use 32202=Too many publishes in progress +32203=Unable to connect to Http Proxy \ No newline at end of file