Skip to content

Commit

Permalink
Merge pull request #10 from MarkoHaralovic/lab2
Browse files Browse the repository at this point in the history
Lab2
  • Loading branch information
MarkoHaralovic authored Nov 26, 2023
2 parents b0c8e48 + a2fbf4f commit 001576e
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 17 deletions.
4 changes: 2 additions & 2 deletions drugi_zadatak/ASA_job1/ASA_job1.asaql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
SELECT
*
INTO
[YourOutputAlias]
DataLakeStorageGen2BlobStorage1
FROM
[YourInputAlias]
EventHub1
21 changes: 21 additions & 0 deletions drugi_zadatak/ASA_job1/Inputs/EventHub1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"Name": "EventHub1",
"Type": "Data Stream",
"DataSourceType": "Event Hub",
"EventHubProperties": {
"ServiceBusNamespace": "labos1",
"EventHubName": "labos1eventhub",
"SharedAccessPolicyName": "RootManageSharedAccessKey",
"SharedAccessPolicyKey": "********",
"ConsumerGroupName": "$Default",
"AuthenticationMode": "ConnectionString"
},
"DataSourceCredentialDomain": "ef5fcfaa-5b94-4290-8c6f-5db0990128c5.StreamAnalystics",
"Serialization": {
"Type": "Json",
"Encoding": "UTF8"
},
"PartitionKey": null,
"CompressionType": "None",
"ScriptType": "Input"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"Name": "DataLakeStorageGen2BlobStorage1",
"DataSourceType": "Blob Storage",
"BlobStorageProperties": {
"StorageAccounts": [{
"AccountName": "tpioudatalake",
"AccountKey": "********"
}],
"Container": "asa1container",
"PathPattern": "{date}/{time}",
"DateFormat": "yyyy/MM/dd",
"TimeFormat": "HH/mm",
"AuthenticationMode": "ConnectionString"
},
"DataSourceCredentialDomain": "b205bdbe-df60-49ed-a6b6-4ca9a547517e.StreamAnalystics",
"Serialization": {
"Type": "Json",
"Encoding": "UTF8",
"Format": "LineSeparated"
},
"MinimumRows": null,
"MaximumTime": null,
"ScriptType": "Output"
}
4 changes: 0 additions & 4 deletions drugi_zadatak/ASA_job1/asaproj.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
"filePath": "JobConfig.json",
"subType": "JobConfig"
},
{
"filePath": "Inputs/undefined",
"subType": "Input"
},
{
"filePath": "Inputs/EventHub1.json",
"subType": "Input"
Expand Down
40 changes: 29 additions & 11 deletions drugi_zadatak/ASA_job2/ASA_job2.asaql
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@

-- Before you begin
-- 1. Add input: right-click the Inputs folder and select "ASA: Add Input" to prepare your input data.
-- 2. Add output: right-click the Outputs folder and select "ASA: Add Output" to choose your sink type.
-- 3. Edit your query below and start testing your ASA job locally.
-- For more information, please visit: https://docs.microsoft.com/en-us/azure/stream-analytics/quick-create-visual-studio-code
-- Before you begin
-- 1. Add input: right-click the Inputs folder and select "ASA: Add Input" to prepare your input data.
-- 2. Add output: right-click the Outputs folder and select "ASA: Add Output" to choose your sink type.
-- 3. Edit your query below and start testing your ASA job locally.
-- For more information, please visit: https://docs.microsoft.com/en-us/azure/stream-analytics/quick-create-visual-studio-code

SELECT
*
INTO
[YourOutputAlias]
FROM
[YourInputAlias]
SELECT
author_fullname AS author_fullname,
title AS title,
selftext AS content,
ups AS ups,
downs AS downs,
upvote_ratio AS upvoteRatio,
score AS score,
created_utc AS creationDate,
num_comments AS num_comments,
CASE
WHEN is_video = true THEN 'video'
WHEN thumbnail != 'self' AND thumbnail != 'default' AND thumbnail != '' THEN 'image'
ELSE 'text'
END AS postType
INTO
CosmosDB1
FROM
EventHub1
AS post
PARTITION BY author_fullname
WHERE
num_comments > 0

21 changes: 21 additions & 0 deletions drugi_zadatak/ASA_job2/Inputs/EventHub1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"Name": "EventHub1",
"Type": "Data Stream",
"DataSourceType": "Event Hub",
"EventHubProperties": {
"ServiceBusNamespace": "labos1",
"EventHubName": "labos1eventhub",
"SharedAccessPolicyName": "RootManageSharedAccessKey",
"SharedAccessPolicyKey": "********",
"ConsumerGroupName": "$Default",
"AuthenticationMode": "ConnectionString"
},
"DataSourceCredentialDomain": "ef5fcfaa-5b94-4290-8c6f-5db0990128c5.StreamAnalystics",
"Serialization": {
"Type": "Json",
"Encoding": "UTF8"
},
"PartitionKey": null,
"CompressionType": "None",
"ScriptType": "Input"
}
14 changes: 14 additions & 0 deletions drugi_zadatak/ASA_job2/Outputs/CosmosDB1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"Name": "CosmosDB1",
"DataSourceType": "DocumentDB",
"DocumentDbProperties": {
"AccountId": "fervjestinadb",
"AccountKey": "********",
"Database": "FilteredRedditPosts",
"ContainerName": "FilteredPostsContainer",
"DocumentId": "",
"AuthenticationMode": "ConnectionString"
},
"DataSourceCredentialDomain": "0aa1f090-78b0-429d-a03d-a55d9f3290c8.StreamAnalystics",
"ScriptType": "Output"
}
8 changes: 8 additions & 0 deletions drugi_zadatak/ASA_job2/asaproj.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@
{
"filePath": "JobConfig.json",
"subType": "JobConfig"
},
{
"filePath": "Inputs/EventHub1.json",
"subType": "Input"
},
{
"filePath": "Outputs/CosmosDB1.json",
"subType": "Output"
}
]
}
28 changes: 28 additions & 0 deletions drugi_zadatak/new_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

