This is a simple Kafka consumer to enable Greenplum Database (GPDB) to load data from
a Kafka topic using GPDB's external web table capability. This client is written
in Go. It uses the concept of Consumer Group, which is a way Kafka can ensure that
data consumed in parallel will not be duplicated. Also, this version stores the offsets
for the consumed records within Zookeeper, using consumer.CommitUpto(msg)
(see ./kafka_consumer.go). Doing this once per record is likely slowing things down, and
the alternative approach, of storing the offsets within a Kafka topic, is probably faster;
there is an early version of this approach here.
The ./attic directory contains some scripts that may be useful. For example, when using Zookeeper to store offsets, it's possible to rewind a topic by exporting the offsets, editing the resulting file, then importing them from that file.
Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design. See the Kafka docs for a nice introduction.
GPDB is an open source massively parallel processing (MPP) relational database. Using its external tables feature, it is able to load massive amounts of data very efficiently by exploiting its parallel architecture. This data can be in files on ETL hosts, within HDFS, or in Amazon S3. This Kafka consumer will utilize GPDB external web tables, since they are able to run an executable program, in parallel, to ingest data.
- I am trying to learn Go and Kafka (maybe this is also a caveat).
- Go builds a single, statically linked binary, which has all its dependencies built in, which makes it simpler to install than, say, a Python or Java program.
- A Running Kafka installation. I followed the Quick Start. My installation directory was
./kafka_2.11-0.10.0.0
, so this will appear in the Kafka commands, below. - A Go installation. Basically:
- Extract the Go distribution into /usr/local
- Create a working directory for Go:
mkdir $HOME/go
- Set up your environment (in
$HOME/.bashrc
):export GOROOT=/usr/local/go export GOPATH=$HOME/go export PATH=$PATH:$GOROOT/bin
- An installation of GPDB. The GPDB Sandbox VM would work just fine for trying this out.
- The Go source file for the Kafka consumer
- A Kafka producer (one is supplied here)
This example will involve creating a single table, crimes, within GPDB and loading 100,000 rows through Kafka. This data set is accessible here but, for this example, there is a small subset stored in S3 (see below). In this example, I envision running one terminal on the local machine and two more on the GPDB master host (see pictures, below).
- Resolve the dependencies:
go get github.com/wvanbergen/kafka/consumergroup github.com/Shopify/sarama
- Build the executable:
go build kafka_consumer.go
- Install the resulting executable, kafka_consumer, into the
$HOME
directory of the gpadmin user on each of your GPDB segment hosts (or, just onto the single host if you are using the GPDB Sandbox VM). - Create a topic in Kafka, with two partitions:
./kafka_2.11-0.10.0.0/bin/kafka-topics.sh --create --topic chicago_crimes --replication-factor 1 --partitions 2 --zookeeper localhost:2181
- Upon success, you should be able to see it in the output when you list the topics:
./kafka_2.11-0.10.0.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
- Log in to your GPDB master host, as user gpadmin.
- Ensure GPDB is running. You can start it by typing
gpstart -a
, as the gpadmin user. - To make the external table definition constant, so there is no need to edit its SQL, determine the IP number of the host running your Kafka installation's Zookeeper server, and add an entry for this IP in your
/etc/hosts
file, aliased to the host name my_zk_host. In my setup, it would look like this:172.16.1.1 my_zk_host
. If you are running GPDB across more than one host, ensure you copy this/etc/hosts
file to each segment host. - Using the SQL file containing the DDL for these tables, and the psql client, create the two tables (one is the table to load; the other is the external table):
psql template1 -f ./gpdb_create_tables.sql
- At this point, you should be able to run a quick query to verify this is working:
psql template1 -c "SELECT * FROM crimes_kafka"
, which should run but return zero rows of output, since the Kafka topic is empty at this point. - If all is well, you can start a periodic load there. For the purposes of this demo, just run the query to load the table every five seconds:
[gpadmin@mdw ~]$ while $(true) ; do psql template1 -c "INSERT INTO crimes SELECT * FROM crimes_kafka" ; sleep 5 ; done
(the output should beINSERT 0 0
, showing no data being inserted). - In a separate terminal window, also logged into the GPDB master host, as gpadmin, start the following command so you're able to track the progress of the load from Kafka:
while $(true) ; do psql template1 -c "SELECT COUNT(*) FROM crimes" ; sleep 5 ; done
- Back at the terminal on your laptop, grab the sample data file (MD5: 6f05a6ea98576eff13ff16b0da9559ec). This contains 100,000 lines, plus a header (the first line).
- You'll need a way to produce data into the topic you created earlier, using this data set. Within this repo, there is a source file, also in Go, that you can compile and run. If you are using OS X, there is a pre-compiled binary you can use. That is the one we'll use in the next step. This program just reads its stdin and produces to the given Kafka topic. It loads a batch of 5000 rows, then waits for five seconds before sending the next batch.
- Finally, kick off the load of the data file into the Kafka topic:
bzcat ./chicago_crimes_100k_rows.csv.bz2 | tail -n +2 | ./kafka_producer.darwin.amd64 -topic chicago_crimes
(thetail -n +2
just skips the header line).
These images show the action going on within each of the three terminal sessions during this data load process The first shows the situation shortly after the load into Kafka begins, and the second shows all 100k rows loaded into the GPDB table.