Neo4j Kakfa integration demo in python on Ubuntu, using the confluent-kafka client.
Real-time Graph Updates using Kafka Messaging.
A recent Neo4j whitepaper describes how Monsanto is performing real-time updates on a 600M node Neo4j graph using Kafka to consume data extracted from a large Oracle Exadata instance.
This modern data architecture combines a fast, scalable messaging platform (Kafka) for low latency data provisioning and an enterprise graph database (Neo4j) for high performance, in-memory analytics & OLTP - creating new and powerful real-time graph analytics capabilities for your enterprise applications.
In this Gist, we'll see how Apache Kafka can stream 1M messages to update a large Neo4j graph.
Kakfa allows you to create a messaging service that is fault-tolerant, scalable, and features massively parallel distributed processing.
Kafka was originally developed at LinkedIn, and is becoming widely adopted because it excels at moving large amounts of data quickly across the enterprise.
Using Kafka, LinkedIn has ingested over a trillion messages per day, while Netflix reports ingesting over 500B messages per day on AWS.
The core of Kafka is the message log, which is essentially a time-dependent data table. Messages are identified only by their offset in the log, and the log represents a running record of events published by source systems. Applications can subscribe to message streams, allowing for loosely coupled, flexible architectures.
In a production system, the Kafka producers and consumers would be running continuously, with the producers polling for changes in the source data, and the consumers polling to see if there are new messages in the Kafka log for updates.
There are many common scenarios where the lowest possible latency between source systems and Neo4j is desirable (e.g. recommendations, e-commerce, fraud detection, access control, supply chain). Kafka + Neo4j is a great fit for these use cases.
Let's imagine we're operating a service called MovieFriends that features a social network of 1M users who rent streaming movies. Our goal is to ingest an update of daily charges for each user. We have a Neo4j social graph of User nodes, and we want to append a DailyCharge node to each User node.
We'll setup Kafka & Neo4j, create a 1M node Neo4j graph, produce a 1M messages in just a few seconds, and then consume these messages as batch updates to our graph.
- We'll set up Kafka and Neo4j instances on your local machine (about an hour)
- Next, we'll use a simple producer client to publish messages to Kafka, mocking an extract from a source system (5 secs)
- Finally, we'll use a simple consumer to subscribe to Kakfa and send 1M batched messages as updates to Neo4j using the high-speed Bolt protocol (90 secs)
The confluent-kafka client is currently the fastest python client available, per recent benchmarking by the Activision data sciences team.
I've used the Activision benchmarking script as the framework for this demo -- the main modifications I've made are to generate more realistic messages in the Kafak producer, and integrate the Bolt driver with the Kafka consumer.
The easiest way to get started with Kafka is to follow the directions provided on the Confluent Quickstart for setting up a single instance Kafka server.
Make sure you are running the 1.8 JDK (1.7 works too)
java -version
https://docs.confluent.io/3.1.1/quickstart.html
navigate with cd
to the destination folder and then download confluent with:
curl -O http://packages.confluent.io/archive/5.4/confluent-community-5.4.1-2.12.zip
Unzip the package:
unzip confluent-5.4.1-2.12.zip
Confluent folder is composed as:
bin etc lib logs README share src
Open a new terminal with the unzipped folder of the downloaded confluent platform as base dir and then run Zookeeper:
./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
Open a new terminal with the unzipped folder of the downloaded confluent platform as base dir and then run Kafka:
./bin/kafka-server-start ./etc/kafka/server.properties
Open a new terminal with the unzipped folder of the downloaded confluent platform as base dir and then start the schema registry:
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
Now the service is on and is possible to send some Avro data to a Kafka topic using a utility provided with Kafka to send the data without having to write any code.
Open a new terminal with the unzipped folder of the downloaded confluent platform as base dir, move in bin with cd
and then:
kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Now the console is in appending mode, is possible to write data to pass it to kafka, for example write line by line the following:
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
finally is possible to shut down the process with ctrl+c
Is possible to check the data produced using Kafka's console consumer process to read data from the topic:
cd
to the same confluent's bin folder and then:
kafka-avro-console-consumer --topic test --zookeeper localhost:2181 --from-beginning
the last output's rows shows the values written:
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
Finally press ctrl+c
to shut down the process.
To delete a topic move in confluent bin folder and then:
kafka-topics --zookeeper localhost:2181 --delete --topic test
When all is done, shut down the services in the reverse order that you started them.
Install the librdkafka C library, Kafka won't run without it
sudo apt install librdkafka-dev
or install from source:
https://github.com/edenhill/librdkafka
Some useful documentation on configs (pay attention to buffer sizes)
http://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
There are three dependencies confluent-python, pykafka, and neo4j-driver.
Install Python 3.5 and Jupyter if you want to run the notebook.
pip3 install confluent-kafka
pip3 install pykafka
pip3 install neo4j-driver
Installing Neo4J: https://www.digitalocean.com/community/tutorials/how-to-install-neo4j-on-an-ubuntu-vps
Download Neo4j from the repository:
wget -O - http://debian.neo4j.org/neotechnology.gpg.key | sudo apt-key add -
sudo bash -c "echo 'deb http://debian.neo4j.org/repo stable/' > /etc/apt/sources.list.d/neo4j.list"
sudo apt-get update
sudo apt-get install neo4j
Move to Neo4j's installation directory shown with:
dpkg -L neo4j
Then navigate in:
cd usr/bin/
Start neo4j with the command:
sudo neo4j start
There may be a short delay until the server is ready. Open browser and navigate to localhost:7474/browser/
Connection URL: bolt://localhost:7687
Autentication type: Username/password
Username: neo4j
Password: neo4j
Neo4j change password:
new password:
repeat new password:
Connected to Neo4j (nice to meet you) You are connected as user neo4j to bolt://localhost:7687 Connection credentials are stored in your web browser.
Now that neo4j is ready shut down it:
sudo neo4j stop
Navigate to Neo4j's folder plugins to install a set called APOC downloadable from github that is a library that consists of procedures and functions to help with tasks in areas like data integration, graph algorithms or data conversion. https://github.com/neo4j-contrib/neo4j-apoc-procedures
cd /
cd var/lib/neo4j/plugins
neo4j version
With the given neo4j version find the correct APOC version to download at "Version Copatibility Matrix" in README.md of APOC repository.
In the folder var/lib/neo4j/plugins
run the following instruction with the right APOC version:
sudo wget https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/3.5.0.9/apoc-3.5.0.9-all.jar
Now in the folder there's a jar file. Navigate in neo4j configuration folder located at /etc/neo4j/
and open with sudo nano
the file: neo4j.conf
Append the following lines to allow procedures that use internal API and whitelist procedures and functions in general:
dbms.security.procedures.unrestricted=apoc.*
dbms.security.procedures.whitelist=apoc.*
Writeout and exit.
Restart your system, start neo4j and run the following command in neo4j shell to test APOC version:
RETURN apoc.version();
Let's start by making the MovieFriends graph. You'll need to start with a new, blank Neo4j database.
To achieve good performance, we'll set constraints and indexes on all id fields we'll use for matching update records.
#make sure apoc procedures are installed in Neo4j plugins folder
from neo4j.v1 import GraphDatabase, basic_auth, TRUST_ON_FIRST_USE, CypherError
from string import Template
nodes = 1000000
nodes_per_graph = 10000
graphs = int(nodes/nodes_per_graph)
query0 = 'MATCH (n) DETACH DELETE n'
query1 = Template('CALL apoc.generate.ba( ${nodes_per_graph}, 1, "User", "KNOWS") '
).substitute(locals())
query2 = '''
MATCH (n:User) SET n.userId = id(n)+1000000
;
'''
query3 = '''
CREATE CONSTRAINT ON (n:User) ASSERT n.userId IS UNIQUE
;
'''
query4 = '''
CREATE INDEX on :DailyCharge(userId)
;
'''
driver = GraphDatabase.driver("bolt://localhost",
auth=basic_auth("neo4j", "neo4j"),
encrypted=False,
trust=TRUST_ON_FIRST_USE)
try:
session = driver.session()
result = session.run(query0)
summary = result.consume()
print(summary.counters)
session.close()
session = driver.session()
for i in range(graphs):
result = session.run(query1)
summary = result.consume()
#print(summary.counters)
session.close()
session = driver.session()
result = session.run(query2)
summary = result.consume()
print(summary.counters)
session.close()
session = driver.session()
result = session.run(query3)
summary = result.consume()
print(summary.counters)
session.close()
session = driver.session()
result = session.run(query4)
summary = result.consume()
print(summary.counters)
session.close()
except Exception as e:
print('*** Got exception',e)
if not isinstance(e, CypherError):
print('*** Rolling back')
session.rollback()
else:
print('*** Not rolling back')
finally:
print('*** Done!')
You should see this output:
{}
{'properties_set': 1000000}
{'constraints_added': 1}
{'indexes_added': 1}
*** Done!
Now that we've built the graph, let's update it. The first task is to generate the messages we'll need (pretending that these have been extracted from various source systems as noted above).
This first script initializes some of the global variables used by both the producer and consumer and also sets the format of our messages. To keep things simple, we'll just have our message be a comma-delimited string with a userId, an amount, and a timestamp.
We are also declaring our Kafka topic name and the total number of messages. Note that if you re-run the producer it will append messages to the existing topic each time. If you want a new group of 1M messages, you'll need a new topic name.
A timer wrapper is included so you can see the throughput.
# Initializations.
import random
import time
# connect to Kafka
bootstrap_servers = 'localhost:9092' # change if your brokers live else where
kafka_topic = 'neo4j-1M-demo'
msg_count = 1000000
# this is the total number of messages that will be generated
# function to generate messages that will be the data for the graph update
# an example message is displayed : userId, amount, timestamp
# this simulates data from the source database
i=0
def generate_message(i):
msg_payload = (str(i+1000000) + ',' + str(random.randrange(0,5000)/100) + ',' + str(time.time())).encode()
return(msg_payload)
example_message = generate_message(i)
msg_bytes = len(generate_message(i))
print("Example message: " + str(example_message))
print("Message size (bytes): " + str(msg_bytes))
# we'll use a timer so you can see the throughput for both
# the producer and the consumer
# reset timer for kafka producer and consumer
producer_timings = {}
consumer_timings = {}
# function to calc throughput based on msg count and length
def calculate_thoughput(timing, n_messages=msg_count, msg_size=msg_bytes):
print("Processed {0} messsages in {1:.2f} seconds".format(n_messages, timing))
print("{0:.2f} MB/s".format((msg_size * n_messages) / timing / (1024*1024)))
print("{0:.2f} Msgs/s".format(n_messages / timing))
Executing this script yields an example message:
Example message: b'1000000,5.84,1484261697.689981'
Message size (bytes): 30
This defines the producer, and you can see that all we are really doing here is invoking the generate_message() function in a loop. This creates a bunch of messages that are passed to the Kafka broker.
# kafka producer function, simulates ETL data stream for graph updates
from confluent_kafka import Producer, KafkaException, KafkaError
import random
import time
topic = kafka_topic
def confluent_kafka_producer_performance():
# Note that you need to set producer buffer to at least as large as number of messages
# otherwise you'll get a buffer overflow and the sequential messages will be corrupted
conf = {'bootstrap.servers': bootstrap_servers,
'queue.buffering.max.messages': 200000
}
producer = confluent_kafka.Producer(**conf)
i = 0
messages_overflow = 0
producer_start = time.time()
for i in range(msg_count):
msg_payload = generate_message(i)
try:
producer.produce(topic, value=msg_payload)
except BufferError as e:
messages_overflow += 1
# checking for overflow
print('BufferErrors: ' + str(messages_overflow))
producer.flush()
return time.time() - producer_start
So now we can run the producer (wrapped in the timer function)
producer_timings['confluent_kafka_producer'] = confluent_kafka_producer_performance()
calculate_thoughput(producer_timings['confluent_kafka_producer'])
We see 1M message produced in 5 seconds, about 200K messages per sec (on my laptop).
BufferErrors: 0
Processed 1000000 messsages in 5.06 seconds
5.66 MB/s
197728.85 Msgs/s
So what did we get? We can use the pykafka client to easily check the earliest and latest offsets and make sure everything looks good.
from pykafka import KafkaClient
client = KafkaClient(hosts=bootstrap_servers)
topic = client.topics[kafka_topic.encode()]
print(topic.earliest_available_offsets())
print(topic.latest_available_offsets())
We can see that the messages start at offset 0 and go to offset 1,000,000. There are 1M messages waiting in queue to be consumed.
{0: OffsetPartitionResponse(offset=[0], err=0)}
{0: OffsetPartitionResponse(offset=[1000000], err=0)}
Now we are ready to update our MovieFriends graph.
We'll start as before, by defining the consumer, however we're going to make a few optimizations for Neo4j.
The Confluent-Kafka client consumer.poll() function polls one message at at time, so we could generate a single update for each message, but that's a lot of unnecessary I/O for Bolt. Alternatively, we could try to jam all 1M messages into Neo4j but this could create memory issues and a long-running query. A better approach is to poll the messages in batches, and then send each batch to Neo4j as a list in a parameterized query.
The confluent_kafka_consume_batch(consumer, batch_size)
function polls and formats a list of messages for Neo4j, per the specified batch size.
import confluent_kafka
from confluent_kafka import Consumer, KafkaException, KafkaError
import sys
import getopt
import json
from pprint import pformat
import uuid
from neo4j.v1 import GraphDatabase, basic_auth, TRUST_ON_FIRST_USE, CypherError
#import pandas as pd #uncomment if you want to write messages to a file
def confluent_kafka_consume_batch(consumer, batch_size):
batch_list = []
batch_msg_consumed = 0
for m in range(batch_size):
msg = consumer.poll()
if msg is None:
break
#continue
if msg.error():
# Error or event
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
# Error
raise KafkaException(msg.error())
else:
datastr = str(msg.value())
data = datastr[2:-1].split(",")
# details you can access from message object
# print("%s %s" % ("iterator:", m))
# print("%s %s" % ("msg:", str(msg.value())))
# print("%s %s" % ("length:", len(msg)))
# print("%s %s" % ("data:", data))
batch_list.extend([data])
batch_msg_consumed += 1
return(batch_list, batch_msg_consumed)
The confluent_kafka_consumer_performance()
function is a wrapper for the consumer, which iterates through the required number of batches. With each iteration, it opens a new Bolt transaction, and passes the batch list as a parameter to the update query. We UNWIND the list as rows, and then use the indexed ids to efficiently MATCH and MERGE for the cartesian updates. The Bolt transaction is committed, and the result consumed (and in case you were wondering, the UNWIND list as a passed parameter is a neat trick from Michael Hunger).
Once all the batches are consumed, the consumer is closed and the throughput computed.
def confluent_kafka_consumer_performance():
topic = kafka_topic
msg_consumed_count = 0
batch_size = 50000
batch_list = []
nodes = 0
rels = 0
driver = GraphDatabase.driver("bolt://localhost",
auth=basic_auth("neo4j", "neo4j"),
encrypted=False,
trust=TRUST_ON_FIRST_USE)
update_query = '''
WITH {batch_list} AS batch_list
UNWIND batch_list AS rows
WITH rows, toInteger(rows[0]) AS userid
MATCH (u:User {userId: userid})
MERGE (u)-[r:HAS_DAILY_CHARGE]->(n:DailyCharge {userId: toInteger(rows[0])})
ON CREATE SET n.amountUSD = toFloat(rows[1]), n.createdDate = toFloat(rows[2])
'''
conf = {'bootstrap.servers': bootstrap_servers,
'group.id': uuid.uuid1(),
'session.timeout.ms': 60000,
'enable.auto.commit': 'true',
'default.topic.config': {
'auto.offset.reset': 'earliest'
}
}
consumer = confluent_kafka.Consumer(**conf)
consumer_start = time.time()
def print_assignment (consumer, partitions):
print('Assignment:', partitions)
# Subscribe to topics
consumer.subscribe([topic], on_assign=print_assignment)
# consumer loop
try:
session = driver.session()
while True:
# Neo4j Graph update loop using Bolt
try:
batch_list, batch_msg_consumed = confluent_kafka_consume_batch(consumer, batch_size)
msg_consumed_count += batch_msg_consumed
# if you want to see what your message batches look like
# df = pd.DataFrame(batch_list)
# filename='test_' + str(msg_consumed_count) + '.csv'
# df.to_csv(path_or_buf= filename)
# using the Bolt explicit transaction, recommended for writes
with session.begin_transaction() as tx:
result = tx.run(update_query, {"batch_list": batch_list})
tx.success = True;
summary = result.consume()
nodes = summary.counters.nodes_created
rels = summary.counters.relationships_created
print("%s %s %s %s" % ("Messages consumed:", msg_consumed_count , "Batch size:", len(batch_list)), end=" ")
print("%s %s %s %s" % ("Nodes created:", nodes, "Rels created:", rels))
if msg_consumed_count >= msg_count:
break
except Exception as e:
print('*** Got exception',e)
if not isinstance(e, CypherError):
print('*** Rolling back')
session.rollback()
else:
print('*** Not rolling back')
finally:
batch_msg_consumed_count = 0
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
finally:
session.close()
consumer_timing = time.time() - consumer_start
consumer.close()
return consumer_timing
Executing the consumer script updates the graph:
# run consumer throughput test
consumer_timings['confluent_kafka_consumer'] = confluent_kafka_consumer_performance()
calculate_thoughput(consumer_timings['confluent_kafka_consumer'])
The output shows each batch being processed and returns the Neo4j Bolt driver summary showing the nodes and relationships created for each batch.
I can process 1M updates in 90 secs on my laptop, about 11,000 messages per sec.
You can test different batch sizes and see how that affects performance.
Note that in this demo, I'm running all the transactions in the same session to maximize throughput, in production it may safer to open and close a new session with each batch transaction (ie inside the update loop).
Assignment: [TopicPartition{topic=neo4j-1M-demo,partition=0,offset=-1001,error=None}]
Messages consumed: 50000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 100000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 150000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 200000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 250000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 300000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 350000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 400000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 450000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 500000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 550000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 600000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 650000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 700000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 750000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 800000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 850000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 900000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 950000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Messages consumed: 1000000 Batch size: 50000 Nodes created: 50000 Rels created: 50000
Processed 1000000 messsages in 91.31 seconds
0.31 MB/s
10951.72 Msgs/s
In this GraphGist, there's a simple demonstration of how Kafka can be integrated with Neo4j to create a high-throughput, loosely-coupled ETL using Kafka's simple consumer and Neo4j's high-speed Bolt protocol. Neo4j has excellent OLTP capabilities and when coupled with the Kakfa distributed streaming platform.