Microsoft provides a set of labs and exercises that can be used to learn working with data in Fabric, however the real learning comes when one considers an example that introduces something new. As I've downloaded some time ago an archive with several datasets on Sales forecast from the Kaggle website, I tried to import the Features dataset in different ways and see how it goes.
Store,Date,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,IsHoliday
1,2010-02-05,42.31,2.572,NA,NA,NA,NA,NA,211.0963582,8.106,FALSE
1,2010-02-12,38.51,2.548,NA,NA,NA,NA,NA,211.2421698,8.106,TRUE
1,2010-02-19,39.93,2.514,NA,NA,NA,NA,NA,211.2891429,8.106,FALSE
1,2010-02-26,46.63,2.561,NA,NA,NA,NA,NA,211.3196429,8.106,FALSE
1,2010-03-05,46.5,2.625,NA,NA,NA,NA,NA,211.3501429,8.106,FALSE
Within an existing lakehouse, one can import the CSV as it is via 'Files/Upload' and once the data is imported, once can navigate to the file and use 'Load to Tables/New Table' to important the data into a managed table. Unfortunately, because some of the numeric fields include also literal values "NA" for the columns with NULLs, their data type is considered as varchar(8000), which is not ideal for calculations:
-- table created via Load to Tables CREATE TABLE [dbo].[walmart_features]( [Store] [int] NULL, [Date] [date] NULL, [Temperature] [float] NULL, [Fuel_Price] [float] NULL, [MarkDown1] [varchar](8000) NULL, [MarkDown2] [varchar](8000) NULL, [MarkDown3] [varchar](8000) NULL, [MarkDown4] [varchar](8000) NULL, [MarkDown5] [varchar](8000) NULL, [CPI] [varchar](8000) NULL, [Unemployment] [varchar](8000) NULL, [IsHoliday] [bit] NULL ) ON [PRIMARY]
This could be fixed by replacing the NA values with an empty value, which I did and used this version for the next steps.
I tried then using Spark to import the data, though then all the fields are defined as varchar(8000).
-- table created via Spark CREATE TABLE [dbo].[walmart_features2]( [Store] [varchar](8000) NULL, [Date] [varchar](8000) NULL, [Temperature] [varchar](8000) NULL, [Fuel_Price] [varchar](8000) NULL, [MarkDown1] [varchar](8000) NULL, [MarkDown2] [varchar](8000) NULL, [MarkDown3] [varchar](8000) NULL, [MarkDown4] [varchar](8000) NULL, [MarkDown5] [varchar](8000) NULL, [CPI] [varchar](8000) NULL, [Unemployment] [varchar](8000) NULL, [IsHoliday] [varchar](8000) NULL ) ON [PRIMARY] GO
So, is needed to define the schema explicitly, however I had to import the IsHoliday as string and cast the value explicitly to a Boolean using a second data frame (see alternatives):
from pyspark.sql.types import * from pyspark.sql.functions import * #define schema featuresSchema = StructType([ StructField("Store", IntegerType()) , StructField("Date", DateType()) , StructField("Temperature", DecimalType(13,2)) , StructField("Fuel_Price", DecimalType(13,2)) , StructField("MarkDown1", DecimalType(13,2)) , StructField("MarkDown2", DecimalType(13,2)) , StructField("MarkDown3", DecimalType(13,2)) , StructField("MarkDown4", DecimalType(13,2)) , StructField("MarkDown5", DecimalType(13,2)) , StructField("CPI", DecimalType(18,6)) , StructField("Unemployment", DecimalType(13,2)) , StructField("IsHoliday", StringType()) ]) # Load a file into a dataframe df = spark.read.load('Files/OpenSource/features2.csv' , format='csv' , schema = featuresSchema , header=True)
# do the conversion for isHoliday
df2 = df.withColumn("IsHoliday", df.IsHoliday.cast(BooleanType())) # Save the dataframe as a delta table df2.write.format("delta").saveAsTable("walmart_features3")
Now, table's definition looks much better:
-- table created via Spark with explicit schema CREATE TABLE [dbo].[walmart_features3]( [Store] [int] NULL, [Date] [date] NULL, [Temperature] [decimal](13, 2) NULL, [Fuel_Price] [decimal](13, 2) NULL, [MarkDown1] [decimal](13, 2) NULL, [MarkDown2] [decimal](13, 2) NULL, [MarkDown3] [decimal](13, 2) NULL, [MarkDown4] [decimal](13, 2) NULL, [MarkDown5] [decimal](13, 2) NULL, [CPI] [decimal](18, 6) NULL, [Unemployment] [decimal](13, 2) NULL, [IsHoliday] [bit] NULL ) ON [PRIMARY] GO
Comments:
(1) I tried to apply the schema change directly on the initial data frame, though the schema didn't change:
df.withColumn("IsHoliday", df.IsHoliday.cast(BooleanType()))
(2) For the third method one could have left the NA in because by the conversion a NULL will be considered. Conversely, it might be needed to check if there are other values that fail the conversion.
(3) The following warning in the Notebook when running the above code is a hint that something went wrong during the conversion (e.g. decimals were cut):
"Your file(s) might include corrupted records"
(4) Especially for the transformed values it makes sense to look at the values (at last when the dataset isn't too big):
-- validating the values for the Boolean data field SELECT IsHoliday , count(*) NoRecords FROM dbo.walmart_features3 GROUP BY IsHoliday ORDER BY 1
(5) The tables can be deleted directly in the lakehouse or via PySpark (observe the catalog.table_name):
#dropping the table spark.sql('DROP TABLE IF EXISTS UAT.walmart_features3')
At the beginning probably it makes sense to remove "IF EXISTS" to make sure that the table is available.
(6) For those who run into a similar issue, the SQL Endpoint for the lakehouse is read only, therefore attempting to delete a table via SSMS will result in such an error:
Drop failed for Table 'dbo.walmart_features2'. (Microsoft.SqlServer.Smo)
The external policy action 'Microsoft.Sql/Sqlservers/Databases/Schemas/Tables/Drop' was denied on the requested resource.
Cannot drop the table 'walmart_features2', because it does not exist or you do not have permission.
Happy coding!
No comments:
Post a Comment