|
| 1 | +""" |
| 2 | +Many thanks to: https://www.scrapingbee.com/blog/web-scraping-twitter/ |
| 3 | +With minor adjustments - ProgrammingIncluded |
| 4 | +""" |
| 5 | +import re |
| 6 | +import os |
| 7 | +import json |
| 8 | +import argparse |
| 9 | +import shutil |
| 10 | +import time |
| 11 | + |
| 12 | +from random import randint |
| 13 | +from dataclasses import dataclass |
| 14 | + |
| 15 | +from selenium import webdriver |
| 16 | +from selenium.webdriver.common.by import By |
| 17 | +from selenium.webdriver.support import expected_conditions as EC |
| 18 | +from selenium.webdriver.chrome.service import Service |
| 19 | +from webdriver_manager.chrome import ChromeDriverManager |
| 20 | +from selenium.webdriver.support.ui import WebDriverWait |
| 21 | +from selenium.common.exceptions import WebDriverException |
| 22 | + |
| 23 | +SCRAPE_N_TWEETS = 20 |
| 24 | + |
| 25 | +driver = webdriver.Chrome(service=Service(ChromeDriverManager().install())) |
| 26 | + |
| 27 | +@dataclass(init=True, repr=True, unsafe_hash=True) |
| 28 | +class Tweet: |
| 29 | + id: str |
| 30 | + tag_text: str |
| 31 | + name: str |
| 32 | + tweet_text: str |
| 33 | + retweet_count: str |
| 34 | + handle: str |
| 35 | + timestamp: str |
| 36 | + like_count: str |
| 37 | + reply_count: str |
| 38 | + potential_boost: bool |
| 39 | + |
| 40 | +def ensures_or(f, otherwise="NULL"): |
| 41 | + try: |
| 42 | + return f() |
| 43 | + except Exception as e: |
| 44 | + print("Could not obtain using {} instead. Error: {}".format(otherwise, str(e))) |
| 45 | + |
| 46 | + return otherwise |
| 47 | + |
| 48 | +def remove_elements(driver, elements): |
| 49 | + elements = ["'{}'".format(v) for v in elements] |
| 50 | + driver.execute_script(""" |
| 51 | + const values = [{}]; |
| 52 | + for (let i = 0; i < values.length; ++i) {{ |
| 53 | + var element = document.querySelector(`[data-testid='${{values[i]}}']`); |
| 54 | + if (element) |
| 55 | + element.parentNode.removeChild(element); |
| 56 | + }} |
| 57 | + """.format(",".join(elements))) |
| 58 | + |
| 59 | +def fetch_html(url, fpath, force=False, number_posts_to_cap=SCRAPE_N_TWEETS, bio_only=False): |
| 60 | + if not force and os.path.exists(fpath): |
| 61 | + return |
| 62 | + elif force: |
| 63 | + shutil.rmtree(fpath) |
| 64 | + |
| 65 | + os.makedirs(fpath) |
| 66 | + |
| 67 | + driver.get(url) |
| 68 | + state = "" |
| 69 | + while state != "complete": |
| 70 | + print("loading not complete") |
| 71 | + time.sleep(randint(3, 5)) |
| 72 | + state = driver.execute_script("return document.readyState") |
| 73 | + |
| 74 | + try: |
| 75 | + WebDriverWait(driver, 10).until(EC.presence_of_element_located( |
| 76 | + (By.CSS_SELECTOR, '[data-testid="tweet"]'))) |
| 77 | + except WebDriverException: |
| 78 | + print("Tweets did not appear!, Try setting headless=False to see what is happening") |
| 79 | + |
| 80 | + driver.find_element(By.XPATH, "/html/body/div[1]/div/div/div[1]/div[2]/div/div/div/div/div/div[2]/div[2]/div/div[2]/div[1]").click() |
| 81 | + |
| 82 | + # delete bottom element |
| 83 | + remove_elements(driver, ["BottomBar"]) |
| 84 | + |
| 85 | + metadata = {} |
| 86 | + metadata["bio"] = ensures_or(lambda: driver.find_element(By.CSS_SELECTOR,'div[data-testid="UserDescription"]').text) |
| 87 | + metadata["name"], metadata["username"] = ensures_or(lambda: driver.find_element(By.CSS_SELECTOR,'div[data-testid="UserName"]').text.split('\n'), ("NULL", "NULL")) |
| 88 | + metadata["location"] = ensures_or(lambda: driver.find_element(By.CSS_SELECTOR,'span[data-testid="UserLocation"]').text) |
| 89 | + metadata["website"] = ensures_or(lambda: driver.find_element(By.CSS_SELECTOR,'a[data-testid="UserUrl"]').text) |
| 90 | + metadata["join_date"] = ensures_or(driver.find_element(By.CSS_SELECTOR,'span[data-testid="UserJoinDate"]').text) |
| 91 | + metadata["following"] = ensures_or(driver.find_element(By.XPATH, "//span[contains(text(), 'Following')]/ancestor::a/span").text) |
| 92 | + metadata["followers"] = ensures_or(driver.find_element(By.XPATH, "//span[contains(text(), 'Followers')]/ancestor::a/span").text) |
| 93 | + |
| 94 | + # Force utf-16 |
| 95 | + # Save a copy of the metadata |
| 96 | + with open(os.path.join(fpath, "metadata.json"), "w", encoding="utf-8") as f: |
| 97 | + json.dump(metadata, f, ensure_ascii=False) |
| 98 | + |
| 99 | + # Save a screen shot of the bio |
| 100 | + driver.save_screenshot(os.path.join(fpath, "profile.png")) |
| 101 | + |
| 102 | + if bio_only: |
| 103 | + return |
| 104 | + |
| 105 | + # Create tweets folder |
| 106 | + tweets_path = os.path.join(fpath, "tweets") |
| 107 | + os.makedirs(tweets_path) |
| 108 | + |
| 109 | + tweets_metadata = [] |
| 110 | + id_tracker = 0 |
| 111 | + last_id = id_tracker |
| 112 | + last_id_count = 0 |
| 113 | + tweets_tracker = set() |
| 114 | + boosted_tracker = set() |
| 115 | + last_height = 0 |
| 116 | + new_height = 0 |
| 117 | + try: |
| 118 | + while True: |
| 119 | + if id_tracker >= number_posts_to_cap - 1: |
| 120 | + break |
| 121 | + elif last_id_count > 5: |
| 122 | + print("No more data to load?") |
| 123 | + break |
| 124 | + |
| 125 | + if last_id == id_tracker: |
| 126 | + last_id_count += 1 |
| 127 | + else: |
| 128 | + last_id = id_tracker |
| 129 | + last_id_count = 0 |
| 130 | + |
| 131 | + tweets = driver.find_elements(By.CSS_SELECTOR, '[data-testid="tweet"]') |
| 132 | + for tweet in tweets: |
| 133 | + # Try to scroll there first. |
| 134 | + driver.execute_script("return arguments[0].scrollIntoView();", tweet) |
| 135 | + time.sleep(1) |
| 136 | + driver.execute_script("window.scrollTo(0, window.pageYOffset - 50);") |
| 137 | + |
| 138 | + tm = {"id": id_tracker} |
| 139 | + tm["tag_text"] = ensures_or(lambda: tweet.find_element(By.CSS_SELECTOR,'div[data-testid="User-Names"]').text) |
| 140 | + try: |
| 141 | + tm["name"], tm["handle"], _, tm["timestamp"] = ensures_or(lambda: tm["tag_text"].split('\n'), tuple(["UKNOWN" for _ in range(4)])) |
| 142 | + except Exception as e: |
| 143 | + print("Unable to unpack name values. {}".format(e)) |
| 144 | + tm["name"], tm["handle"], tm["timestamp"] = tm["tag_text"], "ERR", "ERR" |
| 145 | + |
| 146 | + tm["tweet_text"] = ensures_or(lambda: tweet.find_element(By.CSS_SELECTOR,'div[data-testid="tweetText"]').text) |
| 147 | + tm["retweet_count"] = ensures_or(lambda: tweet.find_element(By.CSS_SELECTOR,'div[data-testid="retweet"]').text) |
| 148 | + tm["like_count"] = ensures_or(lambda: tweet.find_element(By.CSS_SELECTOR,'div[data-testid="like"]').text) |
| 149 | + tm["reply_count"] = ensures_or(lambda: tweet.find_element(By.CSS_SELECTOR,'div[data-testid="reply"]').text) |
| 150 | + |
| 151 | + if tm["tweet_text"] != "NULL": |
| 152 | + if tm["tweet_text"] in boosted_tracker: |
| 153 | + # We need to go back in time to find the boosted post! |
| 154 | + for t in tweets_metadata: |
| 155 | + if t["tweet_text"] == tm["tweet_text"]: |
| 156 | + t["potential_boost"] = True |
| 157 | + break |
| 158 | + |
| 159 | + tm["potential_boost"] = False |
| 160 | + boosted_tracker.add(tm["tweet_text"]) |
| 161 | + else: |
| 162 | + tm["potential_boost"] = False |
| 163 | + |
| 164 | + dtm = Tweet(**tm) |
| 165 | + if dtm in tweets_tracker: |
| 166 | + continue |
| 167 | + |
| 168 | + try: |
| 169 | + # Try to remove elements before screenshot |
| 170 | + remove_elements(driver, ["sheetDialog", "mask"]) |
| 171 | + tweet.screenshot(os.path.join(tweets_path, "{}.png".format(id_tracker))) |
| 172 | + except Exception as e: |
| 173 | + # Failure to screenshot maybe because the tweet is too stale. Skip for now. |
| 174 | + continue |
| 175 | + |
| 176 | + id_tracker += 1 |
| 177 | + tweets_metadata.append(tm) |
| 178 | + tweets_tracker.add(dtm) |
| 179 | + |
| 180 | + if id_tracker > number_posts_to_cap: |
| 181 | + break |
| 182 | + |
| 183 | + # Scroll! |
| 184 | + # Scroll down to bottom |
| 185 | + driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") |
| 186 | + |
| 187 | + # Wait to load page |
| 188 | + time.sleep(randint(2, 4)) |
| 189 | + |
| 190 | + # Calculate new scroll height and compare with last scroll height |
| 191 | + new_height = driver.execute_script("return document.body.scrollHeight") |
| 192 | + if new_height == last_height: |
| 193 | + break |
| 194 | + last_height = new_height |
| 195 | + except Exception as e: |
| 196 | + raise e |
| 197 | + finally: |
| 198 | + # Dump all metadata |
| 199 | + with open(os.path.join(tweets_path, "tweets.json"), "w", encoding="utf-8") as f: |
| 200 | + json.dump(tweets_metadata, f, ensure_ascii=False) |
| 201 | + |
| 202 | +def parse_args(): |
| 203 | + parser = argparse.ArgumentParser(description="Process Twitter Account Metadata") |
| 204 | + parser.add_argument("--input-json", "-i", help="Input json file", default="input.json") |
| 205 | + parser.add_argument("--force", "-f", help="Force re-download everything. WARNING, will delete outputs.", action="store_true") |
| 206 | + parser.add_argument("--posts", "-p", help="Max number of posts to screenshot.", default=SCRAPE_N_TWEETS) |
| 207 | + parser.add_argument("--bio-only", "-b", help="Only store bio, no snapshots or tweets.", action="store_true") |
| 208 | + return parser.parse_args() |
| 209 | + |
| 210 | +def main(): |
| 211 | + args = parse_args() |
| 212 | + output_folder = "snapshots" |
| 213 | + os.makedirs(output_folder, exist_ok=True) |
| 214 | + |
| 215 | + data = [] |
| 216 | + weird_opening = "window\..* = (\[[\S\s]*)" |
| 217 | + with open(args.input_json) as f: |
| 218 | + txt = f.read() |
| 219 | + match = re.match(weird_opening, txt) |
| 220 | + if match.group(1): |
| 221 | + txt = match.group(1) |
| 222 | + # Remove the first line metadata |
| 223 | + data = json.loads(txt) |
| 224 | + |
| 225 | + for d in data: |
| 226 | + account = d["following"] |
| 227 | + fetch_html(account["userLink"], fpath=os.path.join(output_folder, account["accountId"]), force=args.force, bio_only=args.bio_only) |
| 228 | + |
| 229 | +if __name__ == "__main__": |
| 230 | + main() |
0 commit comments