lakehouse.daft.silver

class lakehouse.daft.silver.Silver(catalog: str, source_schema: str, target_schema: str, io_config: IOConfig | None = None, config: Dict[str, Any] | None = None, **options: Dict[str, Any])

Bases: ETL

A generic class building a framework how to process data in the Silver layer in a Medallion architecture.

Use the functions load(), transform() and write() to specify configs. Use execute() to execute the defined steps.

Overwrite functions as required:
  • custom_load(self, table: str) -> daft.DataFrame: Function to customize the way or the source data is loaded. required, if load(mode=”custom”) else ignored.

  • custom_filter(self, df: daft.DataFrame, table: str) -> daft.DataFrame: Function to filter the loaded dataframe and making use of predicate pushdown. required, if load(filter=”custom”) else ignored.

  • custom_transform(self, df: daft.DataFrame, table: str) -> daft.DataFrame: Function to be optionally overwritten to add custom transformations, only executed if transform() is defined

  • rename_columns(self, df: daft.DataFrame, table: str) -> daft.DataFrame:

  • select_columns(self, df: daft.DataFrame, table: str) -> daft.DataFrame:

  • cast_column_types(self, df: daft.DataFrame, table: str) -> daft.DataFrame: Function to cast column types based on defined config. Can be overwritten.

  • default_transform(self, df: daft.DataFrame, table: str) -> daft.DataFrame: Can be overwritten to add default transformations executed after the the custom transformations. Defaults create a timestamp column with the current timestamp of transformations. Only executed if transform(ignore_defaults=False)

  • get_replace_condition(self, df: daft.DataFrame, table: str) -> str: Allows you to define the filter used for the replace where overwrite operation. required if write(mode=”replace”).

  • get_delta_merge_builder(self, df: daft.DataFrame, delta_table: DeltaTable) -> DeltaMergeBuilder: Allows you to define the merge builder for the merge write into delta. required if write(mode=”merge”)

  • custom_write(self, df: daft.DataFrame, table: str) -> None: Allows to define a custom write operation. required if write(mode=”custom”)

  • source_path(self, table: str) -> str: Allows you to specify a dynamic path for the target

  • target_path(self, table: str) -> str: Allows you to specify a dynamic path for the target

options

Kwargs, Any options provided into the class

Type:

Dict[str, Any]

catalog

Name of the catalog, required

Type:

str

source_schema

Name of the source_schema

Type:

str

target_schema

Name of the target_schema

Type:

str

io_config

IOConfig object from daft as to https://www.getdaft.io/projects/docs/en/stable/api_docs/doc_gen/io_configs/daft.io.IOConfig.html and https://www.getdaft.io/projects/docs/en/stable/integrations/

Type:

IOConfig | None

data

Intermediate DataFrame per table based on the specified options before execute()

Type:

Dict[str, daft.DataFrame]

__init__(catalog: str, source_schema: str, target_schema: str, io_config: IOConfig | None = None, config: Dict[str, Any] | None = None, **options: Dict[str, Any]) None

Initializes the Silver class with user-provided options.

Parameters:
cast_column_types(df: DataFrame, table: str) DataFrame

Function to cast column types based on defined config. Can be overwritten.

Parameters:
  • df (daft.DataFrame) – DataFrame

  • table (str) – name of the table

Returns:

transformed DataFrame

custom_filter(df: DataFrame, table: str) DataFrame

Abstract function which can be overwritten to filter the loaded dataframe and making use of predicate pushdown.

Filter rows and columns not needed here before applying any other transformations in transform()

Parameters:
  • df (daft.DataFrame) – DataFrame

  • table (str) – name of the table

Returns:

filtered DataFrame

custom_load(table: str) DataFrame

Abstract function to be overwritten to load data based on custom implemenatation and return a DataFrame

Parameters:

table (str) – name of the table

Returns:

loaded data as DataFrame

custom_transform(df: DataFrame, table: str) DataFrame

Function to be optionally overwritten to add custom transformations, only executed if transform() is defined

Parameters:
  • df (daft.DataFrame) – DataFrame

  • table (str) – name of the table

Returns:

transformed DataFrame

custom_write(df: DataFrame, table: str) None

Abstract function to write a DataFrame.

Parameters:
  • df (daft.DataFrame) – DataFrame

  • table (str) – name of the table

default_transform(df: DataFrame, table: str) DataFrame

Function adding the current timestamp as LH_SilverTS to the transformed data.

Can be overwritten if more internal transformations should be added

Parameters:
  • df (daft.DataFrame) – DataFrame

  • table (str) – name of the table

Returns:

transformed DataFrame with internal transformations

execute(*tables) None

Executes the elt process via _execute_one() for one or multiple tables as specified.

Parameters:

*tables – List of tables as args

