Showing posts with label parquet. Show all posts
Showing posts with label parquet. Show all posts

01 February 2024

🏭🗒️Microsoft Fabric: Delta Tables [Notes]

Disclaimer: This is work in progress intended to consolidate information from various sources for learning purposes. For the latest information please consult the documentation (see the links below)! 

Last updated: 18-Apr-2024

Delta Table Structure

[Delta Lake] delta table
  • {definition} table that stores data as a directory of files in the delta lake (DL) and registers table metadata to the metastore within a catalog and schema [1]
    • ⇐ represents a schema abstraction over data files that are stored in Delta format [2]
    • for each table, the lakehouse stores 
      • a folder containing its Parquet data files
      • a _delta_Log folder in which transaction details are logged in JSON format
    • all Microsoft Fabric (MF) experiences generate and consume delta tables [10]
      • provides interoperability and a unified product experience [10]
      •  some experiences can only write to delta tables, while others can read from it [10]
    • ⇒ all data files for a table in a database are grouped under a common data path [12]
  • {feature} support ACID transactions
    • modifications made to a table are logged in its transaction log
      • enforces serializable isolation for concurrent operations [2]
      • the logged transactions can be used to retrieve the history of changes made [2]
      • each transaction creates new Parquet files
      • deleting a row, doesn't physically delete in the parquet file [9]
        • {feature} [DL] Delete Vectors 
          • are read as part of the table and indicate which rows to ignore [9]
          • make it faster to perform deletions because there's no need to re-write the existing parquet files [9]
          • many deleted rows take more resources to read that file [9]
  • {feature} support DML
    • only tables created while the future is available will have all DML published [..]
  • {feature} support data versioning 
    • multiple versions of each table row can be retrieved from the transaction log [2]
  • {feature} support time travel
  • {feature} support for batch and streaming data
    • delta tables can be used as both sources and destinations for streaming data [2]
  • {feature} support standard formats and interoperability
  • table consumption
    • [lakehouse] SQL Endpoint 
      • provides a read-only experience [4]
      • can be used to query only delta tables via T-SQL [4]
        • other file formats can not be queried using the SQL endpoint [4]
          • ⇐  the files need to be converted to the delta format [4]
      • {limitation} doesn't support the full T-SQL surface area of a transactional data warehouse [4]
  • [Spark] managed table
    •  the table definition in the metastore and the underlying data files are both managed by the Spark runtime for the Fabric lakehouse [4] 
  • [Spark] external table
    • the relational table definition in the metastore is mapped to an alternative file storage location [4]
      • the Parquet data files and JSON log files for the table are stored in the Files storage location [4]
  • [Spark] allows greater control over the creation and management of delta tables [4]
  • {operation} create (aka create delta table)
    • defines the table in the metastore for the lakehouse
    • its data is stored in the underlying Parquet files for the table
      • the details of mapping the table definition in the metastore to the underlying files are abstracted [4]
      • ⇐ internally, there is also a log file that keeps track of which parquet files, when combined, make up the data that is in the table [4]
        • the log files are internal and cannot be used directly by other engines [4]
          •  ⇐ DF publishes automatically the right log files so that other engines can directly access the right parquet files [..]
        • after every 10 transactions, a new log file (aka checkpoint) is created automatically and asynchronously [4]
          • ⇐ the file is a summary of all the previous log files [4]
          • when querying the table, the system needs to read the latest checkpoint and any log files that were created after*
            • ⇐ instead of having to read 105,120 log files, 10 or less files will be read*
        • the Delta Lake Logs are automatically so that other engines can directly access the right parquet files *
    • [Apache Spark in a lakehouse] allows greater control of the creation and management of delta tables [4]
      • via saving a dataframe 
        • {method}save a dataframe as a delta table [4]
          • ⇐ the easiest way to create a delta table 
          • creates both the table schema definition in the metastore and the data files in delta format [4]
        • {method} create the table definition [4]
          • creates the table schema in the metastore without saving any data files [4]
        • {method} save data in delta format without creating a table definition in the metastore [4]
          • {scenario} persist the results of data transformations performed in Spark in a file format over which  a table definition is overlayed later or processed directly by using the delta lake API [4]
          • modifications made to the data through the delta lake API or in an external table that is subsequently created on the folder will be recorded in the transaction logs [4]
          • {mode} overwrite
            • replace the contents of an existing folder with the data in a dataframe [4]
          • {mode} append
            • adds rows from a dataframe to an existing folder [4]
          • Fabric uses an automatic table discovery capability to create the corresponding table metadata in the metastore [4]
      • via DeltaTableBuilder API
        • enables to write Spark code to create a table based on specifications
      • via Spark SQL
        • [managed table] via CREATE TABLE <table_definition> USING DELTA
        • [external table] via CREATE TABLE <table_name> USING DELTA LOCATION
          • the schema of the table is determined by the Parquet files containing the data in the specified location
          • {scenario} create a table definition 
            • that references data that has already been saved in delta format [4]
            • based on a folder where data are ingested in the delta format [4]
  • {operation} update (aka update delta table)
  • {operation} delete (aka delete delta table)
    • [managed table] deleting the table deletes the underlying files from the Tables storage location for the lakehouse [4]
    • [external table] deleting a table from the lakehouse metastore does not delete the associated data files [4]
  • performance and storage cost efficiency tend to degrade over time
    • {reason} new data added to the table might skew the data [3]
    • {reason} batch and streaming data ingestion rates might bring in many small files
    • {reason} update and delete operations eventually create read overhead
      • parquet files are immutable by design, so Delta tables adds new parquet files which the changeset, further amplifying the issues imposed by the first two items [3]
    • {reason} no longer needed data files and log files available in the storage
  • {recommendation} don’t allow special characters in column names (incl. spaces)
  • {recommendation} make table and column names business-friendly
  • {feature} table partitions
    • {recommendation} use a partitioned folder structure wherever applicable
      • helps to improve data manageability and query performance
      • results in faster search for specific data entries thanks to partition pruning/elimination
      • {best practice} partition data to align with the query patterns [15]
      • it can dramatically speed up query performance, especially when combined with other performance optimizations [15]
  • {command} MERGE 
    • allows updating a delta table with advanced conditions [3]
      • from a source table, view or DataFrame [3]
    • {limitation} the current algorithm in the open source distribution of Delta Lake isn't fully optimized for handling unmodified rows [3]
    • [Microsoft Spark Delta] implemented a custom Low Shuffle Merge optimization
      • unmodified rows are excluded from an expensive shuffling operation that is needed for updating matched rows [3]
  • {command} OPTIMIZE
    • consolidates multiple small Parquet files into large file [8]
    • should be run whenever there are enough small files to justify running the compaction operation [6]
      • {best practice} run optimization after loading large tables [8]
    • benefits greatly from the ACID transactions supported [6]
    • [Delta Lake] predicate filtering
      • specify predicates to only compact a subset of your data [6]
      • {scenario} running a compaction job on the same dataset daily [6]
  • {command} VACUUM
    • removes old files no longer referenced by a Delta table log [8]
      • files need to be older than the retention threshold [8]
        • the default file retention threshold is seven days [8]
          • shorter retention period impacts Delta's time travel capabilities [8]
          • {default} historical data can't be delete within the retention threshold [2]
            • ⇐ that's to maintain the consistency in data [2]
        • {best practice} set a retention interval to at least seven days [8]
          • ⇐ because old snapshots and uncommitted files can still be in use by the concurrent table readers and writers [8]
        • important to optimize storage cost [8]
    • {warning} leaning up active files might lead to reader failures or even table corruption if the uncommitted files are removed [8]
  • {issue} small files
    • create large metadata transaction logs which cause planning time slowness [6]
    • result from
      • big repartition value [6]
      • if the dataset is partitioned on a high-cardinality column or if there are deeply nested partitions, then more small files will be created [6]
      • tables that are incrementally updated frequently [6]
    • files of sizes above 128 MB, and optimally close to 1 GB, improve compression and data distribution across the cluster nodes [8]
  • {feature} auto compaction 
    • combines small files within Delta table partitions to automatically reduce small file problems [7]
    • occurs after a write to a table has succeeded and runs synchronously on the cluster that has performed the write [7]
    • only compacts files that haven’t been compacted previously [7]
    • only triggered for partitions or tables that have at least a certain number of small files [7]
    • enabled at the table or session level [7]
  • {feature}[Delta Lake 1.2] data skipping 
    • the engine takes advantage of minimum and maximum values metadata to provide faster queries
      • ⇐ requires the respective metadata
    •  its effectiveness depends on data's layout [7]
  • {feature} [Delta Lake 3.0] z-ordering (aka multi-dimensional clustering)
    • technique to collocate related information in the same set of files [7]
    • automatically used in data-skipping algorithms [7]
    • dramatically reduces the amount of data to read [7]
    • aims to produce evenly-balanced data files with respect to the number of tuples
      • ⇐ but not necessarily data size on disk [7]
        • ⇐ the two measures are most often correlated [7]
          • ⇐ but there can be situations when that is not the case, leading to skew in optimize task times [7]
    • via ZORDER BY clause 
      • applicable to columns with high cardinality commonly used in query predicates [7]
      • multiple columns can be specified as a comma-separated list
        • {warning} the effectiveness of the locality drops with each extra column [7]
        • {warning} using columns that do not have statistics collected on them is  ineffective and wastes resources [7] 
          • statistics collection can be configured on certain columns by reordering columns in the schema, or by increasing the number of columns to collect statistics on [7]
    • {characteristic} not idempotent
      • every time is executed, it will try to create a new clustering of data in all files in a partition [7]
        • it includes new and existing files that were part of previous Z-Ordering [7]
  • {feature} checkpointing
    • allows read queries to quickly reconstruct the current state of the table without reading too many files having incremental updates [7]
    • {default} each checkpoint is written as a single Parquet file [7]
    • {alternative} [Delta Lake 2.0] multi-part checkpointing
      • allows splitting the checkpoint into multiple Parquet files [7]
        • ⇒ parallelizes and speeds up writing the checkpoint [7]
  • {feature} [Delta Lake 3.0] log compaction
    • reduces the need for frequent checkpoints and minimize the latency spikes caused by them [7]
    • allows new log compaction files with the format <x>.<y>.compact.json
      • the files contain the aggregated actions for commit range [x, y] 
    • read support is enabled by default [7]
    • write support not available yet [7]
      •  will be added in a future version of Delta [7]
  • {feature} Delta Lake transaction log (aka DeltaLog)
    • a sequential record of every transaction performed on a table since its creation [15]
    • central to DL functionality because it is at the core of its important features [15]
      • incl. ACID transactions, scalable metadata handling, time travel
    • {goal} enable multiple readers and writers to operate on a given version of a dataset file simultaneously [15]
    • {goal} provide additional information,  to the execution engine for more performant operations [15]
      • e.g. data skipping indexes
    • always shows the user a consistent view of the data
      • ⇒ serves as a single source of truth
    • for each write operation, the data file is always written first, and only when that operation succeeds, a transaction log file is added to the _delta_log folder
      • ⇐ the transaction is only considered complete when the transaction log entry is written successfully [15] 
  • {feature} [Lakehouse] table maintenance 
    • manages efficiently delta tables and keeps them always ready for analytics [8]
    • performs ad-hoc table maintenance using contextual right-click actions in a delta table within the Lakehouse explorer [8]
    • applies bin-compaction, V-Order, and unreferenced old files cleanup [8]
      • via Lakehouse >> Tables >> (select table) >>Maintenance >> (select options) >> Run now
        •  a Spark maintenance job is submitted for execution [8]
          • uses the user identity and table privileges
            • consumes Fabric capacity of the workspace/user that submitted the job
          • {constraint} only one maintenance job on a table can be run at any time
            • if there's a running job on the table, the new one is rejected [8]
            • jobs on different tables can execute in parallel [8]
          • running jobs are available in the Monitoring Hub 
            • see "TableMaintenance" text within the activity name column [8]
    • {best practice} properly designing the table physical structure based on the ingestion frequency and expected read patterns is likely more important than running the optimization commands [3]

Acronyms:
ACID - atomicity, consistency, isolation, durability
API - Application Programming Interface
CRUD - create, read, update, and delete
DL - Delta lake
MF - Microsoft Fabric
JSON - JavaScript Object Notation

Resources:
[1] Microsoft Learn (2023) Data objects in the Databricks lakehouse (link)
[2] Microsoft Learn (2023) Implement medallion lakehouse architecture in Microsoft Fabric (link)
[3] Microsoft Learn (2023) Delta Lake table optimization and V-Order (link)
[4] Microsoft Learn (2023) Work with Delta Lake tables in Microsoft Fabric (link)
[5] Delta Lake (2023) Quickstart (link)
[6] Delta Lake (2023) Delta Lake Small File Compaction with OPTIMIZE (link)
[7] Delta Lake (2023) Optimizations (link)
[8] Microsoft Learn (2023) 
Use table maintenance feature to manage delta tables in Fabric (link)
[9] Microsoft Fabric Updates Blog (2023) 
Announcing: Automatic Data Compaction for Fabric Warehouse, by Kevin Conan (link)
[10] 
Microsoft Learn (2023) Delta Lake table format interoperability (link)
[11] Josep Aguilar-Saborit et al (2020) POLARIS: The Distributed SQL Engine in Azure Synapse, Proceedings of the VLDB Endowment PVLDB 13(12)  (link)
[12] Josep Aguilar-Saborit et al (2024), Extending Polaris to Support Transactions (link)
[13] Michael Armbrust et al (2020) Delta Lake: High-Performance ACID Table Storage over Cloud Object Stores, Proceedings of the VLDB Endowment13(12) (link)
[14]
 Jesús Camacho-Rodríguez et al (2023) LST-Bench: Benchmarking Log-Structured Tables in the Cloud, Proceedings of the ACM on Management of Data (2024), 2 (1) (link)
[15] Bennie Haelen & Dan Davis (2024) Delta Lake: Up and Running Modern Data Lakehouse Architectures with Delta Lake, 2024

31 January 2024

🏭🗒️Microsoft Fabric: Parquet Format [Notes]

Disclaimer: This is work in progress intended to consolidate information from various sources for learning purposes. For the latest information please consult the documentation (see the links below)! 

Last updated: 31-Jan-2024

[Microsoft Fabric] Parquet format

  • {definition} open source, column-oriented data file format designed for efficient data storage and retrieval [1]
    • provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk [1]
    • designed to be a common interchange format for both batch and interactive workloads [1]
  • {characteristic} open source file format
    • similar to other columnar-storage file formats available in Hadoop [1]
      • e.g. RCFile, ORC
    • became an industry standard 
      •  {benefit} provides interoperability across multiple tools
  • {characteristic} language agnostic [1]
    • different programming languages can be used to manipulate the data
  • {characteristic} column-based format [1]
    • files are organized by column
      • ⇐ rather than by row
      • ⇒ saves storage space and speeds up analytics queries [1]
    •  reads only the needed columns 
      • ⇐ non-relevant data are skipped
      • ⇒ greatly minimizes the IO [1]
        • aggregation queries are less time-consuming compared to row-oriented databases [1]
    • {benefit} increased data throughput and performance [1]
      • ⇒ recommended for analytical workloads
  • {characteristic} highly efficient data compression/decompression [1]
    • supports flexible compression options and efficient encoding schemes [1]
      • data can be compressed by using one of the several codecs available [1]
        • ⇒ different data files can be compressed differently [1]
    •  reduced storage requirements [1]
      • by at least one-third on large datasets
      • ⇒ {benefit} saves on cloud storage space
    •  greatly improves scan and deserialization time [1]
      • ⇒ {benefit} reduces the processing costs
    • {downside} can be slower to write than row-based file formats
      • primarily because they contain metadata about the file contents 
      • though have fast read times
  • {characteristic} supports complex data types and advanced nested data structures [1]
    • implemented using the record-shredding and assembly algorithm
      • accommodates complex data structures that can be used to store the data [1]
      • optimized to work with complex data in bulk and features different ways for efficient data compression and encoding types [1]
        • the approach is best especially for those queries that need to read certain columns from a large table [1]
  • {characteristic} cloud-ready
    • works best with interactive and serverless technologies [1]
  • {characteristic} immutable
    • a file can't be update to modify the column name, reorder or drop columns [2]
      • ⇐ requires rewriting the whole file [2]
  • {characteristic} binary-based file
    • ⇒ not easily readable (by humans)
  • {characteristic} self-describing 
    •  contains metadata about schema and structure
    • {concept} row groups (aka segments) 
      • contains data from the same columns
        • {constraint} column names are case sensitive
    • {concept} file footer 
      • stores metadata statistics for each row group [2]
        • min/max statistics 
        • the number of rows
        • can be leveraged by data processing engines to run queries more efficiently [2]
          • ⇐ depending on the query, entire row group can be skipped [2]
    • {concept} file header
  •  large datasets can be split across multiple parquet files
    • ⇐ the structure can be flat or hierarchical 
    • managing multiple files has several challenges
    • the files can be used to define a table (aka parquet table)
      • ⇐ {constraint} the files must have the same definition
        • ⇐ schema enforcement must be coded manually [2]
      • {limitation} [Data Lake] no support for ACID transactions [2]
        • ⇒ easy to corrupt [2]
          • partially written files will break any subsequent read operations
            • the compute engine will try to read in the corrupt files and error out [2]
            • corrupted files must be manually identified and deleted manually to fix the issues [2]
      • {limitation} it's not easy to delete rows from it [2]
        • requires reading all the data, filtering out the data not needed, and then rewriting the entire table [2]
      • {limitation} doesn't support DML transactions [2]
      • {limitation} there is no change data feed [2]
      • {limitation} slow file listing [2]
        • small files require excessive I/O overhead
          • ideally the files should be between 64 MB and 1 GB
          • ideally the files should be compacted into larger files (aka small file compaction, bin-packing)
      • {limitation} expensive footer reads to gather statistics for file skipping [2]
        • fetching all the footers and building the file-level metadata for the entire table is slow [2]
          • ⇐ it requires a file-listing operation [2]
        • the effectiveness of data skipping depends on how many files can be can skipped when performing a query [2]
      • {limitation} doesn't support schema enforcement [2]
      • {limitation} doesn't support check constraints [2]
      • {limitation} doesn't support data versioning [2]
    • {concept} table partitioning
      • {definition} common optimization approach used to store the data of the same table in different directories, with partitioning column values encoded in the path of each partition directory [6]
      • {recommendation} avoid partitioning by columns with very high cardinality
    • {concept} bin-packing (aka compaction, bin-compaction)
      • aims to produce evenly-balanced data files with respect to their size on disk, 
        • ⇐ but not necessarily in respect to the number of tuples per file [7]
      • requires an algorithm that efficiently organizes the files into equal size containers [6]
      • {characteristic} idempotent
        •  if it is run twice on the same dataset, the second run has no effect [7]
  • {feature} [Microsoft Fabric] V-order
    • {definition} write time optimization to the parquet file format that enables lightning-fast reads under the MF compute engines [3]
    • applies special sorting, row group distribution, dictionary encoding and compression on parquet files [3]
      • requires less compute engines resources in to read it [3]
        • provides further cost efficiency and performance
          • has a 15% impact on average write times but provides up to 50% more compression [3]
    • {characteristic} open-source parquet format compliant
      • all parquet engines can read it as a regular parquet file [3]
      • ⇐ table properties and optimization commands can be used on control V-Order on its partitions [3]
      • compatible with other features [3]
    • applied at parquet file level [3]
    • enabled by default
  • {command} OPTIMIZE
    • merges all changes into bigger, consolidated parquet files (aka bin-compaction) [3]
    • [Spark] dynamically optimizes partitions while generating files with a default 128 MB size [5]
      • the target file size may be changed per workload requirements using configurations [5]
    • properly designing the table physical structure based on the ingestion frequency and expected read patterns is likely more important than running the optimization command [3]
    • running the compaction operation brings the data lake in an unusable state for readers [7]
    • {warning} manually compacting the files is inefficient and error prone [7]
      • no way to differentiate files that contain new data from files that contain existing data that was just compacted into new files [7]
  • [Delta Lake] when ZORDER and VORDER are used together, Apache Spark performs bin-compaction, ZORDER, VORDER sequentially [3]

Acronyms:
ACID - atomicity, consistency, isolation, durability
IO - Input/Output
MF - Microsoft Fabric
ORC - Optimized Row Columnar
RCFile - Record Columnar File

Resources:
[1] Databricks (2023) What is Parquet? (link)
[2] Data Lake (2023) Delta Lake vs. Parquet Comparison (link)
[3] Data Mozart (2023) Parquet file format – everything you need to know! (link)
[4] Microsoft Learn (2023) Query Parquet files using serverless SQL pool in Azure Synapse Analytics (link)
[5] Microsoft Learn (2023) Lakehouse tutorial: Prepare and transform data in the lakehouse (link)
[6] Apache Spark (2023) Spark SQL Guide (link)
[7] Delta Lake (2023) Delta Lake Small File Compaction with OPTIMIZE (link)
[8] Delta Lake (2023) Optimizations (link)

10 October 2023

💫Data Warehousing and Dynamics 365 for Finance and Operation - A Few Issues to Consider I

Data Warehousing
Data Warehousing Series

Introduction

Besides the fact that data professionals don't have direct access to D365 F&O production environments (direct access is available only to sandboxes), which was from the beginning an important constraint imposed by the architecture, there are a few more challenges that need to be addressed when working with the data.

Case Sensitiveness

SQL Server is not case sensitive, therefore, depending on the channel though which the data came, values appear either in upper or lower case, respectively a mixture of both. Even if this isn't an issue in D365, it can become an issue when the data leave the environment. E.g., PowerQuery is case sensitive (while DAX is case insensitive), thus, if a field containing a mix of values participate in a join or aggregation, this will result in unexpected behavior (e.g., duplicates, records ignored). It's primarily the case of the Company (aka DataAreaId) field available in most of the important tables.

The ideal solution would be to make sure that the values are correct by design, however this can't be always enforced. Otherwise, when using the data outside of D365 F&O the solution would be to transform all the values in upper case (or lower case). However, also this step might occur too late. E.g., when the data are exported to the Azure Data Lake in parquet file format.

Unique Keys

A unique record in D365 F&O was in earlier versions usually identified by the RecId and DataAreaId, while later the Partition field was added. This means that most of the joins will need to consider all 3 columns, which adds some overhead. In some environments there's only a Partition defined (and thus the field can be ignored), however this is not a warranty. 

As long developers use SQL there's no issue of using multiple fields in JOINs, though in PowerQuery there must be created a unique key based on the respective records so the JOINs are possible. Actually, also SQL-based JOINs would benefit if each record would be identified by one field.

Audit Metadata

Not all tables have fields that designate the date when a record was created or last modified, respectively the user who performed the respective action. The fields can be added manually when setting up the system, however that's seldom done. This makes it difficult to audit the records and sometimes it's a challenge also for reporting, respectively for troubleshooting the differences between DWH and source system. Fortunately, the Export to Data Lake adds a timestamp reflecting the time when the record was synchronized, though it can be used then only for the records synchronized after the first load. 

Tables vs. Entities

Data are modified in D365 F&O via a collection of entities, which are nothing but views that encapsulate the business logic, being based on the base tables or other views, respectively a combination of both. The Export to Data Lake (*) is based on the tables, while Link to Data Lake is based on data entities. 

Using the base tables means that the developer must reengineer the logic from the views. For some cases it might work to create the entities as views in the DWH environment though some features might not be supported. It's the case of serverless and dedicated SQL pools, that support only a subset from the features available under standard Azure SQL Server. 

The developer can try to replicate the logic from entities, considering only the logic needed by the business, especially when only a subset from the functionality available in the entity was used. The newly created views can become thus more readable and maintainable. On the other side, if the logic in entity changed, the changes need to be reflected also in the DWH views. 

Using the entity-based data makes sure that the data are consistent between environments. Unfortunately, Microsoft found out that isn't so easy to synchronize the data at entity level. Moreover, there are multiple entities based on the same table that reflect only a subset of the columns or rows. Thus, to cover all the fields from a base table, one might be forced to synchronize multiple views, leading thus to data duplication.  

In theory, both mechanisms can be used within the same environment, even if this approach is against the unique source of truth principle, when data are duplicated. 

Data Validation in the Data Lake

One scenario in which both sources are useful is when validating whether the synchronization mechanism worked as expected. Thus, one can compare the number of records and check whether there are differences that can't be mitigated. However, does it make sense to "duplicate" database objects only for this purpose?

Ideally, to validate whether a record was synchronized should be done in the source environment (e.g. via a timestamp). That's difficult to achieve, especially when there's no direct access to the source database (as is the case for Production databases). Fortunately, Dataverse provides this functionality, even if might not be bullet proof. 

In extremis, the most reliable approach is to copy the production environment on a sandbox and do a count of records for each table, using as baseline for comparison the time when the refresh occurred.

Base Enum Values

The list of values that don't have their own tables are managed within the application as Base Enums and, naturally, only the numeric values being saved to the database. Even if this is practical for the application, it's a nightmare for the people using the data exported from database as is needed to convert the codes to meaningful values. Some of the mappings between the codes and values are documented in two system tables, and even in old language-based documentation, though both sources are far from complete. As alternative, one can try to discover the values in the system. 

Unfortunately, the mappings need to be repeated when the Enum-based attributed is used in multiple places. One can reduce mapping's duplication by encapsulating the logic into a view (aka "base view") and reused accordingly (see the logic for TDM.vEcoResProduct).

Even if the values for many of the Enums are stored into the EnumValueTable table, Enum's name being available in EnumIdTable table, it's not a good idea to retrieve the values via a JOIN in the business logic. This would complicate the business logic unnecessarily. A CASE is more efficient even if occasionally more difficult to maintain. Unfortunately, there's no timestamp to identify which values were added lately.

Note:
(*) Existing customers have until 1-Nov-2024 to transition from Export to Data lake to Synapse link. Microsoft advises new customers to use Synapse Link.

22 February 2023

💎🏭SQL Reloaded: Automatic Statistics Creation & Dropping for CETAS based on CSV File Format in Serverless SQL Pool

Introduction

The serverless SQL pool query optimizer uses statistics to calculate and compare the cost of various query plans, and then choose the plan with the lowest cost. Automatic creation of statistics is turned on for parquet file format, though for CSV file format statistics will be automatically created only when OPENROWSET is used. This means that when creating CETAS based on CSV the statistics need to be created manually. 

This would be one more reason for holding the files in the Data Lake as parquet files. On the other side there are also many files already available in CSV format, respectively technoloqies that allows exporting data only/still as CSV. Moreover, transforming the files as parquet is not always technically feasible.

Using OPENROWSET could also help, though does it make sense to use a different mechanismus for the CSV file format? In some scenarios will do. I prefer to have a unitary design, when possible. Moreover, even if some columns are not needed, they can still be useful for certain scenarios (e.g. troubleshooting, reevaluating their use, etc.). 

There are files, especially the ones coming from ERPs (Enterprise Resource Planning) or similar systems, which have even a few hundred columns (on average between 50 and 100 columns). Manually creating  the statistics for the respective tables will cost lot of time and effort. To automate the process there are mainly three choices:
(1) Creating statistics for all the columns for a given set of tables (e.g. for a given schema).
(2) Finding a way to automatically identify the columns which are actually used.
(3) Storing the list of tables and columns on which statistics should be build (however the list needs to be maintained manually). 

Fortunately, (1) can be solved relatively easy, based on the available table metadata, however it's not the best solution, as lot of statistics will be unnecessarily created. (2) is possible under certain architectures or additional effort. (3) takes time, though it's also an approachable solution.

What do we need?

For building the solution, we need table and statistics metadata, and the good news is that the old SQL Server queries still work. To minimize code's repetition, it makes sense to encapsulate the logic in views. For table metadata one can use the sys.objects DMV as is more general (one can replace sys.objects with sys.tables to focus only on tables):

-- drop the view (for cleaning)
-- DROP VIEW IF EXISTS dbo.vAdminObjectColumns

-- create view
CREATE OR ALTER VIEW dbo.vObjectColumns
AS 
-- object-based column metadata
SELECT sch.name + '.' + obj.name two_part_name
, sch.Name schema_name
, obj.name object_name
, col.name column_name
, obj.type
, CASE 
	WHEN col.is_ansi_padded = 1 and LEFT(udt.name , 1) = 'n' THEN col.max_length/2
	ELSE col.max_length
  END max_length
, col.precision 
, col.scale
, col.is_nullable 
, col.is_identity
, col.object_id
, col.column_id
, udt.name as data_type
, col.collation_name
, ROW_NUMBER() OVER(PARTITION BY col.object_id ORDER BY col.column_id) ranking FROM sys.columns col JOIN sys.types udt on col.user_type_id= udt.user_type_id JOIN sys.objects obj ON col.object_id = obj.object_id JOIN sys.schemas as sch on sch.schema_id = obj.schema_id -- testing the view SELECT obc.* FROM dbo.vObjectColumns obc WHERE obc.object_name LIKE '<table name>%' AND obc.schema_name = 'CRM' AND obc.type = 'U' ORDER BY obc.two_part_name , Ranking

The view can be used also as basis for getting the defined stats:

-- drop the view (for cleaning)
-- DROP VIEW IF EXISTS dbo.vAdminObjectStats

-- create view 
CREATE OR ALTER VIEW dbo.vObjectStats
AS
-- object-based column statistics
SELECT obc.two_part_name + '.' + QuoteName(stt.name) three_part_name
, obc.two_part_name
, obc.schema_name
, obc.object_name
, obc.column_name
, stt.name stats_name
, STATS_DATE(stt.[object_id], stt.stats_id) AS last_updated
, stt.auto_created
, stt.user_created
, stt.no_recompute
, stt.has_filter 
, stt.filter_definition
, stt.is_temporary 
, stt.is_incremental 
, stt.auto_drop 
, stt.stats_generation_method_desc
, stt.[object_id]
, obc.type 
, stt.stats_id
, stc.stats_column_id
, stc.column_id
FROM dbo.vObjectColumns obc
     LEFT JOIN sys.stats_columns stc 
	   ON stc.object_id = obc.object_id
	  AND stc.column_id = obc.column_id 
          LEFT JOIN sys.stats stt
            ON stc.[object_id] = stt.[object_id] 
           AND stc.stats_id = stt.stats_id

-- testing the view 
SELECT * FROM dbo.vObjectStats obs WHERE (obs.auto_created = 1 OR obs.user_created = 1) AND obs.type = 'U' AND obs.object_name = '<table name>' ORDER BY obs.two_part_name , obs.column_id

Now we have a basis for the next step. However, before using the stored procedure define below, one should use the last query and check whether statistics were defined before on a table. Use for testing also a table for which you know that statistics are available.

Create Statistics

The code below is based on a similar stored procedure available in the Microsoft documentation (see [1]). It uses a table's column metadata, stores them in a temporary table and then looks through each record, create the DDL script and runs it:

-- drop procedure (for cleaning)
--DROP PROCEDURE dbo.pCreateStatistics

-- create stored procedure
CREATE OR ALTER PROCEDURE dbo.pCreateStatistics
(   @schema_name nvarchar(50)
,   @table_name nvarchar(100)
)
AS
-- creates statistics for serverless SQL pool
BEGIN
	DECLARE @query as nvarchar(1000) = ''
	DECLARE @index int = 1, @nr_records int = 0

	-- drop temporary table if it exists 
	DROP TABLE IF EXISTS #stats_ddl;

	-- create temporary table 
	CREATE TABLE #stats_ddl( 
	  schema_name nvarchar(50)
	, table_name nvarchar(128)
	, column_name nvarchar(128)
	, ranking int
	);

	-- fill table
	INSERT INTO #stats_ddl
	SELECT obc.schema_name
	, obc.object_name
	, obc.column_name 
	, ROW_NUMBER() OVER(ORDER BY obc.schema_name, obc.object_name) ranking
	FROM dbo.vObjectColumns obc
	WHERE obc.type = 'U' -- tables
	  AND IsNull(@schema_name, obc.schema_name) = obc.schema_name 
	  AND IsNull(@table_name, obc.object_name) = obc.object_name

	SET @nr_records = (SELECT COUNT(*) FROM #stats_ddl)

	WHILE @index <= @nr_records
	BEGIN
		SET @query = (SELECT 'CREATE STATISTICS '+ QUOTENAME('stat_' + schema_name + '_' + table_name + '_' + column_name) + ' ON '+ QUOTENAME(schema_name) + '.' + QUOTENAME(table_name) + '(' + QUOTENAME(column_name) + ')' 
			   FROM #stats_ddl ddl
			   WHERE ranking = @index);

		BEGIN TRY
		        -- execute ddl
			EXEC sp_executesql @query;
		END TRY
		BEGIN CATCH
			SELECT 'create failed for ' + @query;
		END CATCH

		SET @index+=1;
	END

	DROP TABLE #stats_ddl;
END


-- test stored procedure (various scenario)
EXEC dbo.pCreateStatistics '<schema name>', '<table name>' -- based on schema & table
EXEC dbo.pCreateStatistics '<schema name>', NULL -- based on a schema
EXEC dbo.pCreateStatistics NULL, '<table name>' -- based on a table

Notes:
IMPORTANT!!! I recommend testing the stored procedure in a test environment first for a few tables and not for a whole schema. If there are too many tables, this will take time.

Please note that rerunning the stored procedure without deleting previously the statitics on the tables in scope will make the procedure raise failures for each column (behavior by design), though the error messages can be surpressed by commenting the code, if needed. One can introduce further validation, e.g. considering only the columns which don't have a statistic define on them.

Further Steps?

What can we do to improve the code? It would be great if we could find a way to identify the columns which are used in the queries. It is possible to retrieve the queries run in serverless SQL pool, however identifying the tables and columns from there or a similar source is not a straightforward solution. 

The design of views based on the external tables can help in the process! I prefer to build on top of the external tables a first level of views (aka "base views") that include only the fields in use (needed by the business) ordered and "grouped" together based on their importance or certain characteristics. The views are based solely on the external table and thus contain no joins. They can include conversions of data types, translations of codes into meaningful values, and quite seldom filters on the data. However, for traceability the name of the columns don't change! This means that if view's name is easily identifiable based on external table's name, we could check view's columns against the ones of the external table and create statistics only for the respective columns. Using a unique prefix (e.g. "v") to derive views' name from tables' name would do the trick.

To do that, we need to create a view that reflects the dependencies between objects (we'll be interested only in external tables vs views dependencies):

-- drop view (for cleaning)
-- DROP VIEW IF EXISTS dbo.vObjectsReferenced

-- create view
CREATE OR ALTER VIEW dbo.vObjectsReferenced
AS 
-- retrieving the objects referenced 
SELECT QuoteName(sch.name) + '.' + QuoteName(obj.name) AS two_part_name 
, obj.object_id 
, obj.schema_id 
, sch.name schema_name 
, obj.name object_name 
, obj.type
, QuoteName(scr.name) + '.'+ QuoteName(sed.referenced_entity_name) AS ref_two_part_name 
, obr.object_id ref_object_id
, obj.schema_id ref_schema_id 
, scr.name ref_schema_name 
, obr.name ref_object_name 
, obr.type ref_type
FROM sys.sql_expression_dependencies sed 
     JOIN sys.objects obj
       ON obj.object_id = sed.referencing_id 
	      JOIN sys.schemas as sch
	        ON obj.schema_id = sch.schema_id 
	 JOIN sys.objects obr
	   ON sed.referenced_id = obr.object_id
	      JOIN sys.schemas as scr
	        ON obr.schema_id = scr.schema_id

-- testing the view
SELECT top 10 *
FROM dbo.vObjectsReferenced
WHERE ref_type = 'U'

With this, the query used above to fill the table becomes:

-- fill table query with column selection 
SELECT obc.schema_name
, obc.object_name
, obc.column_name 
, ROW_NUMBER() OVER(ORDER BY obc.schema_name, obc.object_name) ranking
FROM dbo.vObjectColumns obc
WHERE obc.type = 'U' -- tables
	AND IsNull(@schema_name, obc.schema_name) = obc.schema_name 
	AND IsNull(@table_name, obc.object_name) = obc.object_name
	AND EXISTS ( -- select only columns referenced in views
	    SELECT * 
		FROM dbo.vObjectsReferenced obr 
		    JOIN dbo.vAdminObjectColumns obt
			ON obr.object_id = obt.object_id 
		WHERE obt.type = 'V' -- view
		AND obr.object_name  =  'v' + obr.ref_object_name
		AND obc.object_id = obr.ref_object_id
		AND obc.column_name = obt.column_name);

This change will reduce the number of statistics created on average by 50-80%. Of course, there will be also cases in which further statistics need to be added manually. One can use this as input for an analysis of the columns used and store the metadata in a file, do changes to it and base on it statistics' creation. 

Drop Statistics

Dropping the indexes resumes to using the dbo.vObjectStats view created above for the schema and/or table provided as parameter. The logic is similar to statistics' creation:

-- drop stored procedure (for cleaning)
-- DROP PROCEDURE IF EXISTS dbo.pDropStatistics

-- create procedure
CREATE OR ALTER PROCEDURE dbo.pDropStatistics
(   @schema_name nvarchar(50)
,   @table_name nvarchar(128)
)
AS
-- drop statistics for a schema and/or external table in serverless SQL pool
BEGIN

	DECLARE @query as nvarchar(1000) = ''
	DECLARE @index int = 1, @nr_records int = 0

	-- drop temporary table if it exists 
	DROP TABLE IF EXISTS #stats_ddl;

	-- create temporary table 
	CREATE TABLE #stats_ddl( 
	 three_part_name nvarchar(128)
	, ranking int
	);

	-- fill table
	INSERT INTO #stats_ddl
	SELECT obs.three_part_name
	, ROW_NUMBER() OVER(ORDER BY obs.three_part_name) ranking
	FROM dbo.vObjectStats obs
	WHERE obs.type = 'U' -- tables
	  AND IsNull(@schema_name, obs.schema_name) = obs.schema_name 
	  AND IsNull(@table_name, obs.object_name) = obs.object_name

	SET @nr_records = (SELECT COUNT(*) FROM #stats_ddl)

	WHILE @index <= @nr_records
	BEGIN
   
		SET @query = (SELECT 'DROP STATISTICS ' + ddl.three_part_name
			   FROM #stats_ddl ddl
			   WHERE ranking = @index);

		BEGIN TRY
		        -- execute ddl
			EXEC sp_executesql @query;
		END TRY
		BEGIN CATCH
			SELECT 'drop failed for ' + @query;
		END CATCH

		SET @index+=1;
	END

	DROP TABLE #stats_ddl;
END

Note:
IMPORTANT!!!
I recommend testing the stored procedure in a test environment first for a few tables and not for a whole schema. If there are too many tables, this will take time.

Closing Thoughts

The solution for statistics' creation is not perfect, though it's a start! It would have been great if such a feature would be provided by Microsoft, and probably they will, given the importance of statistics of identifying an optimal plan. It would be intersting to understand how much statistics help in a distributed environment and what's the volume of data processed for this purpose. 

Please let me know if you found other workarounds for statistics' automation.

Notes:
The above objects and queries seem to work also in SQL databases in Microsoft Fabric.

Happy coding!

Previous Post  <<||>>  Next Post

References:
[1] Microsoft Learn (2022) Statistics in Synapse SQL (link)

Related Posts Plugin for WordPress, Blogger...

About Me

My photo
Koeln, NRW, Germany
IT Professional with more than 24 years experience in IT in the area of full life-cycle of Web/Desktop/Database Applications Development, Software Engineering, Consultancy, Data Management, Data Quality, Data Migrations, Reporting, ERP implementations & support, Team/Project/IT Management, etc.