05 February 2024

💎🏭SQL Reloaded: Microsoft Fabric's Delta Tables in Action - CRUD Operations II (Looking at the Files)

Delta Table's Files

In a previous post, I tested the CRUD operations against a Microsoft Fabric's delta table. To view the files behind, you'll need to navigate to the Lakehouse, right click on the Assets table and pick 'View files' from the floating menu:

Assets' delta table files in Lakehouse

One can navigate within the folder structure, look at the transaction log files under _delta_log folder, though there seems to be no way to download the respective files. For this purpose, alternatively, you can use the Azure Storage Explorer (downloadable here). After installing the Explorer use the 'ADLSGen2 container or directory' option to connect to the folder where the Assets table was created (see [1]).

Assets' delta table files in Azure Storage Explorer

And, here's the content of the _delta_log from where the files can be downloaded:

Assets' log files in Azure Storage Explorer

The _temporary folder is empty.

It seems that for each cell with a DML/DDL command a log file was created. Therefore, it should make no difference if you run the cells individually or as a whole, respectively whether the statements are included in only one cell. 

The first file holds the content generated when creating the table:

{"commitInfo":{"timestamp":1707006399711,"operation":"CREATE TABLE","operationParameters":{"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.4.1.5.3-110807746 Delta-Lake/2.4.0.8","txnId":"c3e6f24e-fa78-4941-a29d-28aac3a25926"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"a9a45d67-bcf9-4bef-8f48-c6b8f7a64f58","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[
{\"name\":\"Id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},
{\"name\":\"CreationDate\",\"type\":\"timestamp\",\"nullable\":false,\"metadata\":{}},
{\"name\":\"Vendor\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},
{\"name\":\"Asset\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},
{\"name\":\"Model\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},
{\"name\":\"Owner\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},
{\"name\":\"Tag\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},
{\"name\":\"Quantity\",\"type\":\"decimal(13,2)\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1707006399640}}

The second file holds the content generated when inserting the first line in the table:

{"commitInfo":{"timestamp":1707006405806,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"3713"},"tags":{"VORDER":"true"},"engineInfo":"Apache-Spark/3.4.1.5.3-110807746 Delta-Lake/2.4.0.8","txnId":"ac51fa78-8384-4e53-bb02-40d65632a218"}}
{"add":{"path":"part-00000-b50df7ae-66d8-41fd-a54d-6238e053d269-c000.snappy.parquet","partitionValues":{},"size":3713,"modificationTime":1707006405683,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":
{\"Id\":1,\"CreationDate\":\"2024-03-01T00:00:00.000Z\",\"Vendor\":\"IBM\",\"Asset\":\"Laptop 1\",\"Model\":\"Model 1\",\"Owner\":\"Owner 1\",\"Tag\":\"XX0001\",\"Quantity\":1.00},\"maxValues\":{\"Id\":1,\"CreationDate\":\"2024-03-01T00:00:00.000Z\",\"Vendor\":\"IBM\",\"Asset\":\"Laptop 1\",\"Model\":\"Model 1\",\"Owner\":\"Owner 1\",\"Tag\":\"XX0001\",\"Quantity\":1.00},\"nullCount\":{\"Id\":0,\"CreationDate\":0,\"Vendor\":0,\"Asset\":0,\"Model\":0,\"Owner\":0,\"Tag\":0,\"Quantity\":0}}","tags":{"VORDER":"true"}}}

The third file holds the content for inserting the multiple lines, while the fourth file contains the content for the UPDATE statement:

{"commitInfo":{"timestamp":1707006417538,"operation":"UPDATE","operationParameters":{"predicate":"[\"(Id#1952 = 6)\"]"},"readVersion":2,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numRemovedBytes":"3870","numCopiedRows":"4","numAddedChangeFiles":"0","executionTimeMs":"2297","scanTimeMs":"1613","numAddedFiles":"1","numUpdatedRows":"1","numAddedBytes":"3870","rewriteTimeMs":"681"},"tags":{"VORDER":"true"},"engineInfo":"Apache-Spark/3.4.1.5.3-110807746 Delta-Lake/2.4.0.8","txnId":"70f0f718-331a-48fc-8419-9f80599ca889"}}
{"remove":{"path":"part-00000-0bd7b054-1bde-44cd-a852-80332d98ae8b-c000.snappy.parquet","deletionTimestamp":1707006417533,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":3870,"tags":{"VORDER":"true"}}}
{"add":{"path":"part-00000-b5e4e62f-c938-4db9-a83a-d85850f310c0-c000.snappy.parquet","partitionValues":{},"size":3870,"modificationTime":1707006417436,"dataChange":true,"stats":"{\"numRecords\":5,\"minValues\":{\"Id\":2,\"CreationDate\":\"2024-01-01T00:00:00.000Z\",\"Vendor\":\"Dell\",\"Asset\":\"Laptop 2\",\"Model\":\"Model 2\",\"Owner\":\"Owner 2\",\"Tag\":\"DD0001\",\"Quantity\":1.00},\"maxValues\":{\"Id\":6,\"CreationDate\":\"2024-02-01T00:00:00.000Z\",\"Vendor\":\"Microsoft\",\"Asset\":\"Laptop 4\",\"Model\":\"Model 4\",\"Owner\":\"Owner 4\",\"Tag\":\"XX0001\",\"Quantity\":1.00},\"nullCount\":{\"Id\":0,\"CreationDate\":0,\"Vendor\":0,\"Asset\":0,\"Model\":0,\"Owner\":0,\"Tag\":0,\"Quantity\":0}}","tags":{"VORDER":"true"}}}

The fifth file, holds the content related to the DELETE statement:

{"commitInfo":{"timestamp":1707006422147,"operation":"DELETE","operationParameters":{"predicate":"[\"(Id#2665 = 2)\"]"},"readVersion":3,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numRemovedBytes":"3870","numCopiedRows":"4","numAddedChangeFiles":"0","executionTimeMs":"1425","numDeletedRows":"1","scanTimeMs":"908","numAddedFiles":"1","numAddedBytes":"3837","rewriteTimeMs":"517"},"tags":{"VORDER":"true"},"engineInfo":"Apache-Spark/3.4.1.5.3-110807746 Delta-Lake/2.4.0.8","txnId":"1f61449f-6c4a-40ce-80ed-6ff1a9a834e9"}}
{"remove":{"path":"part-00000-b5e4e62f-c938-4db9-a83a-d85850f310c0-c000.snappy.parquet","deletionTimestamp":1707006422134,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":3870,"tags":{"VORDER":"true"}}}
{"add":{"path":"part-00000-bee2e3a5-7def-4466-94ff-ccd86aa37b8c-c000.snappy.parquet","partitionValues":{},"size":3837,"modificationTime":1707006422055,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"Id\":3,\"CreationDate\":\"2024-01-01T00:00:00.000Z\",\"Vendor\":\"Dell\",\"Asset\":\"Laptop 3\",\"Model\":\"Model 3\",\"Owner\":\"Owner 2\",\"Tag\":\"DD0001\",\"Quantity\":1.00},\"maxValues\":{\"Id\":6,\"CreationDate\":\"2024-02-01T00:00:00.000Z\",\"Vendor\":\"Microsoft\",\"Asset\":\"Laptop 4\",\"Model\":\"Model 4\",\"Owner\":\"Owner 4\",\"Tag\":\"WX0001\",\"Quantity\":1.00},\"nullCount\":{\"Id\":0,\"CreationDate\":0,\"Vendor\":0,\"Asset\":0,\"Model\":0,\"Owner\":0,\"Tag\":0,\"Quantity\":0}}","tags":{"VORDER":"true"}}}

Before looking at each file, let's note the engineInfo's information that denotes the Spark, respectively the Delta Lake versions: "Apache-Spark/3.4.1.5.3-110807746 Delta-Lake/2.4.0.8". It's important to check these versions against the documentation when troubleshooting!

In each file, besides the "timestamp", observe the values of "operation", "operationParameters", "operationMetrics", "dataChange" and "VORDER". 

Operation indicate the DML or DDL statements run. As per my understanding dataChange and VORDER are set to true. 

The DESCRIBE HISTORY Command

You can retrieve a history of the actions performed on a table for the past 30 days via the DESCRIBE command, which can be entered in a notebook's cell:

-- describe table's history (30 days of data)
DESCRIBE HISTORY Assets;

Attributes' definition can be found in [3], see 'History schema' section. For big tables, it might be a good idea to limit command's output only to several records via the LIMIT keyword:

-- describe table's history (30 days of data, last 5 records)
DESCRIBE HISTORY Assets LIMIT 5;

Table Maintenance

Running multiple change commands on a delta table will result in many such small files, which impacts the read access to data and metadata. The files can be compacted via the table maintenance (see also the notes on delta tables). The OPTIMIZE command allows to consolidate multiple small Parquet files into larger files:

-- compacting the files
OPTIMIZE Assets;

Alternatively, you can do the same from the Lakehouse, navigate to the Assets table, right click on it and from the floating menu select Maintenance and run the job with the provided (see also [4]):

Run maintenance commands for a table

Running the command generated a new file:

{"commitInfo":{"timestamp":1707150591870,"operation":"OPTIMIZE","operationParameters":{"predicate":"[]","zOrderBy":"[]","auto":false},"readVersion":4,"isolationLevel":"SnapshotIsolation","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"2","numRemovedBytes":"7550","p25FileSize":"3879","numDeletionVectorsRemoved":"0","minFileSize":"3879","numAddedFiles":"1","maxFileSize":"3879","p75FileSize":"3879","p50FileSize":"3879","numAddedBytes":"3879"},"tags":{"VORDER":"true"},"engineInfo":"Apache-Spark/3.4.1.5.3-110807746 Delta-Lake/2.4.0.8","txnId":"b6466509-9f1a-467d-ad99-a0e95ed694ec"}}
{"add":{"path":"part-00000-453e9218-fe59-4138-a7dc-0e71c94c07d2-c000.snappy.parquet","partitionValues":{},"size":3879,"modificationTime":1707150591746,"dataChange":false,"stats":"{\"numRecords\":5,\"minValues\":{\"Id\":1,\"CreationDate\":\"2024-01-01T00:00:00.000Z\",\"Vendor\":\"Dell\",\"Asset\":\"Laptop 1\",\"Model\":\"Model 1\",\"Owner\":\"Owner 1\",\"Tag\":\"DD0001\",\"Quantity\":1.00},\"maxValues\":{\"Id\":6,\"CreationDate\":\"2024-03-01T00:00:00.000Z\",\"Vendor\":\"Microsoft\",\"Asset\":\"Laptop 4\",\"Model\":\"Model 4\",\"Owner\":\"Owner 4\",\"Tag\":\"XX0001\",\"Quantity\":1.00},\"nullCount\":{\"Id\":0,\"CreationDate\":0,\"Vendor\":0,\"Asset\":0,\"Model\":0,\"Owner\":0,\"Tag\":0,\"Quantity\":0}}","tags":{"VORDER":"true"}}}
{"remove":{"path":"part-00000-cbaaf2dc-b8d6-44bc-a4e1-97db0ba96074-c000.snappy.parquet","deletionTimestamp":1707150580358,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{},"size":3713,"tags":{"VORDER":"true"}}}
{"remove":{"path":"part-00000-6714ec80-1bd8-41f3-b8b7-5197025ec08f-c000.snappy.parquet","deletionTimestamp":1707150580358,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{},"size":3837,"tags":{"VORDER":"true"}}}

Happy coding!

Resources:
[1] Microsoft Azure (2023) Azure Storage Explorer (link)
[2] Sam Debruyn (2023) Exploring OneLake with Microsoft Azure Storage Explorer (link)
[3] Delta Lake (2023) Table utility commands (link)
[4] Microsoft Learn (2023) Use table maintenance feature to manage delta tables in Fabric (link)

04 February 2024

💎🏭SQL Reloaded: Microsoft Fabric's Delta Tables in Action - CRUD Operations I (Basic Operations)

When building a data-related solution, the most basic functionality for a SQL developer is the CRUD (create, read, update, delete) operations. That's why I decided to test the feature on Lakehouse delta tables, for a first demonstrative post on Microsoft Fabric. 

Unfortunately, at least for the moment, the SQL Endpoint allows only read access to the delta tables, which frankly for an SQL developer is an important limitation. There's however the possibility of running SQL over Spark SQL, which supports a basic dialect of SQL (similar to HiveSQL, though it might lack some features). To run the code, you'll need to create a notebook and you can set the language to Spark SQL for the whole notebook. Make sure, that you have a Lakehouse assigned in the notebook. Of course, you'll need access to Microsoft Fabric.

Create a cell for each of the following blocks of code. To make sure that the code works, it might be a good idea to run the cells individually!

-- drop the test table (if exists already)
DROP TABLE IF EXISTS Assets;

--create the test table
CREATE TABLE Assets(
 Id int NOT NULL,
 CreationDate timestamp NOT NULL,
 Vendor string NOT NULL,
 Asset string NOT NULL,
 Model string NOT NULL,
 Owner string NOT NULL,
 Tag string NOT NULL,
 Quantity decimal(13, 2) NOT NULL
) 
USING DELTA;

-- insert test data
INSERT INTO Assets
VALUES (1, '2024-03-01T00:00:00Z', 'IBM', 'Laptop 1','Model 1','Owner 1','XX0001','1');

-- insert more test records 
INSERT INTO Assets
VALUES ('2', '2024-02-01T00:00:00Z','IBM','Laptop 2','Model 2','Owner 2','XX0001','1')
, ('3', '2024-02-01T00:00:00Z','Microsoft','Laptop 3','Model 3','Owner 2','WX0001','1')
, ('4', '2024-01-01T00:00:00Z','Microsoft','Laptop 3','Model 3','Owner 2','WX0001','1')
, ('5', '2024-01-15T00:00:00Z','Dell','Laptop 4','Model 4','Owner 3','DD0001','1')
, ('6', '2024-01-16T00:00:00Z','Dell','Laptop 4','Model 4','Owner 4','DD0001','1');

-- retrieving all the records
SELECT *
FROM Assets
ORDER BY Vendor;

-- updating a record
UPDATE Assets 
SET CreationDate = '2024-01-19T00:00:00Z'
WHERE Id = 6;

-- deleting a record
DELETE FROM Assets
WHERE Id = 2;

-- reviewing the changes
SELECT *
FROM Assets
ORDER BY Vendor;

Comments:
[1] There seems to be no way to provide a schema (see the SQL syntax for Spark), however the dbo schema seems to be used in the background (see table metadata post). Calling a table with the schema in SQL Spark results in an error:
"[TABLE_OR_VIEW_NOT_FOUND] The table or view `<schema>`.`<object_name>` cannot be found. Verify the spelling and correctness of the schema and catalog."

[2] The data types are different from the ones in SQL Server (e.g. timestamp for dates, string instead of varchars). Dates and decimals support precision, while strings translate to a varchar with 8000 characters (see table metadata post).

[3] Implicit conversions seem to occur when the format is correct. Otherwise, an explicit conversion is needed.

[4] Consider using semicolons at the end of each statement. That's mandatory when running multiple statements within the same cell. Otherwise, an error results.

[5] It would be interesting to test what's the performance when doing CRUD operations on large datasets. 

[6] One can use the LIMIT x clause as alternative for TOP x from T-SQL:

-- reviewing the last 3 records
SELECT *
FROM Assets
ORDER BY CreationDate
LIMIT 3;

[7] SELECT FROM VALUES seems to work as well, though the data types must be the same (no implicit conversions occur):

SELECT *
FROM (
VALUES ('1', '2024-03-01T00:00:00Z', 'IBM', 'Laptop 1','Model 1','Owner 1','XX0001','1')
, ('2', '2024-02-01T00:00:00Z','IBM','Laptop 2','Model 2','Owner 2','XX0001','1')
) DAT(Id, CreationDate, Vendor, Asset, Model, Owner, Tag, Quantity);

Therefore, when bringing the two INSERTs together, you'll need to change the first value from the first INSERT for the ID from numeric to string.

You can use the DESCRIBE QUERY command to troubleshoot the differences:

 -- retrieve query's output metadata
 DESCRIBE QUERY 
 VALUES (1, '2024-03-01T00:00:00Z', 'IBM','Laptop 1','Model 1','Owner 1','XX0001','1') 
 AS DAT(Id, CreationDate, Vendor, Asset, Model, Owner, Tag, Quantity)

Output:
col_name data_type comment
Id int NULL
CreationDate string NULL
Vendor string NULL
Asset string NULL
Model string NULL
Owner string NULL
Tag string NULL
Quantity string NULL

Happy coding!

Resources:
[1] Microsoft Learn (2023) How to use Microsoft Fabric notebooks (link)
[2] Apache Spark (2023) SQL syntax (link)

01 February 2024

🏭🗒️Microsoft Fabric: Delta Tables (Notes)

Disclaimer: This is work in progress intended to consolidate information from various sources. 
Last updated: 18-Apr-2024

Delta Table Structure

[Delta Lake] delta table
  • {definition} table that stores data as a directory of files in the delta lake (DL) and registers table metadata to the metastore within a catalog and schema [1]
    • ⇐ represents a schema abstraction over data files that are stored in Delta format [2]
    • for each table, the lakehouse stores 
      • a folder containing its Parquet data files
      • a _delta_Log folder in which transaction details are logged in JSON format
    • all Microsoft Fabric (MF) experiences generate and consume delta tables [10]
      • provides interoperability and a unified product experience [10]
      •  some experiences can only write to delta tables, while others can read from it [10]
    • ⇒ all data files for a table in a database are grouped under a common data path [12]
  • {feature} support ACID transactions
    • modifications made to a table are logged in its transaction log
      • enforces serializable isolation for concurrent operations [2]
      • the logged transactions can be used to retrieve the history of changes made [2]
      • each transaction creates new Parquet files
      • deleting a row, doesn't physically delete in the parquet file [9]
        • {feature} [DL] Delete Vectors 
          • are read as part of the table and indicate which rows to ignore [9]
          • make it faster to perform deletions because there's no need to re-write the existing parquet files [9]
          • many deleted rows take more resources to read that file [9]
  • {feature} support DML
    • only tables created while the future is available will have all DML published [..]
  • {feature} support data versioning 
    • multiple versions of each table row can be retrieved from the transaction log [2]
  • {feature} support time travel
  • {feature} support for batch and streaming data
    • delta tables can be used as both sources and destinations for streaming data [2]
  • {feature} support standard formats and interoperability
  • table consumption
    • [lakehouse] SQL Endpoint 
      • provides a read-only experience [4]
      • can be used to query only delta tables via T-SQL [4]
        • other file formats can not be queried using the SQL endpoint [4]
          • ⇐  the files need to be converted to the delta format [4]
      • {limitation} doesn't support the full T-SQL surface area of a transactional data warehouse [4]
  • [Spark] managed table
    •  the table definition in the metastore and the underlying data files are both managed by the Spark runtime for the Fabric lakehouse [4] 
  • [Spark] external table
    • the relational table definition in the metastore is mapped to an alternative file storage location [4]
      • the Parquet data files and JSON log files for the table are stored in the Files storage location [4]
  • [Spark] allows greater control over the creation and management of delta tables [4]
  • {operation} create (aka create delta table)
    • defines the table in the metastore for the lakehouse
    • its data is stored in the underlying Parquet files for the table
      • the details of mapping the table definition in the metastore to the underlying files are abstracted [4]
      • ⇐ internally, there is also a log file that keeps track of which parquet files, when combined, make up the data that is in the table [4]
        • the log files are internal and cannot be used directly by other engines [4]
          •  ⇐ DF publishes automatically the right log files so that other engines can directly access the right parquet files [..]
        • after every 10 transactions, a new log file (aka checkpoint) is created automatically and asynchronously [4]
          • ⇐ the file is a summary of all the previous log files [4]
          • when querying the table, the system needs to read the latest checkpoint and any log files that were created after*
            • ⇐ instead of having to read 105,120 log files, 10 or less files will be read*
        • the Delta Lake Logs are automatically so that other engines can directly access the right parquet files *
    • [Apache Spark in a lakehouse] allows greater control of the creation and management of delta tables [4]
      • via saving a dataframe 
        • {method}save a dataframe as a delta table [4]
          • ⇐ the easiest way to create a delta table 
          • creates both the table schema definition in the metastore and the data files in delta format [4]
        • {method} create the table definition [4]
          • creates the table schema in the metastore without saving any data files [4]
        • {method} save data in delta format without creating a table definition in the metastore [4]
          • {scenario} persist the results of data transformations performed in Spark in a file format over which  a table definition is overlayed later or processed directly by using the delta lake API [4]
          • modifications made to the data through the delta lake API or in an external table that is subsequently created on the folder will be recorded in the transaction logs [4]
          • {mode} overwrite
            • replace the contents of an existing folder with the data in a dataframe [4]
          • {mode} append
            • adds rows from a dataframe to an existing folder [4]
          • Fabric uses an automatic table discovery capability to create the corresponding table metadata in the metastore [4]
      • via DeltaTableBuilder API
        • enables to write Spark code to create a table based on specifications
      • via Spark SQL
        • [managed table] via CREATE TABLE <table_definition> USING DELTA
        • [external table] via CREATE TABLE <table_name> USING DELTA LOCATION
          • the schema of the table is determined by the Parquet files containing the data in the specified location
          • {scenario} create a table definition 
            • that references data that has already been saved in delta format [4]
            • based on a folder where data are ingested in the delta format [4]
  • {operation} update (aka update delta table)
  • {operation} delete (aka delete delta table)
    • [managed table] deleting the table deletes the underlying files from the Tables storage location for the lakehouse [4]
    • [external table] deleting a table from the lakehouse metastore does not delete the associated data files [4]
  • performance and storage cost efficiency tend to degrade over time
    • {reason} new data added to the table might skew the data [3]
    • {reason} batch and streaming data ingestion rates might bring in many small files
    • {reason} update and delete operations eventually create read overhead
      • parquet files are immutable by design, so Delta tables adds new parquet files which the changeset, further amplifying the issues imposed by the first two items [3]
    • {reason} no longer needed data files and log files available in the storage
  • {recommendation} don’t allow special characters in column names (incl. spaces)
  • {recommendation} make table and column names business-friendly
  • {feature} table partitions
    • {recommendation} use a partitioned folder structure wherever applicable
      • helps to improve data manageability and query performance
      • results in faster search for specific data entries thanks to partition pruning/elimination
      • {best practice} partition data to align with the query patterns [15]
      • it can dramatically speed up query performance, especially when combined with other performance optimizations [15]
  • {command} MERGE 
    • allows updating a delta table with advanced conditions [3]
      • from a source table, view or DataFrame [3]
    • {limitation} the current algorithm in the open source distribution of Delta Lake isn't fully optimized for handling unmodified rows [3]
    • [Microsoft Spark Delta] implemented a custom Low Shuffle Merge optimization
      • unmodified rows are excluded from an expensive shuffling operation that is needed for updating matched rows [3]
  • {command} OPTIMIZE
    • consolidates multiple small Parquet files into large file [8]
    • should be run whenever there are enough small files to justify running the compaction operation [6]
      • {best practice} run optimization after loading large tables [8]
    • benefits greatly from the ACID transactions supported [6]
    • [Delta Lake] predicate filtering
      • specify predicates to only compact a subset of your data [6]
      • {scenario} running a compaction job on the same dataset daily [6]
  • {command} VACUUM
    • removes old files no longer referenced by a Delta table log [8]
      • files need to be older than the retention threshold [8]
        • the default file retention threshold is seven days [8]
          • shorter retention period impacts Delta's time travel capabilities [8]
          • {default} historical data can't be delete within the retention threshold [2]
            • ⇐ that's to maintain the consistency in data [2]
        • {best practice} set a retention interval to at least seven days [8]
          • ⇐ because old snapshots and uncommitted files can still be in use by the concurrent table readers and writers [8]
        • important to optimize storage cost [8]
    • {warning} leaning up active files might lead to reader failures or even table corruption if the uncommitted files are removed [8]
  • {issue} small files
    • create large metadata transaction logs which cause planning time slowness [6]
    • result from
      • big repartition value [6]
      • if the dataset is partitioned on a high-cardinality column or if there are deeply nested partitions, then more small files will be created [6]
      • tables that are incrementally updated frequently [6]
    • files of sizes above 128 MB, and optimally close to 1 GB, improve compression and data distribution across the cluster nodes [8]
  • {feature} auto compaction 
    • combines small files within Delta table partitions to automatically reduce small file problems [7]
    • occurs after a write to a table has succeeded and runs synchronously on the cluster that has performed the write [7]
    • only compacts files that haven’t been compacted previously [7]
    • only triggered for partitions or tables that have at least a certain number of small files [7]
    • enabled at the table or session level [7]
  • {feature}[Delta Lake 1.2] data skipping 
    • the engine takes advantage of minimum and maximum values metadata to provide faster queries
      • ⇐ requires the respective metadata
    •  its effectiveness depends on data's layout [7]
  • {feature} [Delta Lake 3.0] Z-Ordering (aka multi-dimensional clustering)
    • technique to collocate related information in the same set of files [7]
    • automatically used in data-skipping algorithms [7]
    • dramatically reduces the amount of data to read [7]
    • aims to produce evenly-balanced data files with respect to the number of tuples
      • ⇐ but not necessarily data size on disk [7]
        • ⇐ the two measures are most often correlated [7]
          • ⇐ but there can be situations when that is not the case, leading to skew in optimize task times [7]
    • via ZORDER BY clause 
      • applicable to columns with high cardinality commonly used in query predicates [7]
      • multiple columns can be specified as a comma-separated list
        • {warning} the effectiveness of the locality drops with each extra column [7]
        • {warning} using columns that do not have statistics collected on them is  ineffective and wastes resources [7] 
          • statistics collection can be configured on certain columns by reordering columns in the schema, or by increasing the number of columns to collect statistics on [7]
    • {characteristic} not idempotent
      • every time is executed, it will try to create a new clustering of data in all files in a partition [7]
        • it includes new and existing files that were part of previous Z-Ordering [7]
  • {feature} checkpointing
    • allows read queries to quickly reconstruct the current state of the table without reading too many files having incremental updates [7]
    • {default} each checkpoint is written as a single Parquet file [7]
    • {alternative} [Delta Lake 2.0] multi-part checkpointing
      • allows splitting the checkpoint into multiple Parquet files [7]
        • ⇒ parallelizes and speeds up writing the checkpoint [7]
  • {feature} [Delta Lake 3.0] log compaction
    • reduces the need for frequent checkpoints and minimize the latency spikes caused by them [7]
    • allows new log compaction files with the format <x>.<y>.compact.json
      • the files contain the aggregated actions for commit range [x, y] 
    • read support is enabled by default [7]
    • write support not available yet [7]
      •  will be added in a future version of Delta [7]
  • Delta Lake transaction log (aka DeltaLog)
    • a sequential record of every transaction performed on a table since its creation [15]
    • central to DL functionality because it is at the core of its important features [15]
      • incl. ACID transactions, scalable metadata handling, time travel
    • {goal} enable multiple readers and writers to operate on a given version of a dataset file simultaneously [15]
    • {goal} provide additional information,  to the execution engine for more performant operations [15]
      • e.g. data skipping indexes
    • always shows the user a consistent view of the data
      • ⇒ serves as a single source of truth
    • for each write operation, the data file is always written first, and only when that operation succeeds, a transaction log file is added to the _delta_log folder
      • ⇐ the transaction is only considered complete when the transaction log entry is written successfully [15] 
  • {feature} [Lakehouse] table maintenance 
    • manages efficiently delta tables and keeps them always ready for analytics [8]
    • performs ad-hoc table maintenance using contextual right-click actions in a delta table within the Lakehouse explorer [8]
    • applies bin-compaction, V-Order, and unreferenced old files cleanup [8]
      • via Lakehouse >> Tables >> (select table) >>Maintenance >> (select options) >> Run now
        •  a Spark maintenance job is submitted for execution [8]
          • uses the user identity and table privileges
            • consumes Fabric capacity of the workspace/user that submitted the job
          • {constraint} only one maintenance job on a table can be run at any time
            • if there's a running job on the table, the new one is rejected [8]
            • jobs on different tables can execute in parallel [8]
          • running jobs are available in the Monitoring Hub 
            • see "TableMaintenance" text within the activity name column [8]
    • {best practice} properly designing the table physical structure based on the ingestion frequency and expected read patterns is likely more important than running the optimization commands [3]

Acronyms:
ACID - atomicity, consistency, isolation, durability
API - Application Programming Interface
CRUD - create, read, update, and delete
DL - Delta lake
MF - Microsoft Fabric
JSON - JavaScript Object Notation

Resources:
[1] Microsoft Learn (2023) Data objects in the Databricks lakehouse (link)
[2] Microsoft Learn (2023) Implement medallion lakehouse architecture in Microsoft Fabric (link)
[3] Microsoft Learn (2023) Delta Lake table optimization and V-Order (link)
[4] Microsoft Learn (2023) Work with Delta Lake tables in Microsoft Fabric (link)
[5] Delta Lake (2023) Quickstart (link)
[6] Delta Lake (2023) Delta Lake Small File Compaction with OPTIMIZE (link)
[7] Delta Lake (2023) Optimizations (link)
[8] Microsoft Learn (2023) 
Use table maintenance feature to manage delta tables in Fabric (link)
[9] Microsoft Fabric Updates Blog (2023) 
Announcing: Automatic Data Compaction for Fabric Warehouse, by Kevin Conan (link)
[10] 
Microsoft Learn (2023) Delta Lake table format interoperability (link)
[11] Josep Aguilar-Saborit et al (2020) POLARIS: The Distributed SQL Engine in Azure Synapse, Proceedings of the VLDB Endowment PVLDB 13(12)  (link)
[12] Josep Aguilar-Saborit et al (2024), Extending Polaris to Support Transactions (link)
[13] Michael Armbrust et al (2020) Delta Lake: High-Performance ACID Table Storage over Cloud Object Stores, Proceedings of the VLDB Endowment13(12) (link)
[14]
 Jesús Camacho-Rodríguez et al (2023) LST-Bench: Benchmarking Log-Structured Tables in the Cloud, Proceedings of the ACM on Management of Data (2024), 2 (1) (link)
[15] Bennie Haelen & Dan Davis (2024) Delta Lake: Up and Running Modern Data Lakehouse Architectures with Delta Lake, 2024

31 January 2024

🏭🗒️Microsoft Fabric: Parquet Format (Notes)

Disclaimer: This is work in progress intended to consolidate information from various sources. 

Last updated: 31-Jan-2024

Parquet format

  • {definition} open source, column-oriented data file format designed for efficient data storage and retrieval [1]
    • provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk [1]
    • designed to be a common interchange format for both batch and interactive workloads [1]
  • {characteristic} open source file format
    • similar to other columnar-storage file formats available in Hadoop [1]
      • e.g. RCFile, ORC
    • became an industry standard 
      •  {benefit} provides interoperability across multiple tools
  • {characteristic} language agnostic [1]
    • different programming languages can be used to manipulate the data
  • {characteristic} column-based format [1]
    • files are organized by column
      • ⇐ rather than by row
      • ⇒ saves storage space and speeds up analytics queries [1]
    •  reads only the needed columns 
      • ⇐ non-relevant data are skipped
      • ⇒ greatly minimizes the IO [1]
        • aggregation queries are less time-consuming compared to row-oriented databases [1]
    • {benefit} increased data throughput and performance [1]
      • ⇒ recommended for analytical workloads
  • {characteristic} highly efficient data compression/decompression [1]
    • supports flexible compression options and efficient encoding schemes [1]
      • data can be compressed by using one of the several codecs available [1]
        • ⇒ different data files can be compressed differently [1]
    •  reduced storage requirements [1]
      • by at least one-third on large datasets
      • ⇒ {benefit} saves on cloud storage space
    •  greatly improves scan and deserialization time [1]
      • ⇒ {benefit} reduces the processing costs
    • {downside} can be slower to write than row-based file formats
      • primarily because they contain metadata about the file contents 
      • though have fast read times
  • {characteristic} supports complex data types and advanced nested data structures [1]
    • implemented using the record-shredding and assembly algorithm
      • accommodates complex data structures that can be used to store the data [1]
      • optimized to work with complex data in bulk and features different ways for efficient data compression and encoding types [1]
        • the approach is best especially for those queries that need to read certain columns from a large table [1]
  • {characteristic} cloud-ready
    • works best with interactive and serverless technologies [1]
  • {characteristic} immutable
    • a file can't be update to modify the column name, reorder or drop columns [2]
      • ⇐ requires rewriting the whole file [2]
  • {characteristic} binary-based file
    • ⇒ not easily readable (by humans)
  • {characteristic} self-describing 
    •  contains metadata about schema and structure
    • {concept} row groups (aka segments) 
      • contains data from the same columns
        • {constraint} column names are case sensitive
    • {concept} file footer 
      • stores metadata statistics for each row group [2]
        • min/max statistics 
        • the number of rows
        • can be leveraged by data processing engines to run queries more efficiently [2]
          • ⇐ depending on the query, entire row group can be skipped [2]
    • {concept} file header
  •  large datasets can be split across multiple parquet files
    • ⇐ the structure can be flat or hierarchical 
    • managing multiple files has several challenges
    • the files can be used to define a table (aka parquet table)
      • ⇐ {constraint} the files must have the same definition
        • ⇐ schema enforcement must be coded manually [2]
      • {limitation} [Data Lake] no support for ACID transactions [2]
        • ⇒ easy to corrupt [2]
          • partially written files will break any subsequent read operations
            • the compute engine will try to read in the corrupt files and error out [2]
            • corrupted files must be manually identified and deleted manually to fix the issues [2]
      • {limitation} it's not easy to delete rows from it [2]
        • requires reading all the data, filtering out the data not needed, and then rewriting the entire table [2]
      • {limitation} doesn't support DML transactions [2]
      • {limitation} there is no change data feed [2]
      • {limitation} slow file listing [2]
        • small files require excessive I/O overhead
          • ideally the files should be between 64 MB and 1 GB
          • ideally the files should be compacted into larger files (aka small file compaction, bin-packing)
      • {limitation} expensive footer reads to gather statistics for file skipping [2]
        • fetching all the footers and building the file-level metadata for the entire table is slow [2]
          • ⇐ it requires a file-listing operation [2]
        • the effectiveness of data skipping depends on how many files can be can skipped when performing a query [2]
      • {limitation} doesn't support schema enforcement [2]
      • {limitation} doesn't support check constraints [2]
      • {limitation} doesn't support data versioning [2]
    • {concept} table partitioning
      • {definition} common optimization approach used to store the data of the same table in different directories, with partitioning column values encoded in the path of each partition directory [6]
      • {recommendation} avoid partitioning by columns with very high cardinality
    • {concept} bin-packing (aka compaction, bin-compaction)
      • aims to produce evenly-balanced data files with respect to their size on disk, 
        • ⇐ but not necessarily in respect to the number of tuples per file [7]
      • requires an algorithm that efficiently organizes the files into equal size containers [6]
      • {characteristic} idempotent
        •  if it is run twice on the same dataset, the second run has no effect [7]
  • {feature} [Microsoft Fabric] V-order
    • {definition} write time optimization to the parquet file format that enables lightning-fast reads under the MF compute engines [3]
    • applies special sorting, row group distribution, dictionary encoding and compression on parquet files [3]
      • requires less compute engines resources in to read it [3]
        • provides further cost efficiency and performance
          • has a 15% impact on average write times but provides up to 50% more compression [3]
    • {characteristic} open-source parquet format compliant
      • all parquet engines can read it as a regular parquet file [3]
      • ⇐ table properties and optimization commands can be used on control V-Order on its partitions [3]
      • compatible with other features [3]
    • applied at parquet file level [3]
    • enabled by default
  • {command} OPTIMIZE
    • merges all changes into bigger, consolidated parquet files (aka bin-compaction) [3]
    • [Spark] dynamically optimizes partitions while generating files with a default 128 MB size [5]
      • the target file size may be changed per workload requirements using configurations [5]
    • properly designing the table physical structure based on the ingestion frequency and expected read patterns is likely more important than running the optimization command [3]
    • running the compaction operation brings the data lake in an unusable state for readers [7]
    • {warning} manually compacting the files is inefficient and error prone [7]
      • no way to differentiate files that contain new data from files that contain existing data that was just compacted into new files [7]
  • [Delta Lake] when ZORDER and VORDER are used together, Apache Spark performs bin-compaction, ZORDER, VORDER sequentially [3]
Previous Post <<||>> Next Post

Acronyms:
ACID - atomicity, consistency, isolation, durability
IO - Input/Output
MF - Microsoft Fabric
ORC - Optimized Row Columnar
RCFile - Record Columnar File

Resources:
[1] Databricks (2023) What is Parquet? (link)
[2] Data Lake (2023) Delta Lake vs. Parquet Comparison (link)
[3] Data Mozart (2023) Parquet file format – everything you need to know! (link)
[4] Microsoft Learn (2023) Query Parquet files using serverless SQL pool in Azure Synapse Analytics (link)
[5] Microsoft Learn (2023) Lakehouse tutorial: Prepare and transform data in the lakehouse (link)
[6] Apache Spark (2023) Spark SQL Guide (link)
[7] Delta Lake (2023) Delta Lake Small File Compaction with OPTIMIZE (link)
[8] Delta Lake (2023) Optimizations (link)

27 January 2024

Data Science: Back to the Future I (About Beginnings)

Data Science
Data Science Series

I've attended again, after several years, a webcast on performance improvement in SQL Server with Claudio Silva, “Writing T-SQL code for the engine, not for you”. The session was great and I really enjoyed it! I recommend it to any data(base) professional, even if some of the scenarios presented should be known already.

It's strange to see the same topics from 20-25 years ago reappearing over and over again despite the advancements made in the area of database engines. Each version of SQL Server brought something new in what concerns the performance, though without some good experience and understanding of the basic optimization and troubleshooting techniques there's little overall improvement for the average data professional in terms of writing and tuning queries!

Especially with the boom of Data Science topics, the volume of material on SQL increased considerably and many discover how easy is to write queries, even if the start might be challenging for some. Writing a query is easy indeed, though writing a performant query requires besides the language itself also some knowledge about the database engine and the various techniques used for troubleshooting and optimization. It's not about knowing in advance what the engine will do - the engine will often surprise you - but about knowing what techniques work, in what cases, which are their advantages and disadvantages, respectively on how they might impact the processing.

Making a parable with writing literature, it's not enough to speak a language; one needs more for becoming a writer, and there are so many levels of mastery! However, in database world even if creativity is welcomed, its role is considerable diminished by the constraints existing in the database engine, the problems to be solved, the time and the resources available. More important, one needs to understand some of the rules and know how to use the building blocks to solve problems and build reliable solutions.

The learning process for newbies focuses mainly on the language itself, while the exposure to complexity is kept to a minimum. For some learners the problems start when writing queries based on multiple tables -  what joins to use, in what order, how to structure the queries, what database objects to use for encapsulating the code, etc. Even if there are some guidelines and best practices, the learner must walk the path and experiment alone or in an organized setup.

In university courses the focus is on operators algebras, algorithms, on general database technologies and architectures without much hand on experience. All is too theoretical and abstract, which is acceptable for research purposes,  but not for the contact with the real world out there! Probably some labs offer exposure to real life scenarios, though what to cover first in the few hours scheduled for them?

This was the state of art when I started to learn SQL a quarter century ago, and besides the current tendency of cutting corners, the increased confidence from doing some tests, and the eagerness of shouting one’s shaking knowledge and more or less orthodox ideas on the various social networks, nothing seems to have changed! Something did change – the increased complexity of the problems to solve, and, considering the recent technological advances, one can afford now an AI learn buddy to write some code for us based on the information provided in the prompt.

This opens opportunities for learning and growth. AI can be used in the learning process by providing additional curricula for learners to dive deeper in some topics. Moreover, it can help us in time to address the challenges of the ever-increase complexity of the problems.

Related Posts Plugin for WordPress, Blogger...

About Me

My photo
Koeln, NRW, Germany
IT Professional with more than 24 years experience in IT in the area of full life-cycle of Web/Desktop/Database Applications Development, Software Engineering, Consultancy, Data Management, Data Quality, Data Migrations, Reporting, ERP implementations & support, Team/Project/IT Management, etc.