05 - Production Pipelines

Delta Live Tables (DLT)

Delta Live Tables is a framework designed to simplify the creation and management of data processing pipelines. It ensures data quality and maintains table dependencies, making it easier to build large-scale ETL processes.

DELTA LIVE TABLES, addresses below challenges when building ETL processes

  1. Complexities of large scale ETL

    1. Hard to build and maintain dependencies
    2. Difficult to switch between batch and stream
  2. Data quality and governance

    1. Difficult to monitor and enforce data quality
    2. Impossible to trace data lineage
  3. Difficult pipeline operations

    1. Poor observability at granular data level
    2. Error handling and recovery is laborious

Multi-hop Architecture:

  • Bronze Tables: These tables store raw data in its original form. For example, the orders_raw table ingests Parquet data incrementally using Auto Loader.
  • Silver Tables: These tables contain refined and cleaned data. Operations such as data cleansing and enrichment are performed at this level. For example, orders_cleaned enriches order data with customer information.
  • Gold Tables: These tables provide aggregated or business-level insights. For instance, daily_customer_books calculates daily book counts per customer.

alt text

Databricks Notebooks:

  • DLT pipelines are implemented using Databricks notebooks with SQL syntax.
  • The LIVE keyword is used to declare DLT tables.

Auto Loader:

  • Used for incremental data ingestion with the STREAMING keyword for streaming tables.
  • The cloud_files method specifies the source location, format (e.g., Parquet), and reader options.

Data Quality Constraints:

  • Constraints ensure data quality by specifying actions on constraint violations:
    • Default is WARN
    • DROP ROW: Discards violating records.
    • FAIL UPDATE: Fails the pipeline on violations.
    • Reports violations without discarding records.

Setting Constraint for Data Quality Check

// Drop Row on Violation
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

// Fail Pipeline on Violation
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Creating a DLT Pipeline

  • Navigate to the Workflows tab and select Delta Live Table.
  • Create a pipeline by providing configurations like dataset path and storage location.
  • Choose between Triggered Mode (runs once) and Continuous Mode (continuously ingests new data).
  • Use Development Mode for interactive development, which reuses clusters and disables retries for quick error resolution.
  • The execution flow is visualized as a Directed Acyclic Graph (DAG), showing entities and their relationships.

In Databricks Delta Live Tables (DLT), Development Mode and Production Mode serve different purposes, primarily tailored to the needs of testing and production environments. Here are the key differences between the two modes:

Development Mode

  • Cluster Reuse: In development mode, clusters are not terminated immediately after a pipeline update. This allows for faster iterations because the same cluster can be reused for multiple updates without waiting for it to restart. This is particularly useful when you’re testing and debugging frequently[1][2].
  • No Automatic Retries: Pipelines do not automatically retry in the event of task failures. This allows you to quickly detect and fix logical or syntactic errors in your pipeline without unnecessary delays caused by retries[1][2].
  • Faster Feedback: Development mode is designed for rapid iteration during testing. You can validate your pipeline’s source code (e.g., syntax checks) without actually updating tables, which helps you identify issues faster[1].
  • Cluster Shutdown Delay: By default, clusters remain running for two hours in development mode, but this can be customized using the pipelines.clusterShutdown.delay setting[2].

Production Mode

  • Automatic Retries: In production mode, pipelines are configured to automatically retry in case of specific recoverable errors, such as memory leaks or stale credentials. This ensures higher reliability and resilience in a production environment where uptime is critical[1][2].
  • Cluster Restarts: The system will restart clusters when necessary to recover from certain errors, ensuring that pipelines continue running smoothly without manual intervention[2].
  • Optimized for Stability: Production mode is designed to handle larger workloads and ensure stability, making it ideal for running pipelines that are deployed to process live data in a production environment[1].

When to Use Each Mode

  • Use Development Mode during the development and testing phases when you need faster feedback and don’t require automatic retries.
  • Switch to Production Mode when deploying your pipeline for live data processing, where stability, error recovery, and retries are essential.

Example SQL Commands

  • Bronze Table Declaration:

    CREATE STREAMING LIVE TABLE orders_raw
    COMMENT "Raw orders data"
    AS SELECT * FROM cloud_files("/path/to/data", "parquet", map("inferSchema", "true"));
    
  • Silver Table Declaration with Constraints:

    CREATE LIVE TABLE orders_cleaned
    AS SELECT * FROM LIVE.orders_raw
    WHERE order_id IS NOT NULL;
    
  • Gold Table Example:

    CREATE LIVE TABLE daily_customer_books
    AS SELECT customer_id, COUNT(*) AS books_count FROM LIVE.orders_cleaned GROUP BY customer_id;
    

Change Data Capture (CDC)

Change Data Capture refers to capturing changes made to source data and applying them to target tables. This process involves handling inserts, updates, and deletes efficiently.

Key Concepts of CDC

  1. CDC Events:

    • Changes are logged as events containing both data and metadata (e.g., operation type, timestamp).
  2. Apply Changes Into Command:

    • Used in DLT to apply CDC events to target tables.
  3. Command Syntax:

    APPLY CHANGES INTO target_table
    FROM source_cdc_table
    KEYS (primary_key_field)
    APPLY AS DELETE WHEN operation = 'Delete'
    SEQUENCE BY timestamp_field
    EXCEPT (columns_to_ignore);
    
  4. Slowly Changing Dimensions (SCD):

    • Type 1 SCD updates records in place.
    • Type 2 SCD keeps historical records by adding new rows for changes.
  5. Limitations:

    • The Apply Changes Into command breaks append-only requirements for streaming sources due to updates and deletes in the target table.

Example Use Case

  • Suppose you have a CDC feed with operations like insert, update, and delete.
  • Use Apply Changes Into to ensure your target table reflects these changes accurately while maintaining historical context if needed.

Citations:

[1] https://learn.microsoft.com/en-us/azure/databricks/delta-live-tables/testing [2] https://learn.microsoft.com/el-gr/azure/databricks/delta-live-tables/updates [3] https://stackoverflow.com/questions/74524713/how-to-separate-delta-live-tables-production-and-development-targets-and-repo-br