Welcome to Lakehouse-NS

Lakehouse-NS gives you a simple framework to implement your lakehouse based on the Medallion Architecture.

  • It currently, supports Spark and Daft. More engines like Polars are in the backlog

  • Currently, it supports Delta Lake as lakehouse format

  • The framework will also be extended step by step with more baseline logic

  • The spark engine was tested locally with pyspark 4.0.0 and delta-spark 4.4.0 and on Databricks Serverless v4. The daft engine was tested with getdaft 0.4.8 and deltalake 0.25.4

Some import links:

Spark Set-Up and Get Started

Requires to have installed one of the following: - pyspark and delta-spark - Databricks Connect - Spark Connect and Delta Connect - default spark session on Databricks or Fabric

pip install lakehouse-ns

Also you need to have a catalog set-up and create your bronze, silver and gold schema(s). You can of course use the default “spark_catalog”, which is not recommended.

That’s already it!

Now just import the Bronze, Silver and Gold classes and overwrite the load or transform functions. That’s it.

from lakehouse.spark import bronze, silver, gold

spark = <Your Spark Session>

#Create your schemas
spark.sql(f"CREATE SCHEMA IF NOT EXISTS <catalog>.<schema>")

options = {
   "catalog": "<catalog>",
   "target_schema": "<schema>"
}


class StarWarsBronze(bronze.Bronze):
   def custom_load(self, table):
      results = []
      query = f"https://swapi.tech/api/{table}"
      json_request = requests.get(query).json()
      results.extend(json_request["results"])

      while json_request["next"]:
            json_request = requests.get(json_request["next"]).json()
            results.extend(json_request["results"])
      return self.spark.createDataFrame(results)

bronze_instance = StarWarsBronze(spark, **options)
bronze_instance.load().write(mode="overwrite").execute("people", "planets")

See detailed samples here: https://github.com/datanikkthegreek/lakehouse-docu/tree/main/spark/samples

In the examples you will find also an notebook showing you how easy debugging and testing is despite using this framework.

Daft Set-Up and Get Started

pip install getdaft deltalake lakehouse-ns

That’s already it! A catalog is not yet supported. Coming soon! A catalog and schema will still be required which can be used for path construction and consistancy.

Now just import the Bronze, Silver and Gold classes and overwrite the load or transform functions. That’s it.

from lakehouse.daft import bronze, silver, gold
import daft

options = {
   "catalog": "<catalog>",
   "target_schema": "<schema>"
}


class StarWarsBronze(bronze.Bronze):
   def custom_load(self, table):
     results = []
     query = f"https://swapi.tech/api/{table}"
     json_request = requests.get(query).json()
     results.extend(json_request["results"])

     while json_request["next"]:
         json_request = requests.get(json_request["next"]).json()
         results.extend(json_request["results"])
     return daft.from_pylist(results)

   def target_path(self, table: str) -> str:
     return f"D:/Data/{self.catalog}/{self.target_schema}/{table}"

bronze_instance = StarWarsBronze(**options)
bronze_instance.load().write(mode="overwrite").execute("people", "planets")

See detailed samples here: https://github.com/datanikkthegreek/lakehouse-docu/tree/main/daft/samples

In the examples you will find also an notebook showing you how easy debugging and testing is despite using this framework.

Options

You can/must pass in the Bronze, Silver and Gold class the following options. Besides you can specifiy any custom options which you can access via self.options in your class.

Option Type

Option

Type

Engine

Default

Accepted Values

Bronze

Silver

Gold

Description

class

catalog

String

spark, daft

To be defined

Any String

Required

Required

Required

The name of the catalog, e.g. spark_catalog, hive_metastore or any other custom catalog

class

source_schema

String

spark, daft

To be defined

Any String

Optional

Required

Required

The schema from which the data is loaded

class

target_schema

String

spark, daft

To be defined

Any String

Required

Required

Required

The schema to which the data are written

class

io_config

String

daft

None

Daft IOConfig

Optional

Optional

Optional

IOConfig object from daft as to https://www.getdaft.io/projects/docs/en/stable/api_docs/doc_gen/io_configs/daft.io.IOConfig.html and https://www.getdaft.io/projects/docs/en/stable/integrations/

class

config

String

spark, daft

None

Dict with below options

Options

Options

Options

config to optionally pass the configs here instead of passing it during execute. Keys are load, transform, write, optimize and tblproperties

class

any

Any

spark, daft

To be defined

Any

Optional

Optional

Optional

Pass any other options you can use within the class

load

mode

String