get_delta_merge_builder(df: Table, delta_table: DeltaTable) TableMerger

Abstract function to define a DeltaMergeBuilder to perform the merge in the write function.

Executes a merge_schema if merge_schema = True.

Example

>>> merge_condition = "target.primary_key = source.primary_key"
>>> builder = delta_table.merge(df, predicate=merge_condition, source_alias="source", target_alias="target")
>>> builder = builder.when_matched_update_all()
>>> builder = builder.when_not_matched_insert_all()
>>> return builder

More examples can be found here: https://delta-io.github.io/delta-rs/api/delta_table/delta_table_merger/

Parameters:
  • df (pyarrow.Table) – DataFrame as pyarrow table to be written

  • delta_table (DeltaTable) – target delta table to write to

get_replace_condition(df: DataFrame, table: str) str

Condition as SQL expression to overwrite specific data.

Input data has to fullfill this condition

Example

>>> return "sample_col >= sample_value"
Parameters:
  • df (daft.DataFrame) – DataFrame

  • table (str) – name of the table

load(mode: Literal['default', 'custom'] = 'default', filter: Literal['all', 'custom'] = 'all', date_col: str = None)

Function to set the loader configs.

Parameters:
  • mode (str) – default or custom, default: default, Mode of loading loads either the table from source_schema.table as default or as defined in the custom_load function. In Bronze always a custom_load function is needed meaning the default is custom

  • filter (str) – all or custom, default: all, Allows applying directly filters on the loaded data for predicate pushdown using “custom”, otherwise “all” data is loaded. In Bronze always all data is loaded based on the custom_load function

  • date_col (str) – Name of the date column used to determine new data, default: None, must exist in the source and target schema and must be defined if filter is new

Returns:

self

rename_columns(df: DataFrame, table: str) DataFrame

Function to rename columns based on defined config. Can be overwritten.

Parameters:
  • df (daft.DataFrame) – DataFrame

  • table (str) – name of the table

Returns:

transformed DataFrame

select_columns(df: DataFrame, table: str) DataFrame

Function to select columns based on defined config. Can be overwritten.

Parameters:
  • df (daft.DataFrame) – DataFrame

  • table (str) – name of the table

Returns:

transformed DataFrame

source_path(table: str) str

Function to be overwritten to create the table path.

Create a string depending e.g. on catalog, env, schema and table

Example

>>> return f"D:/Data/{self.catalog}/{self.source_schema}/{table}"
Parameters:

table (str) – name of the table

Returns:

Path depening on table and other variables

target_path(table: str) str

Function to be overwritten to create the table path.

Create a string depending e.g. on catalog, env, schema and table

Example

>>> return f"D:{self.catalog}/{self.target_schema}/{table}"
Parameters:

table (str) – name of the table

Returns:

Path depening on table and other variables

transform(ignore_defaults: bool = False, transformation_order: list[str] = ['rename_columns', 'tbl_transformations', 'select_columns', 'cast_column_types'], tbl_transformations: Dict[str, str] = {}, rename_columns: Dict[str, Dict[str, str]] = {}, select_columns: Dict[str, list[str]] = {}, cast_column_types: Dict[str, Dict[str, str]] = {})

Function to set the transformer configs.

Parameters:
  • ignore_defaults (bool) – default: False, Ignores executing default transformations as defined in default_transform function if True. Usually used during debugging

  • transformation_order (List[str]) – default: [“rename_columns”, “tbl_transformations”, “select_columns”, “cast_column_types”], Allows to define the order of transformations to be executed. Default is

  • tbl_transformations (Dict[str, str]) – default: {}, Allows to define custom transformations per table by specifying the table name as key and the function name as value. For the tables it is not defined custom_transform is used.

  • rename_columns (Dict[str, Dict[str, str]]) – default: {}, Allows to define column renaming per table by specifying the table name as key and the column renaming as value. The value is a dictionary with the old column name as key and the new column name as value.

  • select_columns (Dict[str, list[str]]) – default: {}, Allows to define column selection per table by specifying the table name as key and the column names as value.

  • cast_column_types (Dict[str, Dict[str, str]]) – default: {}, Allows to define column casting per table by specifying the table name as key and the column casting as value. The value is a dictionary with the column name as key and the cast type as value.

Returns:

self

write(mode: Literal['overwrite', 'append', 'replace', 'merge', 'custom'] = 'append', overwrite_schema: bool = False)

Function to set the writer configs.

Parameters:
  • mode (str) – overwrite, append, replace, merge or custom, default: append, Defines the mode of writing the data as overwrite, append, replace (define replace filter with function get_replace_condition(), merge (define merge builder with function get_delta_merge_builder(), custom (define custom_write() function)

  • overwrite_schema (bool) – True if schema should be overwritten, default: False, only available for mode overwrite

Returns:

self