-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathautomated_data_generation.py
169 lines (138 loc) · 5.35 KB
/
automated_data_generation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""
This file is a utility script to automate the task of generating random taxi trip data, predicting
trip duration on that random data using the trained model in production, and uploading the entire data
to feature store.
This is only a way to generate lot of data, specifically for use in monitoring section.
This file is not a part of the main project files, and not required if random data generation isn't required.
This file is run on scheduled using Github Actions.
To launch a streamlit web app for on-demand predictions,
please refer to the 'app.py' file located in the current directory.
"""
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
import argparse
import pandas as pd
import random
from src.utils import app_utils, hopsworks_utils
import time
from typing import Tuple
def generate_random_coordinates(
min_lat: float,
max_lat: float,
min_lon: float,
max_lon: float
) -> Tuple[float, float]:
"""
Generates random latitude and longitude coordinates within specified bounds.
Args:
min_lat (float): The minimum latitude value for the region.
max_lat (float): The maximum latitude value for the region.
min_lon (float): The minimum longitude value for the region.
max_lon (float): The maximum longitude value for the region.
Returns:
tuple: A tuple containing the randomly generated latitude and longitude coordinates.
"""
latitude = random.uniform(min_lat, max_lat)
longitude = random.uniform(min_lon, max_lon)
return latitude, longitude
def generate_trip_data() -> Tuple[int, int, float, float, float, float, float]:
"""
Generates random taxi trip data.
Args:
None
Returns:
Tuple[int, int, float, float, float, float, float]: A tuple containing vendor_id, passenger_count,
pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude, and pickup_datetime.
"""
# generate 'vendor_id' with more weightage given to 'vendor_id=2'
# higher weightage is as per the observations from EDA that 'vendor_id=2' has most number of records
vendor_id = random.randint(1, 2)
# generate 'passenger_count' with equal probability
passenger_count = random.randint(1, 6)
# generate current timestamp for 'pickup_datetime'
pickup_datetime = time.time()
# generate random pickup locations
pickup_latitude, pickup_longitude = generate_random_coordinates(
min_lat=40.5,
max_lat=41.0,
min_lon=-74.25,
max_lon=-73.50
)
# generate random dropoff locations
dropoff_latitude, dropoff_longitude = generate_random_coordinates(
min_lat=40.5,
max_lat=41.2,
min_lon=-74.5,
max_lon=-73.0
)
return (
vendor_id, passenger_count, pickup_latitude,
pickup_longitude, dropoff_latitude, dropoff_longitude,
pickup_datetime
)
def run(
project: str
) -> None:
"""
Generates random taxi trip data and inserts it into the feature store.
Args:
project (str): The name of the project.
Returns:
None.
"""
# get random taxi trip data
vendor_id, passenger_count, pickup_latitude, \
pickup_longitude, dropoff_latitude, dropoff_longitude, \
pickup_datetime = generate_trip_data()
# get all features engineered
df_predictor = app_utils.process_input(
vendor_id,
passenger_count,
pickup_latitude,
pickup_longitude,
dropoff_latitude,
dropoff_longitude,
pickup_datetime
)
# parse command-line arguments to get hopsworks api key
parser = argparse.ArgumentParser()
parser.add_argument('--api-key', help='Hopsworks API key')
args = parser.parse_args()
# login to hopsworks pass the project and api key as arguements
project = hopsworks_utils.login_to_hopsworks(project="nyc_taxi_trip_duration", api_key=args.api_key)
# get model
model = app_utils.get_model(project=project, model_name="final_xgboost", version=1)
# make predictions
prediction = model.predict(df_predictor) # in seconds
df_prediction = pd.DataFrame.from_dict({ # create DataFrame from a prediction
'prediction': prediction
})
# connect to feature store
try:
feature_store = project.get_feature_store()
except Exception as e:
raise Exception(f"Error connecting to Feature Store at Hopsworks {project}: {e}")
# get feature group
feature_group_name = "predictions"
prediction_feature_group = feature_store.get_feature_group(
name=feature_group_name,
version=1
)
# create DataFrame from a pickup_datetime
df_pickup_datetime = pd.DataFrame.from_dict({
'pickup_datetime': [pd.to_datetime(pickup_datetime, unit='s').strftime('%Y-%m-%d %H:%M:%S')]
})
# concat dataframes
df_updated = pd.concat([df_pickup_datetime, df_predictor, df_prediction], axis=1)
# to make feature datatypes compatible with that of feature store
# warning: this is unnecessary storage of very small number to big size dtype,
# and should be resolved
df_updated = df_updated.astype({
'h2amto7am_7amto2am': 'int64',
'holiday': 'int64'
})
# insert to feature group
prediction_feature_group.insert(df_updated)
# run the file
run(project="nyc_taxi_trip_duration")