Skip to content

Commit

Permalink
Fix issue: should always check return value of a function if the func…
Browse files Browse the repository at this point in the history
…tion may return None (sonic-net#350)

Fix unit test failure
  • Loading branch information
Junchao-Mellanox authored Mar 28, 2023
1 parent 6e24d32 commit 81636a5
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 68 deletions.
165 changes: 100 additions & 65 deletions sonic_platform_base/sonic_xcvr/api/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .cmisCDB import CmisCdbApi
from .cmisVDM import CmisVdmApi
import time
from collections import defaultdict

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
Expand Down Expand Up @@ -246,7 +247,10 @@ def get_transceiver_threshold_info(self):
if thresh is None:
return None
tx_bias_scale_raw = self.xcvr_eeprom.read(consts.TX_BIAS_SCALE)
tx_bias_scale = 2**tx_bias_scale_raw if tx_bias_scale_raw < 3 else 1
if tx_bias_scale_raw is not None:
tx_bias_scale = 2**tx_bias_scale_raw if tx_bias_scale_raw < 3 else 1
else:
tx_bias_scale = None
threshold_info_dict = {
"temphighalarm": float("{:.3f}".format(thresh[consts.TEMP_HIGH_ALARM_FIELD])),
"templowalarm": float("{:.3f}".format(thresh[consts.TEMP_LOW_ALARM_FIELD])),
Expand All @@ -264,16 +268,23 @@ def get_transceiver_threshold_info(self):
"txpowerlowalarm": float("{:.3f}".format(self.mw_to_dbm(thresh[consts.TX_POWER_LOW_ALARM_FIELD]))),
"txpowerhighwarning": float("{:.3f}".format(self.mw_to_dbm(thresh[consts.TX_POWER_HIGH_WARNING_FIELD]))),
"txpowerlowwarning": float("{:.3f}".format(self.mw_to_dbm(thresh[consts.TX_POWER_LOW_WARNING_FIELD]))),
"txbiashighalarm": float("{:.3f}".format(thresh[consts.TX_BIAS_HIGH_ALARM_FIELD]*tx_bias_scale)),
"txbiaslowalarm": float("{:.3f}".format(thresh[consts.TX_BIAS_LOW_ALARM_FIELD]*tx_bias_scale)),
"txbiashighwarning": float("{:.3f}".format(thresh[consts.TX_BIAS_HIGH_WARNING_FIELD]*tx_bias_scale)),
"txbiashighalarm": float("{:.3f}".format(thresh[consts.TX_BIAS_HIGH_ALARM_FIELD]*tx_bias_scale))
if tx_bias_scale is not None else 'N/A',
"txbiaslowalarm": float("{:.3f}".format(thresh[consts.TX_BIAS_LOW_ALARM_FIELD]*tx_bias_scale))
if tx_bias_scale is not None else 'N/A',
"txbiashighwarning": float("{:.3f}".format(thresh[consts.TX_BIAS_HIGH_WARNING_FIELD]*tx_bias_scale))
if tx_bias_scale is not None else 'N/A',
"txbiaslowwarning": float("{:.3f}".format(thresh[consts.TX_BIAS_LOW_WARNING_FIELD]*tx_bias_scale))
if tx_bias_scale is not None else 'N/A'
}
laser_temp_dict = self.get_laser_temperature()
threshold_info_dict['lasertemphighalarm'] = laser_temp_dict['high alarm']
threshold_info_dict['lasertemplowalarm'] = laser_temp_dict['low alarm']
threshold_info_dict['lasertemphighwarning'] = laser_temp_dict['high warn']
threshold_info_dict['lasertemplowwarning'] = laser_temp_dict['low warn']
try:
threshold_info_dict['lasertemphighalarm'] = laser_temp_dict['high alarm']
threshold_info_dict['lasertemplowalarm'] = laser_temp_dict['low alarm']
threshold_info_dict['lasertemphighwarning'] = laser_temp_dict['high warn']
threshold_info_dict['lasertemplowwarning'] = laser_temp_dict['low warn']
except (KeyError, TypeError):
pass
self.vdm_dict = self.get_vdm()
try:
threshold_info_dict['prefecberhighalarm'] = self.vdm_dict['Pre-FEC BER Average Media Input'][1][1]
Expand Down Expand Up @@ -759,8 +770,10 @@ def get_active_apsel_hostlane(self):
This function returns the application select code that each host lane has
'''
if (self.is_flat_memory()):
return {'{}{}'.format(consts.ACTIVE_APSEL_HOSTLANE, i) : 'N/A' for i in range(1, self.NUM_CHANNELS+1)}
return self.xcvr_eeprom.read(consts.ACTIVE_APSEL_CODE)
return defaultdict(lambda: 'N/A')

active_apsel_code = self.xcvr_eeprom.read(consts.ACTIVE_APSEL_CODE)
return defaultdict(lambda: 'N/A') if not active_apsel_code else active_apsel_code

def get_tx_config_power(self):
'''
Expand Down Expand Up @@ -840,20 +853,19 @@ def get_laser_temperature(self):
try:
aux1_mon_type, aux2_mon_type, aux3_mon_type = self.get_aux_mon_type()
except TypeError:
return None
LASER_TEMP_SCALE = 256.0
return laser_temp_dict
if aux2_mon_type == 0:
laser_temp = self.xcvr_eeprom.read(consts.AUX2_MON)/LASER_TEMP_SCALE
laser_temp_high_alarm = self.xcvr_eeprom.read(consts.AUX2_HIGH_ALARM)/LASER_TEMP_SCALE
laser_temp_low_alarm = self.xcvr_eeprom.read(consts.AUX2_LOW_ALARM)/LASER_TEMP_SCALE
laser_temp_high_warn = self.xcvr_eeprom.read(consts.AUX2_HIGH_WARN)/LASER_TEMP_SCALE
laser_temp_low_warn = self.xcvr_eeprom.read(consts.AUX2_LOW_WARN)/LASER_TEMP_SCALE
laser_temp = self._get_laser_temp_threshold(consts.AUX2_MON)
laser_temp_high_alarm = self._get_laser_temp_threshold(consts.AUX2_HIGH_ALARM)
laser_temp_low_alarm = self._get_laser_temp_threshold(consts.AUX2_LOW_ALARM)
laser_temp_high_warn = self._get_laser_temp_threshold(consts.AUX2_HIGH_WARN)
laser_temp_low_warn = self._get_laser_temp_threshold(consts.AUX2_LOW_WARN)
elif aux2_mon_type == 1 and aux3_mon_type == 0:
laser_temp = self.xcvr_eeprom.read(consts.AUX3_MON)/LASER_TEMP_SCALE
laser_temp_high_alarm = self.xcvr_eeprom.read(consts.AUX3_HIGH_ALARM)/LASER_TEMP_SCALE
laser_temp_low_alarm = self.xcvr_eeprom.read(consts.AUX3_LOW_ALARM)/LASER_TEMP_SCALE
laser_temp_high_warn = self.xcvr_eeprom.read(consts.AUX3_HIGH_WARN)/LASER_TEMP_SCALE
laser_temp_low_warn = self.xcvr_eeprom.read(consts.AUX3_LOW_WARN)/LASER_TEMP_SCALE
laser_temp = self._get_laser_temp_threshold(consts.AUX3_MON)
laser_temp_high_alarm = self._get_laser_temp_threshold(consts.AUX3_HIGH_ALARM)
laser_temp_low_alarm = self._get_laser_temp_threshold(consts.AUX3_LOW_ALARM)
laser_temp_high_warn = self._get_laser_temp_threshold(consts.AUX3_HIGH_WARN)
laser_temp_low_warn = self._get_laser_temp_threshold(consts.AUX3_LOW_WARN)
else:
return laser_temp_dict
laser_temp_dict = {'monitor value': laser_temp,
Expand All @@ -862,6 +874,11 @@ def get_laser_temperature(self):
'high warn': laser_temp_high_warn,
'low warn': laser_temp_low_warn}
return laser_temp_dict

def _get_laser_temp_threshold(self, field):
LASER_TEMP_SCALE = 256.0
value = self.xcvr_eeprom.read(field)
return value/LASER_TEMP_SCALE if value is not None else 'N/A'

def get_laser_TEC_current(self):
'''
Expand Down Expand Up @@ -1683,14 +1700,17 @@ def get_transceiver_status(self):
except TypeError:
pass
module_flag = self.get_module_level_flag()
trans_status['temphighalarm_flag'] = module_flag['case_temp_flags']['case_temp_high_alarm_flag']
trans_status['templowalarm_flag'] = module_flag['case_temp_flags']['case_temp_low_alarm_flag']
trans_status['temphighwarning_flag'] = module_flag['case_temp_flags']['case_temp_high_warn_flag']
trans_status['templowwarning_flag'] = module_flag['case_temp_flags']['case_temp_low_warn_flag']
trans_status['vcchighalarm_flag'] = module_flag['voltage_flags']['voltage_high_alarm_flag']
trans_status['vcclowalarm_flag'] = module_flag['voltage_flags']['voltage_low_alarm_flag']
trans_status['vcchighwarning_flag'] = module_flag['voltage_flags']['voltage_high_warn_flag']
trans_status['vcclowwarning_flag'] = module_flag['voltage_flags']['voltage_low_warn_flag']
try:
trans_status['temphighalarm_flag'] = module_flag['case_temp_flags']['case_temp_high_alarm_flag']
trans_status['templowalarm_flag'] = module_flag['case_temp_flags']['case_temp_low_alarm_flag']
trans_status['temphighwarning_flag'] = module_flag['case_temp_flags']['case_temp_high_warn_flag']
trans_status['templowwarning_flag'] = module_flag['case_temp_flags']['case_temp_low_warn_flag']
trans_status['vcchighalarm_flag'] = module_flag['voltage_flags']['voltage_high_alarm_flag']
trans_status['vcclowalarm_flag'] = module_flag['voltage_flags']['voltage_low_alarm_flag']
trans_status['vcchighwarning_flag'] = module_flag['voltage_flags']['voltage_high_warn_flag']
trans_status['vcclowwarning_flag'] = module_flag['voltage_flags']['voltage_low_warn_flag']
except TypeError:
pass
try:
aux1_mon_type, aux2_mon_type, aux3_mon_type = self.get_aux_mon_type()
if aux2_mon_type == 0:
Expand All @@ -1707,53 +1727,66 @@ def get_transceiver_status(self):
pass
if not self.is_flat_memory():
dp_state_dict = self.get_datapath_state()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['DP%dState' % lane] = dp_state_dict['DP%dState' % lane]
if dp_state_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['DP%dState' % lane] = dp_state_dict.get('DP%dState' % lane)
tx_output_status_dict = self.get_tx_output_status()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txoutput_status%d' % lane] = tx_output_status_dict['TxOutputStatus%d' % lane]
if tx_output_status_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txoutput_status%d' % lane] = tx_output_status_dict.get('TxOutputStatus%d' % lane)
rx_output_status_dict = self.get_rx_output_status()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxoutput_status_hostlane%d' % lane] = rx_output_status_dict['RxOutputStatus%d' % lane]
if rx_output_status_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxoutput_status_hostlane%d' % lane] = rx_output_status_dict.get('RxOutputStatus%d' % lane)
tx_fault = self.get_tx_fault()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txfault%d' % lane] = tx_fault[lane - 1]
if tx_fault:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txfault%d' % lane] = tx_fault[lane - 1]
tx_los = self.get_tx_los()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txlos_hostlane%d' % lane] = tx_los[lane - 1]
if tx_los:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txlos_hostlane%d' % lane] = tx_los[lane - 1]
tx_lol = self.get_tx_cdr_lol()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txcdrlol_hostlane%d' % lane] = tx_lol[lane - 1]
if tx_lol:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txcdrlol_hostlane%d' % lane] = tx_lol[lane - 1]
rx_los = self.get_rx_los()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxlos%d' % lane] = rx_los[lane - 1]
if rx_los:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxlos%d' % lane] = rx_los[lane - 1]
rx_lol = self.get_rx_cdr_lol()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxcdrlol%d' % lane] = rx_lol[lane - 1]
if rx_lol:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxcdrlol%d' % lane] = rx_lol[lane - 1]
config_status_dict = self.get_config_datapath_hostlane_status()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['config_state_hostlane%d' % lane] = config_status_dict['ConfigStatusLane%d' % lane]
if config_status_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['config_state_hostlane%d' % lane] = config_status_dict.get('ConfigStatusLane%d' % lane)
dpinit_pending_dict = self.get_dpinit_pending()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['dpinit_pending_hostlane%d' % lane] = dpinit_pending_dict['DPInitPending%d' % lane]
if dpinit_pending_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['dpinit_pending_hostlane%d' % lane] = dpinit_pending_dict.get('DPInitPending%d' % lane)
tx_power_flag_dict = self.get_tx_power_flag()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txpowerhighalarm_flag%d' % lane] = tx_power_flag_dict['tx_power_high_alarm']['TxPowerHighAlarmFlag%d' % lane]
trans_status['txpowerlowalarm_flag%d' % lane] = tx_power_flag_dict['tx_power_low_alarm']['TxPowerLowAlarmFlag%d' % lane]
trans_status['txpowerhighwarning_flag%d' % lane] = tx_power_flag_dict['tx_power_high_warn']['TxPowerHighWarnFlag%d' % lane]
trans_status['txpowerlowwarning_flag%d' % lane] = tx_power_flag_dict['tx_power_low_warn']['TxPowerLowWarnFlag%d' % lane]
if tx_power_flag_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txpowerhighalarm_flag%d' % lane] = tx_power_flag_dict['tx_power_high_alarm']['TxPowerHighAlarmFlag%d' % lane]
trans_status['txpowerlowalarm_flag%d' % lane] = tx_power_flag_dict['tx_power_low_alarm']['TxPowerLowAlarmFlag%d' % lane]
trans_status['txpowerhighwarning_flag%d' % lane] = tx_power_flag_dict['tx_power_high_warn']['TxPowerHighWarnFlag%d' % lane]
trans_status['txpowerlowwarning_flag%d' % lane] = tx_power_flag_dict['tx_power_low_warn']['TxPowerLowWarnFlag%d' % lane]
rx_power_flag_dict = self.get_rx_power_flag()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxpowerhighalarm_flag%d' % lane] = rx_power_flag_dict['rx_power_high_alarm']['RxPowerHighAlarmFlag%d' % lane]
trans_status['rxpowerlowalarm_flag%d' % lane] = rx_power_flag_dict['rx_power_low_alarm']['RxPowerLowAlarmFlag%d' % lane]
trans_status['rxpowerhighwarning_flag%d' % lane] = rx_power_flag_dict['rx_power_high_warn']['RxPowerHighWarnFlag%d' % lane]
trans_status['rxpowerlowwarning_flag%d' % lane] = rx_power_flag_dict['rx_power_low_warn']['RxPowerLowWarnFlag%d' % lane]
if rx_power_flag_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['rxpowerhighalarm_flag%d' % lane] = rx_power_flag_dict['rx_power_high_alarm']['RxPowerHighAlarmFlag%d' % lane]
trans_status['rxpowerlowalarm_flag%d' % lane] = rx_power_flag_dict['rx_power_low_alarm']['RxPowerLowAlarmFlag%d' % lane]
trans_status['rxpowerhighwarning_flag%d' % lane] = rx_power_flag_dict['rx_power_high_warn']['RxPowerHighWarnFlag%d' % lane]
trans_status['rxpowerlowwarning_flag%d' % lane] = rx_power_flag_dict['rx_power_low_warn']['RxPowerLowWarnFlag%d' % lane]
tx_bias_flag_dict = self.get_tx_bias_flag()
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txbiashighalarm_flag%d' % lane] = tx_bias_flag_dict['tx_bias_high_alarm']['TxBiasHighAlarmFlag%d' % lane]
trans_status['txbiaslowalarm_flag%d' % lane] = tx_bias_flag_dict['tx_bias_low_alarm']['TxBiasLowAlarmFlag%d' % lane]
trans_status['txbiashighwarning_flag%d' % lane] = tx_bias_flag_dict['tx_bias_high_warn']['TxBiasHighWarnFlag%d' % lane]
trans_status['txbiaslowwarning_flag%d' % lane] = tx_bias_flag_dict['tx_bias_low_warn']['TxBiasLowWarnFlag%d' % lane]
if tx_bias_flag_dict:
for lane in range(1, self.NUM_CHANNELS+1):
trans_status['txbiashighalarm_flag%d' % lane] = tx_bias_flag_dict['tx_bias_high_alarm']['TxBiasHighAlarmFlag%d' % lane]
trans_status['txbiaslowalarm_flag%d' % lane] = tx_bias_flag_dict['tx_bias_low_alarm']['TxBiasLowAlarmFlag%d' % lane]
trans_status['txbiashighwarning_flag%d' % lane] = tx_bias_flag_dict['tx_bias_high_warn']['TxBiasHighWarnFlag%d' % lane]
trans_status['txbiaslowwarning_flag%d' % lane] = tx_bias_flag_dict['tx_bias_low_warn']['TxBiasLowWarnFlag%d' % lane]
self.vdm_dict = self.get_vdm()
try:
trans_status['prefecberhighalarm_flag'] = self.vdm_dict['Pre-FEC BER Average Media Input'][1][5]
Expand Down Expand Up @@ -1909,6 +1942,8 @@ def get_application_advertisement(self):
ret = {}
# Read the application advertisment in lower memory
dic = self.xcvr_eeprom.read(consts.APPLS_ADVT_FIELD)
if not dic:
return ret

if not self.is_flat_memory():
# Read the application advertisement in page01
Expand Down
22 changes: 22 additions & 0 deletions tests/sonic_xcvr/test_cmis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from unittest.mock import patch
from mock import MagicMock
import pytest
import traceback
import random
from sonic_platform_base.sonic_xcvr.api.public.cmis import CmisApi
from sonic_platform_base.sonic_xcvr.mem_maps.public.cmis import CmisMemMap
from sonic_platform_base.sonic_xcvr.xcvr_eeprom import XcvrEeprom
Expand All @@ -14,6 +16,7 @@ class TestCmis(object):
reader = MagicMock(return_value=None)
writer = MagicMock()
eeprom = XcvrEeprom(reader, writer, mem_map)
old_read_func = eeprom.read
api = CmisApi(eeprom)

@pytest.mark.parametrize("mock_response, expected", [
Expand Down Expand Up @@ -2174,3 +2177,22 @@ def test_get_error_description(self):

result = self.api.get_error_description()
assert result is None

def test_random_read_fail(self):
def mock_read_raw(offset, size):
i = random.randint(0, 1)
return None if i == 0 else b'0' * size

self.api.xcvr_eeprom.read = self.old_read_func
self.api.xcvr_eeprom.reader = mock_read_raw

run_num = 5
while run_num > 0:
try:
self.api.get_transceiver_bulk_status()
self.api.get_transceiver_info()
self.api.get_transceiver_threshold_info()
self.api.get_transceiver_status()
except:
assert 0, traceback.format_exc()
run_num -= 1
22 changes: 21 additions & 1 deletion tests/sonic_xcvr/test_sff8436.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from mock import MagicMock, patch
import pytest

import traceback
import random

from sonic_platform_base.sonic_xcvr.api.public.sff8436 import Sff8436Api
from sonic_platform_base.sonic_xcvr.codes.public.sff8436 import Sff8436Codes
from sonic_platform_base.sonic_xcvr.mem_maps.public.sff8436 import Sff8436MemMap
from sonic_platform_base.sonic_xcvr.xcvr_eeprom import XcvrEeprom
from sonic_platform_base.sonic_xcvr.fields import consts


class TestSff8436(object):
codes = Sff8436Codes
mem_map = Sff8436MemMap(codes)
Expand Down Expand Up @@ -106,6 +108,23 @@ def test_simulate_copper(self):
assert not self.api.get_temperature_support()
assert not self.api.get_voltage_support()

def test_random_read_fail(self):
def mock_read_raw(offset, size):
i = random.randint(0, 1)
return None if i == 0 else b'0' * size

self.api.xcvr_eeprom.reader = mock_read_raw

run_num = 5
while run_num > 0:
try:
self.api.get_transceiver_bulk_status()
self.api.get_transceiver_info()
self.api.get_transceiver_threshold_info()
except:
assert 0, traceback.format_exc()
run_num -= 1

def test_get_lpmode(self):
self.api.get_lpmode_support = MagicMock()
self.api.get_lpmode_support.return_value = True
Expand Down Expand Up @@ -135,3 +154,4 @@ def test_set_lpmode(self):
self.api.get_lpmode_support.return_value = False
self.api.get_power_override_support.return_value = False
assert not self.api.set_lpmode(True)

Loading

0 comments on commit 81636a5

Please # to comment.