Many organizations benefit from collecting and analyzing massive amounts of data on customers, products, market trends, and other factors pertinent to their business. After-the-fact analysis of those data provides many benefits, but as more companies begin to take advantage of Big Data analytics, competitive factors are driving many organizations to seek actionable insights in near-real-time to stay ahead. The ability to react instantly to customer actions or changing market conditions can result in greatly improved business outcomes. Imagine, for example, the ability to steam weblogs through a machine learning model to generate product recommendations for users based on their demographic profile and past purchase history, and then being able to display those targeted recommendations back to the user, in near-real-time, while they are actively browsing your site.
Structured Streaming on Azure Databricks provides the ability to do just that. Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. It leverages the Apache Spark API that lets you process streaming data in the same manner you process static data. The Spark SQL engine performs computations on the streaming data incrementally and continuously updates the result as new data arrives.
Structured Streaming treats all the data arriving as an unbounded input table, with each new item in the stream treated like a row appended to the input table.
You, as the developer, can then define a query on this input table, as if it were a static table. The results of your computations will be written to an output sink. Spark automatically converts this batch-like query to a streaming execution plan. Triggers can be used to control when to update the results. Each time a trigger fires, Spark checks for new data (new row in the input table), and incrementally updates the result.
For a deeper explanation into how Structured Streaming works, and detailed programming guidance see the Apache Spark Structured Streaming Programming Guide.
Streaming DataFrames can be created through the DataStreamReader
interface returned by SparkSession.readStream()
Just as with the read
method for creating static DataFrames from files, such as CSV or TXT, the details of the source, such as format, schema, and options, can be specified inline.
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
To determine if a DataFrame is a streaming DataFrame, you can use the isstreaming
property. This will return true or false.
df.isStreaming
Details about the built in input sources can be found in the Structured Streaming Programming Guide.
Additional sources we will discuss in this article are:
- Kafka source - Reads data from Kafka. It’s compatible with Kafka broker versions 0.10.0 or higher. See the Kafka Integration Guide for more details.
- Event Hubs source - Reads data from Azure Event Hubs.
It is important to note that some input sources and output sinks, such as "socket", "memory", and "console" are designed for testing only, and should not be used in production environments.
Azure Event Hubs is a hyper-scale telemetry ingestion service that collects, transforms, and stores millions of events. As a distributed streaming platform, it gives you low latency and configurable time retention, which enables you to ingress massive amounts of telemetry into the cloud and read the data from multiple applications using publish-subscribe semantics.
Below, we provide a step-by-step example of how to stream events from Azure Event Hubs using Structured Streaming in Azure Databricks.
Before beginning, make sure you have completed the following:
- Create an Azure Databricks workspace, and add a cluster running Databricks Runtime 3.5 or higher
- Add the Azure Event Hubs Spark Connector to your Databricks workspace
- Create a library in your Databricks workspace using the Maven coordinate
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.1
. - Attach the created library to your Databricks cluster.
- Create a library in your Databricks workspace using the Maven coordinate
- Create an Event Hubs Namespace in your Azure subscription
- Create an Event Hub within the namespace named databricks-demo
In order to reach Event Hubs, you will need to obtain the connection string for the databricks-demo Event Hub from the Azure portal.
-
On the Event Hubs Namespace blade, select Event Hubs from the left-hand menu, select the databricks-demo Event Hub from the list.
-
Select Shared access policies on the databricks-demo blade.
-
On the Shared access policies blade that appears, select + Add, then add a SAS Policy named DatabricksDemo with Send and Listen access, and select Create.
-
Now, select the newly created DatabricksDemo SAS Policy from the list, and copy the Connection string-primary key value.
-
Next, open your Azure Databricks workspace, create a new notebook, and attach it to your cluster.
-
First you need to import some support modules that will help in creating a DataFrame that has the schema expected by Event Hubs. In the new notebook, add the following into the first cell.
from pyspark.sql.types import StructField, StructType, StringType, Row import json
-
Insert a new cell and add the following code to create Event Hubs configuration dictionary, replacing
<your-event-hubs-connection-string>
with the value you copied from the DatabricksDemo SAS Policy. TheeventHubs.startingPosition
parameter tells Event Hubs where to start reading from in the Event Hubs events (setting this to -1 tells it to start from the beginning of the event stream). See this document for more details on the configuration options.# Set Event Hub configuration dictionary eventHubsConfig = { 'eventhubs.connectionString' : "<your-event-hubs-connection-string>", 'eventhubs.startingPosition' : json.dumps({"offset":"-1", "seqNo":-1,"enqueuedTime": None,"isInclusive": True}) }
-
In a new cell, add the following code to create a schema definition representing the structure expected by Event Hubs.
eventHubsSchema = StructType([ StructField("body",StringType(), False), StructField("partitionId",StringType(), True), StructField("partitionKey",StringType(), True), ])
-
The next thing you will do is use Azure Databricks to send a few events to your Event Hub, so you have data to read from your streaming query. Insert a new cell, and add the following. This code adds new rows to a DataFrame.
# Add rows to send to Event Hub newRows = [ Row("This is new message #1!", None, None), Row("This is new message #2!", None, None), Row("This is new message #3!", None, None), Row("This is new message #4!", None, None), Row("This is new message #5!", None, None) ] parallelizeRows = spark.sparkContext.parallelize(newRows) new_messages = spark.createDataFrame(parallelizeRows, event_hubs_schema)
-
Now, insert another cell, and add the code to send the rows created above into your configured Event Hub instance. NOTE: You will rerun this cell multiple times as you proceed to add new events into Event Hubs, and observe the results.
# Write body data from a DataFrame to EventHubs. Events are distributed across partitions using round-robin model. ds = new_messages \ .select("body") \ .write \ .format("eventhubs") \ .options(**eventHubsConfig) \ .option("checkpointLocation", "///checkpoint.txt") \ .save()
-
You are now ready to set up a Structured Streaming query, which will be represented by a DataFrame, and read events from Event Hubs. In a new cell, add the following, which effectively starts a process to listen for new events, starting from the beginning of the Event Hubs events.
streaming_df = spark \ .readStream \ .format("eventhubs") \ .options(**eventHubsConfig) \ .load()
-
You may notice that even though the streaming query is running, you aren't seeing any output. To view the streaming output, you can simply pass the streaming DataFrame to the
display
method, using the the following code:display(streaming_df.withColumn("body", streaming_df["body"].cast("string")))
NOTE: The
body
from Event Hubs is always provided as a byte array, so you will usecast("string")
to explicitly deserialize the body column for readable output. -
You should see the five rows you added appear in the results pane. Now, go back and rerun the cell containing
ds = new_messages...
to add more events into Event Hubs, and observe that the streaming query results will update automatically as the new events are received by Event Hubs: -
Now, register the streaming DataFrame as a temporary view, which will enable you to query the streaming using SQL! Add the following code into two new cells.
streaming_df.withColumn("body", streaming_df["body"].cast("string")).createOrReplaceTempView("events")
%sql SELECT Count(body) FROM events
-
You will see a count of the number of events displayed. Rerun the query above to add more events to Event Hubs, and observe that the row count will update automatically.
-
Finally, use SQL to query the enqueuedTime, and then use the built-in notebook visualizations to produce a bar chart with the count of events that were enqueued at the same time (horizontal axis is enqueuedTime and the vertical axis is the count).
%sql SELECT 1 as count, enqueuedTime FROM events
-
You can stop the streaming query by selecting Stop Execution on the notebook toolbar.
Apache Kafka is a distributed streaming platform, providing the ability to publish and subscribe to streams of records, similar to a message queue or enterprise messaging system. Apache Kafka on HDInsight provides this as a managed high-throughput, low-latency service for real-time data.
To connect to Kafka on HDInsight with Azure Databricks, do the following:
The Apache Kafka connectors for Structured Streaming are packaged in Databricks Runtime. You use the
kafka
connector to connect to Kafka 0.10+ and thekafka08
connector to connect to Kafka 0.8+.
-
Create an HDInsight Kafka cluster, and configure your Azure Virtual Network.
-
Configure the Kafka brokers to advertise the correct address for IP advertising.
-
Peer the Kafka cluster to the Azure Databricks cluster, using the instructions provided in Virtual Network Peering.
-
You need to obtain two pieces of information from your Kafka cluster prior to connecting to it from your notebook:
- List of Kafka brokers
- List of topics from Kafka
-
Once your Kafka connection is established, you can create a streaming DataFrame in your Azure Databricks notebook, similar to how it was done with Azure Event Hubs, but using the
kafka
format. In the options, you need to specify a comma-delimited list of your Kafka brokers.# Setup connection to Kafka query = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .option("subscribe", "topic1,topic2") \ .option("startingOffsets", "latest") \ .load()
The Azure Databricks documentation provides a subset of configuration settings for connecting to Kafka.
-
To read the data from the stream, use a query like the following:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
NOTE: The
key
and thevalue
from Kafka are always provided as byte arrays, so you will usecast(x AS STRING)
to explicitly deserialize the keys and values.
Important things to note around Kafka parameters:
You should not set the following Kafka parameters for the Kafka 0.10 connector as it will throw an exception:
group.id
: Setting this parameter is not allowed for Spark versions below 2.2.auto.offset.reset
: Instead, set the source optionstartingOffsets
to specify where to start. To maintain consistency, Structured Streaming (as opposed to the Kafka Consumer) manages the consumption of offsets internally. This ensures that you don’t miss any data after dynamically subscribing to new topics/partitions.startingOffsets
applies only when you start a new Streaming query, and that resuming from a checkpoint always picks up from where the query left off.key.deserializer
: Keys are always deserialized as byte arrays withByteArrayDeserializer
. Use DataFrame operations to explicitly deserialize the keys.value.deserializer
: Values are always deserialized as byte arrays withByteArrayDeserializer
. Use DataFrame operations to explicitly deserialize the values.enable.auto.commit
: Setting this parameter is not allowed. Spark keeps track of Kafka offsets internally and doesn’t commit any offset.interceptor.classes
: Kafka source always read keys and values as byte arrays. It’s not safe to useConsumerInterceptor
as it may break the query.
Using SSL with Kafka connections requires additional configuration, instructions for which can be found in the Confluent documentation Encryption and Authentication with SSL. You can provide the configurations described there, prefixed with kafka
., as options. It is recommended that certificates are stored in Azure Blob storage and accessed through a DBFS mount point. Combined with Cluster and Job ACLs, you can restrict access to the certificates only to clusters that can access Kafka.
Once these paths are mounted, you can use the DBFS path to the certificates in your options. For example:
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", ...) \
.option("kafka.ssl.truststore.location", "/mnt/kafka-private/ssl/kafka.client.truststore.jks") \
.option("kafka.ssl.keystore.location", "/mnt/kafka-private/ssl/kafka.client.keystore.jks") \
...
When running notebooks containing streaming queries in a production environment, you are likely to want resiliency and uptime guarantees. This can be accomplished by running your notebooks with Azure Databricks Jobs.
Production-grade streaming applications require robust failure handling. In Structured Streaming, enabling checkpointing for a streaming query allows you to restart the query after a failure, and continue the query from where the failed one left off. This provides fault tolerance and data consistency guarantees.
To make your queries fault tolerant, you must enable query checkpointing and configure Databricks jobs to restart your queries automatically after a failure. To enable checkpointing, set the checkpointLocation
option to a location in DBFS or cloud storage on your query prior to starting the query, as follows:
streamingDataFrame.writeStream
.format("parquet")
.option("path", "dbfs://outputPath/")
.option("checkpointLocation", "dbfs://checkpointPath")
.start()
This checkpoint location preserves all of the essential information that uniquely identifies a query, so every query must have a unique checkpoint location.
When creating the Databricks job with the notebook or JAR that has your streaming queries, be sure to configure it to:
- Always use a new cluster.
- Always retry on failure.
Jobs have tight integration with Structured Streaming APIs and can monitor all streaming queries active in a run. This configuration ensures that if any part of the query fails, jobs automatically terminate the run (along all the other queries) and start a new run in a new cluster. The new run re-executes the notebook or JAR code and restarts all of the queries again. This is the safest way to ensure that you get back into a good state.
- Structured Streaming In Apache Spark
- Tutorial: Stream data into Azure Databricks using Event Hubs
- Structured Streaming + Kafka integration guide
Read next: Advanced settings & configuration