04 - Structured Streaming

Structured Streaming in Databricks

Structured Streaming is a powerful feature of Apache Spark, integrated into Databricks, that enables scalable and fault-tolerant stream processing. It allows users to process streaming data as if it were a static table, providing a seamless experience for handling continuously evolving datasets.

Data Stream

A data stream is any data source that grows over time, such as new log files, database updates captured in Change Data Capture (CDC) feeds, or events queued in messaging systems like Kafka.

  • Processing Approaches:
    • Traditional Approach: Reprocess the entire dataset with each update.
    • Incremental Approach: Use custom logic to capture only new data since the last update, which is efficiently handled by Spark Structured Streaming.

Spark Structured Streaming

alt text

  • Scalable Engine: It allows querying an infinite data source and incrementally persisting results into a data sink (e.g., files, tables).
  • Table Abstraction: Treats streaming data as an “unbounded” table, making it easy to interact with new data as if appending rows to a static table.

Integration with Delta Lake

  • Delta Tables: Delta Lake is well-integrated with Spark Structured Streaming. Using spark.readStream(), you can query a Delta table as a stream source, processing existing and new data seamlessly.
(spark.readStream
      .table("books")
      .createOrReplaceTempView("books_streaming_tmp_vw")
)

alt text

Streaming Queries

  • DataFrame API: Transform streaming data frames using the same operations as static data frames.
  • Output Persistence: Use dataframe.writeStream to write results to durable storage. Configure the output with:
    • Trigger Intervals: Specify when to process new data (default is every 0.5 seconds). Options include fixed intervals (e.g., every 5 minutes), triggerOnce, or availableNow.
streamDF.writeStream
    .trigger(processingTime="2 minutes")
    .outputMode("append")
    .option("checkpointLocation", "/path")
    .table(Output_Table")
  • Output Modes:
    • Append Mode: Default mode; appends new rows to the target table.
    • Complete Mode: Recalculates and overwrites the target table with each batch.

Checkpointing and Fault Tolerance

  • Checkpoints: Store the current state of a streaming job to cloud storage, allowing resumption from the last state in case of failure. Each streaming write requires a separate checkpoint location.

Processing Guarantees

  • Write-ahead Logs: Record the offset range of data processed during each trigger interval, ensuring reliable tracking of stream progress.
  • Exactly Once Semantics: Achieved through repeatable data sources and idempotent sinks, ensuring no duplicates in the sink even under failure conditions.

Unsupported Operations

  • Limitations: Some operations, like sorting and deduplication, are complex or logically infeasible with streaming data. Advanced methods like windowing and watermarking can help address these challenges.

Structured Streaming in Databricks provides a robust framework for processing streaming data, offering scalability, fault tolerance, and ease of use by abstracting streams as tables.


Incremental Data Ingestion From Files

  • Loading new data files encountered since the last ingestion
  • Reduces redundant processing

Auto Loader

Auto Loader is a feature in Databricks that simplifies the ingestion of large volumes of data files. It efficiently processes new files as they arrive in cloud storage.

  • Scalable and Efficient: Handles billions of files and supports near real-time ingestion.
  • Checkpointing: Stores metadata of discovered files to ensure exactly-once guarantees.

Example: Using Auto Loader with SparkSQL

alt text

(spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .option("cloudFiles.schemaLocation", "dbfs:/mnt/demo/orders_checkpoint")
        .load(f"{dataset_bookstore}/orders-raw")
      .writeStream
        .option("checkpointLocation", "dbfs:/mnt/demo/orders_checkpoint")
        .table("orders_updates")
)
-- Using Auto Loader to create a streaming table from CSV files
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", ",", "header", "true"));

COPY INTO Command

The COPY INTO command is used for idempotent and incremental loading of new data files into Delta tables.

  • Efficient for Small Volumes: Best for thousands of files.
  • SQL Syntax: Simple and straightforward to use.

Example: COPY INTO with SparkSQL

-- Loading data into a Delta table using COPY INTO
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = CSV
FORMAT_OPTIONS ('delimiter' = ',', 'header' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');