Showing posts with label Delta Lake. Show all posts
Showing posts with label Delta Lake. Show all posts

31 March 2024

Microsoft Fabric: Polaris (Notes)

Disclaimer: This is work in progress intended to consolidate information from various sources and may deviate from them. Please consult the sources for the exact content!
Last updated: 31-Mar-2024

Polaris

  • {definition} cloud-native analytical query engine over the data lake that follows a stateless micro-service architecture and is designed to execute queries in a scalable, dynamic and fault-tolerant way [1], [2]
    • the engine behind the serverless SQL pool [1] and Microsoft Fabric [2]
    • petabyte-scale execution [1]
    • highly-available micro-service architecture
      • data and query processing is packaged into units (aka tasks) [1]
        • can be readily moved across compute nodes and re-started at the task level [1]
    • can run directly over data in HDFS and in managed transactional stores [1]
  • [Azure Synapse] designed initially to execute read-only queries [1]
    • ⇐ the architecture behind serverless SQL pool
    • uses a completely new scale-out framework based on a distributed SQL Server query engine [1]
        • fully compatible with T-SQL
        • leverages SQL Server single-node runtime and QO [1]
  • [Microsoft Fabric] extended with a complete transaction manager that executes general CRUD transactions [2]
    • incl. updates, deletes and bulk loads [2]
    • based on [delta tables] and [delta lake]
      • the delta lake supports currently only transactions within one table [4]
    • ⇐ the architecture behind lakehouses
  • {goal} converge DWH and big data workloads [1]
    • the query engine scales-out for relational data and heterogeneous datasets stored in DFSs[1]
      • needs a clean abstraction over the underlying data type and format, capturing just what’s needed for efficiently parallelizing data processing
  • {goal} separate compute and state for cloud-native execution [1]
    • all services within a pool are stateless
      • data is stored durably in remote storage and is abstracted via data cells [1]
        • ⇐ data is naturally decoupled from compute nodes
    • the metadata and transactional log state is off-loaded to centralized services [[1]
    • multiple compute pools can transactionally access the same logical database [1]
  • {goal} cloud-first [2]
    • {benefit} leverages elasticity
    • transactions need to be resilient to node failures on dynamically changing topologies [2]
      •  ⇒ the storage engine disaggregates the source of truth for execution state (including data, metadata and transactional state) from compute nodes [2]
    • must ensure disaggregation of metadata and transactional state from compute nodes [2]
      • ⇐ to ensure that the life span of a transaction is resilient to changes in the backend compute topology [2]
        • ⇐ can change dynamically to take advantage of the elastic nature of the cloud or to handle node failures [2]
  • {goal} use optimized native columnar, immutable and open storage format [2]
    • uses delta format 
      • ⇐ optimized to handle read-heavy workloads with low contention [2] 
  • {goal} leverage the full potential of vectorized query processing for SQL [2]
  • {goal} support zero-copy data sharing with other services in the lake [2]
  • {goal} support read-heavy workloads with low contention [2]
  • {goal} support lineage-based features [2]
    • by taking advantage of delta table capabilities 
  • {goal} provide full SQL SI transactional support [2]
    • {benefit} all traditional DWH requirements are met [2]
      • incl. multi-table and multi-statement transactions [2]
        • ⇐ Polaris is the only system that supports this [2]
        • the design is optimized for analytics, specifically read- and insert-intensive workloads [2]
        • mixes of transactions are supported as well
  • {objective} no cross-component state sharing [2] 
    • {principle} encapsulation of state within each component to avoid sharing state across nodes [2]
    • SI and the isolation of state across components allows to execute transactions as if they were queries [2]
      • ⇒ makes read and write transactions indistinguishable [2]
        • ⇒ allows to fully leverage its optimized distributed execution framework [2]
  • {objective} support snapshot Isolation (SI) semantics [2]
    • implemented over versioned data
    • allows reads (R) and writes (W) to proceed concurrently over their own data snapshot 
      • R/W never conflict, and W/W of active transactions only conflict if they modify the same data [2] 
      • ⇐ all W transactions are serializable, leading to a serial schedule in increasing order of log record IDs [4]
        • follows from the commit protocol for write transactions, where only one transaction can write the record with each record ID [4]
      • ⇐  R transactions at the snapshot isolation level create no contention
        •  ⇒  any number of R transactions can run concurrently [4]
    • the immutable data representation in LSTs allows dealing with failures by simply discarding data and metadata files that represent uncommitted changes [2]
      • similar to how temporary tables are discarded during query processing failures [2]
  • {feature} resize live workloads [1]
    • scales resources with the workloads automatically
  • {feature} deliver predictable performance at scale [1]
    • scales computational resources based on workloads' needs
  • {feature} efficiently handle both relational and unstructured data [1]
  • {feature} flexible, fine-grained task monitoring
    • a task is the finest grain of execution 
  • {feature} global resource-aware scheduling
    • enables much better resource utilization and concurrency than traditional DWHs
      • capable of handling partial query restarts
      • maintains a global view of multiple queries
    • it is planned to build on this a global view with autonomous workload management features
  • {feature} multi-layered data caching model
    • leverages 
      • SQL Server buffer pools for cashing columnar data
      • SSD caching
    • the delta table and its log are are immutable, they can be safely cached on cluster nodes [4]
  • {feature} tracks data lineage natively
    • the transaction log can also be used to audit logging based on the commit Info records [4]
  • {feature} versioning
    • maintain all versions as data is updated [1]
  • {feature} time-travel
    • {benefit} allows users query point-in-time snapshots
    • {benefit)} allows to roll back erroneous updates to the data.
  • {feature} table cloning
    • {benefit} allows to create a point-in-time snapshot of the data based on its metadata
  • {concept} state 
    • allows to drive the end-to-end life cycle of a SQL statement with transactional guarantees and top tier performance [1]
    • comprised of 
      • cache
      • metadata
      • transaction logs
      • data
    • [on-premises architecture] all state is in the compute layer
      • relies on small, highly stable and homogenous clusters with dedicated hardware for Tier-1 performance
      • {downside} expensive
      • {downside} hard to maintain
      • {downside} limited scalability
        • cluster capacity is bounded by machine sizes because of the fixed topology
  • {concept}[stateful architecture
    • the state of inflight transactions is stored in the compute node and is not hardened into persistent storage until the transaction commits [1]
      • ⇒ when a compute node fails, the state of non-committed transactions is lost [1] 
        •  ⇒ the in-flight transactions fail as well [1]
    • often also couples metadata describing data distributions and mappings to compute nodes [1] 
      • ⇒ a compute node effectively owns responsibility for processing a subset of the data [1] 
        • its ownership cannot be transferred without a cluster restart [1]
    • {downside} resilience to compute node failure and elastic assignment of data to compute are not possible [1]
  • {concept} [stateless compute architecture
    • requires that compute nodes hold no state information [1]
      • ⇒ all data, transactional logs and metadata need to be externalized [1]
    • {benefit} allows applications to 
      • partially restart the execution of queries in the event of compute node failures [1] 
      • adapt to online changes of the cluster topology without failing in-flight transactions [1] 
    • caches need to be as close to the compute as possible [1] 
      • since they can be lazily reconstructed from persisted data they don’t necessarily need to be decoupled from compute [1] 
        • the coupling of caches and compute does not make the architecture stateful [1] 
  • {concept} [cloud] decoupling of compute and storage
    • provides more flexible resource scaling
      • the 2 layers can scale up and down independently adapting to user needs [1] 
      • customers pay for the compute needed to query a working subset of the data [1] 
    • is not the same as decoupling compute and state [1] 
      • if any of the remaining state held in compute cannot be reconstructed from external services, then compute remains stateful [1] 
Acronyms:
ADLS - Azure Data Lake Storage
CRUD - Create, Read, Update, Delete
DCP - distributed computation platform 
DFS - Distributed File System
DWH - data warehouse
HDFS - Hadoop DFS
SI - Semantic Isolation 
SSD - Solid-State Drive

References:
[1] Josep Aguilar-Saborit et al (2020) POLARIS: The Distributed SQL Engine in Azure Synapse, Proceedings of the VLDB Endowment PVLDB 13(12)  (link)
[2] Josep Aguilar-Saborit et al (2024), Extending Polaris to Support Transactions (link)
[3] Advancing Analytics (2021) Azure Synapse Analytics - Polaris Whitepaper Deep-Dive (link)
[4] Michael Armbrust et al (2020) Delta Lake: High-Performance ACID Table Storage over Cloud Object Stores, Proceedings of the VLDB Endowment 13(12) (link)

17 February 2024

Business Intelligence: Microsoft Fabric's Notebooks

Business Intelligence Series
Business Intelligence Series 

When several technologies make their entrance in a data-related field like Data Warehousing, Data Analitics or Data Science, one is forced to understand how the respective technologies can be used or misused, respectively what's their place in the bigger picture. Microsoft Fabric introduces several important technologies that will change the way data are stored, processed and consumed. 

The first important technology is the notebook - a web document-like cell-based container for writing and executing code in a collaborative manner. The concept is not new, Jupyter notebooks have been around for almost a decade. In Microsof Fabric, notebooks support multiple languages, from which a default one applies to the whole notebook, while on cell level any of the supported languages can be used. 

One can execute a single cell, multiple cells or the entire notebook in a sequential manner, mix languages for the various operations - load, transform, save, and visualize data when needed. Notebooks can be parametrized and run via the homonymous activity in Data Factory pipelines, automating thus data processing. Probably more functionality is to come. 

Data engineers seems to have great flexibility, though usually flexibility implies constraints and/or mischiefs in other areas. I see for example in presentations the overuse of temporary data objects (mainly views) in Spark SQL as part of complex logic. That's acceptable during prototyping, though such code becomes a danger as soon the logic is deployed into production. Data objects should be created outside of the logic that uses them and should be treated as artifacts, with version control and proper documentation. It's maybe true that temporary objects reduce the volume of objects in the metastore, though is this the way to go?

Temporary objects tend to lead to wheel's reinvention or they get duplicated across multiple notebooks, which can easily create a maintenance nightmare. One needs to consider that the business logic changes a lot, the requirements and the data sources change, and on the long term, the cost of maintaining the code can easily overweight the benefits. 

Notebooks remind me of the beginnings of web programming when HTML was mixed up with client scripting languages like VB Script or Javascript, CSS, respectively server-side scripting languages. It was kind of a spaghetti code, modified repeatedly by multiple programmers, unendingly duplicated, and through a miracle it worked, until it stopped working unexpectedly in strangest situations. The strangest part was when after removing  commented code from a section made the code run again. 

The debugging of another person's code was a nightmare. Code developed by two people for similar purposes was looking unrecognizable different in terms of structure, programming techniques and layout. The technical debt was high, increasing in exponential manner. One was aware that the code needed refactoring, though there were more important things to do or no time allocated for it.

In the meantime the maturity of programming languages, frameworks, methodologies, best practices, and hopefully of programmers improved the overall quality of software (at least on average). Thinking of software from an Engineer's perspective improved the efficiency and effectiveness of a programmer's endeavor. The average programmer is able to write quality code, though there's a considerable minimum of "engineering" knowledge involved beside the mere knowledge of languages and tools. 

Notebooks are good up to a point, beyond which one needs to take a step back, restructure, move the code where it belongs, take a few more steps back and review the good practices and their application, disseminate the knowledge inside the team and use it in the next iterations, respectively refractor the code when needed! Hopefully, people learned from the mistakes of the past. 

Resources:
[1] Microsoft Learn (2023) How to use Microsoft Fabric notebooks (link

13 February 2024

Business Intelligence: A One Man Show III (The Microsoft Fabric)

Business Intelligence Series
Business Intelligence Series

Announced at the end of the last year, Microsoft Fabric (MF) become a reality for the data professional, even if there are still many gaps in the overall architecture and some things don't work as they should. The Delta Lake and the various data consumption experiences seem to bring more flexibility but also raise questions on how one can use them adequately in building solutions for Data Analytics and/or Data Science. 

Currently, as it happens with new technologies, data professionals seem to try to explore the functionality, see what's possible, what's missing, and that's a considerable effort as everybody is more or less on his own. The material released by Microsoft and other professionals should facilitate in theory this effort, though the considerable number of features and the effort needed to review them do the opposite. Some professionals do this as part of their jobs, and exploring the feature seems to be a full job in each area, while others, like myself, do it in their own time. 

There are organizations that demand from their employees to regularly actualize their knowledge in their field of activity, respectively explore how new technologies can be integrated in organization's architecture. Having a few hours or even a day a weak for this can go a long way! Occasionally, I could take 1-2 hours a week during the program and take maybe a few many more hours from my own time. Unfortunately, most of the significant progress I made in a certain area (SQL Server, Dynamics 365, Software Engineering, Power BI, and now MF) it was done in my own time, which became in time more and more challenging to do given the pace with which new features and technologies develop.

By comparison, it was relatively easy to locally install SQL Server in its various CTP or community versions, deploy one of the readily-available databases, and start learning. I'm still doing it, playing with a SQL Server 2022 instance whenever I find the time. Similarly, I can use Power BI and a few other tools, depending again on the time available to make progress. However, with MF things start slowly to get blurry. The 60 days of trial won't cut it anymore as there are so many things to learn - Spark SQL, PySpark, Delta Lake, KQL, Dataflows, etc. Probably, there will be ways for learning any of this standalone, though not together in an integrated manner. 

The complexity of the tools demands more time, a proper infrastructure and a good project to accommodate them. This doesn't mean that the complexity of the solutions need to increase as well! Azure Synapse allowed me to reuse many of the techniques I used in the past to build a modern Data Analytics solution, while in other areas I had to accommodate the new. The solution wasn't perfect (only time will tell), though it provided the minimum of what was needed. I expect the same to happen in Microsoft Fabric, even if the number of choices is bigger. 

There's a considerable difference between building a minimal viable solution and exploring, respectively harnessing MF's capabilities. The challenge for many organizations is to determine what that minimum is about, how to build that knowledge into the team, especially when starting from zero. 

Conversely, this doesn't mean that the skillset and effort can't be covered by one person. It might be more challenging though achievable if the foundation is there, respectively if certain conditions are met. This depends also on organization's expectations, infrastructure and other characteristics. A whole team is more likely to succeed than one person, but not certainty! 

Previous Post <<||>> Next Post

07 February 2024

SQL Reloaded: Microsoft Fabric's Delta Tables in Action - Views and other Data Objects

One reads in the training material that the SQL Endpoint provides a read-only experience [1], meaning that no data can be written back to the delta lake tables. Playing with the metadata available in Spark SQL via Notebooks and the SQL Endpoint (see post), I realized that there is more to the statement! Even if one can query via the SQL Endpoint only delta tables, this doesn't mean that one can't build a semantic model on top of it, much like one was able to do via the Serverless SQL pool in Azure Synapse.

In Spark one can create via SQL, PySpark and the other supported languages views and functions, though they will not be available to the SQL Endpoint! To use the data generated in the process, the respective data needs to be saved to delta tables. Conversely, one can still create views, functions and stored procedures via the SQL Endpoint though the objects won't be available in Spark SQL! 

This has important implications, though in this post let's focus on the syntax and create several objects for testing purposes in the two environments. I'll use the Assets delta table created in a previous post. The Spark SQL code should be run in a notebook (e.g. one cell per group of statements), while the code for the SQL Endpoint should be run in SQL Server Management Studio.

Views

/* test view's creation in the SQL Endpoint */

-- drop the test view 
DROP VIEW IF EXISTS dbo.vAssets_Microsoft2;
GO

-- create the test view
CREATE VIEW dbo.vAssets_Microsoft2
AS
--Microsoft assets
SELECT Id, CreationDate, Vendor, Asset, Model, Owner, Tag, Quantity
FROM dbo.Assets
WHERE Vendor = 'Microsoft';
GO

-- test the viwe
SELECT *
FROM dbo.vAssets_Microsoft2;

/* test view's creation in Spark SQL */

-- drop test view 
DROP VIEW IF EXISTS vAssets_Microsoft;

-- create test view
CREATE VIEW vAssets_Microsoft COMMENT 'Microsoft assets in scope (view)'
AS
SELECT Id, CreationDate, Vendor, Asset, Model, Owner, Tag, Quantity
FROM assets
WHERE Vendor = 'Microsoft';

-- review data
SELECT *
FROM vAssets_Microsoft;

Table-Valued Functions

/* test function's creation in the SQL Endpoint */

-- drop the test function 
DROP FUNCTION IF EXISTS dbo.fAssets_Microsoft2;
GO

-- create the test function
CREATE FUNCTION dbo.fAssets_Microsoft2(
    @Vendor nvarchar(max))
RETURNS TABLE 
AS 
RETURN (
    SELECT Id, CreationDate, Vendor, Asset, Model, Owner, Tag, Quantity
    FROM dbo.Assets
    WHERE Vendor = @Vendor
);
GO

-- test the function
SELECT *
FROM dbo.fAssets_Microsoft2('Microsoft');

SELECT *
FROM dbo.fAssets_Microsoft2('Dell');

Unfortunately, the Spark SQL code doesn't seem to work, its execution returning a PARSE_SYNTAX_ERROR error no matter how simple the code was (see also [2]).
 
/* test function's creation in Spark SQL */

-- drop test function 
DROP FUNCTION IF EXISTS fAssets_Microsoft;

-- create test function
CREATE FUNCTION fAssets_Microsoft(
    pVendor string)
RETURNS TABLE
AS 
RETURN 
    SELECT Id, CreationDate, Vendor, Asset, Model, Owner, Tag, Quantity
    FROM assets
    WHERE Vendor = pVendor;

-- review data
SELECT *
FROM fAssets_Microsoft('Microsoft');

Stored Procedure

Stored procedures aren't available in Spark SQL, though this doesn't mean that we can't test the code in the SQL Endpoint:

/* test procedure's creation in the SQL Endpoint */

-- drop the test procedure 
DROP PROCEDURE IF EXISTS dbo.spAssets_Microsoft2;
GO

-- create the test procedure
CREATE PROCEDURE dbo.spAssets_Microsoft2(
@Vendor nvarchar(max) = NULL)
AS
--Microsoft assets
SELECT Id, CreationDate, Vendor, Asset, Model, Owner, Tag, Quantity
FROM dbo.Assets
WHERE Vendor = IsNull(@Vendor, Vendor);
GO

-- test the procedure
EXEC dbo.spAssets_Microsoft2 'Microsoft';
EXEC dbo.spAssets_Microsoft2 'Dell';
EXEC dbo.spAssets_Microsoft2;

Notes:
1) I observed in documentation and some presentations that the common practice of prefixing data objects based on their type is seldom considered. I still find it useful when building solutions, even if object's type can be derived from the context and/or metadata. 
2) The examples were chosen to test the minimal functionality so that the differences between the two platforms are minimal - using the dbo schema and the GO command in the SQL Endpoint, COMMENT in Spark SQL. However, as soon specific functionality is used, extra code is needed to mitigate the differences.
3) The names between environments were kept different, just in case one needs to test objects' availability between platforms.

Happy coding!

Resources:
[1] Microsoft Learn (2023) Work with Delta Lake tables in Microsoft Fabric (link)
[2] Databricks (2023) CREATE FUNCTION (SQL and Python) (link)

01 February 2024

Microsoft Fabric: Delta Tables (Notes)

Disclaimer: This is work in progress intended to consolidate information from various sources. 
Last updated: 18-Apr-2024

Delta Table Structure

[Delta Lake] delta table
  • {definition} table that stores data as a directory of files in the delta lake (DL) and registers table metadata to the metastore within a catalog and schema [1]
    • ⇐ represents a schema abstraction over data files that are stored in Delta format [2]
    • for each table, the lakehouse stores 
      • a folder containing its Parquet data files
      • a _delta_Log folder in which transaction details are logged in JSON format
    • all Microsoft Fabric (MF) experiences generate and consume delta tables [10]
      • provides interoperability and a unified product experience [10]
      •  some experiences can only write to delta tables, while others can read from it [10]
    • ⇒ all data files for a table in a database are grouped under a common data path [12]
  • {feature} support ACID transactions
    • modifications made to a table are logged in its transaction log
      • enforces serializable isolation for concurrent operations [2]
      • the logged transactions can be used to retrieve the history of changes made [2]
      • each transaction creates new Parquet files
      • deleting a row, doesn't physically delete in the parquet file [9]
        • {feature} [DL] Delete Vectors 
          • are read as part of the table and indicate which rows to ignore [9]
          • make it faster to perform deletions because there's no need to re-write the existing parquet files [9]
          • many deleted rows take more resources to read that file [9]
  • {feature} support DML
    • only tables created while the future is available will have all DML published [..]
  • {feature} support data versioning 
    • multiple versions of each table row can be retrieved from the transaction log [2]
  • {feature} support time travel
  • {feature} support for batch and streaming data
    • delta tables can be used as both sources and destinations for streaming data [2]
  • {feature} support standard formats and interoperability
  • table consumption
    • [lakehouse] SQL Endpoint 
      • provides a read-only experience [4]
      • can be used to query only delta tables via T-SQL [4]
        • other file formats can not be queried using the SQL endpoint [4]
          • ⇐  the files need to be converted to the delta format [4]
      • {limitation} doesn't support the full T-SQL surface area of a transactional data warehouse [4]
  • [Spark] managed table
    •  the table definition in the metastore and the underlying data files are both managed by the Spark runtime for the Fabric lakehouse [4] 
  • [Spark] external table
    • the relational table definition in the metastore is mapped to an alternative file storage location [4]
      • the Parquet data files and JSON log files for the table are stored in the Files storage location [4]
  • [Spark] allows greater control over the creation and management of delta tables [4]
  • {operation} create (aka create delta table)
    • defines the table in the metastore for the lakehouse
    • its data is stored in the underlying Parquet files for the table
      • the details of mapping the table definition in the metastore to the underlying files are abstracted [4]
      • ⇐ internally, there is also a log file that keeps track of which parquet files, when combined, make up the data that is in the table [4]
        • the log files are internal and cannot be used directly by other engines [4]
          •  ⇐ DF publishes automatically the right log files so that other engines can directly access the right parquet files [..]
        • after every 10 transactions, a new log file (aka checkpoint) is created automatically and asynchronously [4]
          • ⇐ the file is a summary of all the previous log files [4]
          • when querying the table, the system needs to read the latest checkpoint and any log files that were created after*
            • ⇐ instead of having to read 105,120 log files, 10 or less files will be read*
        • the Delta Lake Logs are automatically so that other engines can directly access the right parquet files *
    • [Apache Spark in a lakehouse] allows greater control of the creation and management of delta tables [4]
      • via saving a dataframe 
        • {method}save a dataframe as a delta table [4]
          • ⇐ the easiest way to create a delta table 
          • creates both the table schema definition in the metastore and the data files in delta format [4]
        • {method} create the table definition [4]
          • creates the table schema in the metastore without saving any data files [4]
        • {method} save data in delta format without creating a table definition in the metastore [4]
          • {scenario} persist the results of data transformations performed in Spark in a file format over which  a table definition is overlayed later or processed directly by using the delta lake API [4]
          • modifications made to the data through the delta lake API or in an external table that is subsequently created on the folder will be recorded in the transaction logs [4]
          • {mode} overwrite
            • replace the contents of an existing folder with the data in a dataframe [4]
          • {mode} append
            • adds rows from a dataframe to an existing folder [4]
          • Fabric uses an automatic table discovery capability to create the corresponding table metadata in the metastore [4]
      • via DeltaTableBuilder API
        • enables to write Spark code to create a table based on specifications
      • via Spark SQL
        • [managed table] via CREATE TABLE <table_definition> USING DELTA
        • [external table] via CREATE TABLE <table_name> USING DELTA LOCATION
          • the schema of the table is determined by the Parquet files containing the data in the specified location
          • {scenario} create a table definition 
            • that references data that has already been saved in delta format [4]
            • based on a folder where data are ingested in the delta format [4]
  • {operation} update (aka update delta table)
  • {operation} delete (aka delete delta table)
    • [managed table] deleting the table deletes the underlying files from the Tables storage location for the lakehouse [4]
    • [external table] deleting a table from the lakehouse metastore does not delete the associated data files [4]
  • performance and storage cost efficiency tend to degrade over time
    • {reason} new data added to the table might skew the data [3]
    • {reason} batch and streaming data ingestion rates might bring in many small files
    • {reason} update and delete operations eventually create read overhead
      • parquet files are immutable by design, so Delta tables adds new parquet files which the changeset, further amplifying the issues imposed by the first two items [3]
    • {reason} no longer needed data files and log files available in the storage
  • {recommendation} don’t allow special characters in column names (incl. spaces)
  • {recommendation} make table and column names business-friendly
  • {feature} table partitions
    • {recommendation} use a partitioned folder structure wherever applicable
      • helps to improve data manageability and query performance
      • results in faster search for specific data entries thanks to partition pruning/elimination
      • {best practice} partition data to align with the query patterns [15]
      • it can dramatically speed up query performance, especially when combined with other performance optimizations [15]
  • {command} MERGE 
    • allows updating a delta table with advanced conditions [3]
      • from a source table, view or DataFrame [3]
    • {limitation} the current algorithm in the open source distribution of Delta Lake isn't fully optimized for handling unmodified rows [3]
    • [Microsoft Spark Delta] implemented a custom Low Shuffle Merge optimization
      • unmodified rows are excluded from an expensive shuffling operation that is needed for updating matched rows [3]
  • {command} OPTIMIZE
    • consolidates multiple small Parquet files into large file [8]
    • should be run whenever there are enough small files to justify running the compaction operation [6]
      • {best practice} run optimization after loading large tables [8]
    • benefits greatly from the ACID transactions supported [6]
    • [Delta Lake] predicate filtering
      • specify predicates to only compact a subset of your data [6]
      • {scenario} running a compaction job on the same dataset daily [6]
  • {command} VACUUM
    • removes old files no longer referenced by a Delta table log [8]
      • files need to be older than the retention threshold [8]
        • the default file retention threshold is seven days [8]
          • shorter retention period impacts Delta's time travel capabilities [8]
          • {default} historical data can't be delete within the retention threshold [2]
            • ⇐ that's to maintain the consistency in data [2]
        • {best practice} set a retention interval to at least seven days [8]
          • ⇐ because old snapshots and uncommitted files can still be in use by the concurrent table readers and writers [8]
        • important to optimize storage cost [8]
    • {warning} leaning up active files might lead to reader failures or even table corruption if the uncommitted files are removed [8]
  • {issue} small files
    • create large metadata transaction logs which cause planning time slowness [6]
    • result from
      • big repartition value [6]
      • if the dataset is partitioned on a high-cardinality column or if there are deeply nested partitions, then more small files will be created [6]
      • tables that are incrementally updated frequently [6]
    • files of sizes above 128 MB, and optimally close to 1 GB, improve compression and data distribution across the cluster nodes [8]
  • {feature} auto compaction 
    • combines small files within Delta table partitions to automatically reduce small file problems [7]
    • occurs after a write to a table has succeeded and runs synchronously on the cluster that has performed the write [7]
    • only compacts files that haven’t been compacted previously [7]
    • only triggered for partitions or tables that have at least a certain number of small files [7]
    • enabled at the table or session level [7]
  • {feature}[Delta Lake 1.2] data skipping 
    • the engine takes advantage of minimum and maximum values metadata to provide faster queries
      • ⇐ requires the respective metadata
    •  its effectiveness depends on data's layout [7]
  • {feature} [Delta Lake 3.0] Z-Ordering (aka multi-dimensional clustering)
    • technique to collocate related information in the same set of files [7]
    • automatically used in data-skipping algorithms [7]
    • dramatically reduces the amount of data to read [7]
    • aims to produce evenly-balanced data files with respect to the number of tuples
      • ⇐ but not necessarily data size on disk [7]
        • ⇐ the two measures are most often correlated [7]
          • ⇐ but there can be situations when that is not the case, leading to skew in optimize task times [7]
    • via ZORDER BY clause 
      • applicable to columns with high cardinality commonly used in query predicates [7]
      • multiple columns can be specified as a comma-separated list
        • {warning} the effectiveness of the locality drops with each extra column [7]
        • {warning} using columns that do not have statistics collected on them is  ineffective and wastes resources [7] 
          • statistics collection can be configured on certain columns by reordering columns in the schema, or by increasing the number of columns to collect statistics on [7]
    • {characteristic} not idempotent
      • every time is executed, it will try to create a new clustering of data in all files in a partition [7]
        • it includes new and existing files that were part of previous Z-Ordering [7]
  • {feature} checkpointing
    • allows read queries to quickly reconstruct the current state of the table without reading too many files having incremental updates [7]
    • {default} each checkpoint is written as a single Parquet file [7]
    • {alternative} [Delta Lake 2.0] multi-part checkpointing
      • allows splitting the checkpoint into multiple Parquet files [7]
        • ⇒ parallelizes and speeds up writing the checkpoint [7]
  • {feature} [Delta Lake 3.0] log compaction
    • reduces the need for frequent checkpoints and minimize the latency spikes caused by them [7]
    • allows new log compaction files with the format <x>.<y>.compact.json
      • the files contain the aggregated actions for commit range [x, y] 
    • read support is enabled by default [7]
    • write support not available yet [7]
      •  will be added in a future version of Delta [7]
  • Delta Lake transaction log (aka DeltaLog)
    • a sequential record of every transaction performed on a table since its creation [15]
    • central to DL functionality because it is at the core of its important features [15]
      • incl. ACID transactions, scalable metadata handling, time travel
    • {goal} enable multiple readers and writers to operate on a given version of a dataset file simultaneously [15]
    • {goal} provide additional information,  to the execution engine for more performant operations [15]
      • e.g. data skipping indexes
    • always shows the user a consistent view of the data
      • ⇒ serves as a single source of truth
    • for each write operation, the data file is always written first, and only when that operation succeeds, a transaction log file is added to the _delta_log folder
      • ⇐ the transaction is only considered complete when the transaction log entry is written successfully [15] 
  • {feature} [Lakehouse] table maintenance 
    • manages efficiently delta tables and keeps them always ready for analytics [8]
    • performs ad-hoc table maintenance using contextual right-click actions in a delta table within the Lakehouse explorer [8]
    • applies bin-compaction, V-Order, and unreferenced old files cleanup [8]
      • via Lakehouse >> Tables >> (select table) >>Maintenance >> (select options) >> Run now
        •  a Spark maintenance job is submitted for execution [8]
          • uses the user identity and table privileges
            • consumes Fabric capacity of the workspace/user that submitted the job
          • {constraint} only one maintenance job on a table can be run at any time
            • if there's a running job on the table, the new one is rejected [8]
            • jobs on different tables can execute in parallel [8]
          • running jobs are available in the Monitoring Hub 
            • see "TableMaintenance" text within the activity name column [8]
    • {best practice} properly designing the table physical structure based on the ingestion frequency and expected read patterns is likely more important than running the optimization commands [3]

Acronyms:
ACID - atomicity, consistency, isolation, durability
API - Application Programming Interface
CRUD - create, read, update, and delete
DL - Delta lake
MF - Microsoft Fabric
JSON - JavaScript Object Notation

Resources:
[1] Microsoft Learn (2023) Data objects in the Databricks lakehouse (link)
[2] Microsoft Learn (2023) Implement medallion lakehouse architecture in Microsoft Fabric (link)
[3] Microsoft Learn (2023) Delta Lake table optimization and V-Order (link)
[4] Microsoft Learn (2023) Work with Delta Lake tables in Microsoft Fabric (link)
[5] Delta Lake (2023) Quickstart (link)
[6] Delta Lake (2023) Delta Lake Small File Compaction with OPTIMIZE (link)
[7] Delta Lake (2023) Optimizations (link)
[8] Microsoft Learn (2023) 
Use table maintenance feature to manage delta tables in Fabric (link)
[9] Microsoft Fabric Updates Blog (2023) 
Announcing: Automatic Data Compaction for Fabric Warehouse, by Kevin Conan (link)
[10] 
Microsoft Learn (2023) Delta Lake table format interoperability (link)
[11] Josep Aguilar-Saborit et al (2020) POLARIS: The Distributed SQL Engine in Azure Synapse, Proceedings of the VLDB Endowment PVLDB 13(12)  (link)
[12] Josep Aguilar-Saborit et al (2024), Extending Polaris to Support Transactions (link)
[13] Michael Armbrust et al (2020) Delta Lake: High-Performance ACID Table Storage over Cloud Object Stores, Proceedings of the VLDB Endowment13(12) (link)
[14]
 Jesús Camacho-Rodríguez et al (2023) LST-Bench: Benchmarking Log-Structured Tables in the Cloud, Proceedings of the ACM on Management of Data (2024), 2 (1) (link)
[15] Bennie Haelen & Dan Davis (2024) Delta Lake: Up and Running Modern Data Lakehouse Architectures with Delta Lake, 2024

31 January 2024

Microsoft Fabric: Parquet Format (Notes)

Disclaimer: This is work in progress intended to consolidate information from various sources. 

Last updated: 31-Jan-2024

Parquet format

  • {definition} open source, column-oriented data file format designed for efficient data storage and retrieval [1]
    • provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk [1]
    • designed to be a common interchange format for both batch and interactive workloads [1]
  • {characteristic} open source file format
    • similar to other columnar-storage file formats available in Hadoop [1]
      • e.g. RCFile, ORC
    • became an industry standard 
      •  {benefit} provides interoperability across multiple tools
  • {characteristic} language agnostic [1]
    • different programming languages can be used to manipulate the data
  • {characteristic} column-based format [1]
    • files are organized by column
      • ⇐ rather than by row
      • ⇒ saves storage space and speeds up analytics queries [1]
    •  reads only the needed columns 
      • ⇐ non-relevant data are skipped
      • ⇒ greatly minimizes the IO [1]
        • aggregation queries are less time-consuming compared to row-oriented databases [1]
    • {benefit} increased data throughput and performance [1]
      • ⇒ recommended for analytical workloads
  • {characteristic} highly efficient data compression/decompression [1]
    • supports flexible compression options and efficient encoding schemes [1]
      • data can be compressed by using one of the several codecs available [1]
        • ⇒ different data files can be compressed differently [1]
    •  reduced storage requirements [1]
      • by at least one-third on large datasets
      • ⇒ {benefit} saves on cloud storage space
    •  greatly improves scan and deserialization time [1]
      • ⇒ {benefit} reduces the processing costs
    • {downside} can be slower to write than row-based file formats
      • primarily because they contain metadata about the file contents 
      • though have fast read times
  • {characteristic} supports complex data types and advanced nested data structures [1]
    • implemented using the record-shredding and assembly algorithm
      • accommodates complex data structures that can be used to store the data [1]
      • optimized to work with complex data in bulk and features different ways for efficient data compression and encoding types [1]
        • the approach is best especially for those queries that need to read certain columns from a large table [1]
  • {characteristic} cloud-ready
    • works best with interactive and serverless technologies [1]
  • {characteristic} immutable
    • a file can't be update to modify the column name, reorder or drop columns [2]
      • ⇐ requires rewriting the whole file [2]
  • {characteristic} binary-based file
    • ⇒ not easily readable (by humans)
  • {characteristic} self-describing 
    •  contains metadata about schema and structure
    • {concept} row groups (aka segments) 
      • contains data from the same columns
        • {constraint} column names are case sensitive
    • {concept} file footer 
      • stores metadata statistics for each row group [2]
        • min/max statistics 
        • the number of rows
        • can be leveraged by data processing engines to run queries more efficiently [2]
          • ⇐ depending on the query, entire row group can be skipped [2]
    • {concept} file header
  •  large datasets can be split across multiple parquet files
    • ⇐ the structure can be flat or hierarchical 
    • managing multiple files has several challenges
    • the files can be used to define a table (aka parquet table)
      • ⇐ {constraint} the files must have the same definition
        • ⇐ schema enforcement must be coded manually [2]
      • {limitation} [Data Lake] no support for ACID transactions [2]
        • ⇒ easy to corrupt [2]
          • partially written files will break any subsequent read operations
            • the compute engine will try to read in the corrupt files and error out [2]
            • corrupted files must be manually identified and deleted manually to fix the issues [2]
      • {limitation} it's not easy to delete rows from it [2]
        • requires reading all the data, filtering out the data not needed, and then rewriting the entire table [2]
      • {limitation} doesn't support DML transactions [2]
      • {limitation} there is no change data feed [2]
      • {limitation} slow file listing [2]
        • small files require excessive I/O overhead
          • ideally the files should be between 64 MB and 1 GB
          • ideally the files should be compacted into larger files (aka small file compaction, bin-packing)
      • {limitation} expensive footer reads to gather statistics for file skipping [2]
        • fetching all the footers and building the file-level metadata for the entire table is slow [2]
          • ⇐ it requires a file-listing operation [2]
        • the effectiveness of data skipping depends on how many files can be can skipped when performing a query [2]
      • {limitation} doesn't support schema enforcement [2]
      • {limitation} doesn't support check constraints [2]
      • {limitation} doesn't support data versioning [2]
    • {concept} table partitioning
      • {definition} common optimization approach used to store the data of the same table in different directories, with partitioning column values encoded in the path of each partition directory [6]
      • {recommendation} avoid partitioning by columns with very high cardinality
    • {concept} bin-packing (aka compaction, bin-compaction)
      • aims to produce evenly-balanced data files with respect to their size on disk, 
        • ⇐ but not necessarily in respect to the number of tuples per file [7]
      • requires an algorithm that efficiently organizes the files into equal size containers [6]
      • {characteristic} idempotent
        •  if it is run twice on the same dataset, the second run has no effect [7]
  • {feature} [Microsoft Fabric] V-order
    • {definition} write time optimization to the parquet file format that enables lightning-fast reads under the MF compute engines [3]
    • applies special sorting, row group distribution, dictionary encoding and compression on parquet files [3]
      • requires less compute engines resources in to read it [3]
        • provides further cost efficiency and performance
          • has a 15% impact on average write times but provides up to 50% more compression [3]
    • {characteristic} open-source parquet format compliant
      • all parquet engines can read it as a regular parquet file [3]
      • ⇐ table properties and optimization commands can be used on control V-Order on its partitions [3]
      • compatible with other features [3]
    • applied at parquet file level [3]
    • enabled by default
  • {command} OPTIMIZE
    • merges all changes into bigger, consolidated parquet files (aka bin-compaction) [3]
    • [Spark] dynamically optimizes partitions while generating files with a default 128 MB size [5]
      • the target file size may be changed per workload requirements using configurations [5]
    • properly designing the table physical structure based on the ingestion frequency and expected read patterns is likely more important than running the optimization command [3]
    • running the compaction operation brings the data lake in an unusable state for readers [7]
    • {warning} manually compacting the files is inefficient and error prone [7]
      • no way to differentiate files that contain new data from files that contain existing data that was just compacted into new files [7]
  • [Delta Lake] when ZORDER and VORDER are used together, Apache Spark performs bin-compaction, ZORDER, VORDER sequentially [3]
Previous Post <<||>> Next Post

Acronyms:
ACID - atomicity, consistency, isolation, durability
IO - Input/Output
MF - Microsoft Fabric
ORC - Optimized Row Columnar
RCFile - Record Columnar File

Resources:
[1] Databricks (2023) What is Parquet? (link)
[2] Data Lake (2023) Delta Lake vs. Parquet Comparison (link)
[3] Data Mozart (2023) Parquet file format – everything you need to know! (link)
[4] Microsoft Learn (2023) Query Parquet files using serverless SQL pool in Azure Synapse Analytics (link)
[5] Microsoft Learn (2023) Lakehouse tutorial: Prepare and transform data in the lakehouse (link)
[6] Apache Spark (2023) Spark SQL Guide (link)
[7] Delta Lake (2023) Delta Lake Small File Compaction with OPTIMIZE (link)
[8] Delta Lake (2023) Optimizations (link)

Related Posts Plugin for WordPress, Blogger...

About Me

My photo
IT Professional with more than 24 years experience in IT in the area of full life-cycle of Web/Desktop/Database Applications Development, Software Engineering, Consultancy, Data Management, Data Quality, Data Migrations, Reporting, ERP implementations & support, Team/Project/IT Management, etc.