lakehouse.spark.etl

class lakehouse.spark.etl.ETL(spark: SparkSession, catalog: str, source_schema: str, target_schema: str, config: Dict[str, Any] | None = None, **options: Dict[str, Any])

Bases: ETLLoader, ETLTransformer, ETLWriter, ETLOptimizer, ETLTblProperties, Interface

A generic class building a framework how to process data from one layer to the other in a Medallion architecture.

Use the functions load(), transform() and write() to specify configs. Use execute() to execute the defined steps. Use optimize() to specify optimize configs and tblproperties() to set table properties. Default properties are set on new tables for you. See default properties in the tblproperties() function.

Overwrite functions as required:
  • custom_load(self, table: str) -> DataFrame: Function to customize the way or the source data is loaded. required, if load(mode=”custom”) else ignored.

  • custom_filter(self, df: DataFrame, table: str) -> DataFrame: Function to filter the loaded dataframe and making use of predicate pushdown. required, if load(filter=”custom”) else ignored.

  • custom_transform(self, df: DataFrame, table: str) -> DataFrame: Function to be optionally overwritten to add custom transformations, only executed if transform() is defined

  • rename_columns(self, df: DataFrame, table: str) -> DataFrame:

  • select_columns(self, df: DataFrame, table: str) -> DataFrame:

  • cast_column_types(self, df: DataFrame, table: str) -> DataFrame: Function to cast column types based on defined config. Can be overwritten.

  • default_transform(self, df: DataFrame, table: str) -> DataFrame: Can be overwritten to add default transformations executed after the the custom transformations. Defaults create a timestamp column with the current timestamp of transformations. Only executed if transform(ignore_defaults=False)

  • get_replace_condition(self, df: DataFrame, table: str) -> str: Allows you to define the filter used for the replace where overwrite operation. required if write(mode=”replace”).

  • get_delta_merge_builder(self, df: DataFrame, delta_table: DeltaTable) -> DeltaMergeBuilder: Allows you to define the merge builder for the merge write into delta. required if write(mode=”merge”)

  • custom_write(self, df: DataFrame, table: str) -> None: Allows to define a custom write operation. required if write(mode=”custom”)

  • target_path(self, table: str) -> str: Allows you to specify a dynamic path if using external tables

  • checkpoint_path(self, table: str) -> str: Function to be overwritten to create the checkpoint path if performing a streaming write

spark

Spark Session as provided to process the data

Type:

SparkSession

options

Kwargs, Any options provided into the class

Type:

Dict[str, Any]

catalog

Name of the created catalog recognized by spark e.g. from Hive Metastore or Unity Catalogue

Type:

str

source_schema

Name of the source_schema

Type:

str

target_schema

Name of the target_schema

Type:

str

data

Intermediate DataFrame per table based on the specified options before execute()

Type:

Dict[str, DataFrame]

__init__(spark: SparkSession, catalog: str, source_schema: str, target_schema: str, config: Dict[str, Any] | None = None, **options: Dict[str, Any]) None

Initializes the ETL class with user-provided options.

Parameters:
  • spark (SparkSession) – existing Spark Session

  • catalog (str) – Name of the created catalog recognized by spark e.g. from Hive Metastore or Unity Catalogue, required

  • source_schema (str) – Name of the source_schema, required

  • target_schema (str) – Name of the target_schema, required

  • config (Dict[str, Any] | None) – config to optionally pass the configs here instead of passing it during execute. Keys are load, transform, write, optimize and tblproperties

  • **options (Dict[str, Any]) – Kwargs, Any other options provided into the class

cast_column_types(df: DataFrame, table: str) DataFrame

Function to cast column types based on defined config. Can be overwritten.

Parameters:
  • df (DataFrame) – DataFrame

  • table (str) – name of the table

Returns:

transformed DataFrame

checkpoint_path(table: str) str

Function to be overwritten to create the checkpoint path if performing a streaming write.

