lakehouse.daft.bronze
- class lakehouse.daft.bronze.Bronze(catalog: str, target_schema: str, io_config: IOConfig | None = None, config: Dict[str, Any] | None = None, **options: Dict[str, Any])
Bases:
ETLA generic class building a framework how to process data in the Bronze layer in a Medallion architecture.
Use the functions load(), transform() and write() to specify configs. Use execute() to execute the defined steps.
- Overwrite functions as required:
custom_load(self, table: str) -> daft.DataFrame: Function to customize the way or the source data is loaded. required, if load(mode=”custom”) else ignored.
custom_filter(self, df: daft.DataFrame, table: str) -> daft.DataFrame: Function to filter the loaded dataframe and making use of predicate pushdown. required, if load(filter=”custom”) else ignored.
custom_transform(self, df: daft.DataFrame, table: str) -> daft.DataFrame: Function to be optionally overwritten to add custom transformations, only executed if transform() is defined
rename_columns(self, df: daft.DataFrame, table: str) -> daft.DataFrame:
select_columns(self, df: daft.DataFrame, table: str) -> daft.DataFrame:
cast_column_types(self, df: daft.DataFrame, table: str) -> daft.DataFrame: Function to cast column types based on defined config. Can be overwritten.
default_transform(self, df: daft.DataFrame, table: str) -> daft.DataFrame: Can be overwritten to add default transformations executed after the the custom transformations. Defaults create a timestamp column with the current timestamp of transformations. Only executed if transform(ignore_defaults=False)
get_replace_condition(self, df: daft.DataFrame, table: str) -> str: Allows you to define the filter used for the replace where overwrite operation. required if write(mode=”replace”).
get_delta_merge_builder(self, df: daft.DataFrame, delta_table: DeltaTable) -> DeltaMergeBuilder: Allows you to define the merge builder for the merge write into delta. required if write(mode=”merge”)
custom_write(self, df: daft.DataFrame, table: str) -> None: Allows to define a custom write operation. required if write(mode=”custom”)
source_path(self, table: str) -> str: Allows you to specify a dynamic path for the target
target_path(self, table: str) -> str: Allows you to specify a dynamic path for the target
- options
Kwargs, Any options provided into the class
- Type:
Dict[str, Any]
- catalog
Name of the catalog, required
- Type:
str
- target_schema
Name of the target_schema
- Type:
str
- io_config
IOConfig object from daft as to https://www.getdaft.io/projects/docs/en/stable/api_docs/doc_gen/io_configs/daft.io.IOConfig.html and https://www.getdaft.io/projects/docs/en/stable/integrations/
- Type:
IOConfig | None
- data
Intermediate DataFrame per table based on the specified options before execute()
- Type:
Dict[str, daft.DataFrame]
- __init__(catalog: str, target_schema: str, io_config: IOConfig | None = None, config: Dict[str, Any] | None = None, **options: Dict[str, Any]) None
Initializes the Bronze class with user-provided options.
- Parameters:
catalog (str) – Name of the catalog, required
target_schema (str) – Name of the target_schema, required
io_config (IOConfig | None) – IOConfig object from daft as to https://www.getdaft.io/projects/docs/en/stable/api_docs/doc_gen/io_configs/daft.io.IOConfig.html and https://www.getdaft.io/projects/docs/en/stable/integrations/
config (Dict[str, Any] | None) – config to optionally pass the configs here instead of passing it during execute. Keys are load, transform, write, optimize and tblproperties
**options (Dict[str, Any] | None) – Kwargs, Any options provided into the class
- cast_column_types(df: DataFrame, table: str) DataFrame
Function to cast column types based on defined config. Can be overwritten.
- Parameters:
df (daft.DataFrame) – DataFrame
table (str) – name of the table
- Returns:
transformed DataFrame
- custom_filter(df: DataFrame, table: str) DataFrame
Abstract function which can be overwritten to filter the loaded dataframe and making use of predicate pushdown.
Filter rows and columns not needed here before applying any other transformations in transform()
- Parameters:
df (daft.DataFrame) – DataFrame
table (str) – name of the table
- Returns:
filtered DataFrame
- custom_load(table: str) DataFrame
Abstract function to be overwritten to load data based on custom implemenatation and return a DataFrame
- Parameters:
table (str) – name of the table
- Returns:
loaded data as DataFrame
- custom_transform(df: DataFrame, table: str) DataFrame
Function to be optionally overwritten to add custom transformations, only executed if transform() is defined
- Parameters:
df (daft.DataFrame) – DataFrame
table (str) – name of the table
- Returns:
transformed DataFrame
- custom_write(df: DataFrame, table: str) None
Abstract function to write a DataFrame.
- Parameters:
df (daft.DataFrame) – DataFrame
table (str) – name of the table
- default_transform(df: DataFrame, table: str) DataFrame
Function adding the current timestamp as LH_BronzeTS to the transformed data.
Can be overwritten if more internal transformations should be added
- Parameters:
df (daft.DataFrame) – DataFrame
table (str) – name of the table
- Returns:
transformed DataFrame with internal transformations
- execute(*tables) None
Executes the elt process via _execute_one() for one or multiple tables as specified.
- Parameters:
*tables – List of tables as args
- get_delta_merge_builder(df: Table, delta_table: DeltaTable) TableMerger
Abstract function to define a DeltaMergeBuilder to perform the merge in the write function.
Executes a merge_schema if merge_schema = True.
Example
>>> merge_condition = "target.primary_key = source.primary_key" >>> builder = delta_table.merge(df, predicate=merge_condition, source_alias="source", target_alias="target") >>> builder = builder.when_matched_update_all() >>> builder = builder.when_not_matched_insert_all() >>> return builder
More examples can be found here: https://delta-io.github.io/delta-rs/api/delta_table/delta_table_merger/
- Parameters:
df (pyarrow.Table) – DataFrame as pyarrow table to be written
delta_table (DeltaTable) – target delta table to write to
- get_replace_condition(df: DataFrame, table: str) str
Condition as SQL expression to overwrite specific data.
Input data has to fullfill this condition
Example
>>> return "sample_col >= sample_value"
- Parameters:
df (daft.DataFrame) – DataFrame
table (str) – name of the table
- load()
Function to set the loader configs.
- Returns:
self
- rename_columns(df: DataFrame, table: str) DataFrame
Function to rename columns based on defined config. Can be overwritten.
- Parameters:
df (daft.DataFrame) – DataFrame
table (str) – name of the table
- Returns:
transformed DataFrame
- select_columns(df: DataFrame, table: str) DataFrame
Function to select columns based on defined config. Can be overwritten.
- Parameters:
df (daft.DataFrame) – DataFrame
table (str) – name of the table
- Returns:
transformed DataFrame
- source_path(table: str) str
Function to be overwritten to create the table path.
Create a string depending e.g. on catalog, env, schema and table
Example
>>> return f"D:/Data/{self.catalog}/{self.source_schema}/{table}"
- Parameters:
table (str) – name of the table
- Returns:
Path depening on table and other variables
- target_path(table: str) str
Function to be overwritten to create the table path.
Create a string depending e.g. on catalog, env, schema and table
Example
>>> return f"D:{self.catalog}/{self.target_schema}/{table}"
- Parameters:
table (str) – name of the table
- Returns:
Path depening on table and other variables
- transform(ignore_defaults: bool = False, transformation_order: list[str] = ['rename_columns', 'tbl_transformations', 'select_columns', 'cast_column_types'], tbl_transformations: Dict[str, str] = {}, rename_columns: Dict[str, Dict[str, str]] = {}, select_columns: Dict[str, list[str]] = {}, cast_column_types: Dict[str, Dict[str, str]] = {})
Function to set the transformer configs.
- Parameters:
ignore_defaults (bool) – default: False, Ignores executing default transformations as defined in default_transform function if True. Usually used during debugging
transformation_order (List[str]) – default: [“rename_columns”, “tbl_transformations”, “select_columns”, “cast_column_types”], Allows to define the order of transformations to be executed. Default is
tbl_transformations (Dict[str, str]) – default: {}, Allows to define custom transformations per table by specifying the table name as key and the function name as value. For the tables it is not defined custom_transform is used.
rename_columns (Dict[str, Dict[str, str]]) – default: {}, Allows to define column renaming per table by specifying the table name as key and the column renaming as value. The value is a dictionary with the old column name as key and the new column name as value.
select_columns (Dict[str, list[str]]) – default: {}, Allows to define column selection per table by specifying the table name as key and the column names as value.
cast_column_types (Dict[str, Dict[str, str]]) – default: {}, Allows to define column casting per table by specifying the table name as key and the column casting as value. The value is a dictionary with the column name as key and the cast type as value.
- Returns:
self
- write(mode: Literal['overwrite', 'append', 'replace', 'merge', 'custom'] = 'append', overwrite_schema: bool = False)
Function to set the writer configs.
- Parameters:
mode (str) – overwrite, append, replace, merge or custom, default: append, Defines the mode of writing the data as overwrite, append, replace (define replace filter with function get_replace_condition(), merge (define merge builder with function get_delta_merge_builder(), custom (define custom_write() function)
overwrite_schema (bool) – True if schema should be overwritten, default: False, only available for mode overwrite
- Returns:
self