Structured Streaming is a powerful feature of Apache Spark, integrated into Databricks, that enables scalable and fault-tolerant stream processing. It allows users to process streaming data as if it were a static table, providing a seamless experience for handling continuously evolving datasets.
A data stream is any data source that grows over time, such as new log files, database updates captured in Change Data Capture (CDC) feeds, or events queued in messaging systems like Kafka.
spark.readStream()
, you can query a Delta table as a stream source, processing existing and new data seamlessly.(spark.readStream
.table("books")
.createOrReplaceTempView("books_streaming_tmp_vw")
)
dataframe.writeStream
to write results to durable storage. Configure the output with:
triggerOnce
, or availableNow
.streamDF.writeStream
.trigger(processingTime="2 minutes")
.outputMode("append")
.option("checkpointLocation", "/path")
.table(”Output_Table")
Structured Streaming in Databricks provides a robust framework for processing streaming data, offering scalability, fault tolerance, and ease of use by abstracting streams as tables.
Auto Loader is a feature in Databricks that simplifies the ingestion of large volumes of data files. It efficiently processes new files as they arrive in cloud storage.
Example: Using Auto Loader with SparkSQL
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", "dbfs:/mnt/demo/orders_checkpoint")
.load(f"{dataset_bookstore}/orders-raw")
.writeStream
.option("checkpointLocation", "dbfs:/mnt/demo/orders_checkpoint")
.table("orders_updates")
)
-- Using Auto Loader to create a streaming table from CSV files
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", ",", "header", "true"));
The COPY INTO
command is used for idempotent and incremental loading of new data files into Delta tables.
Example: COPY INTO with SparkSQL
-- Loading data into a Delta table using COPY INTO
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = CSV
FORMAT_OPTIONS ('delimiter' = ',', 'header' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
COPY INTO
commandHere is a table summarizing the main differences between Auto Loader and COPY INTO:
Feature/Aspect | Auto Loader | COPY INTO |
---|---|---|
Use Case | Designed for continuous, scalable, and efficient ingestion of large volumes of files. | Ideal for batch ingestion of smaller datasets or incremental appends. |
Performance | Supports high scalability with features like file notification mode for large directories/files. | Best suited for smaller numbers of files (e.g., thousands or less) in directory listing mode. |
Incremental Processing | Automatically tracks and processes new files incrementally using Structured Streaming. | Tracks ingested files to ensure idempotency but does not natively support continuous streaming. |
Schema Management | Requires a schema location to store inferred schemas for faster restarts and schema stability. | Can infer schema directly during ingestion but does not require a schema location. |
Transformation Support | Limited to transformations supported by Structured Streaming. | Supports simple transformations like column reordering, omission, and type casting during load. |
Error Handling | Advanced error handling with automatic retries and recovery mechanisms. | Basic error handling with validation options for previewing data before ingestion. |
Latency | Suitable for near real-time streaming ingestion with low latency. | Primarily used for batch ingestion; not optimized for low-latency use cases. |
Configuration Complexity | Requires configuration of triggers, schema location, and optional file notification services. | Simple SQL-based command; minimal configuration required. |
File Format Support | Supports multiple file formats via cloudFiles source (e.g., CSV, JSON, Parquet). |
Also supports various formats (e.g., CSV, JSON, Parquet) with explicit format options in the command. |
Cost Efficiency | More cost-effective for large-scale streaming pipelines due to optimized resource usage. | Cost-effective for scheduled batch processing or smaller datasets with predictable loads. |
In summary: