01 February 2024

🏭🗒️Microsoft Fabric: Delta Tables [Notes]

Disclaimer: This is work in progress intended to consolidate information from various sources for learning purposes. For the latest information please consult the documentation (see the links below)! 

Last updated: 18-Apr-2024

Delta Table Structure

[Delta Lake] delta table
  • {definition} table that stores data as a directory of files in the delta lake (DL) and registers table metadata to the metastore within a catalog and schema [1]
    • ⇐ represents a schema abstraction over data files that are stored in Delta format [2]
    • for each table, the lakehouse stores 
      • a folder containing its Parquet data files
      • a _delta_Log folder in which transaction details are logged in JSON format
    • all Microsoft Fabric (MF) experiences generate and consume delta tables [10]
      • provides interoperability and a unified product experience [10]
      •  some experiences can only write to delta tables, while others can read from it [10]
    • ⇒ all data files for a table in a database are grouped under a common data path [12]
  • {feature} support ACID transactions
    • modifications made to a table are logged in its transaction log
      • enforces serializable isolation for concurrent operations [2]
      • the logged transactions can be used to retrieve the history of changes made [2]
      • each transaction creates new Parquet files
      • deleting a row, doesn't physically delete in the parquet file [9]
        • {feature} [DL] Delete Vectors 
          • are read as part of the table and indicate which rows to ignore [9]
          • make it faster to perform deletions because there's no need to re-write the existing parquet files [9]
          • many deleted rows take more resources to read that file [9]
  • {feature} support DML
    • only tables created while the future is available will have all DML published [..]
  • {feature} support data versioning 
    • multiple versions of each table row can be retrieved from the transaction log [2]
  • {feature} support time travel
  • {feature} support for batch and streaming data
    • delta tables can be used as both sources and destinations for streaming data [2]
  • {feature} support standard formats and interoperability
  • table consumption
    • [lakehouse] SQL Endpoint 
      • provides a read-only experience [4]
      • can be used to query only delta tables via T-SQL [4]
        • other file formats can not be queried using the SQL endpoint [4]
          • ⇐  the files need to be converted to the delta format [4]
      • {limitation} doesn't support the full T-SQL surface area of a transactional data warehouse [4]
  • [Spark] managed table
    •  the table definition in the metastore and the underlying data files are both managed by the Spark runtime for the Fabric lakehouse [4] 
  • [Spark] external table
    • the relational table definition in the metastore is mapped to an alternative file storage location [4]
      • the Parquet data files and JSON log files for the table are stored in the Files storage location [4]
  • [Spark] allows greater control over the creation and management of delta tables [4]
  • {operation} create (aka create delta table)
    • defines the table in the metastore for the lakehouse
    • its data is stored in the underlying Parquet files for the table
      • the details of mapping the table definition in the metastore to the underlying files are abstracted [4]
      • ⇐ internally, there is also a log file that keeps track of which parquet files, when combined, make up the data that is in the table [4]
        • the log files are internal and cannot be used directly by other engines [4]
          •  ⇐ DF publishes automatically the right log files so that other engines can directly access the right parquet files [..]
        • after every 10 transactions, a new log file (aka checkpoint) is created automatically and asynchronously [4]
          • ⇐ the file is a summary of all the previous log files [4]
          • when querying the table, the system needs to read the latest checkpoint and any log files that were created after*
            • ⇐ instead of having to read 105,120 log files, 10 or less files will be read*
        • the Delta Lake Logs are automatically so that other engines can directly access the right parquet files *
    • [Apache Spark in a lakehouse] allows greater control of the creation and management of delta tables [4]
      • via saving a dataframe 
        • {method}save a dataframe as a delta table [4]
          • ⇐ the easiest way to create a delta table 
          • creates both the table schema definition in the metastore and the data files in delta format [4]
        • {method} create the table definition [4]
          • creates the table schema in the metastore without saving any data files [4]
        • {method} save data in delta format without creating a table definition in the metastore [4]
          • {scenario} persist the results of data transformations performed in Spark in a file format over which  a table definition is overlayed later or processed directly by using the delta lake API [4]
          • modifications made to the data through the delta lake API or in an external table that is subsequently created on the folder will be recorded in the transaction logs [4]
          • {mode} overwrite
            • replace the contents of an existing folder with the data in a dataframe [4]
          • {mode} append
            • adds rows from a dataframe to an existing folder [4]
          • Fabric uses an automatic table discovery capability to create the corresponding table metadata in the metastore [4]
      • via DeltaTableBuilder API
        • enables to write Spark code to create a table based on specifications
      • via Spark SQL
        • [managed table] via CREATE TABLE <table_definition> USING DELTA
        • [external table] via CREATE TABLE <table_name> USING DELTA LOCATION
          • the schema of the table is determined by the Parquet files containing the data in the specified location
          • {scenario} create a table definition 
            • that references data that has already been saved in delta format [4]
            • based on a folder where data are ingested in the delta format [4]
  • {operation} update (aka update delta table)
  • {operation} delete (aka delete delta table)
    • [managed table] deleting the table deletes the underlying files from the Tables storage location for the lakehouse [4]
    • [external table] deleting a table from the lakehouse metastore does not delete the associated data files [4]
  • performance and storage cost efficiency tend to degrade over time
    • {reason} new data added to the table might skew the data [3]
    • {reason} batch and streaming data ingestion rates might bring in many small files
    • {reason} update and delete operations eventually create read overhead
      • parquet files are immutable by design, so Delta tables adds new parquet files which the changeset, further amplifying the issues imposed by the first two items [3]
    • {reason} no longer needed data files and log files available in the storage
  • {recommendation} don’t allow special characters in column names (incl. spaces)
  • {recommendation} make table and column names business-friendly
  • {feature} table partitions
    • {recommendation} use a partitioned folder structure wherever applicable
      • helps to improve data manageability and query performance
      • results in faster search for specific data entries thanks to partition pruning/elimination
      • {best practice} partition data to align with the query patterns [15]
      • it can dramatically speed up query performance, especially when combined with other performance optimizations [15]
  • {command} MERGE 
    • allows updating a delta table with advanced conditions [3]
      • from a source table, view or DataFrame [3]
    • {limitation} the current algorithm in the open source distribution of Delta Lake isn't fully optimized for handling unmodified rows [3]
    • [Microsoft Spark Delta] implemented a custom Low Shuffle Merge optimization
      • unmodified rows are excluded from an expensive shuffling operation that is needed for updating matched rows [3]
  • {command} OPTIMIZE
    • consolidates multiple small Parquet files into large file [8]
    • should be run whenever there are enough small files to justify running the compaction operation [6]
      • {best practice} run optimization after loading large tables [8]
    • benefits greatly from the ACID transactions supported [6]
    • [Delta Lake] predicate filtering
      • specify predicates to only compact a subset of your data [6]
      • {scenario} running a compaction job on the same dataset daily [6]
  • {command} VACUUM
    • removes old files no longer referenced by a Delta table log [8]
      • files need to be older than the retention threshold [8]
        • the default file retention threshold is seven days [8]
          • shorter retention period impacts Delta's time travel capabilities [8]
          • {default} historical data can't be delete within the retention threshold [2]
            • ⇐ that's to maintain the consistency in data [2]
        • {best practice} set a retention interval to at least seven days [8]
          • ⇐ because old snapshots and uncommitted files can still be in use by the concurrent table readers and writers [8]
        • important to optimize storage cost [8]
    • {warning} leaning up active files might lead to reader failures or even table corruption if the uncommitted files are removed [8]
  • {issue} small files
    • create large metadata transaction logs which cause planning time slowness [6]
    • result from
      • big repartition value [6]
      • if the dataset is partitioned on a high-cardinality column or if there are deeply nested partitions, then more small files will be created [6]
      • tables that are incrementally updated frequently [6]
    • files of sizes above 128 MB, and optimally close to 1 GB, improve compression and data distribution across the cluster nodes [8]
  • {feature} auto compaction 
    • combines small files within Delta table partitions to automatically reduce small file problems [7]
    • occurs after a write to a table has succeeded and runs synchronously on the cluster that has performed the write [7]
    • only compacts files that haven’t been compacted previously [7]
    • only triggered for partitions or tables that have at least a certain number of small files [7]
    • enabled at the table or session level [7]
  • {feature}[Delta Lake 1.2] data skipping 
    • the engine takes advantage of minimum and maximum values metadata to provide faster queries
      • ⇐ requires the respective metadata
    •  its effectiveness depends on data's layout [7]
  • {feature} [Delta Lake 3.0] z-ordering (aka multi-dimensional clustering)
    • technique to collocate related information in the same set of files [7]
    • automatically used in data-skipping algorithms [7]
    • dramatically reduces the amount of data to read [7]
    • aims to produce evenly-balanced data files with respect to the number of tuples
      • ⇐ but not necessarily data size on disk [7]
        • ⇐ the two measures are most often correlated [7]
          • ⇐ but there can be situations when that is not the case, leading to skew in optimize task times [7]
    • via ZORDER BY clause 
      • applicable to columns with high cardinality commonly used in query predicates [7]
      • multiple columns can be specified as a comma-separated list
        • {warning} the effectiveness of the locality drops with each extra column [7]
        • {warning} using columns that do not have statistics collected on them is  ineffective and wastes resources [7] 
          • statistics collection can be configured on certain columns by reordering columns in the schema, or by increasing the number of columns to collect statistics on [7]
    • {characteristic} not idempotent
      • every time is executed, it will try to create a new clustering of data in all files in a partition [7]
        • it includes new and existing files that were part of previous Z-Ordering [7]
  • {feature} checkpointing
    • allows read queries to quickly reconstruct the current state of the table without reading too many files having incremental updates [7]
    • {default} each checkpoint is written as a single Parquet file [7]
    • {alternative} [Delta Lake 2.0] multi-part checkpointing
      • allows splitting the checkpoint into multiple Parquet files [7]
        • ⇒ parallelizes and speeds up writing the checkpoint [7]
  • {feature} [Delta Lake 3.0] log compaction
    • reduces the need for frequent checkpoints and minimize the latency spikes caused by them [7]
    • allows new log compaction files with the format <x>.<y>.compact.json
      • the files contain the aggregated actions for commit range [x, y] 
    • read support is enabled by default [7]
    • write support not available yet [7]
      •  will be added in a future version of Delta [7]
  • {feature} Delta Lake transaction log (aka DeltaLog)
    • a sequential record of every transaction performed on a table since its creation [15]
    • central to DL functionality because it is at the core of its important features [15]
      • incl. ACID transactions, scalable metadata handling, time travel
    • {goal} enable multiple readers and writers to operate on a given version of a dataset file simultaneously [15]
    • {goal} provide additional information,  to the execution engine for more performant operations [15]
      • e.g. data skipping indexes
    • always shows the user a consistent view of the data
      • ⇒ serves as a single source of truth
    • for each write operation, the data file is always written first, and only when that operation succeeds, a transaction log file is added to the _delta_log folder
      • ⇐ the transaction is only considered complete when the transaction log entry is written successfully [15] 
  • {feature} [Lakehouse] table maintenance 
    • manages efficiently delta tables and keeps them always ready for analytics [8]
    • performs ad-hoc table maintenance using contextual right-click actions in a delta table within the Lakehouse explorer [8]
    • applies bin-compaction, V-Order, and unreferenced old files cleanup [8]
      • via Lakehouse >> Tables >> (select table) >>Maintenance >> (select options) >> Run now
        •  a Spark maintenance job is submitted for execution [8]
          • uses the user identity and table privileges
            • consumes Fabric capacity of the workspace/user that submitted the job
          • {constraint} only one maintenance job on a table can be run at any time
            • if there's a running job on the table, the new one is rejected [8]
            • jobs on different tables can execute in parallel [8]
          • running jobs are available in the Monitoring Hub 
            • see "TableMaintenance" text within the activity name column [8]
    • {best practice} properly designing the table physical structure based on the ingestion frequency and expected read patterns is likely more important than running the optimization commands [3]

