Skip to content

Commit

Permalink
Filter out events
Browse files Browse the repository at this point in the history
  • Loading branch information
prgeor committed Sep 7, 2022
1 parent c400706 commit db73f4a
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 20 deletions.
5 changes: 1 addition & 4 deletions sonic-xcvrd/xcvrd/xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,10 +994,7 @@ def on_port_update_event(self, port_change_event):
self.port_dict[lport]['lanes'] = port_change_event.port_dict['lanes']
if 'host_tx_ready' in port_change_event.port_dict:
self.port_dict[lport]['host_tx_ready'] = port_change_event.port_dict['host_tx_ready']
if 'admin_status' in port_change_event.port_dict and 'oper_status' in port_change_event.port_dict:
# At times 'admin_status' is NOT the same in the PORT_TABLE of APPL_DB and STATE_DB
# We dont have better way to check if 'admin_status' is from APPL_DB or STATE_DB so this
# check is put temporarily to listen only to APPL_DB's admin_status and ignore that of STATE_DB
if 'admin_status' in port_change_event.port_dict:
self.port_dict[lport]['admin_status'] = port_change_event.port_dict['admin_status']
if 'laser_freq' in port_change_event.port_dict:
self.port_dict[lport]['laser_freq'] = int(port_change_event.port_dict['laser_freq'])
Expand Down
74 changes: 58 additions & 16 deletions sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class PortChangeEvent:
PORT_REMOVE = 1
PORT_SET = 2
PORT_DEL = 3
PORT_EVENT = {}

def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None):
# Logical port name, e.g. Ethernet0
Expand Down Expand Up @@ -106,10 +107,15 @@ def subscribe_port_config_change(namespaces):
return sel, asic_context

def subscribe_port_update_event(namespaces, logger):
"""
Subscribe to a particular DB's table and listen to only interested fields
Format :
{ <DB name> : <Table name> , <field1>, <field2>, .. } where only field<n> update will be received
"""
port_tbl_map = [
{'CONFIG_DB': swsscommon.CFG_PORT_TABLE_NAME},
{'STATE_DB': 'TRANSCEIVER_INFO'},
{'STATE_DB': 'PORT_TABLE'},
{'STATE_DB': 'PORT_TABLE', 'FILTER': ['host_tx_ready']},
]

sel = swsscommon.Select()
Expand All @@ -119,15 +125,18 @@ def subscribe_port_update_event(namespaces, logger):
db = daemon_base.db_connect(list(d.keys())[0], namespace=namespace)
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
port_tbl = swsscommon.SubscriberStateTable(db, list(d.values())[0])
logger.log_notice("subscribing to table: {} in {} : {} DB".format(
port_tbl, list(d.keys())[0], list(d.values())[0]))
port_tbl.db_name = list(d.keys())[0]
port_tbl.table_name = list(d.values())[0]
port_tbl.filter = d['FILTER'] if 'FILTER' in d else None
asic_context[port_tbl] = asic_id
sel.addSelectable(port_tbl)
logger.log_warning("subscribing to port_tbl {} - {} DB of namespace {} ".format(
port_tbl, list(d.values())[0]), namespace)
return sel, asic_context

def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_event_handler):
"""
Select PORT update events, notify the observers upon a port update in APPL_DB/CONFIG_DB
Select PORT update events, notify the observers upon a port update in CONFIG_DB
or a XCVR insertion/removal in STATE_DB
"""
if not stop_event.is_set():
Expand All @@ -137,6 +146,8 @@ def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_
if state != swsscommon.Select.OBJECT:
logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT')
return

port_event_cache = {}
for port_tbl in asic_context.keys():
while True:
(key, op, fvp) = port_tbl.pop()
Expand All @@ -145,25 +156,56 @@ def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_
if not validate_port(key):
continue
fvp = dict(fvp) if fvp is not None else {}
logger.log_warning("$$$ {} handle_port_update_event() : op={} DB:{} Table:{} fvp {}".format(
key, op, port_tbl.db_name, port_tbl.table_name, fvp))

if 'index' not in fvp:
fvp['index'] = '-1'
port_index = int(fvp['index'])
port_change_event = None
if op == swsscommon.SET_COMMAND:
port_change_event = PortChangeEvent(key,
fvp['index'] = '-1'
fvp['key'] = key
fvp['asic_id'] = asic_context[port_tbl]
fvp['op'] = op
fvp['FILTER'] = port_tbl.filter
# Soak duplicate events and consider only the last event
port_event_cache[key+port_tbl.db_name+port_tbl.table_name] = fvp

# Now apply filter over soaked events
for key, fvp in port_event_cache.items():
port_index = int(fvp['index'])
port_change_event = None
diff = {}
filter = fvp['FILTER']
del fvp['FILTER']
if key in PortChangeEvent.PORT_EVENT:
diff = dict(set(fvp.items()) - set(PortChangeEvent.PORT_EVENT[key].items()))
# Ignore duplicate events
if not diff:
PortChangeEvent.PORT_EVENT[key] = fvp
continue
# Ensure only interested field update gets through for processing
if filter is not None:
if not (set(filter) & set(diff.keys())):
PortChangeEvent.PORT_EVENT[key] = fvp
continue
PortChangeEvent.PORT_EVENT[key] = fvp

if fvp['op'] == swsscommon.SET_COMMAND:
port_change_event = PortChangeEvent(fvp['key'],
port_index,
asic_context[port_tbl],
fvp['asic_id'],
PortChangeEvent.PORT_SET,
fvp)
elif op == swsscommon.DEL_COMMAND:
port_change_event = PortChangeEvent(key,
elif fvp['op'] == swsscommon.DEL_COMMAND:
port_change_event = PortChangeEvent(fvp['key'],
port_index,
asic_context[port_tbl],
fvp['asic_id'],
PortChangeEvent.PORT_DEL,
fvp)
logger.log_notice("handle_port_update_event() : op={} port_tbl {} fvp {}".format(op, port_tbl, fvp))
if port_change_event is not None:
port_change_event_handler(port_change_event)
# This is the final event considered for processing
logger.log_warning("*** {} handle_port_update_event() fvp {}".format(
key, fvp))
if port_change_event is not None:
port_change_event_handler(port_change_event)


def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler):
"""Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers
Expand Down

0 comments on commit db73f4a

Please # to comment.