-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathraw_req_async.py
49 lines (41 loc) · 1.61 KB
/
raw_req_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
"""Using CDP.fetch.RequestPaused to filter content in real-time."""
import asyncio
import colorama
import mycdp
import sys
from seleniumbase import decorators
from seleniumbase import cdp_driver
c1 = colorama.Fore.RED + colorama.Back.LIGHTYELLOW_EX
c2 = colorama.Fore.BLUE + colorama.Back.LIGHTCYAN_EX
cr = colorama.Style.RESET_ALL
if "linux" in sys.platform:
c1 = c2 = cr = ""
class RequestPausedTest():
async def request_paused_handler(self, event, tab):
r = event.request
is_image = ".png" in r.url or ".jpg" in r.url or ".gif" in r.url
if not is_image: # Let the data through
tab.feed_cdp(
mycdp.fetch.continue_request(request_id=event.request_id)
)
else: # Block the data (images)
TIMED_OUT = mycdp.network.ErrorReason.TIMED_OUT
s = f"{c1}BLOCKING{cr} | {c2}{r.method}{cr} | {r.url}"
print(f" >>> ------------\n{s}")
tab.feed_cdp(
mycdp.fetch.fail_request(event.request_id, TIMED_OUT)
)
async def start_test(self):
driver = await cdp_driver.start_async()
tab = await driver.get("about:blank")
tab.add_handler(mycdp.fetch.RequestPaused, self.request_paused_handler)
url = "https://gettyimages.com/photos/firefly-2003-nathan"
await driver.get(url)
await asyncio.sleep(5)
@decorators.print_runtime("RequestPausedTest")
def main():
test = RequestPausedTest()
loop = asyncio.new_event_loop()
loop.run_until_complete(test.start_test())
if __name__ == "__main__":
main()