-
Notifications
You must be signed in to change notification settings - Fork 0
/
reader.py
83 lines (67 loc) · 2 KB
/
reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#!/usr/bin/env python
import pika
import time
import md5
import sys
import os
from utils import *
class Reader:
#
# Initialize connection and queues
#
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
self.channel.queue_declare(queue='reads',durable=True) #queue for receiving filepath requests
self.channel.queue_declare(queue='responses', durable=True) #queue for sending filepath responses
#
# Checks if <filename> exists in database <dbname>
#
def fileExists(self,dbname, filename):
return os.path.isfile('%s/%s' % (dbname,filename))
#
# Callback function for each "reads" message
# Scans databases for the screenshot requested, and responds with the filepath if it exists.
#
def openEntry(self,ch, method, properties, msgbody):
msg = eval(msgbody)
print " [x] Received:"
printmsg(msg)
filename = md5.new(msg[URL]).hexdigest()
workers = msg[WORKERS]
exists = False
for db in workers:
if self.fileExists(db, filename):
self.respond(msg[URL],db,filename)
exists = True
if not exists:
self.respond(msg[URL],'','',False)
ch.basic_ack(delivery_tag = method.delivery_tag)
#
# Psuh a message to the "responses" queue, corresponding to the "read" requests
#
def respond(self,url,db,filename,exists=True):
mymsg = {TYPE:RES_SCREENSHOT_FILEPATH,
EXISTS:exists,
URL:url}
if exists:
mymsg[FILEPATH] = db+'/'+filename
print " [x] Push to queue = 'responses': "
printmsg(mymsg)
self.channel.basic_publish(exchange='', routing_key='responses', body=str(mymsg),properties=pika.BasicProperties(delivery_mode = 2,))
#
# Pop messages from queue "reads"
#
def work(self):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.openEntry, queue='reads')
self.channel.start_consuming()
reader = None
def __main__():
global reader
reader = Reader()
reader.work()
try:
__main__()
except KeyboardInterrupt:
reader.connection.close()