spark, daft

default

default, custom

N/A

Optional

Optional

Mode of loading loads either the table from source_schema.table as default or as defined in the custom_load function. In Bronze always a custom_load function is needed meaning the default is custom

load

filter

String

spark, daft

all

all, custom, new

N/A

Optional

Optional

Allows applying directly filters on the loaded data for predicate pushdown using “custom”, otherwise “all” data is loaded. In Bronze always all data is loaded based on the custom_load function

load

source_tbl

String

spark, daft

None

None, Any String

N/A

N/A

Optional

Name of the source table, If provided the source table is used instead of the provided target table to load data

load

date_col

String

spark, daft

None

None, Any String

N/A

Optional

Optional

Name of the date column used to determine new data, default: None, must exist in the source and target schema and must be defined if filter is new

transform

ignore_defaults

boolean

spark, daft

FALSE

TRUE, FALSE

Optional

Optional

Optional

Ignores executing default transformations as defined in default_transform function. Usually used during debugging

transform

transformation_order

list[str]

spark, daft

default funcs in docstring

any function defined by user including the defaults

Optional

Optional

Optional

Allows to define the order of transformations. If not defined the default order is used function name as value. For the tables it is not defined custom_transform is used

transform

tbl_transformations

Dict

spark, daft

{}

Dict[str, str]

Optional

Optional

Optional

Allows to define custom transformations per table by specifying the table name as key and the function name as value. For the tables it is not defined custom_transform is used

transform

rename_columns

Dict

spark, daft

{}

Dict[str, Dict[str, str]]

Optional

Optional

Optional

Allows to define column renaming per table by specifying the table name as key and the column renaming as value. The value is a dictionary with the old column name as key and the new column name as value

transform

select_columns

Dict

spark, daft

None

Dict[str, list[str]]

Optional

Optional

Optional

Allows to define column selection per table by specifying the table name as key and the column names as value

transform

cast_column_types

Dict

spark, daft

{}

Dict[str, Dict[str, str]]

Optional

Optional

Optional

Allows to define column casting per table by specifying the table name as key and the column casting as value. The value is a dictionary with the column name as key and the cast type as value

write

mode

String

spark, daft

append

overwrite, append, replace, merge, stream, custom| Optional| Optional

Optional

