From bde6a2ecba8250da30aea572768c5ed09c10f125 Mon Sep 17 00:00:00 2001 From: abrighto Date: Wed, 10 Feb 2021 21:15:05 +0100 Subject: [PATCH] Minor fixes, tested current state unsubscribe on ws close --- csw/CommandServer.py | 1 + csw/ComponentHandlers.py | 8 +++++ docs/CommandServer.html | 6 ++-- docs/ComponentHandlers.html | 32 +++++++++++++------ docs/Event.html | 6 ++-- docs/LocationService.html | 4 +-- examples/TestCommandServer.py | 15 ++++++--- tests/PyTestAssemblyEventHandlers.out | 2 -- tests/testSupport/project/Common.scala | 2 +- .../testassembly/TestAssemblyHandlers.scala | 7 ++-- 10 files changed, 56 insertions(+), 27 deletions(-) diff --git a/csw/CommandServer.py b/csw/CommandServer.py index 39e0c13..dbc14cc 100644 --- a/csw/CommandServer.py +++ b/csw/CommandServer.py @@ -190,6 +190,7 @@ async def _handleWs(self, request: Request) -> WebSocketResponse: elif msg.type == aiohttp.WSMsgType.ERROR: print('Error: ws connection closed with exception %s' % ws.exception()) print('websocket connection closed') + self.handler._unsubscribeCurrentState(ws) return ws @staticmethod diff --git a/csw/ComponentHandlers.py b/csw/ComponentHandlers.py index caef5d4..a92c048 100644 --- a/csw/ComponentHandlers.py +++ b/csw/ComponentHandlers.py @@ -99,3 +99,11 @@ def _subscribeCurrentState(self, stateNames: List[str], ws: WebSocketResponse): else: for stateName in stateNames: self._addCurrentStateSubscriber(stateName, ws) + + def _unsubscribeCurrentState(self, ws: WebSocketResponse): + """ + Internal method used to unsubscribe a websocket from current state events + """ + for stateName in self._currentStateSubscribers: + if ws in self._currentStateSubscribers[stateName]: + self._currentStateSubscribers[stateName].remove(ws) diff --git a/docs/CommandServer.html b/docs/CommandServer.html index 81f6f25..63a16cd 100644 --- a/docs/CommandServer.html +++ b/docs/CommandServer.html @@ -184,7 +184,7 @@

Module csw.CommandServer

responseDict = commandResponse._asDict() return web.json_response(responseDict) else: - raise Exception("Invalid Location type: " + method) + raise Exception("Invalid command type: " + method) async def _handleQueryFinal(self, queryFinal: QueryFinal) -> Response: commandResponse = await self._crm.waitForTask(queryFinal.runId, queryFinal.timeoutInSeconds) @@ -218,6 +218,7 @@

Module csw.CommandServer

elif msg.type == aiohttp.WSMsgType.ERROR: print('Error: ws connection closed with exception %s' % ws.exception()) print('websocket connection closed') + self.handler._unsubscribeCurrentState(ws) return ws @staticmethod @@ -309,7 +310,7 @@

Args

responseDict = commandResponse._asDict() return web.json_response(responseDict) else: - raise Exception("Invalid Location type: " + method) + raise Exception("Invalid command type: " + method) async def _handleQueryFinal(self, queryFinal: QueryFinal) -> Response: commandResponse = await self._crm.waitForTask(queryFinal.runId, queryFinal.timeoutInSeconds) @@ -343,6 +344,7 @@

Args

elif msg.type == aiohttp.WSMsgType.ERROR: print('Error: ws connection closed with exception %s' % ws.exception()) print('websocket connection closed') + self.handler._unsubscribeCurrentState(ws) return ws @staticmethod diff --git a/docs/ComponentHandlers.html b/docs/ComponentHandlers.html index f98d184..d0eda60 100644 --- a/docs/ComponentHandlers.html +++ b/docs/ComponentHandlers.html @@ -91,8 +91,7 @@

Module csw.ComponentHandlers

def currentStates(self) -> List[CurrentState]: """ - Returns the current state for the component - :stateName str: the name of the current state to get + Returns the current states for the component """ return [] @@ -127,7 +126,15 @@

Module csw.ComponentHandlers

self._addCurrentStateSubscriber("", ws) else: for stateName in stateNames: - self._addCurrentStateSubscriber(stateName, ws) + self._addCurrentStateSubscriber(stateName, ws) + + def _unsubscribeCurrentState(self, ws: WebSocketResponse): + """ + Internal method used to unsubscribe a websocket from current state events + """ + for stateName in self._currentStateSubscribers: + if ws in self._currentStateSubscribers[stateName]: + self._currentStateSubscribers[stateName].remove(ws)
@@ -203,8 +210,7 @@

Classes

def currentStates(self) -> List[CurrentState]: """ - Returns the current state for the component - :stateName str: the name of the current state to get + Returns the current states for the component """ return [] @@ -239,7 +245,15 @@

Classes

self._addCurrentStateSubscriber("", ws) else: for stateName in stateNames: - self._addCurrentStateSubscriber(stateName, ws) + self._addCurrentStateSubscriber(stateName, ws) + + def _unsubscribeCurrentState(self, ws: WebSocketResponse): + """ + Internal method used to unsubscribe a websocket from current state events + """ + for stateName in self._currentStateSubscribers: + if ws in self._currentStateSubscribers[stateName]: + self._currentStateSubscribers[stateName].remove(ws)

Methods

@@ -247,16 +261,14 @@

Methods

def currentStates(self) ‑> List[CurrentState]
-

Returns the current state for the component -:stateName str: the name of the current state to get

+

Returns the current states for the component

Expand source code
def currentStates(self) -> List[CurrentState]:
     """
-    Returns the current state for the component
-    :stateName str: the name of the current state to get
+    Returns the current states for the component
     """
     return []
diff --git a/docs/Event.html b/docs/Event.html index a179b23..2262fac 100644 --- a/docs/Event.html +++ b/docs/Event.html @@ -172,7 +172,7 @@

Classes

class Event -(source: str, eventName: str, paramSet: List[Parameter], eventTime: EventTime = EventTime(seconds=1611843294, nanos=818454980), eventId: str = '4b8f912d-259b-49b4-9ce7-47b21aa21b83') +(source: str, eventName: str, paramSet: List[Parameter], eventTime: EventTime = EventTime(seconds=1612987758, nanos=283993959), eventId: str = 'cd5d2f42-5458-4430-935b-96b9859e134d')

Abstract base class that creates an Event that can be published to the event service @@ -414,7 +414,7 @@

Args

class ObserveEvent -(source: str, eventName: str, paramSet: List[Parameter], eventTime: EventTime = EventTime(seconds=1611843294, nanos=818454980), eventId: str = '4b8f912d-259b-49b4-9ce7-47b21aa21b83') +(source: str, eventName: str, paramSet: List[Parameter], eventTime: EventTime = EventTime(seconds=1612987758, nanos=283993959), eventId: str = 'cd5d2f42-5458-4430-935b-96b9859e134d')

An event type fired when an observation has taken place.

@@ -491,7 +491,7 @@

Inherited members

class SystemEvent -(source: str, eventName: str, paramSet: List[Parameter], eventTime: EventTime = EventTime(seconds=1611843294, nanos=818454980), eventId: str = '4b8f912d-259b-49b4-9ce7-47b21aa21b83') +(source: str, eventName: str, paramSet: List[Parameter], eventTime: EventTime = EventTime(seconds=1612987758, nanos=283993959), eventId: str = 'cd5d2f42-5458-4430-935b-96b9859e134d')

An event type for publishing changes in system data.

diff --git a/docs/LocationService.html b/docs/LocationService.html index 11ef55d..dd8fc19 100644 --- a/docs/LocationService.html +++ b/docs/LocationService.html @@ -105,7 +105,7 @@

Module csw.LocationService

elif typ == "AkkaLocation": return AkkaLocation.schema().loads(s) else: - raise Exception("Invalid Location type: " + typ) + raise Exception("Invalid location type: " + typ) _pdocIgnoreGenerated("AkkaLocation") @@ -585,7 +585,7 @@

Class variables

elif typ == "AkkaLocation": return AkkaLocation.schema().loads(s) else: - raise Exception("Invalid Location type: " + typ) + raise Exception("Invalid location type: " + typ)

Subclasses

    diff --git a/examples/TestCommandServer.py b/examples/TestCommandServer.py index cf8431b..57c16f4 100644 --- a/examples/TestCommandServer.py +++ b/examples/TestCommandServer.py @@ -11,6 +11,7 @@ class MyComponentHandlers(ComponentHandlers): + prefix = "CSW.pycswTest" async def longRunningCommand(self, runId: str, command: ControlCommand) -> CommandResponse: await asyncio.sleep(3) @@ -71,9 +72,9 @@ def onOneway(self, runId: str, command: ControlCommand) -> CommandResponse: """ n = len(command.paramSet) print(f"MyComponentHandlers Received oneway {str(command)} with {n} params") - filt = command.get("filter").values[0] - encoder = command.get("encoder").values[0] - print(f"filter = {filt}, encoder = {encoder}") + # filt = command.get("filter").values[0] + # encoder = command.get("encoder").values[0] + # print(f"filter = {filt}, encoder = {encoder}") return Accepted(runId) def validateCommand(self, runId: str, command: ControlCommand) -> CommandResponse: @@ -96,8 +97,12 @@ def currentStates(self) -> List[CurrentState]: floatArrayParam = Parameter("FloatArrayValue", "FloatArrayKey", [[1.2, 2.3, 3.4], [5.6, 7.8, 9.1]], "marcsec") intMatrixParam = Parameter("IntMatrixValue", "IntMatrixKey", [[[1, 2, 3, 4], [5, 6, 7, 8]], [[-1, -2, -3, -4], [-5, -6, -7, -8]]], "meter") - return [CurrentState("csw.assembly", "PyCswState", [intParam, intArrayParam, floatArrayParam, intMatrixParam])] + return [CurrentState(self.prefix, "PyCswState", [intParam, intArrayParam, floatArrayParam, intMatrixParam])] # noinspection PyTypeChecker -commandServer = CommandServer("csw.pycswTest", MyComponentHandlers()) +handlers = MyComponentHandlers() +commandServer = CommandServer(handlers.prefix, handlers) +handlers.commandServer = commandServer +print(f"Starting test command server on port {commandServer.port}") +commandServer.start() diff --git a/tests/PyTestAssemblyEventHandlers.out b/tests/PyTestAssemblyEventHandlers.out index 89122da..da5c621 100644 --- a/tests/PyTestAssemblyEventHandlers.out +++ b/tests/PyTestAssemblyEventHandlers.out @@ -1,5 +1,3 @@ {"_type":"SystemEvent","eventId":"test","source":"CSW.TestPublisher","eventName":"testEvent1","eventTime":"1970-01-01T00:00:00Z","paramSet":[{"DoubleKey":{"keyName":"assemblyEventValue","values":[42.0],"units":"NoUnits"}}]} {"_type":"SystemEvent","eventId":"test","source":"CSW.TestPublisher","eventName":"testEvent2","eventTime":"1970-01-01T00:00:00Z","paramSet":[{"IntKey":{"keyName":"IntValue","values":[42],"units":"arcsec"}},{"IntArrayKey":{"keyName":"IntArrayValue","values":[[1,2,3,4],[5,6,7,8]],"units":"NoUnits"}},{"FloatArrayKey":{"keyName":"FloatArrayValue","values":[[1.2,2.3,3.4],[5.6,7.8,9.1]],"units":"marcsec"}},{"IntMatrixKey":{"keyName":"IntMatrixValue","values":[[[1,2,3,4],[5,6,7,8]],[[-1,-2,-3,-4],[-5,-6,-7,-8]]],"units":"meter"}}]} -{"_type":"SystemEvent","eventId":"test","source":"CSW.TestPublisher","eventName":"testEvent2","eventTime":"1970-01-01T00:00:00Z","paramSet":[{"IntKey":{"keyName":"IntValue","values":[42],"units":"arcsec"}},{"IntArrayKey":{"keyName":"IntArrayValue","values":[[1,2,3,4],[5,6,7,8]],"units":"NoUnits"}},{"FloatArrayKey":{"keyName":"FloatArrayValue","values":[[1.2,2.3,3.4],[5.6,7.8,9.1]],"units":"marcsec"}},{"IntMatrixKey":{"keyName":"IntMatrixValue","values":[[[1,2,3,4],[5,6,7,8]],[[-1,-2,-3,-4],[-5,-6,-7,-8]]],"units":"meter"}}]} -{"_type":"SystemEvent","eventId":"test","source":"CSW.TestPublisher","eventName":"testEvent3","eventTime":"1970-01-01T00:00:00Z","paramSet":[{"StructKey":{"keyName":"MyStruct","values":[{"paramSet":[{"CoordKey":{"keyName":"CoordParam","values":[{"_type":"EqCoord","tag":"BASE","ra":659912249999,"dec":-109892300000,"frame":"FK5","catalogName":"none","pm":{"pmx":0.5,"pmy":2.33}},{"_type":"SolarSystemCoord","tag":"BASE","body":"Venus"},{"_type":"MinorPlanetCoord","tag":"GUIDER1","epoch":2000.0,"inclination":324000000000,"longAscendingNode":7200000000,"argOfPerihelion":360000000000,"meanDistance":1.4,"eccentricity":0.234,"meanAnomaly":792000000000},{"_type":"CometCoord","tag":"BASE","epochOfPerihelion":2000.0,"inclination":324000000000,"longAscendingNode":7200000000,"argOfPerihelion":360000000000,"perihelionDistance":1.4,"eccentricity":0.234},{"_type":"AltAzCoord","tag":"301 deg","alt":153000000000,"az":0}],"units":"NoUnits"}},{"ShortKey":{"keyName":"shortValue","values":[42],"units":"arcsec"}},{"BooleanKey":{"keyName":"booleanValue","values":[true,false],"units":"arcsec"}},{"IntArrayKey":{"keyName":"IntArrayValue","values":[[1,2,3,4],[5,6,7,8]],"units":"NoUnits"}},{"LongKey":{"keyName":"longValue","values":[42],"units":"arcsec"}},{"FloatKey":{"keyName":"floatValue","values":[42.1],"units":"arcsec"}},{"FloatArrayKey":{"keyName":"FloatArrayValue","values":[[1.2,2.3,3.4],[5.6,7.8,9.1]],"units":"arcsec"}},{"IntMatrixKey":{"keyName":"IntMatrixValue","values":[[[1,2,3,4],[5,6,7,8]],[[-1,-2,-3,-4],[-5,-6,-7,-8]]],"units":"meter"}},{"DoubleArrayKey":{"keyName":"DoubleArrayValue","values":[[1.2,2.3,3.4],[5.6,7.8,9.1]],"units":"arcsec"}},{"IntKey":{"keyName":"IntValue","values":[42],"units":"arcsec"}}]}],"units":"NoUnits"}},{"CoordKey":{"keyName":"CoordParam","values":[{"_type":"EqCoord","tag":"BASE","ra":659912249999,"dec":-109892300000,"frame":"FK5","catalogName":"none","pm":{"pmx":0.5,"pmy":2.33}},{"_type":"SolarSystemCoord","tag":"BASE","body":"Venus"},{"_type":"MinorPlanetCoord","tag":"GUIDER1","epoch":2000.0,"inclination":324000000000,"longAscendingNode":7200000000,"argOfPerihelion":360000000000,"meanDistance":1.4,"eccentricity":0.234,"meanAnomaly":792000000000},{"_type":"CometCoord","tag":"BASE","epochOfPerihelion":2000.0,"inclination":324000000000,"longAscendingNode":7200000000,"argOfPerihelion":360000000000,"perihelionDistance":1.4,"eccentricity":0.234},{"_type":"AltAzCoord","tag":"301 deg","alt":153000000000,"az":0}],"units":"NoUnits"}},{"ByteKey":{"keyName":"byteValue","values":[-34,-83,-66,-17],"units":"NoUnits"}},{"ShortKey":{"keyName":"shortValue","values":[42],"units":"arcsec"}},{"BooleanKey":{"keyName":"booleanValue","values":[true,false],"units":"arcsec"}},{"ByteArrayKey":{"keyName":"ByteArrayValue","values":[[-34,-83,-66,-17],[1,2,3,4]],"units":"NoUnits"}},{"IntArrayKey":{"keyName":"IntArrayValue","values":[[1,2,3,4],[5,6,7,8]],"units":"NoUnits"}},{"LongKey":{"keyName":"longValue","values":[42],"units":"arcsec"}},{"FloatKey":{"keyName":"floatValue","values":[42.1],"units":"arcsec"}},{"FloatArrayKey":{"keyName":"FloatArrayValue","values":[[1.2,2.3,3.4],[5.6,7.8,9.1]],"units":"arcsec"}},{"IntMatrixKey":{"keyName":"IntMatrixValue","values":[[[1,2,3,4],[5,6,7,8]],[[-1,-2,-3,-4],[-5,-6,-7,-8]]],"units":"meter"}},{"DoubleArrayKey":{"keyName":"DoubleArrayValue","values":[[1.2,2.3,3.4],[5.6,7.8,9.1]],"units":"arcsec"}},{"IntKey":{"keyName":"IntValue","values":[42],"units":"arcsec"}}]} {"_type":"SystemEvent","eventId":"test","source":"CSW.TestPublisher","eventName":"testEvent3","eventTime":"1970-01-01T00:00:00Z","paramSet":[{"StructKey":{"keyName":"MyStruct","values":[{"paramSet":[{"CoordKey":{"keyName":"CoordParam","values":[{"_type":"EqCoord","tag":"BASE","ra":659912249999,"dec":-109892300000,"frame":"FK5","catalogName":"none","pm":{"pmx":0.5,"pmy":2.33}},{"_type":"SolarSystemCoord","tag":"BASE","body":"Venus"},{"_type":"MinorPlanetCoord","tag":"GUIDER1","epoch":2000.0,"inclination":324000000000,"longAscendingNode":7200000000,"argOfPerihelion":360000000000,"meanDistance":1.4,"eccentricity":0.234,"meanAnomaly":792000000000},{"_type":"CometCoord","tag":"BASE","epochOfPerihelion":2000.0,"inclination":324000000000,"longAscendingNode":7200000000,"argOfPerihelion":360000000000,"perihelionDistance":1.4,"eccentricity":0.234},{"_type":"AltAzCoord","tag":"301 deg","alt":153000000000,"az":0}],"units":"NoUnits"}},{"ShortKey":{"keyName":"shortValue","values":[42],"units":"arcsec"}},{"BooleanKey":{"keyName":"booleanValue","values":[true,false],"units":"arcsec"}},{"IntArrayKey":{"keyName":"IntArrayValue","values":[[1,2,3,4],[5,6,7,8]],"units":"NoUnits"}},{"LongKey":{"keyName":"longValue","values":[42],"units":"arcsec"}},{"FloatKey":{"keyName":"floatValue","values":[42.1],"units":"arcsec"}},{"FloatArrayKey":{"keyName":"FloatArrayValue","values":[[1.2,2.3,3.4],[5.6,7.8,9.1]],"units":"arcsec"}},{"IntMatrixKey":{"keyName":"IntMatrixValue","values":[[[1,2,3,4],[5,6,7,8]],[[-1,-2,-3,-4],[-5,-6,-7,-8]]],"units":"meter"}},{"DoubleArrayKey":{"keyName":"DoubleArrayValue","values":[[1.2,2.3,3.4],[5.6,7.8,9.1]],"units":"arcsec"}},{"IntKey":{"keyName":"IntValue","values":[42],"units":"arcsec"}}]}],"units":"NoUnits"}},{"CoordKey":{"keyName":"CoordParam","values":[{"_type":"EqCoord","tag":"BASE","ra":659912249999,"dec":-109892300000,"frame":"FK5","catalogName":"none","pm":{"pmx":0.5,"pmy":2.33}},{"_type":"SolarSystemCoord","tag":"BASE","body":"Venus"},{"_type":"MinorPlanetCoord","tag":"GUIDER1","epoch":2000.0,"inclination":324000000000,"longAscendingNode":7200000000,"argOfPerihelion":360000000000,"meanDistance":1.4,"eccentricity":0.234,"meanAnomaly":792000000000},{"_type":"CometCoord","tag":"BASE","epochOfPerihelion":2000.0,"inclination":324000000000,"longAscendingNode":7200000000,"argOfPerihelion":360000000000,"perihelionDistance":1.4,"eccentricity":0.234},{"_type":"AltAzCoord","tag":"301 deg","alt":153000000000,"az":0}],"units":"NoUnits"}},{"ByteKey":{"keyName":"byteValue","values":[-34,-83,-66,-17],"units":"NoUnits"}},{"ShortKey":{"keyName":"shortValue","values":[42],"units":"arcsec"}},{"BooleanKey":{"keyName":"booleanValue","values":[true,false],"units":"arcsec"}},{"ByteArrayKey":{"keyName":"ByteArrayValue","values":[[-34,-83,-66,-17],[1,2,3,4]],"units":"NoUnits"}},{"IntArrayKey":{"keyName":"IntArrayValue","values":[[1,2,3,4],[5,6,7,8]],"units":"NoUnits"}},{"LongKey":{"keyName":"longValue","values":[42],"units":"arcsec"}},{"FloatKey":{"keyName":"floatValue","values":[42.1],"units":"arcsec"}},{"FloatArrayKey":{"keyName":"FloatArrayValue","values":[[1.2,2.3,3.4],[5.6,7.8,9.1]],"units":"arcsec"}},{"IntMatrixKey":{"keyName":"IntMatrixValue","values":[[[1,2,3,4],[5,6,7,8]],[[-1,-2,-3,-4],[-5,-6,-7,-8]]],"units":"meter"}},{"DoubleArrayKey":{"keyName":"DoubleArrayValue","values":[[1.2,2.3,3.4],[5.6,7.8,9.1]],"units":"arcsec"}},{"IntKey":{"keyName":"IntValue","values":[42],"units":"arcsec"}}]} diff --git a/tests/testSupport/project/Common.scala b/tests/testSupport/project/Common.scala index a0f3eeb..96d1845 100755 --- a/tests/testSupport/project/Common.scala +++ b/tests/testSupport/project/Common.scala @@ -36,7 +36,7 @@ object Common extends AutoPlugin { ), resolvers ++= Seq( "jitpack" at "https://jitpack.io", - "bintray" at "https://jcenter.bintray.com", +// "bintray" at "https://jcenter.bintray.com", Resolver.bintrayRepo("lonelyplanet", "maven") ), version := "0.0.1", diff --git a/tests/testSupport/test-assembly/src/main/scala/org/tmt/csw/testassembly/TestAssemblyHandlers.scala b/tests/testSupport/test-assembly/src/main/scala/org/tmt/csw/testassembly/TestAssemblyHandlers.scala index dda7d4a..d6855df 100755 --- a/tests/testSupport/test-assembly/src/main/scala/org/tmt/csw/testassembly/TestAssemblyHandlers.scala +++ b/tests/testSupport/test-assembly/src/main/scala/org/tmt/csw/testassembly/TestAssemblyHandlers.scala @@ -1,7 +1,6 @@ package org.tmt.csw.testassembly import java.io.{File, FileOutputStream} - import akka.actor.typed.{ActorSystem, Behavior} import akka.actor.typed.scaladsl.{ActorContext, Behaviors} import csw.command.client.messages.TopLevelActorMessage @@ -247,9 +246,10 @@ class TestAssemblyHandlers(ctx: ActorContext[TopLevelActorMessage], testFd.write(s"$s\n".getBytes()) } - pythonService.subscribeCurrentState { cs => + val currentStateSubscription = pythonService.subscribeCurrentState { cs => testLog(s"Received current state from python service: $cs") } + try { val longRunningCommand = makeTestCommand("LongRunningCommand") val validateResponse = @@ -273,6 +273,9 @@ class TestAssemblyHandlers(ctx: ActorContext[TopLevelActorMessage], testLog( s"Response from simple submit to ${pythonConnection.componentId.fullName}: $simpleCommandResponse") + // Test cancel of current state subscription + currentStateSubscription.cancel() + val resultCommand = makeTestCommand("ResultCommand") val resultCommandResponse = Await.result(pythonService.submit(resultCommand), 5.seconds)