diff --git a/.vscode/settings.json b/.vscode/settings.json index e8190a2..a320942 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,5 +5,5 @@ "python.defaultInterpreterPath": "python3", "modulename": "${workspaceFolderBasename}", "distname": "${workspaceFolderBasename}", - "moduleversion": "1.1.4" + "moduleversion": "1.1.5" } \ No newline at end of file diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 5f8ed5f..5245743 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,5 +1,11 @@ # pygnssutils Release Notes +### RELEASE 1.1.5 + +ENHANCEMENTS: + +1. Streamline gnssntripclient and improve exit handling when invoked via PyGPSClient. + ### RELEASE 1.1.4 ENHANCEMENTS: diff --git a/pyproject.toml b/pyproject.toml index d89e50b..174f8c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "pygnssutils" authors = [{ name = "semuadmin", email = "semuadmin@semuconsulting.com" }] maintainers = [{ name = "semuadmin", email = "semuadmin@semuconsulting.com" }] description = "GNSS Command Line Utilities" -version = "1.1.4" +version = "1.1.5" license = { file = "LICENSE" } readme = "README.md" requires-python = ">=3.9" diff --git a/src/pygnssutils/_version.py b/src/pygnssutils/_version.py index 1ad365b..3c5f596 100644 --- a/src/pygnssutils/_version.py +++ b/src/pygnssutils/_version.py @@ -8,4 +8,4 @@ :license: BSD 3-Clause """ -__version__ = "1.1.4" +__version__ = "1.1.5" diff --git a/src/pygnssutils/gnssntripclient.py b/src/pygnssutils/gnssntripclient.py index e95553f..bfe06ff 100644 --- a/src/pygnssutils/gnssntripclient.py +++ b/src/pygnssutils/gnssntripclient.py @@ -35,7 +35,6 @@ from os import getenv from queue import Queue from threading import Event, Thread -from time import sleep from certifi import where as findcacerts from pynmeagps import GET, NMEAMessage @@ -58,6 +57,7 @@ ENV_NTRIP_PASSWORD, ENV_NTRIP_USER, FIXES, + HTTPCODES, MAXPORT, NOGGA, NTRIP1, @@ -116,10 +116,9 @@ def __init__( self._app_update_status(False, (str(err), "red")) raise ParameterError(msg + "\nType gnssntripclient -h for help.") from err - self._socket = None self._connected = False self._stopevent = Event() - self._ntrip_thread = None + self._sleepevent = Event() self._last_gga = datetime.fromordinal(1) self._retrycount = 0 self._ntrip_version = NTRIP2 @@ -197,57 +196,28 @@ def run(self, **kwargs) -> bool: raise ParameterError(msg + "\nType gnssntripclient -h for help.") from err self._connected = True - self._start_read_thread( - self._settings, - self._stopevent, - self._output, - ) + self._stopevent.clear() + self._sleepevent.clear() + Thread( + target=self._read_thread, + args=( + self._settings, + self._stopevent, + self._output, + ), + daemon=True, + ).start() if self.settings["mountpoint"] != "": return 1 return 0 - def _start_read_thread( - self, - settings: dict, - stopevent: Event, - output: object, - ): - """ - Start the NTRIP reader thread. - """ - - if self._connected: - stopevent.clear() - self._ntrip_thread = Thread( - target=self._read_thread, - args=( - settings, - stopevent, - output, - ), - daemon=True, - ) - self._ntrip_thread.start() - - def _stop_read_thread(self): - """ - Stop NTRIP reader thread. - """ - - if self._ntrip_thread is not None: - self._stopevent.set() - # while self._ntrip_thread.is_alive(): - # sleep(0.1) - self._ntrip_thread = None - - self._app_update_status(False, ("Disconnected", "blue")) - def stop(self): """ Close NTRIP server connection. """ - self._stop_read_thread() + self._stopevent.set() + self._sleepevent.set() # cancel any retry sleep interval self._connected = False def _read_thread( @@ -257,7 +227,10 @@ def _read_thread( output: object, ): """ - Try connecting to NTRIP caster. + Main read thread. + + Opens socket connection to NTRIP caster and streams RTCM + or SPARTN output. :param dict settings: settings as dictionary :param Event stopevent: stop event @@ -267,22 +240,18 @@ def _read_thread( self._retrycount = 0 hostname = settings["server"] errc = "" # critical error message + sock = None while self._retrycount <= self._retries and not stopevent.is_set(): try: + sock = self._open_connection(settings) + if not self._do_request(sock, settings, stopevent, output): + # bad response or sourcetable, so quit + self.stop() + break - self._do_connection(settings, stopevent, output) - - except ssl.SSLCertVerificationError as err: - errc = err.strerror - if "certificate is not valid for 'www." in err.strerror: - errc += ( - f" - try using '{hostname[4:]}' rather than " - f"'{hostname}' for the NTRIP caster URL" - ) - elif "unable to get local issuer certificate" in err.strerror: - errc += f" - try adding the NTRIP caster URL SSL certificate to {findcacerts()}" + # retryable errors... except ( BrokenPipeError, ConnectionAbortedError, @@ -290,7 +259,6 @@ def _read_thread( ConnectionResetError, OverflowError, socket.gaierror, - ssl.SSLError, TimeoutError, ) as err: errm = str(repr(err)) @@ -302,117 +270,139 @@ def _read_thread( f". Retrying in {self._retryinterval * (2**self._retrycount)} secs " f"({self._retrycount}/{self._retries}) ..." ) - self._app_update_status(False, (errm, "red")) + self._app_update_status(True, (errm, "red")) + # critical errors... + except (ssl.SSLError, ssl.SSLCertVerificationError) as err: + errc = err.strerror + if "certificate is not valid for 'www." in err.strerror: + errc += ( + f" - try using '{hostname[4:]}' rather than " + f"'{hostname}' for the NTRIP caster URL" + ) + elif "unable to get local issuer certificate" in err.strerror: + errc += f" - try adding the NTRIP caster URL SSL certificate to {findcacerts()}" + except OSError: # socket already closed, ignore + errc = "socket closed" except Exception as err: # pylint: disable=broad-exception-caught errc = str(repr(err)) if errc != "": # break connection on critical error - stopevent.set() - self._connected = False + self.stop() self._app_update_status(False, (errc, "red")) break - sleep(self._retryinterval * (2**self._retrycount)) + if not self._stopevent.is_set() and not self._sleepevent.is_set(): + self._sleepevent.wait(self._retryinterval * (2**self._retrycount)) + + self._close_connection(sock) + self.logger.debug("exiting read thread") - def _do_connection( + def _open_connection(self, settings: dict) -> socket: + """ + Create a IPv4, IPv6 dual-stack socket connection. + + :param dict settings: settings as dictionary + :return: socket + :rtype: socket + :raises: Various socket error types if connection fails + """ + + hostname = settings["server"] + sock = socket.create_connection( + (socket.gethostbyname(hostname), int(settings["port"])), + timeout=self._timeout, + ) + if int(settings["https"]): + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + context.load_verify_locations(findcacerts()) + sock = context.wrap_socket(sock, server_hostname=hostname) + + return sock + + def _close_connection(self, sock: socket): + """ + Close socket connection. + + :param socket sock: open socket + """ + + try: + sock.shutdown(socket.SHUT_RDWR) + sock.close() + except (AttributeError, OSError): # already closed, ignore + pass + + def _do_request( self, + sock: socket, settings: dict, stopevent: Event, output: object, - ): + ) -> int: """ - Opens socket to NTRIP server and reads incoming data. + Send HTTP request to NTRIP server and process incoming data. :param dict settings: settings as dictionary :param Event stopevent: stop event :param object output: output stream for raw data + :returns rc: return code (0 - stop, 1 - ok) + :rtype: int :raises: Various socket error types if connection fails """ - hostname = settings["server"] - port = int(settings["port"]) - https = int(settings["https"]) - - # create a IPv4, IPv6 dual-stack socket for connection - ip = socket.gethostbyname(hostname) - with socket.create_connection((ip, port), self._timeout) as self._socket: - if https: - context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - context.load_verify_locations(findcacerts()) - self._socket = context.wrap_socket( - self._socket, server_hostname=hostname - ) - - self._do_request(self._socket, settings, output) - - if not self.responseok: - stopevent.set() - self._connected = False - msg = ( - f"Connection failed {self._response_status['code']} " - f"{self._response_status['description']}" - ) - self._app_update_status(False, (msg, "red")) - elif self.is_sourcetable: - stable = self._parse_sourcetable(self.response_body) - self._settings["sourcetable"] = stable - mp, dist = self._get_closest_mountpoint() - self._do_output(output, stable, (mp, dist)) - self._app_update_status(False, ("Sourcetable retrieved", "blue")) - stopevent.set() - self._connected = False - - def _do_request(self, sock: socket, settings: dict, output: object): - """ - Send formatted HTTP(S) GET request and process response. - - :param socket sock: raw socket - :param dict settings: settings - :param object output: output stream for raw data - """ - - hostname = settings["server"] - port = int(settings["port"]) - datatype = settings["datatype"].lower() - ggainterval = settings["ggainterval"] - path = settings["mountpoint"] - request_headers = self._set_headers(settings) self.logger.debug(f"Request headers:\n{request_headers}") self._response_body = b"" - awaiting_response = True + response_header = True sock.sendall(request_headers.encode()) - while True: + while not stopevent.is_set(): data = sock.recv(DEFAULT_BUFSIZE) if len(data) == 0: break - if awaiting_response: + if response_header: data = self._parse_response_header(data) - awaiting_response = False - if ( - self.is_gnssdata - and not awaiting_response - and not self._stopevent.is_set() - ): - # stream gnss data until disconnection - msg = f"Streaming {datatype} data from {hostname}:{port}/{path} ..." - self._app_update_status(True, (msg, "blue")) - self._parse_ntrip_data( - sock, - datatype, - ggainterval, - output, - ) - if not self.is_gnssdata and not awaiting_response: - self._response_body = self._response_body + data + response_header = False + else: + if self.is_gnssdata: + # stream gnss data until disconnection + msg = ( + f"Streaming {settings['datatype']} data from " + f"{settings['server']}:{settings['port']}/{settings['mountpoint']} ..." + ) + self._app_update_status(True, (msg, "blue")) + self._parse_ntrip_data( + sock, + settings, + stopevent, + output, + ) + else: # sourcetable + self._response_body = self._response_body + data + + if not self.responseok: + msg = ( + f"Connection failed {self._response_status['code']} " + f"{self._response_status['description']}" + ) + self._app_update_status(False, (msg, "red")) + return 0 + if self.is_sourcetable: + stable = self._parse_sourcetable(self.response_body) + self._settings["sourcetable"] = stable + mp, dist = self._get_closest_mountpoint() + self._do_output(output, stable, (mp, dist)) + self._app_update_status(False, ("Sourcetable retrieved", "blue")) + return 0 + + return 1 def _set_headers(self, settings: dict) -> str: """ Construct HTTP(S) GET request headers. - :param dict settings: settings + :param dict settings: settings as dictionary :returns: request headers as string :rtype: str """ @@ -461,54 +451,49 @@ def _parse_response_header(self, data: bytes) -> bytes: :param bytes data: raw data from socket :returns: response body as bytes :rtype: bytes - :raises: Exception """ - try: - hdrbdy = data.split(b"\r\n\r\n", 1) - if len(hdrbdy) == 1: # no body content - # some poorly implemented ICY responses only have - # a single "\r\n" between response header and body - if hdrbdy[0][:12] == b"ICY 200 OK\r\n": - hdr, bdy = hdrbdy[0][:10], hdrbdy[0][12:] - else: - hdr, bdy = hdrbdy[0], b"" - else: # has body content - hdr, bdy = hdrbdy - # some legacy casters use cp1250 rather than utf-8 - hdr = hdr.decode(errors="backslashreplace").split("\r\n") - status = hdr[0].split(" ", 3) - self._response_status = { - "protocol": status[0], - "code": int(status[1]), - "description": status[2], - } - for line in hdr: - rsp = line.split(":", 1) - if len(rsp) > 1: - self._response_headers[rsp[0].lower().strip()] = rsp[1].strip() - self.logger.debug( - f"Response: {self._response_status}\n{self._response_headers}" - ) - return bdy - except Exception as err: - raise ConnectionAbortedError( - f"Unable to parse response headers - {err}" - ) from err # caught in _read_thread() + hdrbdy = data.split(b"\r\n\r\n", 1) + if len(hdrbdy) == 1: # no body content + # some poorly implemented ICY responses only have + # a single "\r\n" between response header and body + if hdrbdy[0][:12] == b"ICY 200 OK\r\n": + hdr, bdy = hdrbdy[0][:10], hdrbdy[0][12:] + else: + hdr, bdy = hdrbdy[0], b"" + else: # has body content + hdr, bdy = hdrbdy + # some legacy casters use cp1250 rather than utf-8 + hdr = hdr.decode(errors="backslashreplace").split("\r\n") + print("HDR:-", hdr, hdr[0]) + status = hdr[0].split(" ", 3) + self._response_status = { + "protocol": status[0], + "code": int(status[1]), + "description": HTTPCODES.get(int(status[1]), status[1]), + } + for line in hdr: + rsp = line.split(":", 1) + if len(rsp) > 1: + self._response_headers[rsp[0].lower().strip()] = rsp[1].strip() + self.logger.debug( + f"Response: {self._response_status}\n{self._response_headers}" + ) + return bdy def _parse_ntrip_data( self, sock: socket, - datatype: str, - ggainterval: int, + settings: dict, + stopevent: Event, output: object, ): """ Read and parse incoming NTRIP RTCM3/SPARTN data stream. :param socket sock: raw socket - :param str datatype: RTCM or SPARTN - :param int ggainterval: GGA transmission interval seconds + :param dict settings: settings as dictionary + :param Event stopevent: stop event :raises: TimeoutError if inactivity timeout exceeded """ @@ -519,14 +504,14 @@ def _parse_ntrip_data( stream = SocketWrapper(sock, self.encoding) # parser will wrap socket as SocketStream - if datatype == SPARTN: + if settings["datatype"].lower() == SPARTN: parser = SPARTNReader( stream, quitonerror=ERR_LOG, bufsize=DEFAULT_BUFSIZE, - decode=self._settings["spartndecode"], - key=self._settings["spartnkey"], - basedate=self._settings["spartnbasedate"], + decode=settings.get("spartndecode", False), + key=settings.get("spartnkey", "ABCD1234"), + basedate=settings.get("spartnbasedate", 0), ) else: parser = UBXReader( @@ -537,7 +522,7 @@ def _parse_ntrip_data( labelmsm=True, ) - while not self._stopevent.is_set(): + while not stopevent.is_set(): try: raw_data, parsed_data = parser.read() if raw_data is None: @@ -550,10 +535,9 @@ def _parse_ntrip_data( else: if hasattr(parsed_data, "identity"): self.logger.info(f"Message received: {parsed_data.identity}") - self.logger.debug(parsed_data) self._do_output(output, raw_data, parsed_data) last_activity = datetime.now() - self._send_gga(ggainterval, output) + self._send_gga(sock, settings["ggainterval"], output) except ( RTCMMessageError, @@ -644,10 +628,11 @@ def _format_gga(self) -> tuple: except ValueError: return None, None - def _send_gga(self, ggainterval: int, output: object): + def _send_gga(self, sock: socket, ggainterval: int, output: object): """ Send NMEA GGA sentence to NTRIP server at prescribed interval. + :param socket sock: open socket :param int ggainterval: GGA send interval in seconds (-1 = don't send) :param object output: writeable output medium e.g. serial port """ @@ -656,7 +641,7 @@ def _send_gga(self, ggainterval: int, output: object): if datetime.now() > self._last_gga + timedelta(seconds=ggainterval): raw_data, parsed_data = self._format_gga() if parsed_data is not None: - self._socket.sendall(raw_data) + sock.sendall(raw_data) self._do_output(output, raw_data, parsed_data) self._last_gga = datetime.now()