Structured Streaming is a powerful feature of Apache Spark, integrated into Databricks, that enables scalable and fault-tolerant stream processing. It allows users to process streaming data as if it were a static table, providing a seamless experience for handling continuously evolving datasets.
A data stream is any data source that grows over time, such as new log files, database updates captured in Change Data Capture (CDC) feeds, or events queued in messaging systems like Kafka.
spark.readStream()
, you can query a Delta table as a stream source, processing existing and new data seamlessly.(spark.readStream
.table("books")
.createOrReplaceTempView("books_streaming_tmp_vw")
)
dataframe.writeStream
to write results to durable storage. Configure the output with:
triggerOnce
, or availableNow
.streamDF.writeStream
.trigger(processingTime="2 minutes")
.outputMode("append")
.option("checkpointLocation", "/path")
.table(”Output_Table")
Structured Streaming in Databricks provides a robust framework for processing streaming data, offering scalability, fault tolerance, and ease of use by abstracting streams as tables.
Auto Loader is a feature in Databricks that simplifies the ingestion of large volumes of data files. It efficiently processes new files as they arrive in cloud storage.
Example: Using Auto Loader with SparkSQL
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", "dbfs:/mnt/demo/orders_checkpoint")
.load(f"{dataset_bookstore}/orders-raw")
.writeStream
.option("checkpointLocation", "dbfs:/mnt/demo/orders_checkpoint")
.table("orders_updates")
)
-- Using Auto Loader to create a streaming table from CSV files
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", ",", "header", "true"));
The COPY INTO
command is used for idempotent and incremental loading of new data files into Delta tables.
Example: COPY INTO with SparkSQL
-- Loading data into a Delta table using COPY INTO
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = CSV
FORMAT_OPTIONS ('delimiter' = ',', 'header' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');