You work for the data science team at STEDI, a small startup focused on assessing balance for seniors. STEDI has an application that collects data from seniors during a small exercise. The user logs in, and then selects the customer they are working with. Then the user starts a timer, and clicks a button with each step the senior takes. When the senior has reached 30 steps, their test is finished. The data transmitted enables the application to monitor seniors’ balance risk.
- Start the docker workspace from the root of the repository folder:
cd [repositoryfolder]
docker-compose up
- Make sure the containers are all running (you should see 9 processes):
docker ps
-
Log in to the STEDI application: http://localhost:4567
-
Click Create New Customer, create a test customer and submit
-
Click start, then add steps until you reach 30 and the timer has stopped
-
Repeat this three times, and you will receive a risk score
The STEDI data science team has configured some real-time data sources using Kafka Connect. One of those data sources is Redis. When a customer is first assessed in the STEDI application, their record is added to a sorted set called Customer in redis. Redis is running in a docker container on the default redis port (6379). There is no redis password configured. Redis is configured as a Kafka source, and whenever any data is saved to Redis (including Customer information), a payload is published to the Kafka topic called redis-server.
- To connect to the redis instance, from the terminal connect to Redis:
docker exec -it nd029-c2-apache-spark-and-spark-streaming_redis_1 redis-cli
- Type:
zrange customer 0 -1
-
Locate the the customer you created in the output
-
In another terminal run this command to start monitoring the kafka topic:
docker exec -it nd029-c2-apache-spark-and-spark-streaming_kafka_1 kafka-console-consumer --bootstrap-server localhost:9092 --topic redis-server
- Back in the redis-cli, type:
zadd Customer 0 "{\"customerName\":\"Sam Test\",\"email\":\"sam.test@test.com\",\"phone\":\"8015551212\",\"birthDay\":\"2001-01-03\"}"
In the kafka consumer terminal you will see the following payload appear in the redis-server topic:
{"key":"Q3VzdG9tZXI=","existType":"NONE","ch":false,"incr":false,"zSetEntries":[{"element":"eyJjdXN0b21lck5hbWUiOiJTYW0gVGVzdCIsImVtYWlsIjoic2FtLnRlc3RAdGVzdC5jb20iLCJwaG9uZSI6IjgwMTU1NTEyMTIiLCJiaXJ0aERheSI6IjIwMDEtMDEtMDMifQ==","score":0.0}],"zsetEntries":[{"element":"eyJjdXN0b21lck5hbWUiOiJTYW0gVGVzdCIsImVtYWlsIjoic2FtLnRlc3RAdGVzdC5jb20iLCJwaG9uZSI6IjgwMTU1NTEyMTIiLCJiaXJ0aERheSI6IjIwMDEtMDEtMDMifQ==","score":0.0}]}
Formatted version of the payload:
{"key":"__Q3VzdG9tZXI=__",
"existType":"NONE",
"Ch":false,
"Incr":false,
"zSetEntries":[{
"element":"__eyJjdXN0b21lck5hbWUiOiJTYW0gVGVzdCIsImVtYWlsIjoic2FtLnRlc3RAdGVzdC5jb20iLCJwaG9uZSI6IjgwMTU1NTEyMTIiLCJiaXJ0aERheSI6IjIwMDEtMDEtMDMifQ==__",
"Score":0.0
}],
"zsetEntries":[{
"element":"eyJjdXN0b21lck5hbWUiOiJTYW0gVGVzdCIsImVtYWlsIjoic2FtLnRlc3RAdGVzdC5jb20iLCJwaG9uZSI6IjgwMTU1NTEyMTIiLCJiaXJ0aERheSI6IjIwMDEtMDEtMDMifQ==",
"score":0.0
}]
}
Both the key and the zSetEntries fields contain data that is base64 encoded. If you base64 decoded the above encoded data it would look like this:
{"key":"__Customer__",
"existType":"NONE",
"Ch":false,
"Incr":false,
"zSetEntries":[{
"element":"__{"customerName":"Sam Test","email":"sam.test@test.com","phone":"8015551212","birthDay":"2001-01-03"}",
"Score":0.0
}__],
"zsetEntries":[{
"element":"{"customerName":"Sam Test","email":"sam.test@test.com","phone":"8015551212","birthDay":"2001-01-03"}",
"score":0.0
}]
}
The application development team has programmed certain business events to be published automatically to Kafka. Whenever a customer takes an assessment, their risk score is generated, as long as they have four or more completed assessments. The risk score is transmitted to a Kafka topic called stedi-events
. The stedi-events
Kafka topic has a String key and a String value as a JSON object with this format:
{"customer":"Jason.Mitra@test.com",
"score":7.0,
"riskDate":"2020-09-14T07:54:06.417Z"
}
The application development team was not able to complete the feature as the graph is currently not receiving any data. Because the graph is currently not receiving any data, you need to generate a new payload in a Kafka topic and make it available to the STEDI application to consume:
-
Spark master and worker run as part of the docker-compose configuration
-
Save the Spark startup logs for submission with your solution using the commands below:
docker logs nd029-c2-apache-spark-and-spark-streaming_spark_1 >& ../../spark/logs/spark-master.log
docker logs nd029-c2-apache-spark-and-spark-streaming_spark_1 >& ../../spark/logs/spark-master.log >& ../../spark/logs/spark-worker.log
-
Create a new Kafka topic to transmit the complete risk score with birth date, so the data can be viewed in the STEDI application graph
-
Edit
docker-compose.yaml
and set the the name of the newly created topic:
KAFKA_RISK_TOPIC: ______
- From the terminal running the docker-composer output, stop the docker containers:
CTRL+C
-
Wait until they all stop
-
Start the docker containers once again:
docker-compose up
-
Log in to the STEDI application: http://localhost:4567
-
From the timer page, use the toggle button in the upper right corner to activate simulated user data to see real-time customer and risk score data. Toggle off and on to create additional customers for redis events. Each time you activate it, STEDI creates 30 new customers, and then starts generating risk scores for each one. It takes 4 minutes for each customer to have risk scores generated, however customer data is generated immediately.
- To monitor the progress of data generated, from a terminal type:
docker logs -f nd029-c2-apache-spark-and-spark-streaming_stedi_1
- You are going to to write 3 Spark Python scripts. Each will connect to a kafka broker running at
kafka:19092
:redis-server
topic: Write one spark scriptsparkpyrediskafkastreamtoconsole.py
to subscribe to theredis-server
topic, base64 decode the payload, and deserialize the JSON to individual fields, then print the fields to the console. The data should include the birth date and email address. You will need these.stedi-events
topic: Write a second spark scriptsparkpyeventskafkastreamtoconsole.py
to subscribe to thestedi-events
topic and deserialize the JSON (it is not base64 encoded) to individual fields. You will need the email address and the risk score.- New Topic: Write a spark script
sparkpykafkajoin.py
to join the customer dataframe and the customer risk dataframes, joining on the email address. Create a JSON output to the newly created kafka topic you configured for STEDI to subscribe to that contains at least the fields below:
{"customer":"Santosh.Fibonnaci@test.com",
"score":"28.5",
"email":"Santosh.Fibonnaci@test.com",
"birthYear":"1963"
}
-
From a new terminal type:
submit-event-kafkajoin.sh
orsubmit-event-kafkajoin.cmd
to submit to the cluster -
Once the data is populated in the configured kafka topic, the graph should have real-time data points
- Upload at least two screenshots of the working graph to the screenshots workspace folder