Acronyms:
ACID - atomicity, consistency, isolation, durability
API - Application Programming Interface
CRUD - create, read, update, and delete
DL - Delta lake
MF - Microsoft Fabric
JSON - JavaScript Object Notation

Resources:
[1] Microsoft Learn (2023) Data objects in the Databricks lakehouse (link)
[2] Microsoft Learn (2023) Implement medallion lakehouse architecture in Microsoft Fabric (link)
[3] Microsoft Learn (2023) Delta Lake table optimization and V-Order (link)
[4] Microsoft Learn (2023) Work with Delta Lake tables in Microsoft Fabric (link)
[5] Delta Lake (2023) Quickstart (link)
[6] Delta Lake (2023) Delta Lake Small File Compaction with OPTIMIZE (link)
[7] Delta Lake (2023) Optimizations (link)
[8] Microsoft Learn (2023) 
Use table maintenance feature to manage delta tables in Fabric (link)
[9] Microsoft Fabric Updates Blog (2023) 
Announcing: Automatic Data Compaction for Fabric Warehouse, by Kevin Conan (link)
[10] 
Microsoft Learn (2023) Delta Lake table format interoperability (link)
[11] Josep Aguilar-Saborit et al (2020) POLARIS: The Distributed SQL Engine in Azure Synapse, Proceedings of the VLDB Endowment PVLDB 13(12)  (link)
[12] Josep Aguilar-Saborit et al (2024), Extending Polaris to Support Transactions (link)
[13] Michael Armbrust et al (2020) Delta Lake: High-Performance ACID Table Storage over Cloud Object Stores, Proceedings of the VLDB Endowment13(12) (link)
[14]
 Jesús Camacho-Rodríguez et al (2023) LST-Bench: Benchmarking Log-Structured Tables in the Cloud, Proceedings of the ACM on Management of Data (2024), 2 (1) (link)
[15] Bennie Haelen & Dan Davis (2024) Delta Lake: Up and Running Modern Data Lakehouse Architectures with Delta Lake, 2024

No comments:

Related Posts Plugin for WordPress, Blogger...

About Me

My photo
Koeln, NRW, Germany
IT Professional with more than 24 years experience in IT in the area of full life-cycle of Web/Desktop/Database Applications Development, Software Engineering, Consultancy, Data Management, Data Quality, Data Migrations, Reporting, ERP implementations & support, Team/Project/IT Management, etc.