lakehouse.spark.bronze
- class lakehouse.spark.bronze.Bronze(spark: SparkSession, catalog: str, target_schema: str, config: Dict[str, Any] | None = None, **options: Dict[str, Any])
Bases:
ETLA generic class building a framework how to process data in the Bronze layer in a Medallion architecture.
Use the functions load(), transform() and write() to specify configs. Use execute() to execute the defined steps. Use optimize() to specify optimize configs and tblproperties() to set table properties. Default properties are set on new tables for you. See default properties in the tblproperties() function.
- Overwrite functions as required:
custom_load(self, table: str) -> DataFrame: Function to customize the way or the source data is loaded. required, if load(mode=”custom”) else ignored.
custom_filter(self, df: DataFrame, table: str) -> DataFrame: Function to filter the loaded dataframe and making use of predicate pushdown. required, if load(filter=”custom”) else ignored.
custom_transform(self, df: DataFrame, table: str) -> DataFrame: Function to be optionally overwritten to add custom transformations, only executed if transform() is defined
rename_columns(self, df: DataFrame, table: str) -> DataFrame:
select_columns(self, df: DataFrame, table: str) -> DataFrame:
cast_column_types(self, df: DataFrame, table: str) -> DataFrame: Function to cast column types based on defined config. Can be overwritten.
default_transform(self, df: DataFrame, table: str) -> DataFrame: Can be overwritten to add default transformations executed after the the custom transformations. Defaults create a timestamp column with the current timestamp of transformations. Only executed if transform(ignore_defaults=False)
get_replace_condition(self, df: DataFrame, table: str) -> str: Allows you to define the filter used for the replace where overwrite operation. required if write(mode=”replace”).
get_delta_merge_builder(self, df: DataFrame, delta_table: DeltaTable) -> DeltaMergeBuilder: Allows you to define the merge builder for the merge write into delta. required if write(mode=”merge”)
custom_write(self, df: DataFrame, table: str) -> None: Allows to define a custom write operation. required if write(mode=”custom”)
target_path(self, table: str) -> str: Allows you to specify a dynamic path if using external tables
checkpoint_path(self, table: str) -> str: Function to be overwritten to create the checkpoint path if performing a streaming write
- spark
Spark Session as provided to process the data
- Type:
SparkSession
- options
Kwargs, Any options provided into the class
- Type:
Dict[str, Any]
- catalog
Name of the created catalog recognized by spark e.g. from Hive Metastore or Unity Catalogue
- Type:
str
- target_schema
Name of the target_schema
- Type:
str
- data
Intermediate DataFrame per table based on the specified options before execute()
- Type:
Dict[str, DataFrame]
- __init__(spark: SparkSession, catalog: str, target_schema: str, config: Dict[str, Any] | None = None, **options: Dict[str, Any]) None
Initializes the Bronze class with user-provided options.
- Parameters:
spark (SparkSession) – existing Spark Session
catalog (str) – Name of the created catalog recognized by spark e.g. from Hive Metastore or Unity Catalogue, required
target_schema (str) – Name of the target_schema, required
config (Dict[str, Any] | None) – config to optionally pass the configs here instead of passing it during execute. Keys are load, transform, write, optimize and tblproperties
**options (Dict[str, Any] | None) – Kwargs, Any options provided into the class
- cast_column_types(df: DataFrame, table: str) DataFrame
Function to cast column types based on defined config. Can be overwritten.
- Parameters:
df (DataFrame) – DataFrame
table (str) – name of the table
- Returns:
transformed DataFrame
- checkpoint_path(table: str) str
Function to be overwritten to create the checkpoint path if performing a streaming write.
Create a string depending e.g. on catalog, env, schema and table
Example
>>> return f"D:/Data/{self.target_schema}/{table}/checkpoint"
- Parameters:
table (str) – name of the table
- Returns:
Path depening on table and other variables
- custom_filter(df: DataFrame, table: str) DataFrame
Abstract function which can be overwritten to filter the loaded dataframe and making use of predicate pushdown.
Filter rows and columns not needed here before applying any other transformations in transform()
- Parameters:
df (DataFrame) – DataFrame
table (str) – name of the table
- Returns:
filtered DataFrame
- custom_load(table: str) DataFrame
Abstract function to be overwritten to load data based on custom implemenatation and return a DataFrame
- Parameters:
table (str) – name of the table
- Returns:
loaded data as DataFrame
- custom_transform(df: DataFrame, table: str) DataFrame
Function to be optionally overwritten to add custom transformations, only executed if transform() is defined
- Parameters:
df (DataFrame) – DataFrame
table (str) – name of the table
- Returns:
transformed DataFrame
- custom_write(df: DataFrame, table: str) None
Abstract function to write a DataFrame.
- Parameters:
df (DataFrame) – DataFrame
table (str) – name of the table
- default_transform(df: DataFrame, table: str) DataFrame
Function adding the current timestamp as LH_BronzeTS to the transformed data.
Can be overwritten if more internal transformations should be added
- Parameters:
df (DataFrame) – DataFrame
table (str) – name of the table
- Returns:
transformed DataFrame with internal transformations
- execute(*tables) None
Executes the elt process via _execute_one() for one or multiple tables as specified.
- Parameters:
*tables – List of tables as args
- get_delta_merge_builder(df: DataFrame, delta_table: DeltaTable) DeltaMergeBuilder
Abstract function to define a DeltaMergeBuilder to perform the merge in the write function.
Executes a merge_schema if merge_schema = True.
Example
>>> merge_condition = "target.primary_key = source.primary_key" >>> builder = delta_table.alias("target").merge(df.alias("source"), merge_condition) >>> builder = builder.whenMatchedUpdateAll() >>> builder = builder.whenNotMatchedInsertAll() >>> return builder
More examples can be found here: https://docs.delta.io/latest/delta-update.html and https://docs.delta.io/latest/api/python/spark/index.html
- Parameters:
df (DataFrame) – DataFrame to be written
delta_table (DeltaTable) – target delta table to write to
- get_replace_condition(df: DataFrame, table: str) str
Condition as SQL expression to overwrite specific data.
Input data has to fullfill this condition
Example
>>> return "sample_col >= sample_value"
- Parameters:
df (DataFrame) – DataFrame
table (str) – name of the table
- load()
Function to set the loader configs.
- Returns:
self
- optimize(optimize: bool = False, optimize_full: bool = False, vacuum: bool = False, vacuum_lite: bool = False, analyze: bool = False, retention: int | None = None, excl_cols: list[str] | None = None)
Function to set the optimize configs.
- Parameters:
optimize (bool) – execute the optimize command on the given tables
optimize_full (bool) – execute the optimize as Full mode (needs Liquid clustering)
vacuum (bool) – execute the vacuum command on the given tables as to the defined retention
vacuum_lite (bool) – execute the vacuum with lite command
analyze (bool) – execute the analyze command on the given table to compute statistics
retention (int) – retention time for files to be considered for vacuum. Retention time must be at higher or equal default retention threshold. The default is 7 days (168 hours). If no value is provided retention threshold is taken. The threshold can be changed with the delta property delta.deletedFileRetentionDuration
excl_cols (list[str]) – list of cols to be excluded to compute statistics
- Returns:
self
- rename_columns(df: DataFrame, table: str) DataFrame
Function to rename columns based on defined config. Can be overwritten.
- Parameters:
df (DataFrame) – DataFrame
table (str) – name of the table
- Returns:
transformed DataFrame
- select_columns(df: DataFrame, table: str) DataFrame
Function to select columns based on defined config. Can be overwritten.
- Parameters:
df (DataFrame) – DataFrame
table (str) – name of the table
- Returns:
transformed DataFrame
- target_path(table: str) str
Function to be overwritten to create the table path.
Create a string depending e.g. on catalog, env, schema and table
Example
>>> return f"D:/Data/{self.target_schema}/{table}"
- Parameters:
table (str) – name of the table
- Returns:
Path depening on table and other variables
- tblproperties(clusterby: Literal['AUTO'] | list[str] | None = 'AUTO', deletion_vectors: bool = True, auto_compact: bool = True, optimize_write: bool = True, change_data_feed: bool = True, row_tracking: bool = True, type_widening: bool = False, tblproperties: Dict[str, str] | None = None)
Function to set the optimize configs.
- Parameters:
clusterby (Literal[“AUTO”] |list[str] | None) – list of cols to be liquid clustered, default AUTO (just works on Databricks), will default to None if not supported
deletion_vectors (bool) – enable deletion vectors (delta.enableDeletionVectors), default True, see also here: https://docs.delta.io/latest/delta-deletion-vectors.html
auto_compact (bool) – enable auto optimize (delta.autoOptimize.autoCompact), default True: https://docs.delta.io/latest/optimizations-oss.html#auto-compaction
optimize_write (bool) – enable optimize write (delta.autoOptimize.optimizeWrite), default True, see also here: https://docs.delta.io/latest/optimizations-oss.html#optimized-write
change_data_feed (bool) – enable change data feed, default True (delta.enableChangeDataFeed): https://docs.delta.io/latest/delta-change-data-feed.html
row_tracking (bool) – enable row tracking (delta.enableRowTracking), default True: https://docs.delta.io/latest/delta-row-tracking.html
type_widening (bool) – enable type widening (delta.enableTypeWidening), default False: https://docs.delta.io/latest/delta-type-widening.html
tblproperties (Dict[str, str] | None) – dict of any delta tblproperties as to https://docs.delta.io/
- Returns:
self
- transform(ignore_defaults: bool = False, transformation_order: list[str] = ['rename_columns', 'tbl_transformations', 'select_columns', 'cast_column_types'], tbl_transformations: Dict[str, str] = {}, rename_columns: Dict[str, Dict[str, str]] = {}, select_columns: Dict[str, list[str]] = {}, cast_column_types: Dict[str, Dict[str, str]] = {})
Function to set the transformer configs.
- Parameters:
ignore_defaults (bool) – default: False, Ignores executing default transformations as defined in default_transform function if True. Usually used during debugging
transformation_order (List[str]) – default: [“rename_columns”, “tbl_transformations”, “select_columns”, “cast_column_types”], Allows to define the order of transformations to be executed. Default is
tbl_transformations (Dict[str, str]) – default: {}, Allows to define custom transformations per table by specifying the table name as key and the function name as value. For the tables it is not defined custom_transform is used.
rename_columns (Dict[str, Dict[str, str]]) – default: {}, Allows to define column renaming per table by specifying the table name as key and the column renaming as value. The value is a dictionary with the old column name as key and the new column name as value.
select_columns (Dict[str, list[str]]) – default: {}, Allows to define column selection per table by specifying the table name as key and the column names as value.
cast_column_types (Dict[str, Dict[str, str]]) – default: {}, Allows to define column casting per table by specifying the table name as key and the column casting as value. The value is a dictionary with the column name as key and the cast type as value.
- Returns:
self
- write(mode: Literal['overwrite', 'append', 'replace', 'merge', 'stream', 'custom'] = 'append', merge_schema: bool = False, external: bool = False)
Function to set the writer configs.
- Parameters:
mode (str) – overwrite, append, replace, merge or custom, default: append, Defines the mode of writing the data as overwrite, append, replace (define replace filter with function get_replace_condition(), merge (define merge builder with function get_delta_merge_builder(), stream (requires checkpoint_path() function), custom (define custom_write() function)
merge_schema (bool) – default: False, If the schema should be automatically envolved/merged
external (bool) – default: False, if True tables are saved as external tables based on the defined path in the path function
- Returns:
self