34
34
MAX_CPE_MATCHES_PER_PAGE = 500
35
35
36
36
37
- def _result_iterator (data : JSON ) -> Iterator [CPEMatchString ]:
38
- results : list [dict [str , Any ]] = data .get ("match_strings" , []) # type: ignore
39
- return (
40
- CPEMatchString .from_dict (result ["match_string" ]) for result in results
41
- )
42
-
43
-
44
37
class CPEMatchApi (NVDApi ):
45
38
"""
46
39
API for querying the NIST NVD CPE match information.
@@ -62,6 +55,7 @@ def __init__(
62
55
token : Optional [str ] = None ,
63
56
timeout : Optional [Timeout ] = DEFAULT_TIMEOUT_CONFIG ,
64
57
rate_limit : bool = True ,
58
+ cache_cpe_matches : bool = True ,
65
59
) -> None :
66
60
"""
67
61
Create a new instance of the CPE API.
@@ -76,13 +70,22 @@ def __init__(
76
70
rolling 30 second window.
77
71
See https://nvd.nist.gov/developers/start-here#divRateLimits
78
72
Default: True.
73
+ cache_cpe_matches: If set to True (the default) the entries in the
74
+ lists of matching CPEs for each match string are cached and reused
75
+ to use less memory.
76
+ If set to False, a separate CPEMatch object is kept for each entry
77
+ to avoid possible side effects when modifying the data.
79
78
"""
80
79
super ().__init__ (
81
80
DEFAULT_NIST_NVD_CPE_MATCH_URL ,
82
81
token = token ,
83
82
timeout = timeout ,
84
83
rate_limit = rate_limit ,
85
84
)
85
+ if cache_cpe_matches :
86
+ self ._cpe_match_cache = {}
87
+ else :
88
+ self ._cpe_match_cache = None
86
89
87
90
def cpe_matches (
88
91
self ,
@@ -157,12 +160,30 @@ def cpe_matches(
157
160
return NVDResults (
158
161
self ,
159
162
params ,
160
- _result_iterator ,
163
+ self . _result_iterator ,
161
164
request_results = request_results ,
162
165
results_per_page = results_per_page ,
163
166
start_index = start_index ,
164
167
)
165
168
169
+ def _result_iterator (self , data : JSON ) -> Iterator [CPEMatchString ]:
170
+ """
171
+ Creates an iterator of all the CPEMatchStrings in given API response JSON
172
+
173
+ Args:
174
+ data: The JSON response data to get the match strings from
175
+
176
+ Returns:
177
+ An iterator over the CPEMatchStrings
178
+ """
179
+ results : list [dict [str , Any ]] = data .get ("match_strings" , []) # type: ignore
180
+ return (
181
+ CPEMatchString .from_dict_with_cache (
182
+ result ["match_string" ], self ._cpe_match_cache
183
+ )
184
+ for result in results
185
+ )
186
+
166
187
async def cpe_match (self , match_criteria_id : str ) -> CPEMatchString :
167
188
"""
168
189
Returns a single CPE match for the given match criteria id.
@@ -201,7 +222,9 @@ async def cpe_match(self, match_criteria_id: str) -> CPEMatchString:
201
222
)
202
223
203
224
match_string = match_strings [0 ]
204
- return CPEMatchString .from_dict (match_string ["match_string" ])
225
+ return CPEMatchString .from_dict_with_cache (
226
+ match_string ["match_string" ], self ._cpe_match_cache
227
+ )
205
228
206
229
async def __aenter__ (self ) -> "CPEMatchApi" :
207
230
await super ().__aenter__ ()
0 commit comments