Defines the mode of writing the data as overwrite, append, replace (define replace filter with
| function get_replace_condition(), merge (define merge builder with function get_delta_merge_builder(), custom (define custom_write() function)

write

overwrite_schema

boolean

daft

FALSE

TRUE, FALSE

Optional

Optional

Optional

If the schema should be overwritten, only available for mode overwrite

write

merge_schema

boolean

spark

FALSE

TRUE, FALSE

Optional

Optional

Optional

If the schema should be automatically envolved/merged

write

external

boolean

spark

FALSE

TRUE, FALSE

Optional

Optional

Optional

If True tables are saved as external tables based on the defined path in the path function

optimize

optimize

boolean

spark, daft

FALSE

TRUE, FALSE

Optional

Optional

Optional

Execute the optimize command on the given tables

optimize

zorder_cols

list[str]

daft

None

Any columns list

Optional

Optional

Optional

zorder cols to optimize based on frequently used cols, requires optimize=True

optimize

optimize_full

boolean

spark

FALSE

TRUE, FALSE

Optional

Optional

Optional

Execute the optimize as Full mode (needs Liquid clustering)

optimize

vacuum

boolean

spark, daft

FALSE

TRUE, FALSE

Optional

Optional

Optional

Execute the vacuum command on the given tables as to the defined retention

optimize

vacuum_lite

boolean

spark

FALSE

TRUE, FALSE

Optional

Optional

Optional

Execute the vacuum with lite command

optimize

retention

int

spark, daft

None

any int

Optional

Optional

Optional

Retention time for files to be considered for vacuum. Retention time must be at higher or equal default retention threshold. The default is 7 days (168 hours). If no value is provided retention threshold is taken. The threshold can be changed with the delta property delta.deletedFileRetentionDuration

optimize

analyze

boolean

spark

FALSE

TRUE, FALSE

Optional

Optional

Optional

Execute the analyze command on the given table to compute statistics. Does not work with all spark sessions/ catalogs. Hive Metastore and Unity Catalogue on Databricks work

optimize

excl_cols

list[str]

spark

None

any list[str]

Optional

Optional

Optional

List of cols to be excluded to compute statistics

tblproperties

clusterby

list[str]

spark

None

any list[str]

Optional

Optional

Optional

List of cols to be applied as liquid clustering columns

tblproperties

deletion_vectors

boolean

spark

TRUE

TRUE, FALSE

Optional

Optional

Optional

enable deletion vectors (delta.enableDeletionVectors), default True, see also here: https://docs.delta.io/latest/delta-deletion-vectors.html

tblproperties

auto_compact

boolean

spark

TRUE

TRUE, FALSE

Optional

Optional

Optional

enable auto optimize (delta.autoOptimize.autoCompact), default True: https://docs.delta.io/latest/optimizations-oss.html#auto-compaction

tblproperties

optimize_write

boolean

spark

TRUE

TRUE, FALSE

Optional

Optional

Optional

enable optimize write (delta.autoOptimize.optimizeWrite), default True, see also here: https://docs.delta.io/latest/optimizations-oss.html#optimized-write

tblproperties

change_data_feed

boolean

spark

TRUE

TRUE, FALSE

Optional

Optional

Optional

enable change data feed (delta.enableChangeDataFeed), default True: https://docs.delta.io/latest/delta-change-data-feed.html

tblproperties

row_tracking

boolean

spark

TRUE

TRUE, FALSE

Optional

Optional

Optional

enable row tracking (delta.enableRowTracking), default True: https://docs.delta.io/latest/delta-row-tracking.html

tblproperties

type_widening

boolean

spark

FALSE

TRUE, FALSE

Optional

Optional

Optional

enable type widening (delta.enableTypeWidening), default False: https://docs.delta.io/latest/delta-type-widening.html

tblproperties

tblproperties

dict[str, str]

spark

None

any key-value pair

Optional

Optional

Optional

dict of any delta tblproperties as to https://docs.delta.io/

Functions

The following functions can be overwritten in the Bronze, Silver and Gold class

Type

Function

Engine

Description

Bronze

Silver & Gold

load

custom_load(self, table: str) -> DataFrame

spark, daft

Function to customize the way or the source data is loaded

required

required, if load(mode=”custom”) else ignored

load

custom_filter(self, df: DataFrame, table: str) -> DataFrame

spark, daft

Function to apply a custom filter after data loading for predicate pushdown

N/A

required, if load(filter=”custom”) else ignored

load

source_path(self, table: str) -> str

daft

Allows you to specify a dynamic path for the source

N/A

required

load

target_path(self, table: str) -> str

daft

Allows you to specify a dynamic path for the target

required

required

transform

custom_transform(self, df: DataFrame, table: str) -> DataFrame

spark, daft

Can be overwritten to add custom transformations. Only executed if transform() is defined

Optional

Optional

transform

rename_columns(self, df: DataFrame, table: str) -> DataFrame

spark, daft

Function to rename columns based on defined config. Can be overwritten.

Optional

Optional

transform

select_columns(self, df: DataFrame, table: str) -> DataFrame

spark, daft

Function to select columns based on defined config. Can be overwritten.

Optional

Optional

transform

cast_column_types(self, df: DataFrame, table: str) -> DataFrame

spark, daft

Function to cast column types based on defined config. Can be overwritten.

Optional

Optional

transform

default_transform(self, df: DataFrame, table: str) -> DataFrame

spark, daft

Can be overwritten to add default transformations executed after the the custom transformations.

Defaults create a timestamp column with the current timestamp of transformations.

Only executed if transform(ignore_defaults=False)

Optional

Optional

write

target_path(self, table: str) -> str

spark, daft

Allows you to specify a dynamic path if using external tables or daft

required if write(external=True) for daft always required

required if write(external=True) for daft always required

write

get_replace_condition(self, df: DataFrame, table: str) -> str

spark, daft

Allows you to define the filter used for the replace where overwrite operation

required if write(mode=”replace”)

required if write(mode=”replace”)

write

get_delta_merge_builder(self, df: DataFrame, delta_table: DeltaTable) -> DeltaMergeBuilder

spark, daft

Allows you to define the merge builder for the merge write into delta

required if write(mode=”merge”)

required if write(mode=”merge”)

write

custom_write(self, df: DataFrame, table: str) -> None

spark, daft

Allows to define a custom write operation

required if write(mode=”custom”)

required if write(mode=”custom”)

write

checkpoint_path(self, table: str) -> str

daft

Function to be overwritten to create the checkpoint path if performing a streaming write

required if write(mode=”stream”)

required if write(mode=”stream”)

optimize

target_path(self, table: str) -> str

daft

Allows you to specify a dynamic path if using external tables or daft

required

required

Lakehouse-NS API Reference