From 79aaaf4df0e28050401ec07c77e064ce87936728 Mon Sep 17 00:00:00 2001 From: James Shannon <james@jamesshannon.com> Date: Tue, 3 Nov 2020 10:25:21 -0800 Subject: [PATCH 1/7] Initial cptlib checkin --- cptlib/__init__.py | 0 cptlib/parcels.py | 115 +++++++++++++++++++++++++++++++++++++++++++++ cptlib/parsers.py | 60 +++++++++++++++++++++++ cptlib/scrapers.py | 65 +++++++++++++++++++++++++ 4 files changed, 240 insertions(+) create mode 100644 cptlib/__init__.py create mode 100644 cptlib/parcels.py create mode 100644 cptlib/parsers.py create mode 100644 cptlib/scrapers.py diff --git a/cptlib/__init__.py b/cptlib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cptlib/parcels.py b/cptlib/parcels.py new file mode 100644 index 0000000..32ee6e7 --- /dev/null +++ b/cptlib/parcels.py @@ -0,0 +1,115 @@ +def centroid_from_latlng(lat_field_name, lng_field_name): + def centroid_fn(record): + return (record[lat_field_name], record[lng_field_name]) + + return centroid_fn + + + +def centroid_from_shape(shape_field = 'points'): + def centroid_fn(record): + list(Polygon(coords).centroid.coords)[0] + poly = Poly(record[shape_field]) + return (poly.lat, poly.lng) + + return centroid_fn + + +import os +import re + +class Parcel: + def __init__(self, address, county_code, apn, centroid = None, tax = None): + self.address = address + self.county_code = county_code + self.apn = apn + + self.centroid = centroid + self.tax = tax + + @property + def csv_row(self): + return { + 'address': self.address, + 'apn': self.apn, + 'latitude': self.centroid[0], + 'longitude': self.centroid[1], + 'tax': self.tax, + 'county': self.county_code, + } + + @property + def html_file_path(self): + apn = re.sub(r'[^A-Za-z0-9]', self.apn, '') + return os.path.join('data', apn[0:3], apn[3:6], '{}.html.gz'.format(apn)) + +import os + +class Parcels(): + def __init__(self, county_code, apn_column, address_column, centroid_fn): + self.county_code = county_code + self.apn_column = apn_column + self.address_column = address_column + self.centroid_fn = centroid_fn + + def _get_address(self, row): + return row[self.address_column] + + def _make_parcel(self, row): + return Parcel(self._get_address(row), self.county_code, + row[self.apn_column], self.centroid_fn(row)) + + def __iter__(self): + return self + + + +import csv + +class ParcelsCSV(Parcels): + def __init__(county_code, apn_column, address_column, centroid_fn, + csv_file_path): + super().__init__(self, county_code, apn_column, address_column, centroid_fn) + + self.csv_reader = dictreader(csv_file_path) + + def __next__(self): + row = next(self.csv_reader) + + return self._make_parcel(row) + + + +import shapefile + +class ParcelsShapeFile(Parcels): + def __init__(county_code, apn_column, address_column, centroid_fn, + shape_file_path): + super().__init__(self, county_code, apn_column, address_column, centroid_fn) + + self.sf = shapefile.Reader(shape_file_path) + self.idx = 0 + + # we only know how to deal with polygons + assert self.sf.shapeType == shapefile.POLYGON + + def __next__(self): + if self.idx < len(self.sf): + record = self.sf.shapeRecord(self.idx) + self.idx += 1 + + dct = record.record.as_dict() + dct['points'] = record.shape.points + return self._make_parcel(dct) + + raise StopIteration + + row = next(self.csv_reader) + + return self._make_parcel(row) + + + + + + diff --git a/cptlib/parsers.py b/cptlib/parsers.py new file mode 100644 index 0000000..643847a --- /dev/null +++ b/cptlib/parsers.py @@ -0,0 +1,60 @@ +def Parser(): + def __init__(self, parcels_generator, data_dir): + self.parcels = parcels_generator + self.data_dir = data_dir + + fieldnames = ['address', 'apn', 'longitude', 'latitude', 'tax', 'county'] + self.csv_writer = csv.DictWriter(f_out, fieldnames=fieldnames) + + def parse(self): + count = 0 + + for parcel in self.parcels: + count += 1 + + html_path = self.data_dir + parcel.html_file_path + + with gzip.open(html_path): + if parse_html(parcel, html): + self.csv_writer.write_row(parcel.csv_row) + else: + pass + + + def _parse_html(self, html): + raise NotImplementedError + +class ParserMegabyte(Parser): + def _parse_html(self, html): + soup = BeautifulSoup(html, 'html.parser') + + #extract payment info + tab1 = soup.find('div', {'id':'h2tab1'}) + total_tax = -1 + if tab1 != None: + bills = tab1.find_all('dt',text='Total Due') + + if len(bills) == 3: + #grab the total annual payment, not the 6-month one + #no need to double value later on + total_tax_str = bills[2].findNext('dd').string.replace('$', '').replace(',', '') + try: + total_tax = float(total_tax_str) + except: + print('--> Could not parse float', amount_str) + else: + print("bad tax records on parcel ",apn) + + else: + print(apn,"Tax data not available.") + + + #extract address + tab2 = soup.find('div', {'id':'h2tab2'}) + if tab2 is not None: + address = tab2.find('dt',text='Address').findNext('dd').string + + if address is None: + address = "UNKNOWN" + + print(address,total_tax) diff --git a/cptlib/scrapers.py b/cptlib/scrapers.py new file mode 100644 index 0000000..cafe0a4 --- /dev/null +++ b/cptlib/scrapers.py @@ -0,0 +1,65 @@ +import time + +class Scraper(): + def __init__(self, parcels_generator, data_dir, url_tpl): + self.parcels = parcels_generator + self.data_dir = data_dir + self.url_tpl = url_tpl + + + self.request_type = 'GET' + self.request_params = {} + # Be kind to the servers running on 20 year old hardware + # Minimum delay is 0.1 seconds which is an absolute max of 10 QPS + self.request_qps = 3 + + self.request_error_retries = 6 + self.request_error_backoff_secs = 2 + + def scrape(self): + count = 0 + delay_secs = 1 / self.request_qps + + for parcel in self.parcels: + count =+ 1 + + url = self._scrape_url(parcel) + path = PATH + parcel.html_file_path + + # create directory + # check if file exists + + request_tries = 0 + start_time = time.time() + + while True: + try: + request_tries += 1 + resp = self._req_make_request(url) + + # Request was successful + break + except: + # Catches network failures + time.sleep(pow(self.request_error_backoff_secs, request_tries)) + pass + + + if self._req_is_success(resp): + with gzip.open(path, 'wt') as f_out: + f_out.write(resp.text) + + time.sleep(max(delay_secs - (time.time() - start_time), 0.1)) + + + def _scrape_url(self, parcel): + return self.url_tpl.format(apn=parcel.apn) + + def _req_make_request(self, url): + if self.request_type == 'GET': + return requests.get(url, **self.request_params) + else: + return requests.post(url, **self.request_params) + + def _req_is_success(self, response): + return response.status_code == 200 From ccc8c13882161bf1d8a4bf89f8707648b3e1f46e Mon Sep 17 00:00:00 2001 From: James Shannon <james@jamesshannon.com> Date: Tue, 3 Nov 2020 19:28:57 -0800 Subject: [PATCH 2/7] cptlib has shared libraries for scraping and parsing --- .gitignore | 1 + cptlib/README | 76 +++++++++++++++++ cptlib/parcels.py | 205 +++++++++++++++++++++++++++++++++++++-------- cptlib/parsers.py | 111 ++++++++++++++++-------- cptlib/scrapers.py | 95 ++++++++++++++++++--- 5 files changed, 410 insertions(+), 78 deletions(-) create mode 100644 cptlib/README diff --git a/.gitignore b/.gitignore index 52ac9dd..7dc422e 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ *.geojson *.geojson.gz scrapers/*/scrape_output/*.html +scrapers/*/data/ venv ### Allow example data ### diff --git a/cptlib/README b/cptlib/README new file mode 100644 index 0000000..da05700 --- /dev/null +++ b/cptlib/README @@ -0,0 +1,76 @@ +These classes should provide a lot of the heavy lifting to scrape and parse +online property tax data with a minimum amount of code. + +See [placer/scraper.py](../scrapers/placer/scraper.py) and +[placer/parser.py](../scrapers/placer/parser.py) for an example on how to use +these. + +1. **Configure and instantiate the `Parcels` iterator** +Every scraper and parser will need to iterate through a list of Parcels, which +is often loaded from a CSV or a shapefile. The `ParcelsCSV` and +`ParcelsShapefile` class instances will provide the scraper and parser with +`Parcel` objects which will be duly scraped or parsed. +The `Parcels` subclasses are configurable by passing, e.g., the key for the APN +field and the CSV or Shapefile path. Gettng the lat/long from the CSV or +shapefile can be tricky because of the multiple formats that this may take so +for the centroid value you need to pass a function which will take the *row* +and return a centroid tuple. There are two functions (`centroidfn_from_latlng` +and `centroidfn_from_shape`) which will create the necessary functions for you. +Creating a `Parcels` instance looks like this: + +``` +parcels_gen = parcels.ParcelsShapefile('PL', 'APN', 'ADR1', + parcels.centroidfn_from_shape(), + os.path.join(DATA_DIR, 'Parcels.shp')) +parcels_gen.valid_apn_pattern = r'^\d{3}-\d{3}-\d{3}-\d{3}$' +``` + +In the above example we're: +* Loading a shapefile named `data/Parcels.shp` +* for Placer county +* where the APN is in the record with key `APN` and address has key `ADR1` +* and the location data exists as a polygon in the record, so we create a function +which will read it from the `points` key (the default) and return the centroid +* and APNs must match a particular regexp or be considered invalid and skipped + +2. **Instantiate a `Scraper` with the `Parcels` instance** +The scraper will loop through each `Parcel` generated by the `Parcels` instance, +make a web request, and save the HTML to the data directory. You provide the +data directory path and a URL template for the web request. There are a number +of properties which can customize the web request behavior. + +Creating a `Scraper` and running it looks like this: + +``` +scraper = scrapers.Scraper(parcels_gen, DATA_DIR, + 'https://common3.mptsweb.com/MBC/placer/tax/main/{apn_clean}/2020/0000') +scraper.request_unsuccessful_string = '<title>ERROR</title>' + +scraper.scrape() +``` + +In the above example we're: +* Creating a `Scraper` using the `ParcelsShapefile` iterator we just created +* which will write to a directory structure in `data/` +* and request from a Placer-county website with the APN loaded into it +* and consider any responses with the HTML title 'ERROR' to be invalid + +3. **Instantiate a `Parser`** +The parser will loop through each `Parcel` generated by the `Parcels` instance, +look for the local HTML file, parse it for tax info, and -- if found -- write +the `Parcel` information to the `output.csv` file. + +Unlike `Scraper`s, parsing requires custom code so `Parser` is meant to be +overridden with a custom parsing function. However, the parent `Parser` handles +the looping and saving. The appropriate `Parser` subclass may already exist +for the flavor of HTML which you need to parse -- look in the [parsers.py] file. +For example, `ParserMegabyte` will parse mptsweb.com pages. If one doesn't exist +then you need to override the `_parse_html()` function, parse the `html` argument +and update the `parcel` instance. + +Creating a `ParserMegabyte` instance and running it looks like this: + +``` +parser = parsers.ParserMegabyte(parcels_gen, DATA_DIR) +parser.parse() +``` diff --git a/cptlib/parcels.py b/cptlib/parcels.py index 32ee6e7..90f5ba8 100644 --- a/cptlib/parcels.py +++ b/cptlib/parcels.py @@ -1,34 +1,62 @@ -def centroid_from_latlng(lat_field_name, lng_field_name): - def centroid_fn(record): - return (record[lat_field_name], record[lng_field_name]) +import csv +import os +import re - return centroid_fn +import shapefile +from shapely.geometry import Polygon +def centroidfn_from_latlng(lat_field_name, lng_field_name): + """Return a centroid function to get lat/long values from a record. + The function returned will extract the lat/long values (using the keys). -def centroid_from_shape(shape_field = 'points'): + Args: + lat_field_name (str): Key (field) name for latitude field + lng_field_name (str): Key (field) name for latitude field + """ def centroid_fn(record): - list(Polygon(coords).centroid.coords)[0] - poly = Poly(record[shape_field]) - return (poly.lat, poly.lng) + return (record[lat_field_name], record[lng_field_name]) return centroid_fn +def centroidfn_from_shape(shape_field = 'points'): + """Return a centroid function to get a polygon's centroid from a record. + The function returned will extract the polygon (using the key) and then + calculate a centroid. -import os -import re + Args: + shape_field (str, optional): Key (field) name for points list. Should be + 'points', which is pulled from the shape file and added to the record. + Defaults to 'points'. + """ + def centroid_fn(record): + points = record[shape_field] + if not points: + return None -class Parcel: + centroid = list(Polygon(points).centroid.coords)[0] + return (centroid[0], centroid[1]) + + return centroid_fn + +class Parcel(): + """Represents a single parcel + """ def __init__(self, address, county_code, apn, centroid = None, tax = None): self.address = address self.county_code = county_code - self.apn = apn + self.apn = apn.strip() self.centroid = centroid self.tax = tax @property def csv_row(self): + """Generate a dict representation suitable for being written to CSV + + Returns: + dict: Parcel as dict + """ return { 'address': self.address, 'apn': self.apn, @@ -40,52 +68,152 @@ def csv_row(self): @property def html_file_path(self): - apn = re.sub(r'[^A-Za-z0-9]', self.apn, '') - return os.path.join('data', apn[0:3], apn[3:6], '{}.html.gz'.format(apn)) + """Generate a relative file path for the parcel's HTML file -import os + Returns: + str: Relative file path and name + """ + apn = self.apn_clean + + # Lots of files in a single directory can cause performance to suffer + # Create a multi-level directory structure based on APN values + #return os.path.join(apn[0:3], apn[3:6], '{}.htm.gz'.format(apn)) + return os.path.join(apn[0:3], '{}.html.gz'.format(apn)) + + @property + def apn_clean(self): + """Generate a cleaned APN string (only alphanumeric) + + Returns: + str: Cleaned APN string + """ + return re.sub(r'[^A-Za-z0-9]', '', self.apn) class Parcels(): + """Abstract class to generate a Parcels based on a file + """ def __init__(self, county_code, apn_column, address_column, centroid_fn): + """Create a Parcels iterator. + + Args: + county_code (str): County code + apn_column (str): Field key for the APN column + address_column (str): Field key for the address column + centroid_fn (callable): Function (incl lambda) which will get the row + (as a dict) and return the centroid as `(lat, lng)` + """ self.county_code = county_code self.apn_column = apn_column self.address_column = address_column self.centroid_fn = centroid_fn + self.valid_apn_pattern = None + def _get_address(self, row): + """Return address given row and the address_column value + + Override me if getting the address is more complex + + Args: + row (dict): Row + + Returns: + str: Address + """ return row[self.address_column] + def _record_is_valid_parcel(self, row): + """Check if the row/record is valid + + In some cases the record doesn't have APN and/or geo info, which + isn't of use to us, and can cause problems. + + This method can be overridden, but should still be called. + + Args: + row (dict): Record + + Returns: + bool: True if record is a valid parcel and should be scraped / parsed + """ + return (row[self.apn_column] + and bool(not self.valid_apn_pattern + or re.search(self.valid_apn_pattern, row[self.apn_column]))) + def _make_parcel(self, row): + """Return a Parcel from the row and helper methods + + Args: + row (dict): Row + + Returns: + Parcel: Parcel + """ return Parcel(self._get_address(row), self.county_code, row[self.apn_column], self.centroid_fn(row)) def __iter__(self): + """I'm an interator + """ return self -import csv - class ParcelsCSV(Parcels): - def __init__(county_code, apn_column, address_column, centroid_fn, + """Class which generates Parcels from a CSV file. + """ + def __init__(self, county_code, apn_column, address_column, centroid_fn, csv_file_path): - super().__init__(self, county_code, apn_column, address_column, centroid_fn) + """Create a Parcels iterator which loops through a CSV file. + + Args: + county_code (str): County code + apn_column (str): Field key for the APN column + address_column (str): Field key for the address column + centroid_fn (callable): Function (incl lambda) which will get the row + (as a dict) and return the centroid as `(lat, lng)` + csv_file_path (str): CSV file path + """ + super().__init__( county_code, apn_column, address_column, centroid_fn) - self.csv_reader = dictreader(csv_file_path) + self.csv_file = open(csv_file_path, encoding='utf-8-sig') + self.csv_reader = csv.DictReader(self.csv_file) def __next__(self): - row = next(self.csv_reader) + while True: + row = next(self.csv_reader) - return self._make_parcel(row) + if self._record_is_valid_parcel(row): + # If not a valid parcel then keep iterating until we get one + return self._make_parcel(row) + print('-> Skipping invalid record') -import shapefile -class ParcelsShapeFile(Parcels): - def __init__(county_code, apn_column, address_column, centroid_fn, +"""Generate Parcels from a Shapefile. + +Pass the APN column key, address column key, a function which returns a +centroid, and the path to the CSV file. + +Records must have a polygon shape. +""" +class ParcelsShapefile(Parcels): + """Class which generates Parcels from a Shapefile + """ + def __init__(self, county_code, apn_column, address_column, centroid_fn, shape_file_path): - super().__init__(self, county_code, apn_column, address_column, centroid_fn) + """Create a Parcels iterator which loops through a CSV file. + + Args: + county_code (str): County code + apn_column (str): Field key for the APN column + address_column (str): Field key for the address column + centroid_fn (callable): Function (incl lambda) which will get the row + (as a dict) and return the centroid as `(lat, lng)` + shape_file_path (str): Shapefile path + """ + super().__init__(county_code, apn_column, address_column, centroid_fn) self.sf = shapefile.Reader(shape_file_path) self.idx = 0 @@ -94,22 +222,33 @@ def __init__(county_code, apn_column, address_column, centroid_fn, assert self.sf.shapeType == shapefile.POLYGON def __next__(self): - if self.idx < len(self.sf): + while self.idx < len(self.sf): record = self.sf.shapeRecord(self.idx) self.idx += 1 + # Create a dict from the record and add the polygon points to the dict + # with the key 'points' dct = record.record.as_dict() dct['points'] = record.shape.points - return self._make_parcel(dct) - - raise StopIteration - row = next(self.csv_reader) - - return self._make_parcel(row) + if self._record_is_valid_parcel(dct): + # If not a valid parcel then keep iterating until we get one + return self._make_parcel(dct) + print('-> Skipping invalid record') + raise StopIteration + def _record_is_valid_parcel(self, row): + """Check if the shapefile record (as a dict) is valid + In some cases the record doesn't have polygon points + Args: + row (dict): Record + Returns: + bool: True if record is a valid parcel and should be scraped / parsed + """ + return bool(row['points'] + and super()._record_is_valid_parcel(row)) diff --git a/cptlib/parsers.py b/cptlib/parsers.py index 643847a..b12b31f 100644 --- a/cptlib/parsers.py +++ b/cptlib/parsers.py @@ -1,60 +1,105 @@ -def Parser(): +import csv +import gzip +import os + +from bs4 import BeautifulSoup + + +class Parser(): + """Abstract Parser class. + + Should be overridden to implement specific tax amount parsing routine + """ def __init__(self, parcels_generator, data_dir): + """Generate a Parser instance + + Args: + parcels_generator (Parcels): Parcels iterator + data_dir (str): Directory to read HTML files from and write output CSV + """ self.parcels = parcels_generator self.data_dir = data_dir - fieldnames = ['address', 'apn', 'longitude', 'latitude', 'tax', 'county'] - self.csv_writer = csv.DictWriter(f_out, fieldnames=fieldnames) + csv_file_path = os.path.join(data_dir, 'output.csv') + self.csv_file = open(csv_file_path, 'w') + csv_fieldnames = ['address', 'apn', 'longitude', 'latitude', 'tax', + 'county'] + self.csv_writer = csv.DictWriter(self.csv_file, fieldnames=csv_fieldnames) def parse(self): + """Execute the parser. Loop through Parcels and parse local HTML files. + """ count = 0 for parcel in self.parcels: count += 1 - html_path = self.data_dir + parcel.html_file_path + path = os.path.join(self.data_dir, parcel.html_file_path) + + print(count) + + try: + with gzip.open(path, 'rt') as f_in: + html = f_in.read() + except FileNotFoundError: + print('-> HTML file not found') + continue - with gzip.open(html_path): - if parse_html(parcel, html): - self.csv_writer.write_row(parcel.csv_row) - else: - pass + if self._parse_html(parcel, html): + self.csv_writer.writerow(parcel.csv_row) + if count % 500 == 0: + # Flush to filesystem every 500 rows + self.csv_file.flush() - def _parse_html(self, html): + continue + + print('-> Could not parse file') + + def _parse_html(self, parcel, html): + """Should be overridden with specific parsing logic + """ raise NotImplementedError class ParserMegabyte(Parser): - def _parse_html(self, html): + """Parser class that parses property tax pages hosted by + Megabyte (mptsweb.com) + """ + def _parse_html(self, parcel, html): + """Parse HTML from Megabyte and update the Parcel with tax amount + + Args: + parcel (Parcel): Parcel associated with HTML text + html (str): Property tax page HTML + + Returns: + bool: True if parsing was successful + """ soup = BeautifulSoup(html, 'html.parser') #extract payment info - tab1 = soup.find('div', {'id':'h2tab1'}) + tab1 = soup.find('div', {'id': 'h2tab1'}) total_tax = -1 - if tab1 != None: - bills = tab1.find_all('dt',text='Total Due') - if len(bills) == 3: - #grab the total annual payment, not the 6-month one - #no need to double value later on - total_tax_str = bills[2].findNext('dd').string.replace('$', '').replace(',', '') - try: - total_tax = float(total_tax_str) - except: - print('--> Could not parse float', amount_str) - else: - print("bad tax records on parcel ",apn) + if tab1 != None: + bills = tab1.find_all('dt', text='Total Due') - else: - print(apn,"Tax data not available.") + if len(bills) == 3: + #grab the total annual payment, not the 6-month one + #no need to double value later on + total_tax_str = bills[2].findNext('dd').string\ + .replace('$', '').replace(',', '') + try: + total_tax = float(total_tax_str) + # set tax amount on parcel + parcel.tax = round(total_tax, 2) + return True + except: + print('--> Could not parse float') + else: + print("--> bad tax records on parcel") - #extract address - tab2 = soup.find('div', {'id':'h2tab2'}) - if tab2 is not None: - address = tab2.find('dt',text='Address').findNext('dd').string - if address is None: - address = "UNKNOWN" - print(address,total_tax) + return False diff --git a/cptlib/scrapers.py b/cptlib/scrapers.py index cafe0a4..7354c9b 100644 --- a/cptlib/scrapers.py +++ b/cptlib/scrapers.py @@ -1,33 +1,65 @@ +import gzip +import os import time +import requests + class Scraper(): + """Scraper class + + This is fairly configurable and probably won't need to be overridden. + """ def __init__(self, parcels_generator, data_dir, url_tpl): + """Generate a scraper instance + + Args: + parcels_generator (Parcels): Parcels iterator + data_dir (str): Directory to write HTML files to + url_tpl (str): URL template. {apn_clean} is replaced at runtime + """ self.parcels = parcels_generator self.data_dir = data_dir self.url_tpl = url_tpl - + # You can change these properties to configure behavior of requests self.request_type = 'GET' - self.request_params = {} + self.request_params = {'headers': + {'User-Agent': ('CA Property Tax Scraper ' + '(https://github.com/typpo/ca-property-tax)')}} + # Be kind to the servers running on 20 year old hardware # Minimum delay is 0.1 seconds which is an absolute max of 10 QPS self.request_qps = 3 self.request_error_retries = 6 + # Exponential backoff starting with this number of seconds self.request_error_backoff_secs = 2 + self.request_unsuccessful_string = None + def scrape(self): + """Execute the scraper. Loop through Parcels and download HTML files. + """ count = 0 delay_secs = 1 / self.request_qps for parcel in self.parcels: - count =+ 1 + count += 1 + + url = self._get_scrape_url(parcel) + path = os.path.join(self.data_dir, parcel.html_file_path) + print(count, parcel.apn, path) - url = self._scrape_url(parcel) - path = PATH + parcel.html_file_path + # Check if the file already exists + if os.path.exists(path): + print('-> File exists. Skipping') + continue - # create directory - # check if file exists + # create the directory + try: + os.mkdir(os.path.dirname(path)) + except FileExistsError: + pass request_tries = 0 start_time = time.time() @@ -39,27 +71,66 @@ def scrape(self): # Request was successful break - except: + except (requests.ConnectionError, requests.Timeout) as exc: # Catches network failures + if request_tries >= self.request_error_retries: + print('Reached max number of retries') + raise exc + time.sleep(pow(self.request_error_backoff_secs, request_tries)) pass - if self._req_is_success(resp): with gzip.open(path, 'wt') as f_out: f_out.write(resp.text) + else: + print('-> Request not successful: {}'.format(resp.status_code)) + time.sleep(max(delay_secs - (time.time() - start_time), 0.1)) + def _get_scrape_url(self, parcel): + """Generate the URL to scrape based on the URL template and Parcel + + Override this if URL generation is more complex. + + Args: + parcel (Parcel): Current Parcel + + Returns: + str: Request URL + """ + return self.url_tpl.format(apn_clean=parcel.apn_clean) - def _scrape_url(self, parcel): - return self.url_tpl.format(apn=parcel.apn) def _req_make_request(self, url): + """Make the request given the URL. Uses self.request_type and + self.request_params. + + Override this if request is more complex. + + Args: + url (str): URL + + Returns: + Response: Response object + """ if self.request_type == 'GET': return requests.get(url, **self.request_params) else: return requests.post(url, **self.request_params) def _req_is_success(self, response): - return response.status_code == 200 + """Test if request was successful. + + Override this if it makes sense to check more than response code. + + Args: + response (Response): Response object + + Returns: + bool: True if request was "successful" + """ + return (response.status_code == 200 and + (not self.request_unsuccessful_string + or self.request_unsuccessful_string not in response.text)) From b38cce0ff9d6dc24587522a5c3074990093806cc Mon Sep 17 00:00:00 2001 From: James Shannon <james@jamesshannon.com> Date: Tue, 3 Nov 2020 19:30:13 -0800 Subject: [PATCH 3/7] Added .md extension to README --- cptlib/{README => README.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename cptlib/{README => README.md} (100%) diff --git a/cptlib/README b/cptlib/README.md similarity index 100% rename from cptlib/README rename to cptlib/README.md From d0679c4f256854a49f9b46d872cad160af786100 Mon Sep 17 00:00:00 2001 From: James Shannon <james@jamesshannon.com> Date: Tue, 3 Nov 2020 19:31:58 -0800 Subject: [PATCH 4/7] Update README.md --- cptlib/README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cptlib/README.md b/cptlib/README.md index da05700..a073f69 100644 --- a/cptlib/README.md +++ b/cptlib/README.md @@ -6,10 +6,12 @@ See [placer/scraper.py](../scrapers/placer/scraper.py) and these. 1. **Configure and instantiate the `Parcels` iterator** + Every scraper and parser will need to iterate through a list of Parcels, which is often loaded from a CSV or a shapefile. The `ParcelsCSV` and `ParcelsShapefile` class instances will provide the scraper and parser with `Parcel` objects which will be duly scraped or parsed. + The `Parcels` subclasses are configurable by passing, e.g., the key for the APN field and the CSV or Shapefile path. Gettng the lat/long from the CSV or shapefile can be tricky because of the multiple formats that this may take so @@ -34,6 +36,7 @@ which will read it from the `points` key (the default) and return the centroid * and APNs must match a particular regexp or be considered invalid and skipped 2. **Instantiate a `Scraper` with the `Parcels` instance** + The scraper will loop through each `Parcel` generated by the `Parcels` instance, make a web request, and save the HTML to the data directory. You provide the data directory path and a URL template for the web request. There are a number @@ -56,6 +59,7 @@ In the above example we're: * and consider any responses with the HTML title 'ERROR' to be invalid 3. **Instantiate a `Parser`** + The parser will loop through each `Parcel` generated by the `Parcels` instance, look for the local HTML file, parse it for tax info, and -- if found -- write the `Parcel` information to the `output.csv` file. From 7bdbb73aa4ec085a2e8b1e2b3be8d92ff129a228 Mon Sep 17 00:00:00 2001 From: James Shannon <james@jamesshannon.com> Date: Wed, 4 Nov 2020 12:25:12 -0800 Subject: [PATCH 5/7] cptlib updates * Support parsed files without addresses * Scraper threading for higher throuput --- cptlib/parcels.py | 7 ++-- cptlib/parsers.py | 9 ++-- cptlib/scrapers.py | 101 +++++++++++++++++++++++++++++---------------- 3 files changed, 71 insertions(+), 46 deletions(-) diff --git a/cptlib/parcels.py b/cptlib/parcels.py index 90f5ba8..f29c51b 100644 --- a/cptlib/parcels.py +++ b/cptlib/parcels.py @@ -77,8 +77,7 @@ def html_file_path(self): # Lots of files in a single directory can cause performance to suffer # Create a multi-level directory structure based on APN values - #return os.path.join(apn[0:3], apn[3:6], '{}.htm.gz'.format(apn)) - return os.path.join(apn[0:3], '{}.html.gz'.format(apn)) + return os.path.join(apn[0:3], apn[3:6], '{}.htm.gz'.format(apn)) @property def apn_clean(self): @@ -120,7 +119,7 @@ def _get_address(self, row): Returns: str: Address """ - return row[self.address_column] + return row[self.address_column] if self.address_column else None def _record_is_valid_parcel(self, row): """Check if the row/record is valid @@ -174,7 +173,7 @@ def __init__(self, county_code, apn_column, address_column, centroid_fn, (as a dict) and return the centroid as `(lat, lng)` csv_file_path (str): CSV file path """ - super().__init__( county_code, apn_column, address_column, centroid_fn) + super().__init__(county_code, apn_column, address_column, centroid_fn) self.csv_file = open(csv_file_path, encoding='utf-8-sig') self.csv_reader = csv.DictReader(self.csv_file) diff --git a/cptlib/parsers.py b/cptlib/parsers.py index b12b31f..fddf28b 100644 --- a/cptlib/parsers.py +++ b/cptlib/parsers.py @@ -33,16 +33,15 @@ def parse(self): for parcel in self.parcels: count += 1 - path = os.path.join(self.data_dir, parcel.html_file_path) - print(count) + print(count, path) try: with gzip.open(path, 'rt') as f_in: html = f_in.read() except FileNotFoundError: - print('-> HTML file not found') + print(count, '-> HTML file not found') continue if self._parse_html(parcel, html): @@ -54,7 +53,7 @@ def parse(self): continue - print('-> Could not parse file') + print(count, '-> Could not parse file') def _parse_html(self, parcel, html): """Should be overridden with specific parsing logic @@ -100,6 +99,4 @@ def _parse_html(self, parcel, html): else: print("--> bad tax records on parcel") - - return False diff --git a/cptlib/scrapers.py b/cptlib/scrapers.py index 7354c9b..a15f130 100644 --- a/cptlib/scrapers.py +++ b/cptlib/scrapers.py @@ -1,3 +1,5 @@ +from collections import deque +import concurrent.futures import gzip import os import time @@ -29,7 +31,8 @@ def __init__(self, parcels_generator, data_dir, url_tpl): # Be kind to the servers running on 20 year old hardware # Minimum delay is 0.1 seconds which is an absolute max of 10 QPS - self.request_qps = 3 + self.request_concurrency = 5 + self.request_avg_qps = 2.5 self.request_error_retries = 6 # Exponential backoff starting with this number of seconds @@ -41,41 +44,67 @@ def scrape(self): """Execute the scraper. Loop through Parcels and download HTML files. """ count = 0 - delay_secs = 1 / self.request_qps - - for parcel in self.parcels: - count += 1 - - url = self._get_scrape_url(parcel) - path = os.path.join(self.data_dir, parcel.html_file_path) - print(count, parcel.apn, path) - - # Check if the file already exists - if os.path.exists(path): - print('-> File exists. Skipping') - continue - - # create the directory - try: - os.mkdir(os.path.dirname(path)) - except FileExistsError: - pass - - request_tries = 0 - start_time = time.time() - - while True: - try: - request_tries += 1 - resp = self._req_make_request(url) - - # Request was successful - break - except (requests.ConnectionError, requests.Timeout) as exc: - # Catches network failures - if request_tries >= self.request_error_retries: - print('Reached max number of retries') - raise exc + + futures = set() + concur = self.request_concurrency + start_times = deque(maxlen=concur) + expected_time = concur / max(self.request_avg_qps, 15) + + with concurrent.futures.ThreadPoolExecutor(max_workers=concur) as executor: + for parcel in self.parcels: + if len(futures) > concur: + # wait for at least one thread to be completed. second item is a + # set of not_done threads, so should return with a size of MAX - 1 + (done, futures) = concurrent.futures.wait(futures, + return_when=concurrent.futures.FIRST_COMPLETED) + + for future in done: + # This will raise any exception that the called function did + future.result() + + # rough approximation of how long it has taken for the [#] of tasks + # to complete + elapsed_time = time.time() - start_times[0] + time.sleep(max(expected_time - elapsed_time, 0)) + + count += 1 + + # There's a lot of overhead to starting a thread and then waiting for + # completion so looping through already-created files is extremely + # slow. Figure out the path and check for existence of the file + # before launching the thread. + path = os.path.join(self.data_dir, parcel.html_file_path) + + print(count, parcel.apn, path) + + # Check if the file already exists + if os.path.exists(path): + print(count, '-> File exists. Skipping') + continue + + futures.add(executor.submit(self._execute_scrape, count, parcel, path)) + start_times.append(time.time()) + + def _execute_scrape(self, count, parcel, path): + """Do the scraping with retries and backoff + + Args: + count (int): Iteration number (for logging) + parcel (Parcel): Parcel we're scraping + path (str): Path to write HTML file to + + Raises: + exc: Connection or Timeout exception from requests + """ + url = self._get_scrape_url(parcel) + + # create the directory(s) + try: + os.makedirs(os.path.dirname(path)) + except FileExistsError: + pass + + request_tries = 0 time.sleep(pow(self.request_error_backoff_secs, request_tries)) pass From f520ee573fb393a60c7b68d415291f5860d476ea Mon Sep 17 00:00:00 2001 From: James Shannon <james@jamesshannon.com> Date: Wed, 4 Nov 2020 13:29:54 -0800 Subject: [PATCH 6/7] FIxex * Mixed up lat/long based on x/y from centroid * Write CSV header --- cptlib/parcels.py | 5 +++-- cptlib/parsers.py | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cptlib/parcels.py b/cptlib/parcels.py index f29c51b..72b4c61 100644 --- a/cptlib/parcels.py +++ b/cptlib/parcels.py @@ -34,8 +34,9 @@ def centroid_fn(record): if not points: return None - centroid = list(Polygon(points).centroid.coords)[0] - return (centroid[0], centroid[1]) + centroid_xy = list(Polygon(points).centroid.coords)[0] + # latitude is y, longitude is x + return (centroid_xy[1], centroid_xy[0]) return centroid_fn diff --git a/cptlib/parsers.py b/cptlib/parsers.py index fddf28b..bfa7c01 100644 --- a/cptlib/parsers.py +++ b/cptlib/parsers.py @@ -25,6 +25,7 @@ def __init__(self, parcels_generator, data_dir): csv_fieldnames = ['address', 'apn', 'longitude', 'latitude', 'tax', 'county'] self.csv_writer = csv.DictWriter(self.csv_file, fieldnames=csv_fieldnames) + self.csv_writer.writeheader() def parse(self): """Execute the parser. Loop through Parcels and parse local HTML files. From decab813aff7672eeca9d46e9ad0413af94d91a2 Mon Sep 17 00:00:00 2001 From: James Shannon <james@jamesshannon.com> Date: Fri, 6 Nov 2020 09:41:19 -0800 Subject: [PATCH 7/7] Stuff * Added length property to each Parcels generator * Added % status to parser method * Added test_limit to parser method for testing --- cptlib/parcels.py | 16 +++++++++++++++- cptlib/parsers.py | 17 ++++++++++++++++- cptlib/scrapers.py | 4 +++- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/cptlib/parcels.py b/cptlib/parcels.py index 72b4c61..9e60319 100644 --- a/cptlib/parcels.py +++ b/cptlib/parcels.py @@ -107,6 +107,8 @@ def __init__(self, county_code, apn_column, address_column, centroid_fn): self.address_column = address_column self.centroid_fn = centroid_fn + self.length = 0 + self.valid_apn_pattern = None def _get_address(self, row): @@ -153,10 +155,16 @@ def _make_parcel(self, row): row[self.apn_column], self.centroid_fn(row)) def __iter__(self): - """I'm an interator + """I'm an interator! """ return self + def __next__(self): + raise NotImplementedError + + def __len__(self): + return self.length + class ParcelsCSV(Parcels): @@ -177,6 +185,11 @@ def __init__(self, county_code, apn_column, address_column, centroid_fn, super().__init__(county_code, apn_column, address_column, centroid_fn) self.csv_file = open(csv_file_path, encoding='utf-8-sig') + # length is # of rows minus header row + self.length = sum(1 for line in self.csv_file) - 1 + + # reset the file before creating dictreader + self.csv_file.seek(0) self.csv_reader = csv.DictReader(self.csv_file) def __next__(self): @@ -216,6 +229,7 @@ def __init__(self, county_code, apn_column, address_column, centroid_fn, super().__init__(county_code, apn_column, address_column, centroid_fn) self.sf = shapefile.Reader(shape_file_path) + self.length = len(self.sf) self.idx = 0 # we only know how to deal with polygons diff --git a/cptlib/parsers.py b/cptlib/parsers.py index bfa7c01..6870806 100644 --- a/cptlib/parsers.py +++ b/cptlib/parsers.py @@ -20,6 +20,9 @@ def __init__(self, parcels_generator, data_dir): self.parcels = parcels_generator self.data_dir = data_dir + # set this to a value to test parsing on only x files + self.test_limit = None + csv_file_path = os.path.join(data_dir, 'output.csv') self.csv_file = open(csv_file_path, 'w') csv_fieldnames = ['address', 'apn', 'longitude', 'latitude', 'tax', @@ -32,11 +35,19 @@ def parse(self): """ count = 0 + print('Scraping {} parcels'.format(len(self.parcels))) + for parcel in self.parcels: count += 1 + + # Break out of loop after a specific number for testing purposes + if self.test_limit and count > self.test_limit: + print('*** Exiting after test limit of {}'.format(self.test_limit)) + break + path = os.path.join(self.data_dir, parcel.html_file_path) - print(count, path) + #print(count, path) try: with gzip.open(path, 'rt') as f_in: @@ -45,6 +56,9 @@ def parse(self): print(count, '-> HTML file not found') continue + if count % 500 == 0: + print('Parsed {} records ({:.0%})'.format(count, count / len(self.parcels))) + if self._parse_html(parcel, html): self.csv_writer.writerow(parcel.csv_row) @@ -56,6 +70,7 @@ def parse(self): print(count, '-> Could not parse file') + def _parse_html(self, parcel, html): """Should be overridden with specific parsing logic """ diff --git a/cptlib/scrapers.py b/cptlib/scrapers.py index a15f130..7867656 100644 --- a/cptlib/scrapers.py +++ b/cptlib/scrapers.py @@ -50,6 +50,8 @@ def scrape(self): start_times = deque(maxlen=concur) expected_time = concur / max(self.request_avg_qps, 15) + print('Scraping {} parcels'.format(len(self.parcels))) + with concurrent.futures.ThreadPoolExecutor(max_workers=concur) as executor: for parcel in self.parcels: if len(futures) > concur: @@ -75,7 +77,7 @@ def scrape(self): # before launching the thread. path = os.path.join(self.data_dir, parcel.html_file_path) - print(count, parcel.apn, path) + print(count, parcel.apn) # Check if the file already exists if os.path.exists(path):