diff --git a/sonic-ycabled/proto/proto_out/linkmgr_grpc_driver.proto b/sonic-ycabled/proto/proto_out/linkmgr_grpc_driver.proto new file mode 100644 index 000000000000..5426898bc491 --- /dev/null +++ b/sonic-ycabled/proto/proto_out/linkmgr_grpc_driver.proto @@ -0,0 +1,47 @@ +syntax = "proto3"; + +service DualToRActive { + rpc QueryAdminForwardingPortState(AdminRequest) returns (AdminReply) {} + rpc SetAdminForwardingPortState(AdminRequest) returns (AdminReply) {} + rpc QueryOperationPortState(OperationRequest) returns (OperationReply) {} + rpc QueryLinkState(LinkStateRequest) returns (LinkStateReply) {} + rpc QueryServerVersion(ServerVersionRequest) returns (ServerVersionReply) {} +} + +message AdminRequest { + repeated int32 portid = 1; + repeated bool state = 2; +} + +message AdminReply { + repeated int32 portid = 1; + repeated bool state = 2; +} + +message OperationRequest { + repeated int32 portid = 1; +} + +message OperationReply { + repeated int32 portid = 1; + repeated bool state = 2; +} + +message LinkStateRequest { + repeated int32 portid = 1; +} + +message LinkStateReply { + repeated int32 portid = 1; + repeated bool state = 2; +} + +message ServerVersionRequest { + string version = 1; +} + +message ServerVersionReply { + string version = 1; +} + + diff --git a/sonic-ycabled/proto_out/__init__.py b/sonic-ycabled/proto_out/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sonic-ycabled/setup.py b/sonic-ycabled/setup.py index 778ed72ae7fb..60ee1414ccc2 100644 --- a/sonic-ycabled/setup.py +++ b/sonic-ycabled/setup.py @@ -1,9 +1,34 @@ from setuptools import setup, find_packages +from distutils.command.build_ext import build_ext as _build_ext +import distutils.command + +class GrpcTool(distutils.cmd.Command): + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + import grpc_tools.protoc + + grpc_tools.protoc.main([ + 'grpc_tools.protoc', + '-Iproto', + '--python_out=.', + '--grpc_python_out=.', + 'proto/proto_out/linkmgr_grpc_driver.proto' + ]) + +class BuildExtCommand (_build_ext, object): + def run(self): + self.run_command('GrpcTool') + super(BuildExtCommand, self).run() setup( name='sonic-ycabled', version='1.0', - description='Y-cable configuration daemon for SONiC', + description='Y-cable and smart nic configuration daemon for SONiC', license='Apache 2.0', author='SONiC Team', author_email='linuxnetdev@microsoft.com', @@ -16,13 +41,16 @@ 'ycabled = ycable.ycable:main', ] }, + cmdclass={'build_ext': BuildExtCommand, + 'GrpcTool': GrpcTool}, install_requires=[ # NOTE: This package also requires swsscommon, but it is not currently installed as a wheel 'enum34; python_version < "3.4"', 'sonic-py-common', ], setup_requires=[ - 'wheel' + 'wheel', + 'grpcio-tools' ], tests_require=[ 'pytest', diff --git a/sonic-ycabled/tests/test_y_cable_helper.py b/sonic-ycabled/tests/test_y_cable_helper.py index 8048cbcbcead..bf7d187e4ac9 100644 --- a/sonic-ycabled/tests/test_y_cable_helper.py +++ b/sonic-ycabled/tests/test_y_cable_helper.py @@ -4561,6 +4561,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_no_port(self, mock_swssc xcvrd_show_hwmode_dir_cmd_sts_tbl = mock_swsscommon_table xcvrd_show_hwmode_dir_rsp_tbl = mock_swsscommon_table xcvrd_show_hwmode_dir_res_tbl = mock_swsscommon_table + port_tbl = mock_swsscommon_table asic_index = 0 task_download_firmware_thread = {} @@ -4568,7 +4569,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_no_port(self, mock_swssc fvp = {"state": "active"} rc = handle_show_hwmode_state_cmd_arg_tbl_notification( - fvp, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) + fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) assert(rc == -1) @patch('swsscommon.swsscommon.Table') @@ -4588,6 +4589,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_else_condition(self, moc xcvrd_show_hwmode_dir_res_tbl = mock_swsscommon_table xcvrd_config_hwmode_state_cmd_sts_tbl = mock_swsscommon_table xcvrd_config_hwmode_state_rsp_tbl = mock_swsscommon_table + port_tbl = mock_swsscommon_table asic_index = 0 task_download_firmware_thread = {} @@ -4595,7 +4597,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_else_condition(self, moc fvp = {"down_firmware": "null"} rc = handle_show_hwmode_state_cmd_arg_tbl_notification( - fvp, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) + fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) assert(rc == None) @patch('swsscommon.swsscommon.Table') @@ -4606,6 +4608,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_else_condition(self, moc @patch('ycable.ycable_utilities.y_cable_helper.get_ycable_physical_port_from_logical_port', MagicMock(return_value=(0))) @patch('ycable.ycable_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('ycable.ycable_utilities.y_cable_helper.y_cable_wrapper_get_presence', MagicMock(return_value=True)) + @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-standby"))) @patch('ycable.ycable_utilities.y_cable_helper.y_cable_port_locks', MagicMock(return_value=[0])) @patch('os.path.isfile', MagicMock(return_value=True)) @patch('time.sleep', MagicMock(return_value=True)) @@ -4619,6 +4622,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_manual(sel xcvrd_show_hwmode_dir_res_tbl = mock_swsscommon_table xcvrd_config_hwmode_state_cmd_sts_tbl = mock_swsscommon_table xcvrd_config_hwmode_state_rsp_tbl = mock_swsscommon_table + port_tbl = mock_swsscommon_table asic_index = 0 task_download_firmware_thread = {} port = "Ethernet0" @@ -4651,7 +4655,7 @@ def get_switching_mode(self): patched_util.get.return_value = PortInstanceHelper() rc = handle_show_hwmode_state_cmd_arg_tbl_notification( - fvp, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) + fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) assert(rc == None) @patch('swsscommon.swsscommon.Table') @@ -4659,16 +4663,20 @@ def get_switching_mode(self): "lane_mask": "0", "direction": "0"}))) @patch('ycable.ycable_utilities.y_cable_helper.get_ycable_physical_port_from_logical_port', MagicMock(return_value=(0))) + @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-standby"))) @patch('ycable.ycable_utilities.y_cable_helper.y_cable_port_locks', MagicMock(return_value=[0])) @patch('os.path.isfile', MagicMock(return_value=True)) def test_handle_show_mux_state_cmd_arg_tbl_notification_no_instance(self, mock_swsscommon_table): mock_table = MagicMock() + mock_table.get = MagicMock( + side_effect=[(True, (('state', "auto"), ("soc_ipv4", "192.168.0.1/32"))), (True, (('index', 2), ))]) mock_swsscommon_table.return_value = mock_table xcvrd_show_hwmode_dir_cmd_sts_tbl = mock_swsscommon_table xcvrd_show_hwmode_dir_rsp_tbl = mock_swsscommon_table xcvrd_show_hwmode_dir_res_tbl = mock_swsscommon_table + port_tbl = mock_swsscommon_table asic_index = 0 task_download_firmware_thread = {} @@ -4676,7 +4684,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_no_instance(self, mock_s fvp = {"state": "active"} rc = handle_show_hwmode_state_cmd_arg_tbl_notification( - fvp, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) + fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) assert(rc == -1) @patch('swsscommon.swsscommon.Table') @@ -4688,11 +4696,14 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_no_instance(self, mock_s @patch('ycable.ycable_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('ycable.ycable_utilities.y_cable_helper.y_cable_wrapper_get_presence', MagicMock(return_value=True)) @patch('ycable.ycable_utilities.y_cable_helper.y_cable_port_locks', MagicMock(return_value=[0])) + @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-standby"))) @patch('os.path.isfile', MagicMock(return_value=True)) @patch('time.sleep', MagicMock(return_value=True)) def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_auto(self, mock_swsscommon_table, platform_sfputil): mock_table = MagicMock() + mock_table.get = MagicMock( + side_effect=[(True, (('state', "auto"), ("soc_ipv4", "192.168.0.1/32"))), (True, (('index', 2), ))]) mock_swsscommon_table.return_value = mock_table xcvrd_config_hwmode_state_cmd_sts_tbl = mock_swsscommon_table @@ -4700,6 +4711,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_auto(self, xcvrd_show_hwmode_dir_cmd_sts_tbl = mock_swsscommon_table xcvrd_show_hwmode_dir_rsp_tbl = mock_swsscommon_table xcvrd_show_hwmode_dir_res_tbl = mock_swsscommon_table + port_tbl = mock_swsscommon_table asic_index = 0 task_download_firmware_thread = {} port = "Ethernet0" @@ -4732,9 +4744,473 @@ def get_mux_direction(self): patched_util.get.return_value = PortInstanceHelper() rc = handle_show_hwmode_state_cmd_arg_tbl_notification( - fvp, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) + fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) assert(rc == None) + def test_retry_setup_grpc_channel_for_port_incorrect(self): + + status = True + fvs = [('state', "auto"), ('read_side', 1)] + Table = MagicMock() + Table.get.return_value = (status, fvs) + swsscommon.Table.return_value.get.return_value = ( + False, {"read_side": "2"}) + rc = retry_setup_grpc_channel_for_port("Ethernet0", 0) + assert(rc == False) + + @patch('ycable.ycable_utilities.y_cable_helper.setup_grpc_channel_for_port', MagicMock(return_value=(True,True))) + def test_retry_setup_grpc_channel_for_port_correct(self): + + status = True + fvs = [('state', "auto"), ('read_side', 1)] + Table = MagicMock() + Table.get.return_value = (status, fvs) + swsscommon.Table.return_value.get.return_value = ( + True, {"cable_type": "active-active", "soc_ipv4":"192.168.0.1/32", "state":"active"}) + rc = retry_setup_grpc_channel_for_port("Ethernet0", 0) + assert(rc == True) + + @patch('ycable.ycable_utilities.y_cable_helper.setup_grpc_channel_for_port', MagicMock(return_value=(None,None))) + def test_retry_setup_grpc_channel_for_port_correct_none_val(self): + + status = True + fvs = [('state', "auto"), ('read_side', 1)] + Table = MagicMock() + Table.get.return_value = (status, fvs) + swsscommon.Table.return_value.get.return_value = ( + True, {"cable_type": "active-active", "soc_ipv4":"192.168.0.1/32", "state":"active"}) + rc = retry_setup_grpc_channel_for_port("Ethernet0", 0) + assert(rc == False) + + def test_process_loopback_interface_and_get_read_side_rc(self): + + loopback_keys = [["Loopback3|10.212.64.2/3", "Loopback3|2603:1010:100:d::1/128"]] + rc = process_loopback_interface_and_get_read_side(loopback_keys) + assert(rc == 0) + + def test_process_loopback_interface_and_get_read_side_rc_true(self): + + loopback_keys = [["Loopback3|10.212.64.1/3", "Loopback3|2603:1010:100:d::1/128"]] + rc = process_loopback_interface_and_get_read_side(loopback_keys) + assert(rc == 1) + + def test_process_loopback_interface_and_get_read_side_false(self): + + loopback_keys = [["Loopback2|10.212.64.1/3", "Loopback3|2603:1010:100:d::1/128"]] + rc = process_loopback_interface_and_get_read_side(loopback_keys) + assert(rc == -1) + + @patch('ycable.ycable_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + def test_check_identifier_presence_and_setup_channel(self): + + status = True + fvs = [('state', "auto"), ('read_side', 1), ('cable_type','active-active'), ('soc_ipv4','192.168.0.1')] + + state_db = {} + test_db = "TEST_DB" + y_cable_tbl = {} + static_tbl = {} + mux_tbl = {} + hw_mux_cable_tbl = {} + hw_mux_cable_tbl_peer = {} + port_tbl = {} + read_side = 0 + asic_index = 0 + y_cable_presence = [True] + delete_change_event = [True] + + port_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "PORT_INFO_TABLE") + port_tbl[asic_index].get.return_value = (status, fvs) + hw_mux_cable_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "HW_TABLE1") + hw_mux_cable_tbl_peer[asic_index] = swsscommon.Table( + test_db[asic_index], "HW_TABLE2") + + rc = check_identifier_presence_and_setup_channel("Ethernet0", port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, y_cable_presence) + assert(rc == None) + + @patch('ycable.ycable_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('ycable.ycable_utilities.y_cable_helper.setup_grpc_channel_for_port', MagicMock(return_value=(None, None))) + def test_check_identifier_presence_and_setup_channel_with_mock(self): + + status = True + fvs = [('state', "auto"), ('read_side', 1), ('cable_type','active-active'), ('soc_ipv4','192.168.0.1')] + + state_db = {} + test_db = "TEST_DB" + y_cable_tbl = {} + static_tbl = {} + mux_tbl = {} + hw_mux_cable_tbl = {} + hw_mux_cable_tbl_peer = {} + port_tbl = {} + read_side = 0 + asic_index = 0 + y_cable_presence = [True] + delete_change_event = [True] + + port_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "PORT_INFO_TABLE") + port_tbl[asic_index].get.return_value = (status, fvs) + hw_mux_cable_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "HW_TABLE1") + hw_mux_cable_tbl_peer[asic_index] = swsscommon.Table( + test_db[asic_index], "HW_TABLE2") + + rc = check_identifier_presence_and_setup_channel("Ethernet0", port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, y_cable_presence) + assert(rc == None) + + + @patch('ycable.ycable_utilities.y_cable_helper.y_cable_wrapper_get_presence', MagicMock(return_value=True)) + @patch('ycable.ycable_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('ycable.ycable_utilities.y_cable_helper.setup_grpc_channel_for_port', MagicMock(return_value=(None, None))) + @patch('ycable.ycable_utilities.y_cable_helper.grpc_port_stubs', MagicMock(return_value={})) + @patch('ycable.ycable_utilities.y_cable_helper.grpc_port_channels', MagicMock(return_value={})) + def test_check_identifier_presence_and_setup_channel_with_mock_not_none(self): + + status = True + fvs = [('state', "auto"), ('read_side', 1), ('cable_type','active-active'), ('soc_ipv4','192.168.0.1')] + + state_db = {} + test_db = "TEST_DB" + y_cable_tbl = {} + static_tbl = {} + mux_tbl = {} + hw_mux_cable_tbl = {} + hw_mux_cable_tbl_peer = {} + port_tbl = {} + read_side = 0 + asic_index = 0 + y_cable_presence = [True] + delete_change_event = [True] + + port_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "PORT_INFO_TABLE") + port_tbl[asic_index].get.return_value = (status, fvs) + hw_mux_cable_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "HW_TABLE1") + hw_mux_cable_tbl_peer[asic_index] = swsscommon.Table( + test_db[asic_index], "HW_TABLE2") + + rc = check_identifier_presence_and_setup_channel("Ethernet0", port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, y_cable_presence) + assert(rc == None) + + @patch('proto_out.linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub', MagicMock(return_value=True)) + def test_setup_grpc_channel_for_port(self): + + rc = setup_grpc_channel_for_port("Ethernet0", "192.168.0.1") + + assert(rc == (None, None)) + + + def test_setup_grpc_channels(self): + + stop_event = MagicMock() + stop_event.is_set.return_value = False + with patch('ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil') as patched_util: + + patched_util.logical.return_value = ['Ethernet0', 'Ethernet4'] + patched_util.get_asic_id_for_logical_port.return_value = 0 + rc = setup_grpc_channels(stop_event) + + assert(rc == None) + + + def test_check_mux_cable_port_type_get_none(self): + + stop_event = MagicMock() + test_db = "TEST_DB" + status = False + asic_index = 0 + fvs = [('state', "auto"), ('read_side', 1), ('cable_type','active-active'), ('soc_ipv4','192.168.0.1')] + stop_event.is_set.return_value = False + port_tbl = {} + port_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "PORT_INFO_TABLE") + port_tbl[asic_index].get.return_value = (status, fvs) + + rc = check_mux_cable_port_type("Ethernet0", port_tbl, 0) + assert(rc == (False, None)) + + + def test_check_mux_cable_port_type_get_correct(self): + + stop_event = MagicMock() + status = True + asic_index = 0 + test_db = "TEST_DB" + fvs = [('state', "auto"), ('read_side', 1), ('cable_type','active-active'), ('soc_ipv4','192.168.0.1')] + stop_event.is_set.return_value = False + port_tbl = {} + port_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "PORT_INFO_TABLE") + port_tbl[asic_index].get.return_value = (status, fvs) + + rc = check_mux_cable_port_type("Ethernet0", port_tbl, 0) + assert(rc == (True, "active-active")) + + + def test_check_mux_cable_port_type_get_correct_standby(self): + + stop_event = MagicMock() + status = True + asic_index = 0 + test_db = "TEST_DB" + fvs = [('state', "auto"), ('read_side', 1), ('cable_type','active-standby'), ('soc_ipv4','192.168.0.1')] + stop_event.is_set.return_value = False + port_tbl = {} + port_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "PORT_INFO_TABLE") + port_tbl[asic_index].get.return_value = (status, fvs) + + rc = check_mux_cable_port_type("Ethernet0", port_tbl, 0) + assert(rc == (True, "active-standby")) + + + def test_parse_grpc_response_hw_mux_cable_change_state(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0] + self.state = [True] + + + response = Response_Helper() + + rc = parse_grpc_response_hw_mux_cable_change_state(True, response, 0, "Ethernet0") + assert(rc == "active") + + + def test_parse_grpc_response_hw_mux_cable_change_state_standby(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0] + self.state = [False] + + + response = Response_Helper() + + rc = parse_grpc_response_hw_mux_cable_change_state(True, response, 0, "Ethernet0") + assert(rc == "standby") + + + def test_parse_grpc_response_hw_mux_cable_change_state_unknown(self): + + class Response_Helper(): + def __init__(self): + self.portid = [1] + self.state = [False] + + + response = Response_Helper() + + rc = parse_grpc_response_hw_mux_cable_change_state(True, response, 0, "Ethernet0") + assert(rc == "unknown") + + + def test_parse_grpc_response_hw_mux_cable_change_state_unknown_false(self): + + class Response_Helper(): + def __init__(self): + self.portid = [1] + self.state = [False] + + + response = Response_Helper() + + rc = parse_grpc_response_hw_mux_cable_change_state(False, response, 0, "Ethernet0") + assert(rc == "unknown") + + + def test_parse_grpc_response_forwarding_state_unknown_false(self): + + class Response_Helper(): + def __init__(self): + self.portid = [1] + self.state = [False] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(False, None, 0) + assert(rc == ("unknown", "unknown")) + + + def test_parse_grpc_response_forwarding_state_active_standby_true(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [True,False] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 0) + assert(rc == ("active", "standby")) + + + def test_parse_grpc_response_forwarding_state_active_active_true(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [True,True] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 0) + assert(rc == ("active", "active")) + + + def test_parse_grpc_response_forwarding_state_active_standby_true_read_side(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [True,False] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 1) + assert(rc == ("standby", "active")) + + + def test_parse_grpc_response_forwarding_state_active_active_true_read_side(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [True,True] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 1) + assert(rc == ("active", "active")) + + + def test_parse_grpc_response_forwarding_state_active_active_true(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [True,True] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 1) + assert(rc == ("active", "active")) + + + def test_parse_grpc_response_forwarding_state_active_standby_true(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [False,True] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 1) + assert(rc == ("standby", "active")) + + + def test_parse_grpc_response_forwarding_state_active_standby_true(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [False,True] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 0) + assert(rc == ("standby", "active")) + + + def test_parse_grpc_response_forwarding_state_standby_standby_true(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [False,False] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 1) + assert(rc == ("standby", "standby")) + + + def test_parse_grpc_response_forwarding_state_standby_standby_true(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [False,False] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 0) + assert(rc == ("standby", "standby")) + + + def test_parse_grpc_response_forwarding_state_active_active_with_true_read_side(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [True,True] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 0) + assert(rc == ("active", "active")) + + + def test_parse_grpc_response_forwarding_state_standby_standby_with_true_read_side(self): + + class Response_Helper(): + def __init__(self): + self.portid = [0,1] + self.state = [False,False] + + + response = Response_Helper() + + rc = parse_grpc_response_forwarding_state(True, response, 1) + assert(rc == ("standby", "standby")) + + + @patch('ycable.ycable_utilities.y_cable_helper.grpc_port_stubs', MagicMock(return_value={})) + @patch('ycable.ycable_utilities.y_cable_helper.grpc_port_channels', MagicMock(return_value={})) + def test_parse_grpc_response_forwarding_state_standby_standby_with_true_read_side(self): + + status = True + asic_index = 0 + test_db = "TEST_DB" + port = "Ethernet0" + fvs_m = [('command', "probe"), ('read_side', 1), ('cable_type','active-standby'), ('soc_ipv4','192.168.0.1')] + hw_mux_cable_tbl = {} + fwd_state_response_tbl = {} + hw_mux_cable_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "PORT_INFO_TABLE") + fwd_state_response_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "PORT_INFO_TABLE") + hw_mux_cable_tbl[asic_index].get.return_value = (status, fvs_m) + + rc = handle_fwd_state_command_grpc_notification(fvs_m, hw_mux_cable_tbl, fwd_state_response_tbl, asic_index, port, "TestDB") + assert(rc == True) def test_get_mux_cable_static_info_without_presence(self): rc = get_muxcable_static_info_without_presence() diff --git a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py index 3504e111992c..da247650603a 100644 --- a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py +++ b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py @@ -4,18 +4,28 @@ """ import datetime +import ipaddress import os import re +import sys import threading import time from importlib import import_module + +import grpc +from proto_out import linkmgr_grpc_driver_pb2_grpc +from proto_out import linkmgr_grpc_driver_pb2 from sonic_py_common import daemon_base, logger from sonic_py_common import multi_asic from sonic_y_cable import y_cable_vendor_mapping from swsscommon import swsscommon +if sys.version_info.major == 3: + UNICODE_TYPE = str +else: + UNICODE_TYPE = unicode SELECT_TIMEOUT = 1000 @@ -23,6 +33,25 @@ y_cable_platform_chassis = None y_cable_is_platform_vs = None +# Global port channels for gRPC RPC's +grpc_port_channels = {} +# Global port channel stubs for gRPC RPC's +grpc_port_stubs = {} + +GRPC_PORT = 50075 + +read_side = -1 + +DEFAULT_NAMESPACE = "" + +LOOPBACK_INTERFACE_T0 = "10.212.64.1/32" +LOOPBACK_INTERFACE_LT0 = "10.212.64.2/32" +LOOPBACK_INTERFACE_T0_NIC = "10.1.0.37/32" +LOOPBACK_INTERFACE_LT0_NIC = "10.1.0.38/32" +# rename and put in right place +# port id 0 -> maps to T0 +# port id 1 -> maps to LT0 + SYSLOG_IDENTIFIER = "y_cable_helper" helper_logger = logger.Logger(SYSLOG_IDENTIFIER) @@ -234,6 +263,313 @@ def set_show_firmware_fields(port, mux_info_dict, xcvrd_show_fw_rsp_tbl): return 0 + +def check_mux_cable_port_type(logical_port_name, port_tbl, asic_index): + + (status, fvs) = port_tbl[asic_index].get(logical_port_name) + if status is False: + helper_logger.log_warning( + "Could not retreive fieldvalue pairs for {}, inside config_db table {}".format(logical_port_name, port_tbl[asic_index].getTableName())) + return (False, None) + + else: + # Convert list of tuples to a dictionary + mux_table_dict = dict(fvs) + if "state" in mux_table_dict: + + val = mux_table_dict.get("state", None) + cable_type = mux_table_dict.get("cable_type", None) + + if val in ["active", "standby", "auto", "manual"]: + if cable_type == "active-active": + helper_logger.log_debug("Y_CABLE_DEBUG:check_mux_cable_port_type returning True active-active port {}".format(logical_port_name)) + return (True , "active-active") + else: + helper_logger.log_debug("Y_CABLE_DEBUG:check_mux_cable_port_type returning True active-standby port {}".format(logical_port_name)) + return (True, "active-standby") + else: + helper_logger.log_debug("Y_CABLE_DEBUG:check_mux_cable_port_type returning False None port {}".format(logical_port_name)) + return (False, None) + + +def hook_grpc_nic_simulated(target, soc_ip): + """ + Args: + target (function): The function collecting transceiver info. + """ + + #NIC_SIMULATOR_CONFIG_FILE = "/etc/sonic/nic_simulator.json" + + def wrapper(*args, **kwargs): + #res = target(*args, **kwargs) + if os.path.exists(MUX_SIMULATOR_CONFIG_FILE): + """setup channels for all downlinks + NIC simulator will run on same port number + Todo put a task for secure channel""" + channel = grpc.insecure_channel("server_ip:GRPC_PORT".format(host)) + stub = None + #metadata_interceptor = MetadataInterceptor(("grpc_server", soc_ipv4)) + #intercept_channel = grpc.intercept_channel(channel, metadata_interceptor) + #stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(intercept_channel) + # TODO hook the interceptor appropriately + return channel, stub + + wrapper.__name__ = target.__name__ + + return wrapper + + +def retry_setup_grpc_channel_for_port(port, asic_index): + + config_db, port_tbl = {}, {} + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + config_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace) + port_tbl[asic_id] = swsscommon.Table(config_db[asic_id], "MUX_CABLE") + + (status, fvs) = port_tbl[asic_index].get(port) + if status is False: + helper_logger.log_warning( + "Could not retreive fieldvalue pairs for {}, inside config_db table {}".format(port, port_tbl[asic_index].getTableName())) + return False + + else: + # Convert list of tuples to a dictionary + mux_table_dict = dict(fvs) + if "state" in mux_table_dict and "soc_ipv4" in mux_table_dict: + + soc_ipv4_full = mux_table_dict.get("soc_ipv4", None) + if soc_ipv4_full is not None: + soc_ipv4 = soc_ipv4_full.split('/')[0] + + channel, stub = setup_grpc_channel_for_port(port, soc_ipv4) + if channel is None or stub is None: + helper_logger.log_notice( + "stub is None, while reattempt setting up channels did not work {}".format(port)) + return False + else: + grpc_port_channels[port] = channel + grpc_port_stubs[port] = stub + return True + +def setup_grpc_channel_for_port(port, soc_ip): + "TODO make these configurable like RESTAPI" + """ + root_cert = open('/etc/sonic/credentials/ca-chain-bundle.cert.pem', 'rb').read() + key = open('/etc/sonic/credentials/client.key.pem', 'rb').read() + cert_chain = open('/etc/sonic/credentials/client.cert.pem', 'rb').read() + + """ + """ + Dummy values for lab for now + TODO remove these once done + root_cert = open('/home/admin/proto_out1/proto_out/ca-chain-bundle.cert.pem', 'rb').read() + key = open('/home/admin/proto_out1/proto_out/client.key.pem', 'rb').read() + cert_chain = open('/home/admin/proto_out1/proto_out/client.cert.pem', 'rb').read() + """ + """credential = grpc.ssl_channel_credentials( + root_certificates=root_cert, + private_key=key, + certificate_chain=cert_chain) + """ + helper_logger.log_debug("Y_CABLE_DEBUG:setting up gRPC channel for RPC's {} {}".format(port,soc_ip)) + channel = grpc.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=[('grpc.keepalive_timeout_ms', 1000)]) + stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel) + + channel_ready = grpc.channel_ready_future(channel) + + try: + channel_ready.result(timeout=0.2) + except grpc.FutureTimeoutError: + channel = None + stub = None + + if stub is None: + helper_logger.log_warning("stub was not setup gRPC ip {} port {}, no gRPC server running ".format(soc_ip, port)) + if channel is None: + helper_logger.log_warning("channel was not setup gRPC ip {} port {}, no gRPC server running".format(soc_ip, port)) + + return channel, stub + + +def process_loopback_interface_and_get_read_side(loopback_keys): + + asic_index = multi_asic.get_asic_index_from_namespace(DEFAULT_NAMESPACE) + + for key in loopback_keys[asic_index]: + helper_logger.log_debug("Y_CABLE_DEBUG:Loopback key = {} ".format(key)) + if key.startswith("Loopback3|") and "/" in key and "::" not in key: + helper_logger.log_debug("Y_CABLE_DEBUG:Loopback split 1 {} ".format(key)) + temp_list = key.split('|') + addr = temp_list[1].split('/')[0] + helper_logger.log_debug("Y_CABLE_DEBUG:Loopback split 2 {} ".format(addr)) + loopback_prefix = ipaddress.ip_network(UNICODE_TYPE(addr)) + loopback_address = str(loopback_prefix) + helper_logger.log_debug("Y_CABLE_DEBUG:Loopback address parsed = {} ".format(loopback_address)) + if loopback_address == LOOPBACK_INTERFACE_LT0 or loopback_address == LOOPBACK_INTERFACE_LT0_NIC: + return 0 + elif loopback_address == LOOPBACK_INTERFACE_T0 or loopback_address == LOOPBACK_INTERFACE_T0_NIC: + return 1 + else: + # Loopback3 should be present, if not present log a warning + helper_logger.log_warning("Could not get any address associated with Loopback3") + return -1 + + return -1 + + +def check_identifier_presence_and_setup_channel(logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, y_cable_presence): + + global grpc_port_stubs + global grpc_port_channels + + (status, fvs) = port_tbl[asic_index].get(logical_port_name) + if status is False: + helper_logger.log_warning( + "Could not retreive fieldvalue pairs for {}, inside config_db table {}".format(logical_port_name, port_tbl[asic_index].getTableName())) + return + + else: + # Convert list of tuples to a dictionary + mux_table_dict = dict(fvs) + if "state" in mux_table_dict and "soc_ipv4" in mux_table_dict: + + val = mux_table_dict.get("state", None) + soc_ipv4_full = mux_table_dict.get("soc_ipv4", None) + if soc_ipv4_full is not None: + soc_ipv4 = soc_ipv4_full.split('/')[0] + cable_type = mux_table_dict.get("cable_type", None) + + if val in ["active", "standby", "auto", "manual"] and cable_type == "active-active": + + # import the module and load the port instance + y_cable_presence[:] = [True] + physical_port_list = logical_port_name_to_physical_port_list( + logical_port_name) + + if len(physical_port_list) == 1: + + physical_port = physical_port_list[0] + if y_cable_wrapper_get_presence(physical_port): + prev_stub = grpc_port_stubs.get(logical_port_name, None) + prev_channel = grpc_port_channels.get(logical_port_name, None) + if prev_channel is not None and prev_stub is not None: + return + + channel, stub = setup_grpc_channel_for_port(logical_port_name, soc_ipv4) + if channel is not None: + grpc_port_channels[logical_port_name] = channel + helper_logger.log_notice( + "channel is not None, Cable-Insert or daemon init, daemon able to set up channel for gRPC SOC IP {}, port {}".format(soc_ipv4, logical_port_name)) + if stub is not None: + grpc_port_stubs[logical_port_name] = stub + helper_logger.log_notice( + "stub is not None, Cable-Insert or daemon init, daemon able to set up channel for gRPC SOC IP {}, port {}".format(soc_ipv4, logical_port_name)) + + fvs_updated = swsscommon.FieldValuePairs([('read_side', str(read_side))]) + hw_mux_cable_tbl[asic_index].set(logical_port_name, fvs_updated) + hw_mux_cable_tbl_peer[asic_index].set(logical_port_name, fvs_updated) + else: + helper_logger.log_warning( + "DAC cable not present while Channel setup Port {} for gRPC channel initiation".format(logical_port_name)) + + else: + helper_logger.log_warning( + "DAC cable logical to physical port mapping returned more than one physical ports while Channel setup Port {}".format(logical_port_name)) + else: + helper_logger.log_warning( + "DAC cable logical to physical port mapping returned more than one physical ports while Channel setup Port {}".format(logical_port_name)) + + +def setup_grpc_channels(stop_event): + + global read_side + helper_logger.log_debug("Y_CABLE_DEBUG:setting up channels for active-active") + config_db, state_db, port_tbl, loopback_tbl, port_table_keys = {}, {}, {}, {}, {} + loopback_keys = {} + hw_mux_cable_tbl = {} + hw_mux_cable_tbl_peer = {} + + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + config_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace) + port_tbl[asic_id] = swsscommon.Table(config_db[asic_id], "MUX_CABLE") + loopback_tbl[asic_id] = swsscommon.Table( + config_db[asic_id], "LOOPBACK_INTERFACE") + loopback_keys[asic_id] = loopback_tbl[asic_id].getKeys() + port_table_keys[asic_id] = port_tbl[asic_id].getKeys() + state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) + hw_mux_cable_tbl[asic_id] = swsscommon.Table( + state_db[asic_id], swsscommon.STATE_HW_MUX_CABLE_TABLE_NAME) + hw_mux_cable_tbl_peer[asic_id] = swsscommon.Table( + state_db[asic_id], "HW_MUX_CABLE_TABLE_PEER") + + if read_side == -1: + read_side = process_loopback_interface_and_get_read_side(loopback_keys) + + helper_logger.log_debug("Y_CABLE_DEBUG:while setting up grpc channels read side = {}".format(read_side)) + + # Init PORT_STATUS table if ports are on Y cable + logical_port_list = y_cable_platform_sfputil.logical + for logical_port_name in logical_port_list: + if stop_event.is_set(): + break + + # Get the asic to which this port belongs + asic_index = y_cable_platform_sfputil.get_asic_id_for_logical_port( + logical_port_name) + if asic_index is None: + helper_logger.log_warning( + "Got invalid asic index for {}, ignored".format(logical_port_name)) + continue + + if logical_port_name in port_table_keys[asic_index]: + check_identifier_presence_and_setup_channel( + logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, y_cable_presence) + else: + # This port does not exist in Port table of config but is present inside + # logical_ports after loading the port_mappings from port_config_file + # This should not happen + helper_logger.log_warning( + "Could not retreive port inside config_db PORT table {} for gRPC channel initiation".format(logical_port_name)) + + +def try_grpc(callback, *args, **kwargs): + """ + Handy function to invoke the callback and catch NotImplementedError + :param callback: Callback to be invoked + :param args: Arguments to be passed to callback + :param kwargs: Default return value if exception occur + :return: Default return value if exception occur else return value of the callback + """ + + return_val = True + try: + resp = callback(*args) + if resp is None: + return_val = False + except grpc.RpcError as e: + #err_msg = 'Grpc error code '+str(e.code()) + if e.code() == grpc.StatusCode.CANCELLED: + helper_logger.log_notice("rpc cancelled for port= {}".format(str(e.code()))) + elif e.code() == grpc.StatusCode.UNAVAILABLE: + helper_logger.log_notice("rpc unavailable for port= {}".format(str(e.code()))) + elif e.code() == grpc.StatusCode.INVALID_ARGUMENT: + helper_logger.log_notice("rpc invalid for port= {}".format(str(e.code()))) + else: + helper_logger.log_notice("rpc exception error for port= {}".format(str(e.code()))) + resp = None + return_val = False + + return return_val, resp + + +def close(channel): + "Close the channel" + channel.close() + def set_result_and_delete_port(result, actual_result, command_table, response_table, port): fvs = swsscommon.FieldValuePairs([(result, str(actual_result))]) response_table.set(port, fvs) @@ -819,11 +1155,16 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen global y_cable_platform_chassis global y_cable_port_instances global y_cable_is_platform_vs + global read_side # Connect to CONFIG_DB and create port status table inside state_db config_db, state_db, port_tbl, y_cable_tbl = {}, {}, {}, {} static_tbl, mux_tbl = {}, {} port_table_keys = {} xcvrd_log_tbl = {} + loopback_tbl= {} + loopback_keys = {} + hw_mux_cable_tbl = {} + hw_mux_cable_tbl_peer = {} y_cable_platform_sfputil = platform_sfp y_cable_platform_chassis = platform_chassis @@ -839,6 +1180,17 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen port_table_keys[asic_id] = port_tbl[asic_id].getKeys() xcvrd_log_tbl[asic_id] = swsscommon.Table(config_db[asic_id], "XCVRD_LOG") xcvrd_log_tbl[asic_id].set("Y_CABLE", fvs_updated) + loopback_tbl[asic_id] = swsscommon.Table( + config_db[asic_id], "LOOPBACK_INTERFACE") + loopback_keys[asic_id] = loopback_tbl[asic_id].getKeys() + state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) + hw_mux_cable_tbl[asic_id] = swsscommon.Table( + state_db[asic_id], swsscommon.STATE_HW_MUX_CABLE_TABLE_NAME) + hw_mux_cable_tbl_peer[asic_id] = swsscommon.Table( + state_db[asic_id], "HW_MUX_CABLE_TABLE_PEER") + + if read_side == -1: + read_side = process_loopback_interface_and_get_read_side(loopback_keys) # Init PORT_STATUS table if ports are on Y cable logical_port_list = y_cable_platform_sfputil.logical @@ -855,8 +1207,13 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen continue if logical_port_name in port_table_keys[asic_index]: - check_identifier_presence_and_update_mux_table_entry( - state_db, port_tbl, y_cable_tbl, static_tbl, mux_tbl, asic_index, logical_port_name, y_cable_presence) + (status, cable_type) = check_mux_cable_port_type(logical_port_name, port_tbl, asic_index) + if status and cable_type == "active-standby": + check_identifier_presence_and_update_mux_table_entry( + state_db, port_tbl, y_cable_tbl, static_tbl, mux_tbl, asic_index, logical_port_name, y_cable_presence) + if status and cable_type == "active-active": + check_identifier_presence_and_setup_channel( + logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, y_cable_presence) else: # This port does not exist in Port table of config but is present inside # logical_ports after loading the port_mappings from port_config_file @@ -867,10 +1224,15 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, stop_event=threading.Event()): # Connect to CONFIG_DB and create port status table inside state_db + global read_side config_db, state_db, port_tbl, y_cable_tbl = {}, {}, {}, {} static_tbl, mux_tbl = {}, {} port_table_keys = {} delete_change_event = [False] + loopback_tbl= {} + loopback_keys = {} + hw_mux_cable_tbl = {} + hw_mux_cable_tbl_peer = {} # Get the namespaces in the platform namespaces = multi_asic.get_front_end_namespaces() @@ -880,6 +1242,18 @@ def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, st config_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace) port_tbl[asic_id] = swsscommon.Table(config_db[asic_id], "MUX_CABLE") port_table_keys[asic_id] = port_tbl[asic_id].getKeys() + loopback_tbl[asic_id] = swsscommon.Table( + config_db[asic_id], "LOOPBACK_INTERFACE") + loopback_keys[asic_id] = loopback_tbl[asic_id].getKeys() + state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) + hw_mux_cable_tbl[asic_id] = swsscommon.Table( + state_db[asic_id], swsscommon.STATE_HW_MUX_CABLE_TABLE_NAME) + hw_mux_cable_tbl_peer[asic_id] = swsscommon.Table( + state_db[asic_id], "HW_MUX_CABLE_TABLE_PEER") + + if read_side == -1: + read_side = process_loopback_interface_and_get_read_side(loopback_keys) + # Init PORT_STATUS table if ports are on Y cable and an event is received for logical_port_name, value in port_dict.items(): @@ -895,8 +1269,13 @@ def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, st if logical_port_name in port_table_keys[asic_index]: if value == SFP_STATUS_INSERTED: helper_logger.log_info("Got SFP inserted ycable event") - check_identifier_presence_and_update_mux_table_entry( - state_db, port_tbl, y_cable_tbl, static_tbl, mux_tbl, asic_index, logical_port_name, y_cable_presence) + (status, cable_type) = check_mux_cable_port_type(logical_port_name, port_tbl, asic_index) + if status and cable_type == "active-standby": + check_identifier_presence_and_update_mux_table_entry( + state_db, port_tbl, y_cable_tbl, static_tbl, mux_tbl, asic_index, logical_port_name, y_cable_presence) + if status and cable_type == "active-active": + check_identifier_presence_and_setup_channel( + logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, y_cable_presence) elif value == SFP_STATUS_REMOVED: helper_logger.log_info("Got SFP deleted ycable event") check_identifier_presence_and_delete_mux_table_entry( @@ -992,7 +1371,7 @@ def check_identifier_presence_and_update_mux_info_entry(state_db, mux_tbl, asic_ (status, fvs) = port_tbl[asic_index].get(logical_port_name) if status is False: - helper_logger.log_debug("Could not retreive fieldvalue pairs for {}, inside config_db table {}".format(logical_port_name, port_tbl[asic_index].getTableName())) + helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside config_db table {}".format(logical_port_name, port_tbl[asic_index].getTableName())) return else: @@ -1526,7 +1905,7 @@ def post_port_mux_info_to_db(logical_port_name, table): else: mux_info_dict = get_muxcable_info(physical_port, logical_port_name) - if mux_info_dict is not None and mux_info_dict != -1: + if mux_info_dict is not None and mux_info_dict != -1: #transceiver_dict[physical_port] = port_info_dict fvs = swsscommon.FieldValuePairs( [('tor_active', mux_info_dict["tor_active"]), @@ -1663,7 +2042,7 @@ def gather_arg_from_db_and_check_for_type(arg_tbl, port, key, fvp_dict, arg): """ def task_download_firmware_worker(port, physical_port, port_instance, file_full_path, xcvrd_down_fw_rsp_tbl, xcvrd_down_fw_cmd_sts_tbl, rc): - helper_logger.log_debug("worker thread launched for downloading physical port {} path {}".format(physical_port, file_full_path)) + helper_logger.log_debug("Y_CABLE_DEBUG:worker thread launched for downloading physical port {} path {}".format(physical_port, file_full_path)) try: status = port_instance.download_firmware(file_full_path) time.sleep(5) @@ -1672,9 +2051,9 @@ def task_download_firmware_worker(port, physical_port, port_instance, file_full_ helper_logger.log_warning("Failed to execute the download firmware API for port {} due to {}".format(physical_port,repr(e))) set_result_and_delete_port('status', status, xcvrd_down_fw_cmd_sts_tbl, xcvrd_down_fw_rsp_tbl, port) - helper_logger.log_debug(" downloading complete {} {} {}".format(physical_port, file_full_path, status)) + helper_logger.log_debug("Y_CABLE_DEBUG:downloading complete {} {} {}".format(physical_port, file_full_path, status)) rc[0] = status - helper_logger.log_debug("download thread finished port {} physical_port {}".format(port, physical_port)) + helper_logger.log_debug("Y_CABLE_DEBUG:download thread finished port {} physical_port {}".format(port, physical_port)) def handle_config_prbs_cmd_arg_tbl_notification(fvp, xcvrd_config_prbs_cmd_arg_tbl, xcvrd_config_prbs_cmd_sts_tbl, xcvrd_config_prbs_rsp_tbl, asic_index, port): @@ -2444,7 +2823,10 @@ def handle_config_hwmode_state_cmd_arg_tbl_notification(fvp, xcvrd_config_hwmode helper_logger.log_error("Error: Wrong input param for cli command config mux hwmode state active/standby logical port {}".format(port)) set_result_and_delete_port('result', 'False', xcvrd_config_hwmode_state_cmd_sts_tbl[asic_index], xcvrd_config_hwmode_state_rsp_tbl[asic_index], port) -def handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port): +def handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port): + state_db = {} + hw_mux_cable_tbl = {} + fvp_dict = dict(fvp) if "state" in fvp_dict: @@ -2466,60 +2848,322 @@ def handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, xcvrd_show_hwmode_dir set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) return -1 + (cable_status, cable_type) = check_mux_cable_port_type(port, port_tbl, asic_index) - port_instance = get_ycable_port_instance_from_logical_port(port) - if port_instance is None or port_instance in port_mapping_error_values: - # error scenario update table accordingly - state = 'not Y-Cable port' - helper_logger.log_error( - "Error: Could not get port instance for cli command show mux hwmode muxdirection Y cable port {}".format(port)) - set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) - return -1 + if cable_status and cable_type == "active-standby": - with y_cable_port_locks[physical_port]: - try: - read_side = port_instance.get_read_side() - except Exception as e: - read_side = None - helper_logger.log_warning("Failed to execute the get_read_side API for port {} due to {}".format(physical_port,repr(e))) + port_instance = get_ycable_port_instance_from_logical_port(port) + if port_instance is None or port_instance in port_mapping_error_values: + # error scenario update table accordingly + state = 'not Y-Cable port' + helper_logger.log_error( + "Error: Could not get port instance for cli command show mux hwmode muxdirection Y cable port {}".format(port)) + set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + return -1 - if read_side is None or read_side == port_instance.EEPROM_ERROR or read_side < 0: + with y_cable_port_locks[physical_port]: + try: + read_side = port_instance.get_read_side() + except Exception as e: + read_side = None + helper_logger.log_warning("Failed to execute the get_read_side API for port {} due to {}".format(physical_port,repr(e))) - state = 'unknown' - helper_logger.log_warning( - "Error: Could not get read side for cli command show mux hwmode muxdirection logical port {} and physical port {}".format(port, physical_port)) - set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) - return -1 + if read_side is None or read_side == port_instance.EEPROM_ERROR or read_side < 0: - with y_cable_port_locks[physical_port]: - try: - active_side = port_instance.get_mux_direction() - except Exception as e: - active_side = None - helper_logger.log_warning("Failed to execute the get_mux_direction API for port {} due to {}".format(physical_port,repr(e))) + state = 'unknown' + helper_logger.log_warning( + "Error: Could not get read side for cli command show mux hwmode muxdirection logical port {} and physical port {}".format(port, physical_port)) + set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + return -1 - if active_side is None or active_side == port_instance.EEPROM_ERROR or active_side < 0: + with y_cable_port_locks[physical_port]: + try: + active_side = port_instance.get_mux_direction() + except Exception as e: + active_side = None + helper_logger.log_warning("Failed to execute the get_mux_direction API for port {} due to {}".format(physical_port,repr(e))) - state = 'unknown' - helper_logger.log_warning("Error: Could not get active side for cli command show mux hwmode muxdirection logical port {} and physical port {}".format(port, physical_port)) + if active_side is None or active_side == port_instance.EEPROM_ERROR or active_side < 0: + + state = 'unknown' + helper_logger.log_warning("Error: Could not get active side for cli command show mux hwmode muxdirection logical port {} and physical port {}".format(port, physical_port)) + + set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + return -1 + + if read_side == active_side and (active_side == 1 or active_side == 2): + state = 'active' + elif read_side != active_side and (active_side == 1 or active_side == 2): + state = 'standby' + else: + state = 'unknown' + helper_logger.log_warning("Error: Could not get valid state for cli command show mux hwmode muxdirection logical port {} and physical port {}".format(port, physical_port)) + set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + return -1 set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) - return -1 - if read_side == active_side and (active_side == 1 or active_side == 2): - state = 'active' - elif read_side != active_side and (active_side == 1 or active_side == 2): - state = 'standby' - else: - state = 'unknown' - helper_logger.log_warning("Error: Could not get valid state for cli command show mux hwmode muxdirection logical port {} and physical port {}".format(port, physical_port)) + elif cable_status and cable_type == "active-active": + + + namespaces = multi_asic.get_front_end_namespaces() + # Get the keys from PORT table inside config db to prepare check for mux_cable identifier + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) + hw_mux_cable_tbl[asic_id] = swsscommon.Table( + state_db[asic_id], swsscommon.STATE_HW_MUX_CABLE_TABLE_NAME) + + (status, fv) = hw_mux_cable_tbl[asic_index].get(port) + if status is False: + helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table while responding to cli cmd show mux status {}".format( + port, hw_mux_cable_tbl[asic_index].getTableName())) + set_result_and_delete_port('state', 'unknown', xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + + mux_port_dict = dict(fv) + read_side = mux_port_dict.get("read_side", None) + helper_logger.log_debug("Y_CABLE_DEBUG:before invoking RPC fwd_state read_side = {}".format(read_side)) + # TODO state only for dummy value in this request MSG remove this + request = linkmgr_grpc_driver_pb2.AdminRequest(portid=[int(read_side), 1 - int(read_side)], state=[0, 0]) + helper_logger.log_debug( + "Y_CABLE_DEBUG:calling RPC for getting cli forwarding state read_side portid = {} Ethernet port {}".format(read_side, port)) + + stub = grpc_port_stubs.get(port, None) + if stub is None: + helper_logger.log_notice("stub is None for getting forwarding state RPC port {}".format(port)) + retry_setup_grpc_channel_for_port(port, asic_index) + stub = grpc_port_stubs.get(port, None) + if stub is None: + helper_logger.log_warning( + "stub was None for performing cli fwd mux RPC port {}, setting it up again did not work".format(port)) + set_result_and_delete_port('state', 'unknown', xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + return + + ret, response = try_grpc(stub.QueryAdminForwardingPortState, request, timeout=0.1) + + (self_state, peer_state) = parse_grpc_response_forwarding_state(ret, response, read_side) + state = self_state set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) - return -1 + if response is not None: + # Debug only, remove this section once Server side is Finalized + fwd_response_port_ids = response.portid + fwd_response_port_ids_state = response.state + helper_logger.log_notice( + "forwarding state RPC received response port ids = {} port {}".format(fwd_response_port_ids, port)) + helper_logger.log_notice( + "forwarding state RPC received response state values = {} port {}".format(fwd_response_port_ids_state, port)) + else: + helper_logger.log_notice("response was none cli handle_fwd_state_command_grpc_notification {} ".format(port)) + + else: + helper_logger.log_warning("Error: Wrong input param for cli command show mux hwmode muxdirection logical port {}".format(port)) + set_result_and_delete_port('state', 'unknown', xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + +def parse_grpc_response_hw_mux_cable_change_state(ret, response, portid, port): + state = 'unknown' + "return a list of states" + if ret is True: + if response.portid[0] == portid: + if response.state[0] == True: + state = 'active' + # No other values expected + elif response.state[0] == False: + state = 'standby' + else: + helper_logger.log_warning("recieved an error state while parsing response hw mux no response state for port".format(port)) + else: + helper_logger.log_warning("recieved an error portid while parsing response hw mux no portid for port".format(port)) - set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) else: - helper_logger.log_warning("Error: Wrong input param for cli command show mux hwmode muxdirection logical port {}".format(port)) - set_result_and_delete_port('state', 'unknown', xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port) + helper_logger.log_warning("recieved an error state while parsing response hw mux for port".format(port)) + state = 'unknown' + + return state + + +def parse_grpc_response_forwarding_state(ret, response, read_side): + self_state = peer_state = 'unknown' + + if ret is True and response is not None: + if int(read_side) == 0: + if response.state[0] == True: + self_state = 'active' + elif response.state[0] == False: + self_state = 'standby' + # No other values expected, should we raise exception/msg + # TODO handle other responses + if response.state[1] == True: + peer_state = 'active' + elif response.state[1] == False: + peer_state = 'standby' + + elif int(read_side) == 1: + if response.state[1] == True: + self_state = 'active' + elif response.state[1] == False: + self_state = 'standby' + if response.state[0] == True: + peer_state = 'active' + elif response.state[0] == False: + peer_state = 'standby' + else: + self_state = 'unknown' + peer_state = 'unknown' + + return (self_state, peer_state) + + +def handle_fwd_state_command_grpc_notification(fvp_m, hw_mux_cable_tbl, fwd_state_response_tbl, asic_index, port, appl_db): + + helper_logger.log_debug("Y_CABLE_DEBUG:recevied the notification fwd state port {}".format(port)) + fvp_dict = dict(fvp_m) + + if "command" in fvp_dict: + # check if xcvrd got a probe command + probe_identifier = fvp_dict["command"] + + if probe_identifier == "probe": + helper_logger.log_debug("Y_CABLE_DEBUG:processing the notification fwd_state port {}".format(port)) + (status, fv) = hw_mux_cable_tbl[asic_index].get(port) + if status is False: + helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table {}".format( + port, hw_mux_cable_tbl[asic_index].getTableName())) + return False + mux_port_dict = dict(fv) + read_side = mux_port_dict.get("read_side") + helper_logger.log_debug("Y_CABLE_DEBUG:before invoking RPC fwd_state read_side = {}".format(read_side)) + # TODO state only for dummy value in this request MSG remove this + request = linkmgr_grpc_driver_pb2.AdminRequest(portid=[int(read_side), 1 - int(read_side)], state=[0, 0]) + helper_logger.log_warning( + "calling RPC for getting forwarding state read_side portid = {} Ethernet port {}".format(read_side, port)) + + self_state = "unknown" + peer_state = "unknown" + stub = grpc_port_stubs.get(port, None) + if stub is None: + helper_logger.log_notice("stub is None for getting forwarding state RPC port {}".format(port)) + retry_setup_grpc_channel_for_port(port, asic_index) + stub = grpc_port_stubs.get(port, None) + if stub is None: + helper_logger.log_warning( + "stub was None for performing fwd mux RPC port {}, setting it up again did not work".format(port)) + fvs_updated = swsscommon.FieldValuePairs([('response', str(self_state)), + ('response_peer', str(peer_state))]) + fwd_state_response_tbl[asic_index].set(port, fvs_updated) + return + + ret, response = try_grpc(stub.QueryAdminForwardingPortState, request, timeout=0.1) + + (self_state, peer_state) = parse_grpc_response_forwarding_state(ret, response, read_side) + if response is not None: + # Debug only, remove this section once Server side is Finalized + fwd_response_port_ids = response.portid + fwd_response_port_ids_state = response.state + helper_logger.log_notice( + "forwarding state RPC received response port ids = {} port {}".format(fwd_response_port_ids, port)) + helper_logger.log_notice( + "forwarding state RPC received response state values = {} port {}".format(fwd_response_port_ids_state, port)) + else: + helper_logger.log_notice("response was none handle_fwd_state_command_grpc_notification {} ".format(port)) + + fvs_updated = swsscommon.FieldValuePairs([('response', str(self_state)), + ('response_peer', str(peer_state))]) + fwd_state_response_tbl[asic_index].set(port, fvs_updated) + helper_logger.log_debug("Y_CABLE_DEBUG:processed the notification fwd state cleanly") + return True + else: + helper_logger.log_warning("probe val not present in the notification fwd state handling port {}".format(port)) + else: + helper_logger.log_warning("command key not present in the notification fwd state handling port {}".format(port)) + + +def handle_hw_mux_cable_table_grpc_notification(fvp, hw_mux_cable_tbl, asic_index, grpc_metrics_tbl, peer, port): + + # entering this section signifies a gRPC start for state + # change request from swss so initiate recording in mux_metrics table + time_start = datetime.datetime.utcnow().strftime("%Y-%b-%d %H:%M:%S.%f") + # This check might be redundant, to check, the presence of this Port in keys + # in logical_port_list but keep for now for coherency + # also skip checking in logical_port_list inside sfp_util + + helper_logger.log_debug("Y_CABLE_DEBUG:recevied the notification mux hw state") + fvp_dict = dict(fvp) + toggle_side = "self" + + if "state" in fvp_dict: + # got a state change + new_state = fvp_dict["state"] + requested_status = new_state + if requested_status in ["active", "standby"]: + + (status, fvs) = hw_mux_cable_tbl[asic_index].get(port) + if status is False: + helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table {}".format( + port, hw_mux_cable_tbl[asic_index].getTableName())) + return + helper_logger.log_debug("Y_CABLE_DEBUG processing the notification mux hw state port {}".format(port)) + mux_port_dict = dict(fvs) + old_state = mux_port_dict.get("state", None) + read_side = mux_port_dict.get("read_side", None) + curr_read_side = int(read_side) + # Now whatever is the state requested, call gRPC to update the soc state appropriately + if peer == True: + curr_read_side = 1-int(read_side) + toggle_side = "peer" + + if new_state == "active": + state_req = 1 + elif new_state == "standby": + state_req = 0 + + helper_logger.log_notice( + "calling RPC for hw mux_cable set state state peer = {} portid {} Ethernet port".format(peer, port)) + + request = linkmgr_grpc_driver_pb2.AdminRequest(portid=[curr_read_side], state=[state_req]) + + stub = grpc_port_stubs.get(port, None) + if stub is None: + helper_logger.log_debug("Y_CABLE_DEBUG:stub is None for performing hw mux RPC port {}".format(port)) + retry_setup_grpc_channel_for_port(port, asic_index) + stub = grpc_port_stubs.get(port, None) + if stub is None: + helper_logger.log_notice( + "stub was None for performing hw mux RPC port {}, setting it up again did not work".format(port)) + return + + ret, response = try_grpc(stub.SetAdminForwardingPortState, request, timeout=0.1) + if response is not None: + # Debug only, remove this section once Server side is Finalized + hw_response_port_ids = response.portid + hw_response_port_ids_state = response.state + helper_logger.log_notice( + "Set admin state RPC received response port ids = {}".format(hw_response_port_ids)) + helper_logger.log_notice( + "Set admin state RPC received response state values = {}".format(hw_response_port_ids_state)) + else: + helper_logger.log_notice("response was none hw_mux_cable_table_grpc_notification {} ".format(port)) + + active_side = parse_grpc_response_hw_mux_cable_change_state(ret, response, curr_read_side, port) + + if active_side == "unknown": + helper_logger.log_warning( + "ERR: Got a change event for updating gRPC but could not toggle the mux-direction for port {} state from {} to {}, writing unknown".format(port, old_state, new_state)) + new_state = 'unknown' + + time_end = datetime.datetime.utcnow().strftime("%Y-%b-%d %H:%M:%S.%f") + fvs_metrics = swsscommon.FieldValuePairs([('grpc_switch_{}_{}_start'.format(toggle_side, new_state), str(time_start)), + ('grpc_switch_{}_{}_end'.format(toggle_side, new_state), str(time_end))]) + grpc_metrics_tbl[asic_index].set(port, fvs_metrics) + + fvs_updated = swsscommon.FieldValuePairs([('state', new_state), + ('read_side', read_side), + ('active_side', str(active_side))]) + hw_mux_cable_tbl[asic_index].set(port, fvs_updated) + helper_logger.log_debug("Y_CABLE_DEBUG: processed the notification hw mux state cleanly {}".format(port)) + else: + helper_logger.log_info("Got a change event on port {} of table {} that does not contain state".format( + port, swsscommon.APP_HW_MUX_CABLE_TABLE_NAME)) + # Thread wrapper class to update y_cable status periodically class YCableTableUpdateTask(object): @@ -2536,9 +3180,11 @@ def __init__(self): def task_worker(self): # Connect to STATE_DB and APPL_DB and get both the HW_MUX_STATUS_TABLE info - appl_db, state_db, config_db, status_tbl, y_cable_tbl = {}, {}, {}, {}, {} - y_cable_tbl_keys = {} - mux_cable_command_tbl, y_cable_command_tbl = {}, {} + appl_db, state_db, config_db, status_tbl, status_tbl_peer = {}, {}, {}, {}, {} + hw_mux_cable_tbl, hw_mux_cable_tbl_peer = {}, {} + hw_mux_cable_tbl_keys = {} + port_tbl, port_table_keys = {}, {} + fwd_state_command_tbl, fwd_state_response_tbl, mux_cable_command_tbl = {}, {}, {} mux_metrics_tbl = {} sel = swsscommon.Select() @@ -2550,19 +3196,30 @@ def task_worker(self): asic_id = multi_asic.get_asic_index_from_namespace(namespace) appl_db[asic_id] = daemon_base.db_connect("APPL_DB", namespace) config_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace) + state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) status_tbl[asic_id] = swsscommon.SubscriberStateTable( appl_db[asic_id], swsscommon.APP_HW_MUX_CABLE_TABLE_NAME) mux_cable_command_tbl[asic_id] = swsscommon.SubscriberStateTable( appl_db[asic_id], swsscommon.APP_MUX_CABLE_COMMAND_TABLE_NAME) - y_cable_command_tbl[asic_id] = swsscommon.Table( - appl_db[asic_id], swsscommon.APP_MUX_CABLE_COMMAND_TABLE_NAME) - state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) - y_cable_tbl[asic_id] = swsscommon.Table( - state_db[asic_id], swsscommon.STATE_HW_MUX_CABLE_TABLE_NAME) mux_metrics_tbl[asic_id] = swsscommon.Table( state_db[asic_id], swsscommon.STATE_MUX_METRICS_TABLE_NAME) - y_cable_tbl_keys[asic_id] = y_cable_tbl[asic_id].getKeys() + hw_mux_cable_tbl[asic_id] = swsscommon.Table( + state_db[asic_id], swsscommon.STATE_HW_MUX_CABLE_TABLE_NAME) + # TODO add definition inside app DB + status_tbl_peer[asic_id] = swsscommon.SubscriberStateTable( + appl_db[asic_id], "HW_MUX_CABLE_TABLE_PEER") + fwd_state_command_tbl[asic_id] = swsscommon.SubscriberStateTable( + appl_db[asic_id], "FORWARDING_STATE_COMMAND") + fwd_state_response_tbl[asic_id] = swsscommon.Table( + appl_db[asic_id], "FORWARDING_STATE_RESPONSE") + hw_mux_cable_tbl_peer[asic_id] = swsscommon.Table( + state_db[asic_id], "HW_MUX_CABLE_TABLE_PEER") + hw_mux_cable_tbl_keys[asic_id] = hw_mux_cable_tbl[asic_id].getKeys() + port_tbl[asic_id] = swsscommon.Table(config_db[asic_id], "MUX_CABLE") + port_table_keys[asic_id] = port_tbl[asic_id].getKeys() sel.addSelectable(status_tbl[asic_id]) + sel.addSelectable(status_tbl_peer[asic_id]) + sel.addSelectable(fwd_state_command_tbl[asic_id]) sel.addSelectable(mux_cable_command_tbl[asic_id]) @@ -2605,46 +3262,56 @@ def task_worker(self): # This check might be redundant, to check, the presence of this Port in keys # in logical_port_list but keep for now for coherency # also skip checking in logical_port_list inside sfp_util - if port not in y_cable_tbl_keys[asic_index]: + if port not in hw_mux_cable_tbl_keys[asic_index]: continue - fvp_dict = dict(fvp) + (status, cable_type) = check_mux_cable_port_type(port, port_tbl, asic_index) + + if status: + + if cable_type == 'active-standby': + fvp_dict = dict(fvp) + + if "state" in fvp_dict: + # got a state change + new_status = fvp_dict["state"] + requested_status = new_status + (status, fvs) = hw_mux_cable_tbl[asic_index].get(port) + if status is False: + helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table {}".format( + port, hw_mux_cable_tbl[asic_index].getTableName())) + continue + mux_port_dict = dict(fvs) + old_status = mux_port_dict.get("state", None) + read_side = mux_port_dict.get("read_side", None) + # Now whatever is the state requested, toggle the mux appropriately + helper_logger.log_debug("Y_CABLE_DEBUG: xcvrd trying to transition port {} from {} to {} read side {}".format(port, old_status, new_status, read_side)) + (active_side, read_side) = update_tor_active_side(read_side, new_status, port) + if active_side == -1: + helper_logger.log_warning("ERR: Got a change event for toggle but could not toggle the mux-direction for port {} state from {} to {}, writing unknown".format( + port, old_status, new_status)) + new_status = 'unknown' + + helper_logger.log_debug("Y_CABLE_DEBUG: xcvrd successful to transition port {} from {} to {} and write back to the DB {}".format(port, old_status, new_status, threading.currentThread().getName())) + helper_logger.log_notice("Got a change event for toggle the mux-direction active side for port {} state requested {} from old state {} to new state {} read_side {} thread id {}".format(port, requested_status, old_status, new_status, read_side, threading.currentThread().getName())) + time_end = datetime.datetime.utcnow().strftime("%Y-%b-%d %H:%M:%S.%f") + fvs_metrics = swsscommon.FieldValuePairs([('xcvrd_switch_{}_start'.format(new_status), str(time_start)), + ('xcvrd_switch_{}_end'.format(new_status), str(time_end))]) + mux_metrics_tbl[asic_index].set(port, fvs_metrics) + + fvs_updated = swsscommon.FieldValuePairs([('state', new_status), + ('read_side', str(read_side)), + ('active_side', str(active_side))]) + hw_mux_cable_tbl[asic_index].set(port, fvs_updated) + else: + helper_logger.log_info("Got a change event on port {} of table {} that does not contain state".format( + port, swsscommon.APP_HW_MUX_CABLE_TABLE_NAME)) - if "state" in fvp_dict: - # got a state change - new_status = fvp_dict["state"] - requested_status = new_status - (status, fvs) = y_cable_tbl[asic_index].get(port) - if status is False: - helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table {}".format( - port, y_cable_tbl[asic_index].getTableName())) - continue - mux_port_dict = dict(fvs) - old_status = mux_port_dict.get("state", None) - read_side = mux_port_dict.get("read_side", None) - # Now whatever is the state requested, toggle the mux appropriately - helper_logger.log_debug("Y_CABLE_DEBUG: xcvrd trying to transition port {} from {} to {} read side {}".format(port, old_status, new_status, read_side)) - (active_side, read_side) = update_tor_active_side(read_side, new_status, port) - if active_side == -1: - helper_logger.log_warning("ERR: Got a change event for toggle but could not toggle the mux-direction for port {} state from {} to {}, writing unknown".format( - port, old_status, new_status)) - new_status = 'unknown' - - helper_logger.log_debug("Y_CABLE_DEBUG: xcvrd successful to transition port {} from {} to {} and write back to the DB {}".format(port, old_status, new_status, threading.currentThread().getName())) - helper_logger.log_notice("Got a change event for toggle the mux-direction active side for port {} state requested {} from old state {} to new state {} read_side {} thread id {}".format(port, requested_status, old_status, new_status, read_side, threading.currentThread().getName())) - time_end = datetime.datetime.utcnow().strftime("%Y-%b-%d %H:%M:%S.%f") - fvs_metrics = swsscommon.FieldValuePairs([('xcvrd_switch_{}_start'.format(new_status), str(time_start)), - ('xcvrd_switch_{}_end'.format(new_status), str(time_end))]) - mux_metrics_tbl[asic_index].set(port, fvs_metrics) - - fvs_updated = swsscommon.FieldValuePairs([('state', new_status), - ('read_side', str(read_side)), - ('active_side', str(active_side))]) - y_cable_tbl[asic_index].set(port, fvs_updated) - else: - helper_logger.log_info("Got a change event on port {} of table {} that does not contain state".format( - port, swsscommon.APP_HW_MUX_CABLE_TABLE_NAME)) + elif cable_type == "active-active": + if fvp: + handle_hw_mux_cable_table_grpc_notification( + fvp, hw_mux_cable_tbl, asic_index, mux_metrics_tbl, False, port) while True: (port_m, op_m, fvp_m) = mux_cable_command_tbl[asic_index].pop() @@ -2654,25 +3321,59 @@ def task_worker(self): if fvp_m: - if port_m not in y_cable_tbl_keys[asic_index]: + if port_m not in hw_mux_cable_tbl_keys[asic_index]: continue fvp_dict = dict(fvp_m) - if "command" in fvp_dict: - # check if xcvrd got a probe command - probe_identifier = fvp_dict["command"] + (status, cable_type) = check_mux_cable_port_type(port_m, port_tbl, asic_index) + + if status: - if probe_identifier == "probe": - (status, fv) = y_cable_tbl[asic_index].get(port_m) - if status is False: - helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table {}".format( - port_m, y_cable_tbl[asic_index].getTableName())) - continue - mux_port_dict = dict(fv) - read_side = mux_port_dict.get("read_side") - update_appdb_port_mux_cable_response_table(port_m, asic_index, appl_db, int(read_side)) + if cable_type == 'active-standby' and "command" in fvp_dict: + # check if xcvrd got a probe command + probe_identifier = fvp_dict["command"] + + if probe_identifier == "probe": + (status, fv) = hw_mux_cable_tbl[asic_index].get(port_m) + if status is False: + helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table {}".format( + port_m, hw_mux_cable_tbl[asic_index].getTableName())) + continue + mux_port_dict = dict(fv) + read_side = mux_port_dict.get("read_side") + update_appdb_port_mux_cable_response_table(port_m, asic_index, appl_db, int(read_side)) + + while True: + (port_m, op_m, fvp_m) = fwd_state_command_tbl[asic_index].pop() + + if not port_m: + break + + helper_logger.log_debug("Y_CABLE_DEBUG: received a probe for Forwarding state using gRPC port status {} {}".format(port_m, threading.currentThread().getName())) + (status, cable_type) = check_mux_cable_port_type(port_m, port_tbl, asic_index) + + if status is False or cable_type != "active-active": + break + + if fvp_m: + handle_fwd_state_command_grpc_notification( + fvp_m, hw_mux_cable_tbl, fwd_state_response_tbl, asic_index, port_m, appl_db) + + while True: + (port_n, op_n, fvp_n) = status_tbl_peer[asic_index].pop() + if not port_n: + break + + (status, cable_type) = check_mux_cable_port_type(port_n, port_tbl, asic_index) + + if status is False or cable_type != "active-active": + break + + if fvp_n: + handle_hw_mux_cable_table_grpc_notification( + fvp_n, hw_mux_cable_tbl_peer, asic_index, mux_metrics_tbl, True, port_n) def task_cli_worker(self): @@ -2694,6 +3395,8 @@ def task_cli_worker(self): xcvrd_show_event_cmd_tbl, xcvrd_show_event_rsp_tbl , xcvrd_show_event_cmd_sts_tbl, xcvrd_show_event_res_tbl= {}, {}, {}, {} xcvrd_show_fec_cmd_tbl, xcvrd_show_fec_rsp_tbl , xcvrd_show_fec_cmd_sts_tbl, xcvrd_show_fec_res_tbl= {}, {}, {}, {} xcvrd_show_ber_cmd_tbl, xcvrd_show_ber_cmd_arg_tbl, xcvrd_show_ber_rsp_tbl , xcvrd_show_ber_cmd_sts_tbl, xcvrd_show_ber_res_tbl= {}, {}, {}, {}, {} + port_tbl, port_table_keys = {}, {} + status_app_tbl = {} y_cable_tbl, y_cable_tbl_keys = {}, {} @@ -2810,6 +3513,8 @@ def task_cli_worker(self): state_db[asic_id], "XCVRD_GET_BER_RSP") xcvrd_show_ber_res_tbl[asic_id] = swsscommon.Table( state_db[asic_id], "XCVRD_GET_BER_RES") + port_tbl[asic_id] = swsscommon.Table(config_db[asic_id], "MUX_CABLE") + port_table_keys[asic_id] = port_tbl[asic_id].getKeys() status_app_tbl[asic_id] = swsscommon.SubscriberStateTable( appl_db[asic_id], swsscommon.APP_MUX_CABLE_TABLE_NAME) y_cable_tbl[asic_id] = swsscommon.Table( @@ -2888,7 +3593,7 @@ def task_cli_worker(self): break if fvp: - handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) + handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port) break while True: