Skip to content
This repository was archived by the owner on Dec 18, 2024. It is now read-only.

[databroker] Add integration tests #330

Conversation

argerus
Copy link
Contributor

@argerus argerus commented Aug 17, 2022

First take at adding integration tests (taken mostly from val.services).

The tests uses a python unit test framework to handle the scaffolding the tests.

A simple runner has been added (run.sh) which:

  1. Starts a databroker container (given by DATABROKER_IMAGE)
  2. Runs the integration tests targeting the container
  3. Stops the container.

This has been integrated with the github action for building databroker.

@@ -0,0 +1,145 @@
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for including these generated files? If there is a practical need to have them included it could maybe be an idea to write a few words on how/when they shall be re-generated.

Copy link
Contributor Author

@argerus argerus Aug 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the _pb2*.py files, it's because the integration tests should be able to verify that the external interface doesn't break even when updating the .proto-files. Updating the generated files should hence be a deliberate action.

I added the script for generating the proto-files.

@argerus argerus force-pushed the feature/integration_tests branch 3 times, most recently from 696144f to e593afb Compare August 17, 2022 15:51
@SebastianSchildt
Copy link
Contributor

I have some trouble running locally (run.sh). Not sure, I already put a sleep and a docker ps after container creation to make sure it is not crashing immediately.

Any immediate idea, what is wrong?

Starting databroker container ("ghcr.io/eclipse/kuksa.val/databroker:v0.17.0")
CONTAINER ID   IMAGE                                            COMMAND                  CREATED         STATUS         PORTS                                           NAMES
6c3982da6173   ghcr.io/eclipse/kuksa.val/databroker:v0.17.0     "/app/databroker --m…"   6 seconds ago   Up 5 seconds   0.0.0.0:55555->55555/tcp, :::55555->55555/tcp   inspiring_thompson
f9fa8f2ec993   1d64d3e888a2                                     "/metrics-server --c…"   2 weeks ago     Up 2 weeks                                                     k8s_metrics-server_metrics-server-ff9dbcb6c-jhltb_kube-system_a688a301-bacd-4955-9bc5-3837e80d0ef5_7
f45357c9f42a   edaa71f2aee8                                     "/coredns -conf /etc…"   2 weeks ago     Up 2 weeks                                                     k8s_coredns_coredns-96cc4f57d-g2x8x_kube-system_a2ee64fa-d271-489a-9f09-aa1b1178a2df_5
9ff37a3f7211   fe8bc53de6e4                                     "local-path-provisio…"   2 weeks ago     Up 2 weeks                                                     k8s_local-path-provisioner_local-path-provisioner-84bb864455-crxvp_kube-system_e571bdfc-9ee3-4d92-be0a-7cadbf97ca04_4
2d9d1202ec5a   k8s.gcr.io/pause:3.6                             "/pause"                 2 weeks ago     Up 2 weeks                                                     k8s_POD_metrics-server-ff9dbcb6c-jhltb_kube-system_a688a301-bacd-4955-9bc5-3837e80d0ef5_4
f51325c4c40f   k8s.gcr.io/pause:3.6                             "/pause"                 2 weeks ago     Up 2 weeks                                                     k8s_POD_local-path-provisioner-84bb864455-crxvp_kube-system_e571bdfc-9ee3-4d92-be0a-7cadbf97ca04_4
a01fddea4a3b   k8s.gcr.io/pause:3.6                             "/pause"                 2 weeks ago     Up 2 weeks                                                     k8s_POD_coredns-96cc4f57d-g2x8x_kube-system_a2ee64fa-d271-489a-9f09-aa1b1178a2df_4
31a97c80a939   vsc-kuksa.val-a127b420aa5baac61a3fbd2048d4f4a1   "/bin/sh -c 'echo Co…"   3 weeks ago     Up 5 hours                                                     awesome_neumann
================================================================================ test session starts ================================================================================
platform darwin -- Python 3.8.13, pytest-7.1.2, pluggy-1.0.0 -- /Users/scs2rng/Documents/Dev/kuksa.val/kuksa_databroker/integration_test/.venv/bin/python3
cachedir: .pytest_cache
rootdir: /Users/scs2rng/Documents/Dev/kuksa.val/kuksa_databroker/integration_test
plugins: asyncio-0.19.0, ordering-0.6
asyncio: mode=strict
collected 3 items                                                                                                                                                                   

test_databroker.py::test_feeder_vdb_connection FAILED                                                                                                                         [ 33%]
test_databroker.py::test_feeder_metadata_registered FAILED                                                                                                                    [ 66%]
test_databroker.py::test_events FAILED                                                                                                                                        [100%]