EVENT_HUB_CONNECTION_STR = "Endpoint=sb://labos1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=6QvWYyyNRIDg8FCqPOxoufaNl++n5EiGt+AEhLHqXuY=;EntityPath=labos1eventhub"
EVENT_HUB_NAME = "labos1eventhub"
CONSUMER_GROUP = "$Default" # Using the default consumer group ??


async def on_event(partition_context, event):
print(
'Received the event: "{}" from the partition with ID: "{}"'.format(
event.body_as_str(encoding="UTF-8"), partition_context.partition_id
)
)


consumer_client = EventHubConsumerClient.from_connection_string(
conn_str=EVENT_HUB_CONNECTION_STR,
consumer_group=CONSUMER_GROUP,
eventhub_name=EVENT_HUB_NAME,
)

try:
asyncio.run(consumer_client.receive(on_event=on_event))
except KeyboardInterrupt:
print("Receiving has stopped.")
finally:
consumer_client.close()
55 changes: 55 additions & 0 deletions drugi_zadatak/new_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import asyncio
import requests
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient
import json
EVENT_HUB_CONNECTION_STR = "Endpoint=sb://labos1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=6QvWYyyNRIDg8FCqPOxoufaNl++n5EiGt+AEhLHqXuY=;EntityPath=labos1eventhub"

EVENT_HUB_NAME = "labos1eventhub"
REDDIT_BASE_URL = "https://www.reddit.com/r/dataengineering/top/.json?t=all&limit=10"
HEADERS = {
"User-Agent": "Python/urllib"
} # gotten from https://github.com/reddit-archive/reddit/wiki/API#rules


async def fetch_reddit_top_posts(after=None):
url = REDDIT_BASE_URL
if after:
url += f"&after={after}"

response = requests.get(url, headers=HEADERS)
response.raise_for_status()
data = response.json().get("data", {})
return data.get("children", []), data.get("after")


async def run():
# Create a producer client to send messages to the event hub.
# Specify a connection string to your event hubs namespace and
# the event hub name.
# code available at the link:https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-python-get-started-send?tabs=connection-string%2Croles-azure-portal
producer = EventHubProducerClient.from_connection_string(
conn_str=EVENT_HUB_CONNECTION_STR, eventhub_name=EVENT_HUB_NAME
)
async with producer:
after = None
while True:
top_posts, after = await fetch_reddit_top_posts(after)
if not top_posts:
break

event_data_batch = await producer.create_batch()

for post in top_posts:
print(post["data"].keys())
post_data = post.get("data", {})
event_data_batch.add(EventData(json.dumps(post_data)))

await producer.send_batch(event_data_batch)
await asyncio.sleep(10)


asyncio.run(run())

while True:
pass

0 comments on commit 001576e

Please # to comment.