forked from Chapoly1305/FindMy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrequest_reports.py
executable file
·155 lines (127 loc) · 6.59 KB
/
request_reports.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python3
import argparse
import base64
import datetime
import glob
import hashlib
import json
import os
import sqlite3
import struct
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cores.pypush_gsa_icloud import icloud_login_mobileme, generate_anisette_headers
def sha256(data):
digest = hashlib.new("sha256")
digest.update(data)
return digest.digest()
def decrypt(enc_data, algorithm_dkey, mode):
decryptor = Cipher(algorithm_dkey, mode, default_backend()).decryptor()
return decryptor.update(enc_data) + decryptor.finalize()
def decode_tag(data):
latitude = struct.unpack(">i", data[0:4])[0] / 10000000.0
longitude = struct.unpack(">i", data[4:8])[0] / 10000000.0
confidence = int.from_bytes(data[8:9], 'big')
status = int.from_bytes(data[9:10], 'big')
return {'lat': latitude, 'lon': longitude, 'conf': confidence, 'status': status}
def getAuth(regenerate=False, second_factor='sms'):
CONFIG_PATH = os.path.dirname(os.path.realpath(__file__)) + "/keys/auth.json"
if os.path.exists(CONFIG_PATH) and not regenerate:
with open(CONFIG_PATH, "r") as f:
j = json.load(f)
else:
mobileme = icloud_login_mobileme(second_factor=second_factor)
j = {'dsid': mobileme['dsid'],
'searchPartyToken': mobileme['delegates']['com.apple.mobileme']['service-data']['tokens'][
'searchPartyToken']}
with open(CONFIG_PATH, "w") as f:
json.dump(j, f)
return (j['dsid'], j['searchPartyToken'])
if __name__ == "__main__":
try:
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--hours', help='only show reports not older than these hours', type=int, default=24)
parser.add_argument('-p', '--prefix', help='only use keyfiles starting with this prefix', default='')
parser.add_argument('-r', '--regen', help='regenerate search-party-token', action='store_true')
parser.add_argument('-t', '--trusteddevice', help='use trusted device for 2FA instead of SMS',
action='store_true')
args = parser.parse_args()
sq3db = sqlite3.connect(os.path.dirname(os.path.realpath(__file__)) + '/keys/reports.db')
sq3 = sq3db.cursor()
privkeys = {}
names = {}
for keyfile in glob.glob(os.path.dirname(os.path.realpath(__file__)) + '/keys/' + args.prefix + '*.keys'):
# read key files generated with generate_keys.py
with open(keyfile) as f:
hashed_adv = priv = ''
name = os.path.basename(keyfile)[len(args.prefix):-5]
for line in f:
key = line.rstrip('\n').split(': ')
if key[0] == 'Private key':
priv = key[1]
elif key[0] == 'Hashed adv key':
hashed_adv = key[1]
if priv and hashed_adv:
privkeys[hashed_adv] = priv
names[hashed_adv] = name
else:
print(f"Couldn't find key pair in {keyfile}")
unixEpoch = int(datetime.datetime.now().strftime('%s'))
startdate = unixEpoch - (60 * 60 * args.hours)
data = {"search": [{"startDate": startdate * 1000, "endDate": unixEpoch * 1000, "ids": list(names.keys())}]}
r = requests.post("https://gateway.icloud.com/acsnservice/fetch",
auth=getAuth(regenerate=args.regen,
second_factor='trusted_device' if args.trusteddevice else 'sms'),
headers=generate_anisette_headers(),
json=data)
res = json.loads(r.content.decode())['results']
print(f'{r.status_code}: {len(res)} reports received.')
ordered = []
found = set()
# SQL query to create a table named 'report' if it does not exist
create_table_query = '''CREATE TABLE IF NOT EXISTS reports (
id_short TEXT, timestamp INTEGER, datePublished INTEGER, payload TEXT,
id TEXT, statusCode INTEGER, lat TEXT, lon TEXT, conf INTEGER, PRIMARY KEY(id_short,timestamp));'''
# Execute the SQL query
sq3.execute(create_table_query)
for report in res:
priv = int.from_bytes(base64.b64decode(privkeys[report['id']]), byteorder='big')
data = base64.b64decode(report['payload'])
# the following is all copied from https://github.com/hatomist/openhaystack-python, thanks @hatomist!
timestamp = int.from_bytes(data[0:4], 'big') +978307200
if timestamp >= startdate:
eph_key = ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP224R1(), data[5:62])
shared_key = ec.derive_private_key(priv, ec.SECP224R1(), default_backend()).exchange(ec.ECDH(), eph_key)
symmetric_key = sha256(shared_key + b'\x00\x00\x00\x01' + data[5:62])
decryption_key = symmetric_key[:16]
iv = symmetric_key[16:]
enc_data = data[62:72]
auth_tag = data[72:]
decrypted = decrypt(enc_data, algorithms.AES(decryption_key), modes.GCM(iv, auth_tag))
tag = decode_tag(decrypted)
tag['timestamp'] = timestamp
tag['isodatetime'] = datetime.datetime.fromtimestamp(timestamp).isoformat()
tag['key'] = names[report['id']]
tag['goog'] = 'https://maps.google.com/maps?q=' + str(tag['lat']) + ',' + str(tag['lon'])
found.add(tag['key'])
ordered.append(tag)
# SQL Injection Mitigation
query = "INSERT OR REPLACE INTO reports VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
parameters = (names[report['id']], timestamp, report['datePublished'], report['payload'], report['id'],
report['statusCode'], str(tag['lat']), str(tag['lon']), tag['conf'])
sq3.execute(query, parameters)
print(f'{len(ordered)} reports used.')
ordered.sort(key=lambda item: item.get('timestamp'))
for rep in ordered: print(rep)
print(f'found: {list(found)}')
print(f'missing: {[key for key in names.values() if key not in found]}')
sq3.close()
sq3db.commit()
sq3db.close()
except Exception as e:
if e == "AuthenticationError":
print("Authentication failed. Please check your Apple ID credentials.")
else:
print(e)