===================================================================================== FAILURES ======================================================================================
____________________________________________________________________________ test_feeder_vdb_connection _____________________________________________________________________________

    @pytest.mark.asyncio
    async def test_feeder_vdb_connection() -> None:
        logger.info("Connecting to VehicleDataBroker {}".format(VDB_ADDRESS))
        helper = VDBHelper(VDB_ADDRESS)
>       await helper.get_vdb_metadata()

test_databroker.py:42: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
vdb_helper.py:100: in get_vdb_metadata
    response = await self._broker_stub.GetMetadata(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_AioCall of RPC that terminated with:
	status = failed to connect to all addresses
	details = "StatusCode.UNAVAILABLE...iled to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"
>

    def __await__(self) -> ResponseType:
        """Wait till the ongoing RPC request finishes."""
        try:
            response = yield from self._call_response
        except asyncio.CancelledError:
            # Even if we caught all other CancelledError, there is still
            # this corner case. If the application cancels immediately after
            # the Call object is created, we will observe this
            # `CancelledError`.
            if not self.cancelled():
                self.cancel()
            raise
    
        # NOTE(lidiz) If we raise RpcError in the task, and users doesn't
        # 'await' on it. AsyncIO will log 'Task exception was never retrieved'.
        # Instead, if we move the exception raising here, the spam stops.
        # Unfortunately, there can only be one 'yield from' in '__await__'. So,
        # we need to access the private instance variable.
        if response is cygrpc.EOF:
            if self._cython_call.is_locally_cancelled():
                raise asyncio.CancelledError()
            else:
>               raise _create_rpc_error(self._cython_call._initial_metadata,
                                        self._cython_call._status)
E               grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
E               	status = StatusCode.UNAVAILABLE
E               	details = "failed to connect to all addresses"
E               	debug_error_string = "{"created":"@1660838598.072596000","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3261,"referenced_errors":[{"created":"@1660838598.072595000","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"
E               >

.venv/lib/python3.8/site-packages/grpc/aio/_call.py:290: AioRpcError
__________________________________________________________________________ test_feeder_metadata_registered __________________________________________________________________________

setup_helper = <coroutine object setup_helper at 0x105b81940>

    @pytest.mark.asyncio
    async def test_feeder_metadata_registered(setup_helper: VDBHelper) -> None:
        helper = await setup_helper
        feeder_names = [
            "Vehicle.OBD.Speed",
            "Vehicle.Powertrain.Transmission.Gear",
            "Vehicle.Chassis.ParkingBrake.IsEngaged",
            "Vehicle.OBD.EngineLoad",
        ]
    
>       meta = await helper.get_vdb_metadata(feeder_names)

test_databroker.py:57: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
vdb_helper.py:100: in get_vdb_metadata
    response = await self._broker_stub.GetMetadata(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_AioCall of RPC that terminated with:
	status = failed to connect to all addresses
	details = "StatusCode.UNAVAILABLE...iled to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"
>

    def __await__(self) -> ResponseType:
        """Wait till the ongoing RPC request finishes."""
        try:
            response = yield from self._call_response
        except asyncio.CancelledError:
            # Even if we caught all other CancelledError, there is still
            # this corner case. If the application cancels immediately after
            # the Call object is created, we will observe this
            # `CancelledError`.
            if not self.cancelled():
                self.cancel()
            raise
    
        # NOTE(lidiz) If we raise RpcError in the task, and users doesn't
        # 'await' on it. AsyncIO will log 'Task exception was never retrieved'.
        # Instead, if we move the exception raising here, the spam stops.
        # Unfortunately, there can only be one 'yield from' in '__await__'. So,
        # we need to access the private instance variable.
        if response is cygrpc.EOF:
            if self._cython_call.is_locally_cancelled():
                raise asyncio.CancelledError()
            else:
>               raise _create_rpc_error(self._cython_call._initial_metadata,
                                        self._cython_call._status)
E               grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
E               	status = StatusCode.UNAVAILABLE
E               	details = "failed to connect to all addresses"
E               	debug_error_string = "{"created":"@1660838599.077967000","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3261,"referenced_errors":[{"created":"@1660838599.077967000","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"
E               >

.venv/lib/python3.8/site-packages/grpc/aio/_call.py:290: AioRpcError
____________________________________________________________________________________ test_events ____________________________________________________________________________________

setup_helper = <coroutine object setup_helper at 0x105b0fc40>

    @pytest.mark.asyncio
    async def test_events(setup_helper: VDBHelper) -> None:
        helper: VDBHelper = await setup_helper
    
        timeout = 3
        datapoint_speed = "Vehicle.OBD.Speed" # float
        datapoint_engine_load = "Vehicle.OBD.EngineLoad" # float
        alias_speed = "speed"
        alias_load = "load"
    
        query = "SELECT {} as {}, {} as {}".format(datapoint_speed, alias_speed, datapoint_engine_load, alias_load)
    
        events = []
        # inner function for collecting subscription events
    
        def inner_callback(name: str, dp: Datapoint):
            dd = helper.datapoint_to_dict(name, dp)
            events.append(dd)
    
        logger.info("# subscribing('{}', timeout={})".format(query, timeout))
    
        subscription = asyncio.create_task(
            helper.subscribe_datapoints(query, timeout=timeout, sub_callback=inner_callback)
        )
    
        set_name1 = asyncio.create_task(
            helper.set_float_datapoint(datapoint_speed, 40.0)
        )
        set_name2 = asyncio.create_task(
            helper.set_float_datapoint(datapoint_engine_load, 10.0)
        )
    
>       await set_name1

test_databroker.py:120: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
vdb_helper.py:208: in set_float_datapoint
    datapoint_id = await self.__get_or_create_datapoint_id_by_name(
vdb_helper.py:165: in __get_or_create_datapoint_id_by_name
    await self.__initialize_metadata()
vdb_helper.py:156: in __initialize_metadata
    response = await self._broker_stub.GetMetadata([])
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_AioCall of RPC that terminated with:
	status = failed to connect to all addresses
	details = "StatusCode.UNAVAILABLE...iled to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"
>

    def __await__(self) -> ResponseType:
        """Wait till the ongoing RPC request finishes."""
        try:
            response = yield from self._call_response
        except asyncio.CancelledError:
            # Even if we caught all other CancelledError, there is still
            # this corner case. If the application cancels immediately after
            # the Call object is created, we will observe this
            # `CancelledError`.
            if not self.cancelled():
                self.cancel()
            raise
    
        # NOTE(lidiz) If we raise RpcError in the task, and users doesn't
        # 'await' on it. AsyncIO will log 'Task exception was never retrieved'.
        # Instead, if we move the exception raising here, the spam stops.
        # Unfortunately, there can only be one 'yield from' in '__await__'. So,
        # we need to access the private instance variable.
        if response is cygrpc.EOF:
            if self._cython_call.is_locally_cancelled():
                raise asyncio.CancelledError()
            else:
>               raise _create_rpc_error(self._cython_call._initial_metadata,
                                        self._cython_call._status)
E               grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
E               	status = StatusCode.UNAVAILABLE
E               	details = "failed to connect to all addresses"
E               	debug_error_string = "{"created":"@1660838600.837507000","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3261,"referenced_errors":[{"created":"@1660838600.837507000","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"
E               >

.venv/lib/python3.8/site-packages/grpc/aio/_call.py:290: AioRpcError
--------------------------------------------------------------------------------- Captured log call ---------------------------------------------------------------------------------
ERROR    grpc._common:_common.py:88 Exception serializing message!
Traceback (most recent call last):
  File "/Users/scs2rng/Documents/Dev/kuksa.val/kuksa_databroker/integration_test/.venv/lib/python3.8/site-packages/grpc/_common.py", line 86, in _transform
    return transformer(message)
TypeError: descriptor 'SerializeToString' for 'google._upb._message.Message' objects doesn't apply to a 'list' object
ERROR    root:vdb_helper.py:262 broker.Subscribe(SELECT Vehicle.OBD.Speed as speed, Vehicle.OBD.EngineLoad as load) failed!
 --> grpcError[Status:UNAVAILABLE (14, 'unavailable'), details:'failed to connect to all addresses']
============================================================================== short test summary info ==============================================================================
FAILED test_databroker.py::test_feeder_vdb_connection - grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
FAILED test_databroker.py::test_feeder_metadata_registered - grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
FAILED test_databroker.py::test_events - grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
================================================================================= 3 failed in 2.82s =================================================================================
Task exception was never retrieved
future: <Task finished name='Task-6' coro=<VDBHelper.subscribe_datapoints() done, defined at /Users/scs2rng/Documents/Dev/kuksa.val/kuksa_databroker/integration_test/vdb_helper.py:219> exception=<AioRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses"
	debug_error_string = "{"created":"@1660838600.837507000","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3261,"referenced_errors":[{"created":"@1660838600.837507000","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"
>>
Traceback (most recent call last):
  File "/Users/scs2rng/Documents/Dev/kuksa.val/kuksa_databroker/integration_test/vdb_helper.py", line 267, in subscribe_datapoints
    raise e
  File "/Users/scs2rng/Documents/Dev/kuksa.val/kuksa_databroker/integration_test/vdb_helper.py", line 233, in subscribe_datapoints
    async for subscribe_reply in response:
  File "/Users/scs2rng/Documents/Dev/kuksa.val/kuksa_databroker/integration_test/.venv/lib/python3.8/site-packages/grpc/aio/_call.py", line 326, in _fetch_stream_responses
    await self._raise_for_status()
  File "/Users/scs2rng/Documents/Dev/kuksa.val/kuksa_databroker/integration_test/.venv/lib/python3.8/site-packages/grpc/aio/_call.py", line 236, in _raise_for_status
    raise _create_rpc_error(await self.initial_metadata(), await
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses"
	debug_error_string = "{"created":"@1660838600.837507000","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3261,"referenced_errors":[{"created":"@1660838600.837507000","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"
>
Task exception was never retrieved
future: <Task finished name='Task-8' coro=<VDBHelper.set_float_datapoint() done, defined at /Users/scs2rng/Documents/Dev/kuksa.val/kuksa_databroker/integration_test/vdb_helper.py:205> exception=<AioRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses"
	debug_error_string = "{"created":"@1660838600.837506000","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3261,"referenced_errors":[{"created":"@1660838600.837506000","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"
>>
Traceback (most recent call last):
  File "/Users/scs2rng/Documents/Dev/kuksa.val/kuksa_databroker/integration_test/vdb_helper.py", line 208, in set_float_datapoint
    datapoint_id = await self.__get_or_create_datapoint_id_by_name(
  File "/Users/scs2rng/Documents/Dev/kuksa.val/kuksa_databroker/integration_test/vdb_helper.py", line 169, in __get_or_create_datapoint_id_by_name
    response = await self.register_datapoint(name, data_type)
  File "/Users/scs2rng/Documents/Dev/kuksa.val/kuksa_databroker/integration_test/vdb_helper.py", line 184, in register_datapoint
    response = await self.__register_datapoints(datapoints=[registration_metadata])
  File "/Users/scs2rng/Documents/Dev/kuksa.val/kuksa_databroker/integration_test/vdb_helper.py", line 74, in __register_datapoints
    response = await self._collector_stub.RegisterDatapoints(
  File "/Users/scs2rng/Documents/Dev/kuksa.val/kuksa_databroker/integration_test/.venv/lib/python3.8/site-packages/grpc/aio/_call.py", line 290, in __await__
    raise _create_rpc_error(self._cython_call._initial_metadata,
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses"
	debug_error_string = "{"created":"@1660838600.837506000","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3261,"referenced_errors":[{"created":"@1660838600.837506000","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"
>
Stopping databroker container
6c3982da6173a2286150ecb69d046ae2efa66ae3f51a06d260582df95836eb69

First take at adding integration tests (taken mostly from val.services).

The tests uses a python unit test framework to handle the scaffolding
around the tests.

A simple runner has been added (`run.sh`) which:
1. Starts a databroker container (given by `DATABROKER_IMAGE`)
2. Runs the integration tests targeting the container
3. Stops the container.

This has been integrated with the github action for building databroker.
@argerus argerus force-pushed the feature/integration_tests branch from e593afb to 4301842 Compare August 19, 2022 15:34
@argerus
Copy link
Contributor Author

argerus commented Aug 19, 2022

@SebastianSchildt

I have some trouble running locally (run.sh). Not sure, I already put a sleep and a docker ps after container creation to make sure it is not crashing immediately.

Any immediate idea, what is wrong?

A theory is that the hardcoded address localhost:55555 is causing the client to try to connect to the IPv6 localhost & that the server is only listening on IPv4. I pushed an update to make it easier to change what the client connects to, in addition to changing the default to 127.0.0.1:55555.

Copy link
Contributor

@SebastianSchildt SebastianSchildt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested: Works on amd64 and arm64 Linux (previous issues maybe related to trying on Mac OS. Not critical)
lgtm 🐘

@SebastianSchildt SebastianSchildt merged commit b6b9691 into eclipse-archived:master Aug 22, 2022
@SebastianSchildt SebastianSchildt deleted the feature/integration_tests branch August 26, 2022 14:45
# for free to subscribe to this conversation on GitHub. Already have an account? #.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants