Showing posts with label PySpark. Show all posts
Showing posts with label PySpark. Show all posts

10 April 2024

SQL Reloaded: Microsoft Fabric's Delta Tables in Action - Import data from CSV Files into Delta Table

Microsoft provides a set of labs and exercises that can be used to learn working with data in Fabric, however the real learning comes when one considers an example that introduces something new. As I've downloaded some time ago an archive with several datasets on Sales forecast from the Kaggle website, I tried to import the Features dataset in different ways and see how it goes.

Store,Date,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,IsHoliday
1,2010-02-05,42.31,2.572,NA,NA,NA,NA,NA,211.0963582,8.106,FALSE
1,2010-02-12,38.51,2.548,NA,NA,NA,NA,NA,211.2421698,8.106,TRUE
1,2010-02-19,39.93,2.514,NA,NA,NA,NA,NA,211.2891429,8.106,FALSE
1,2010-02-26,46.63,2.561,NA,NA,NA,NA,NA,211.3196429,8.106,FALSE
1,2010-03-05,46.5,2.625,NA,NA,NA,NA,NA,211.3501429,8.106,FALSE

Within an existing lakehouse, one can import the CSV as it is via 'Files/Upload' and once the data is imported, once can navigate to the file and use 'Load to Tables/New Table' to important the data into a managed table. Unfortunately, because some of the numeric fields include also literal values "NA" for the columns with NULLs, their data type is considered as varchar(8000), which is not ideal for calculations:

-- table created via Load to Tables
CREATE TABLE [dbo].[walmart_features](
	[Store] [int] NULL,
	[Date] [date] NULL,
	[Temperature] [float] NULL,
	[Fuel_Price] [float] NULL,
	[MarkDown1] [varchar](8000) NULL,
	[MarkDown2] [varchar](8000) NULL,
	[MarkDown3] [varchar](8000) NULL,
	[MarkDown4] [varchar](8000) NULL,
	[MarkDown5] [varchar](8000) NULL,
	[CPI] [varchar](8000) NULL,
	[Unemployment] [varchar](8000) NULL,
	[IsHoliday] [bit] NULL
) ON [PRIMARY]

This could be fixed by replacing the NA values with an empty value, which I did and used this version for the next steps. 

I tried then using Spark to import the data, though then all the fields are defined as varchar(8000).

-- table created via Spark
CREATE TABLE [dbo].[walmart_features2](
	[Store] [varchar](8000) NULL,
	[Date] [varchar](8000) NULL,
	[Temperature] [varchar](8000) NULL,
	[Fuel_Price] [varchar](8000) NULL,
	[MarkDown1] [varchar](8000) NULL,
	[MarkDown2] [varchar](8000) NULL,
	[MarkDown3] [varchar](8000) NULL,
	[MarkDown4] [varchar](8000) NULL,
	[MarkDown5] [varchar](8000) NULL,
	[CPI] [varchar](8000) NULL,
	[Unemployment] [varchar](8000) NULL,
	[IsHoliday] [varchar](8000) NULL
) ON [PRIMARY]
GO

So, is needed to define the schema explicitly, however I had to import the IsHoliday as string and cast the value explicitly to a Boolean using a second data frame (see alternatives):

from pyspark.sql.types import *
from pyspark.sql.functions import *

#define schema
featuresSchema = StructType([
      StructField("Store", IntegerType())
    , StructField("Date", DateType())
    , StructField("Temperature",  DecimalType(13,2))
    , StructField("Fuel_Price", DecimalType(13,2))
    , StructField("MarkDown1", DecimalType(13,2))
    , StructField("MarkDown2", DecimalType(13,2))
    , StructField("MarkDown3", DecimalType(13,2))
    , StructField("MarkDown4", DecimalType(13,2))
    , StructField("MarkDown5", DecimalType(13,2))
    , StructField("CPI", DecimalType(18,6))
    , StructField("Unemployment", DecimalType(13,2))
    , StructField("IsHoliday", StringType())
])

# Load a file into a dataframe
df = spark.read.load('Files/OpenSource/features2.csv'
    , format='csv'
    , schema = featuresSchema
    , header=True)

# do the conversion for isHoliday
df2 = df.withColumn("IsHoliday", df.IsHoliday.cast(BooleanType())) # Save the dataframe as a delta table df2.write.format("delta").saveAsTable("walmart_features3")

Now, table's definition looks much better:

-- table created via Spark with explicit schema
CREATE TABLE [dbo].[walmart_features3](
	[Store] [int] NULL,
	[Date] [date] NULL,
	[Temperature] [decimal](13, 2) NULL,
	[Fuel_Price] [decimal](13, 2) NULL,
	[MarkDown1] [decimal](13, 2) NULL,
	[MarkDown2] [decimal](13, 2) NULL,
	[MarkDown3] [decimal](13, 2) NULL,
	[MarkDown4] [decimal](13, 2) NULL,
	[MarkDown5] [decimal](13, 2) NULL,
	[CPI] [decimal](18, 6) NULL,
	[Unemployment] [decimal](13, 2) NULL,
	[IsHoliday] [bit] NULL
) ON [PRIMARY]
GO

Comments:
(1) I tried to apply the schema change directly on the initial data frame, though the schema didn't change:

df.withColumn("IsHoliday", df.IsHoliday.cast(BooleanType()))

(2) For the third method one could have left the NA in because by the conversion a NULL will be considered. Conversely, it might be needed to check if there are other values that fail the conversion. 

(3) The following warning in the Notebook when running the above code is a hint that something went wrong during the conversion (e.g. decimals were cut): 

"Your file(s) might include corrupted records"

(4) Especially for the transformed values it makes sense to look at the values (at last when the dataset isn't too big):

-- validating the values for the Boolean data field
SELECT IsHoliday
, count(*) NoRecords
FROM dbo.walmart_features3
GROUP BY IsHoliday
ORDER BY 1

(5) The tables can be deleted directly in the lakehouse or via PySpark (observe the catalog.table_name):

#dropping the table
spark.sql('DROP TABLE IF EXISTS UAT.walmart_features3') 

At the beginning probably it makes sense to remove "IF EXISTS" to make sure that the table is available.

(6) For those who run into a similar issue, the SQL Endpoint for the lakehouse is read only, therefore attempting to delete a table via SSMS will result in such an error:

Drop failed for Table 'dbo.walmart_features2'.  (Microsoft.SqlServer.Smo)
The external policy action 'Microsoft.Sql/Sqlservers/Databases/Schemas/Tables/Drop' was denied on the requested resource.
Cannot drop the table 'walmart_features2', because it does not exist or you do not have permission.

Happy coding!

06 February 2024

SQL Reloaded: Microsoft Fabric's Delta Tables in Action - Table Metadata II (Updating a table's COMMENT attributes)

While using the DESCRIBE TABLE metadata I observed that its output shown also a Comment attribute which was NULL for all the columns. Browsing through the Databricks documentation (see [1]), I found that the Comment can be provided in a delta table's definition as follows (code to be run in a notebook):

-- drop the table (if already exists)
DROP TABLE IF EXISTS Assets2;

--create the table
CREATE TABLE Assets2 (
 Id int NOT NULL COMMENT 'Asset UID',
 CreationDate timestamp NOT NULL COMMENT 'Creation Date',
 Vendor string NOT NULL COMMENT 'Vendors name',
 Asset string NOT NULL COMMENT 'Asset type',
 Model string NOT NULL COMMENT 'Asset model',
 Owner string NOT NULL COMMENT 'Current owner',
 Tag string NOT NULL COMMENT 'Assets tag',
 Quantity decimal(13, 2) NOT NULL COMMENT 'Quantity'
) 
USING DELTA
COMMENT 'Vendor assets';

-- show table's definition
DESCRIBE TABLE Assets2;

-- show table's details
DESCRIBE DETAIL Assets;

So, I thought there must be a way to update Assets table's definition as well. And here's the solution split into two steps, as different syntax is required for modifying the columns, respectively the table:

-- modify columns' COMMENT for an existing table
ALTER TABLE Assets ALTER COLUMN ID COMMENT 'Asset UID';
ALTER TABLE Assets ALTER COLUMN CreationDate COMMENT 'Creation Date';
ALTER TABLE Assets ALTER COLUMN Vendor COMMENT 'Vendors name';
ALTER TABLE Assets ALTER COLUMN Asset COMMENT 'Asset type';
ALTER TABLE Assets ALTER COLUMN Model COMMENT 'Asset model';
ALTER TABLE Assets ALTER COLUMN Owner COMMENT 'Current owner';
ALTER TABLE Assets ALTER COLUMN Tag COMMENT 'Assets tag';
ALTER TABLE Assets ALTER COLUMN Quantity COMMENT 'Quantity';

-- show table's definition
DESCRIBE TABLE Assets;

The operation generated a log file with the following content:

{"commitInfo":{"timestamp":1707180621522,"operation":"CHANGE COLUMN","operationParameters":{"column":"{\"name\":\"Quantity\",\"type\":\"decimal(13,2)\",\"nullable\":false,\"metadata\":{\"comment\":\"Quantity\"}}"},"readVersion":13,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.4.1.5.3-110807746 Delta-Lake/2.4.0.8","txnId":"42590d18-e04a-4fb2-bbfa-94414b23fb07"}}
{"metaData":{"id":"83e87b3c-28f4-417f-b4f5-842f6ba6f26d","description":"Vendor assets","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[
{\"name\":\"Id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{\"comment\":\"Asset UID\"}},
{\"name\":\"CreationDate\",\"type\":\"timestamp\",\"nullable\":false,\"metadata\":{\"comment\":\"Creation Date\"}},
{\"name\":\"Vendor\",\"type\":\"string\",\"nullable\":false,\"metadata\":{\"comment\":\"Vendors name\"}},
{\"name\":\"Asset\",\"type\":\"string\",\"nullable\":false,\"metadata\":{\"comment\":\"Asset type\"}},
{\"name\":\"Model\",\"type\":\"string\",\"nullable\":false,\"metadata\":{\"comment\":\"Asset model\"}},
{\"name\":\"Owner\",\"type\":\"string\",\"nullable\":false,\"metadata\":{\"comment\":\"Current owner\"}},
{\"name\":\"Tag\",\"type\":\"string\",\"nullable\":false,\"metadata\":{\"comment\":\"Assets tag\"}},
{\"name\":\"Quantity\",\"type\":\"decimal(13,2)\",\"nullable\":false,\"metadata\":{\"comment\":\"Quantity\"}}]}","partitionColumns":[],"configuration":{},"createdTime":1707149934697}}

And the second step (see [2]):

-- modify a table's COMMENT
COMMENT ON TABLE Assets IS 'Vendor assets';

-- describe table's details
DESCRIBE DETAIL Assets;

    The operation generated a log file with the following content:

{"commitInfo":{"timestamp":1707180808138,"operation":"SET TBLPROPERTIES","operationParameters":{"properties":"{\"comment\":\"Vendor assets\"}"},"readVersion":14,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.4.1.5.3-110807746 Delta-Lake/2.4.0.8","txnId":"21c315b5-a81e-4107-8a19-6256baee7bd5"}}
{"metaData":{"id":"83e87b3c-28f4-417f-b4f5-842f6ba6f26d","description":"Vendor assets","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[
{\"name\":\"Id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{\"comment\":\"Asset UID\"}}
,{\"name\":\"CreationDate\",\"type\":\"timestamp\",\"nullable\":false,\"metadata\":{\"comment\":\"Creation Date\"}}
,{\"name\":\"Vendor\",\"type\":\"string\",\"nullable\":false,\"metadata\":{\"comment\":\"Vendors name\"}}
,{\"name\":\"Asset\",\"type\":\"string\",\"nullable\":false,\"metadata\":{\"comment\":\"Asset type\"}}
,{\"name\":\"Model\",\"type\":\"string\",\"nullable\":false,\"metadata\":{\"comment\":\"Asset model\"}}
,{\"name\":\"Owner\",\"type\":\"string\",\"nullable\":false,\"metadata\":{\"comment\":\"Current owner\"}}
,{\"name\":\"Tag\",\"type\":\"string\",\"nullable\":false,\"metadata\":{\"comment\":\"Assets tag\"}}
,{\"name\":\"Quantity\",\"type\":\"decimal(13,2)\",\"nullable\":false,\"metadata\":{\"comment\":\"Quantity\"}}]}","partitionColumns":[],"configuration":{},"createdTime":1707149934697}}

Notes:
1) Don't forget to remove the Assets2 table!
2) Updating the COMMENT for a single delta table is simple, though for updating the same for a list of tables might be task for a PySpark job. It makes sense to provide the respective metadata from the start, when that's possible. Anyway, the information should be available in lakehouse's data dictionary.
3) One can reset the COMMENT to NULL or to an empty string.

Happy coding!

Resources:
[1] Databaricks (2023) ALTER TABLE (link)
[2] Databaricks (2023) COMMENT ON (link)

SQL Reloaded: Microsoft Fabric's Delta Tables in Action - Table Metadata I (General Information)

In a previous post I've created a delta table called Assets. For troubleshooting and maintenance tasks it would be useful to retrieve a table's metadata - properties, definition, etc. There are different ways to extract the metadata, depending on the layer and method used.

INFORMATION_SCHEMA in SQL Endpoint

Listing all the delta tables available in a Lakehouse can be done using the INFORMATION_SCHEMA via the SQL Endpoint:

-- retrieve the list of tables
SELECT * 
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_SCHEMA  

Output:
TABLE_CATALOG TABLE_SCHEMA TABLE_NAME TABLE_TYPE
Testing dbo city BASE TABLE
Testing dbo assets BASE TABLE

The same schema can be used to list columns' definition:
 
-- retrieve column metadata
SELECT TABLE_CATALOG
, TABLE_SCHEMA
, TABLE_NAME
, COLUMN_NAME
, ORDINAL_POSITION
, DATA_TYPE
, CHARACTER_MAXIMUM_LENGTH
, NUMERIC_PRECISION
, NUMERIC_SCALE
, DATETIME_PRECISION
, CHARACTER_SET_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'assets'
ORDER BY ORDINAL_POSITION

Note:
Given that the INFORMATION_SCHEMA is an ANSI-standard schema for providing metadata about a database's objects (including views, schemata), probably this is the best way to retrieve general metadata as the above (see [3]). More information over the SQL Endpoint can be obtained by querying directly the SQL Server metadata.

Table's Properties

The Delta Lake documentation reveals that a table's properties can be retrieved via the DESCRIBE command, which can be used in a notebook's cell (see [1] for attributes' definition):

-- describe table's details
DESCRIBE DETAIL Assets;

Output (transposed):
Attribute Value
format delta
id 83e87b3c-28f4-417f-b4f5-842f6ba6f26d
name spark_catalog.testing.assets
description NULL
location abfss://[…]@onelake.dfs.fabric.microsoft.com/[…]/Tables/assets
createdAt 2024-02-05T16:18:54Z
lastModified 2024-02-05T16:29:52Z
partitionColumns
numFiles 1
sizeInBytes 3879
properties [object Object]
minReaderVersion 1
minWriterVersion 2
tableFeatures appendOnly,invariants

One can export the table metadata also from the sys.tables via the SQL Endpoint, though will be considered also SQL Server based metadata:

-- table metadata
SELECT t.object_id
, schema_name(t.schema_id) schema_name
, t.name table_name
, t.type_desc
, t.create_date 
, t.modify_date
, t.durability_desc
, t.temporal_type_desc
, t.data_retention_period_unit_desc
FROM sys.tables t
WHERE name = 'assets'

Output (transposed):
Attribute Value
object_id 1264723558
schema_name dbo
table_name assets
type_desc USER_TABLE
create_date 2024-02-05 16:19:04.810
modify_date 2024-02-05 16:19:04.810
durability_desc SCHEMA_AND_DATA
temporal_type_desc NON_TEMPORAL_TABLE
data_retention_period_unit_desc INFINITE

Note:
1) There seem to be thus two different repositories for storing the metadata, thing reflected also in the different timestamps.

Table's Definition 

A table's definition can be easily exported via the SQL Endpoint in SQL Server Management Studio. Multiple tables' definition can be exported as well via the Object explorer details within the same IDE. 

In Spark SQL you can use the DESCRIBE TABLE command:

-- show table's definition
DESCRIBE TABLE Assets;

Output:
col_name data_type comment
Id int NULL
CreationDate timestamp NULL
Vendor string NULL
Asset string NULL
Model string NULL
Owner string NULL
Tag string NULL
Quantity decimal(13,2) NULL

Alternatively, you can use the EXTENDED keyword with the previous command to show further table information:
 
-- show table's definition (extended)
DESCRIBE TABLE EXTENDED Assets;

Output (only the records that appear under '# Detailed Table Information'):
Label Value
Name spark_catalog.testing.assets
Type MANAGED
Location abfss://[...]@onelake.dfs.fabric.microsoft.com/[...]/Tables/assets
Provider delta
Owner trusted-service-user
Table Properties [delta.minReaderVersion=1,delta.minWriterVersion=2]

Note that the table properties can be listed individually via the SHOW TBLPROPERTIES command:
 
--show table properties
SHOW TBLPROPERTIES Assets;

A column's definition can be retrieved via a DESCRIBE command following the syntax:

-- retrieve column metadata
DESCRIBE Assets Assets.CreationDate;

Output:
info_name info_value
col_name CreationDate
data_type timestamp
comment NULL

In PySpark it's trickier to retrieve a table's definition (code adapted after Dennes Torres' presentation at Toboggan Winter edition 2024):

%%pyspark
import pyarrow.dataset as pq
import os
import re

def show_metadata(file_path, table_name):
    #returns a delta table's metadata 

    print(f"\{table_name}:")
    schema_properties = pq.dataset(file_path).schema.metadata
    if schema_properties:
        for key, value in schema_properties.items():
            print(f"{key.decode('utf-8')}:{value.decode('utf-8')}")
    else:
        print("No properties available!")

#main code
path = "/lakehouse/default/Tables"
tables = [f for f in os.listdir(path) if re.match('asset',f)]

for table in tables:
    show_metadata(f"/{path}/"+ table, table)

Output:
\assets:
org.apache.spark.version:3.4.1
org.apache.spark.sql.parquet.row.metadata:{"type":"struct","fields":[
{"name":"Id","type":"integer","nullable":true,"metadata":{}},
{"name":"CreationDate","type":"timestamp","nullable":true,"metadata":{}},
{"name":"Vendor","type":"string","nullable":true,"metadata":{}},
{"name":"Asset","type":"string","nullable":true,"metadata":{}},
{"name":"Model","type":"string","nullable":true,"metadata":{}},
{"name":"Owner","type":"string","nullable":true,"metadata":{}},
{"name":"Tag","type":"string","nullable":true,"metadata":{}},
{"name":"Quantity","type":"decimal(13,2)","nullable":true,"metadata":{}}]}
com.microsoft.parquet.vorder.enabled:true
com.microsoft.parquet.vorder.level:9

Note:
One can list the metadata for all tables by removing the filter on the 'asset' table:

tables = os.listdir(path)

Happy coding!

References:
[1] Delta Lake (2023) Table utility commands (link)
[2] Databricks (2023) DESCRIBE TABLE (link)
[3] Microsoft Learn (2023) System Information Schema Views (link)

27 February 2021

Python: PySpark and GraphFrames (Test Drive)

Besides the challenges met during configuring the PySpark & GraphFrames environment, also running my first example in Spyder IDE proved to be a bit more challenging than expected. Starting from an example provided by the DataBricks documentation on GraphFrames, I had to add 3 more lines to establish the connection of the Spark cluster, respectively to deactivate the context (only one SparkContext can be active per Java VM).

The following code displays the vertices and edges, respectively the in and out degrees for a basic graph. 

from graphframes import *
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

#establishing a connection to the Spark cluster (code added)
sc = SparkContext('local').getOrCreate()
spark = SparkSession(sc)

# Create a Vertex DataFrame with unique ID column "id"
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
], ["id", "name", "age"])
# Create an Edge DataFrame with "src" and "dst" columns
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
], ["src", "dst", "relationship"])

# Create a GraphFrame
g = GraphFrame(v, e)

g.vertices.show()
g.edges.show()

g.inDegrees.show()
g.outDegrees.show()

#stopping the active context (code added)
sc.stop()

Output:
id nameage
a Alice34
b Bob36
cCharlie30
d David29
e Esther32
f Fanny36
g Gabby60
srcdstrelationship
a b friend
b c follow
c b follow
f c follow
e f follow
e d friend
d a friend
a e friend
idinDegree
f1
e1
d1
c2
b2
a1
idoutDegree
f1
e2
d1
c1
b1
a2

Notes:
Without the last line, running a second time the code will halt with the following error: 
ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local) created by __init__ at D:\Work\Python\untitled0.py:4

Loading the same data from a csv file involves a small overhead as the schema needs to be defined explicitly. The same output from above should be provided by the following code:

from graphframes import *
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import * 

#establishing a connection to the Spark cluster (code added)
sc = SparkContext('local').getOrCreate()
spark = SparkSession(sc)

nodes = [
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
]
edges = [
    StructField("src", StringType(), True),
    StructField("dst", StringType(), True),
    StructField("relationship", StringType(), True)
    ]

v = spark.read.csv(r"D:\data\nodes.csv", header=True, schema=StructType(nodes))

e = spark.read.csv(r"D:\data\edges.csv", header=True, schema=StructType(edges))

# Create a GraphFrame
g = GraphFrame(v, e)

g.vertices.show()
g.edges.show()

g.inDegrees.show()
g.outDegrees.show()

#stopping the active context (code added)
sc.stop()

The 'nodes.csv' file has the following content:
id,name,age
"a","Alice",34
"b","Bob",36
"c","Charlie",30
"d","David",29
"e","Esther",32
"f","Fanny",36
"g","Gabby",60

The 'edges.csv' file has the following content:
src,dst,relationship
"a","b","friend"
"b","c","follow"
"c","b","follow"
"f","c","follow"
"e","f","follow"
"e","d","friend"
"d","a","friend"
"a","e","friend"

Note:
There should be no spaces between values (e.g. "a", "b"), otherwise the results might deviate from expectations. 

Now, one can go and test further operations on the graph thus created:

#filtering edges 
gl = g.edges.filter("relationship = 'follow'").sort("src")
gl.show()
print("number edges: ", gl.count())

#filtering vertices
#gl = g.vertices.filter("age >= 30 and age<40").sort("id")
#gl.show()
#print("number vertices: ", gl.count())

# relationships involving edges and vertices
#motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
#motifs.show()

Happy coding!

Related Posts Plugin for WordPress, Blogger...

About Me

My photo
IT Professional with more than 24 years experience in IT in the area of full life-cycle of Web/Desktop/Database Applications Development, Software Engineering, Consultancy, Data Management, Data Quality, Data Migrations, Reporting, ERP implementations & support, Team/Project/IT Management, etc.