-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRealTimeDemo-1.scala
54 lines (35 loc) · 1.43 KB
/
RealTimeDemo-1.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// Databricks notebook source
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
// To connect to an Event Hub, EntityPath is required as part of the connection string.
// Here, we assume that the connection string from the Azure portal does not have the EntityPath part.
import org.apache.spark.sql.functions._
// COMMAND ----------
val connectionString = ConnectionStringBuilder("<EVENT HUBS CONNECTION STRING>").build
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)
var eventhubs =
spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.load()
eventhubs.select(($"body").cast("string"))
// COMMAND ----------
eventhubs.writeStream
.outputMode("append")
.format("json")
.option("path", "dbfs:/<storeLocation>")
.option("checkpointLocation", "dbfs:/<checkpointLocation>")
.start()
// COMMAND ----------
val df = eventhubs.select(($"body").cast("string"))
display(df)
// COMMAND ----------
val jsDF = df.select(get_json_object($"body", "$.timestamp").cast("timestamp").alias("time"),
get_json_object($"body", "$.temperature").alias("temp"),
get_json_object($"body", "$.humidity").alias("humidity"),
get_json_object($"body", "$.city").alias("location"))
// COMMAND ----------
display(jsDF)
// COMMAND ----------
display(jsDF)
// COMMAND ----------