Create a string depending e.g. on catalog, env, schema and table

Example

>>> return f"D:/Data/{self.target_schema}/{table}/checkpoint"
Parameters:

table (str) – name of the table

Returns:

Path depening on table and other variables

custom_filter(df: DataFrame, table: str) DataFrame

Abstract function which can be overwritten to filter the loaded dataframe and making use of predicate pushdown.

Filter rows and columns not needed here before applying any other transformations in transform()

Parameters:
  • df (DataFrame) – DataFrame

  • table (str) – name of the table

Returns:

filtered DataFrame

custom_load(table: str) DataFrame

Abstract function to be overwritten to load data based on custom implemenatation and return a DataFrame

Parameters:

table (str) – name of the table

Returns:

loaded data as DataFrame

custom_transform(df: DataFrame, table: str) DataFrame

Function to be optionally overwritten to add custom transformations, only executed if transform() is defined

Parameters:
  • df (DataFrame) – DataFrame

  • table (str) – name of the table

Returns:

transformed DataFrame

custom_write(df: DataFrame, table: str) None

Abstract function to write a DataFrame.

Parameters:
  • df (DataFrame) – DataFrame

  • table (str) – name of the table

default_transform(df: DataFrame, table: str) DataFrame

Can be overwritten to add default transformations executed after the the custom transformations. Defaults create a timestamp column with the current timestamp of transformations. Only executed if transform(ignore_defaults=False)

Parameters:
  • df (DataFrame) – DataFrame

  • table (str) – name of the table

Returns:

transformed DataFrame with internal transformations

execute(*tables) None

Executes the elt process via _execute_one() for one or multiple tables as specified.

Parameters:

*tables – List of tables as args

get_delta_merge_builder(df: DataFrame, delta_table: DeltaTable) DeltaMergeBuilder

Abstract function to define a DeltaMergeBuilder to perform the merge in the write function.

Executes a merge_schema if merge_schema = True.

Example

>>> merge_condition = "target.primary_key = source.primary_key"
>>> builder = delta_table.alias("target").merge(df.alias("source"), merge_condition)
>>> builder = builder.whenMatchedUpdateAll()
>>> builder = builder.whenNotMatchedInsertAll()
>>> return builder

More examples can be found here: https://docs.delta.io/latest/delta-update.html and https://docs.delta.io/latest/api/python/spark/index.html

Parameters:
  • df (DataFrame) – DataFrame to be written

  • delta_table (DeltaTable) – target delta table to write to

get_replace_condition(df: DataFrame, table: str) str

Condition as SQL expression to overwrite specific data.

Input data has to fullfill this condition

Example

>>> return "sample_col >= sample_value"
Parameters:
  • df (DataFrame) – DataFrame

  • table (str) – name of the table

load(mode: Literal['default', 'custom'] = 'default', filter: Literal['all', 'custom', 'new'] = 'all', date_col: str = None, source_tbl: str = None)

Function to set the loader configs.

Parameters:
  • mode (str) – default or custom, default: default, Mode of loading loads either the table from source_schema.table as default or as defined in the custom_load function. In Bronze always a custom_load function is needed meaning the default is custom

  • filter (str) – all, custom or new, default: all, Allows applying directly filters on the loaded data for predicate pushdown using “custom”, otherwise “all” data is loaded. New determines based on a datae column which data is new. In Bronze always all data is loaded based on the custom_load function

  • date_col (str) – Name of the date column used to determine new data, default: None, must exist in the source and target schema and must be defined if filter is new

  • source_tbl (str) – Name of the source table, default: None, If provided the source table is used instead of the provided target table to load data

Returns:

self

optimize(optimize: bool = False, optimize_full: bool = False, vacuum: bool = False, vacuum_lite: bool = False, analyze: bool = False, retention: int | None = None, excl_cols: list[str] | None = None)

Function to set the optimize configs.

Parameters:
  • optimize (bool) – execute the optimize command on the given tables

  • optimize_full (bool) – execute the optimize as Full mode (needs Liquid clustering)

  • vacuum (bool) – execute the vacuum command on the given tables as to the defined retention

  • vacuum_lite (bool) – execute the vacuum with lite command

  • analyze (bool) – execute the analyze command on the given table to compute statistics

  • retention (int) – retention time for files to be considered for vacuum. Retention time must be at higher or equal default retention threshold. The default is 7 days (168 hours). If no value is provided retention threshold is taken. The threshold can be changed with the delta property delta.deletedFileRetentionDuration

  • excl_cols (list[str]) – list of cols to be excluded to compute statistics

Returns:

self

rename_columns(df: DataFrame, table: str) DataFrame

Function to rename columns based on defined config. Can be overwritten.

Parameters:
  • df (DataFrame) – DataFrame

  • table (str) – name of the table

Returns:

transformed DataFrame

select_columns(df: DataFrame, table: str) DataFrame

Function to select columns based on defined config. Can be overwritten.

Parameters:
  • df (DataFrame) – DataFrame

  • table (str) – name of the table

Returns:

transformed DataFrame

target_path(table: str) str

Function to be overwritten to create the table path.

Create a string depending e.g. on catalog, env, schema and table

Example

>>> return f"D:/Data/{self.target_schema}/{table}"
Parameters:

table (str) – name of the table

Returns:

Path depening on table and other variables

tblproperties(clusterby: Literal['AUTO'] | list[str] | None = 'AUTO', deletion_vectors: bool = True, auto_compact: bool = True, optimize_write: bool = True, change_data_feed: bool = True, row_tracking: bool = True, type_widening: bool = False, tblproperties: Dict[str, str] | None = None)

Function to set the optimize configs.

Parameters:
Returns:

self

transform(ignore_defaults: bool = False, transformation_order: list[str] = ['rename_columns', 'tbl_transformations', 'select_columns', 'cast_column_types'], tbl_transformations: Dict[str, str] = {}, rename_columns: Dict[str, Dict[str, str]] = {}, select_columns: Dict[str, list[str]] = {}, cast_column_types: Dict[str, Dict[str, str]] = {})

Function to set the transformer configs.

Parameters:
  • ignore_defaults (bool) – default: False, Ignores executing default transformations as defined in default_transform function if True. Usually used during debugging

  • transformation_order (List[str]) – default: [“rename_columns”, “tbl_transformations”, “select_columns”, “cast_column_types”], Allows to define the order of transformations to be executed. Default is

  • tbl_transformations (Dict[str, str]) – default: {}, Allows to define custom transformations per table by specifying the table name as key and the function name as value. For the tables it is not defined custom_transform is used.

  • rename_columns (Dict[str, Dict[str, str]]) – default: {}, Allows to define column renaming per table by specifying the table name as key and the column renaming as value. The value is a dictionary with the old column name as key and the new column name as value.

  • select_columns (Dict[str, list[str]]) – default: {}, Allows to define column selection per table by specifying the table name as key and the column names as value.

  • cast_column_types (Dict[str, Dict[str, str]]) – default: {}, Allows to define column casting per table by specifying the table name as key and the column casting as value. The value is a dictionary with the column name as key and the cast type as value.

Returns:

self

write(mode: Literal['overwrite', 'append', 'replace', 'merge', 'stream', 'custom'] = 'append', merge_schema: bool = False, external: bool = False)

Function to set the writer configs.

Parameters:
  • mode (str) – overwrite, append, replace, merge or custom, default: append, Defines the mode of writing the data as overwrite, append, replace (define replace filter with function get_replace_condition(), merge (define merge builder with function get_delta_merge_builder(), stream (requires checkpoint_path() function), custom (define custom_write() function)

  • merge_schema (bool) – default: False, If the schema should be automatically envolved/merged

  • external (bool) – default: False, if True tables are saved as external tables based on the defined path in the path function

Returns:

self