***************************** Welcome to Lakehouse-NS ***************************** .. contents:: Table of Contents Lakehouse-NS gives you a simple framework to implement your lakehouse based on the Medallion Architecture. - It currently, supports Spark and Daft. More engines like Polars are in the backlog - Currently, it supports Delta Lake as lakehouse format - The framework will also be extended step by step with more baseline logic - The spark engine was tested locally with pyspark 4.0.0 and delta-spark 4.4.0 and on Databricks Serverless v4. The daft engine was tested with getdaft 0.4.8 and deltalake 0.25.4 Some import links: - "Homepage" = "https://github.com/datanikkthegreek/lakehouse-docu" - "Samples" = "https://github.com/datanikkthegreek/lakehouse-docu/tree/main/samples" - "Source" = "https://github.com/datanikkthegreek/lakehouse" - "Issues" = "https://github.com/datanikkthegreek/lakehouse-docu/issues" - "Project Planning" = "https://github.com/users/datanikkthegreek/projects/1/views/1" - "Download" = "https://pypi.org/project/lakehouse-ns/" - "Get in touch" = "https://www.linkedin.com/in/dr-nikolaos-servos-nikk-the-greek-a29137b3/" ***************************** Spark Set-Up and Get Started ***************************** Requires to have installed one of the following: - pyspark and delta-spark - Databricks Connect - Spark Connect and Delta Connect - default spark session on Databricks or Fabric `pip install lakehouse-ns` Also you need to have a catalog set-up and create your bronze, silver and gold schema(s). You can of course use the default "spark_catalog", which is not recommended. That's already it! Now just import the Bronze, Silver and Gold classes and overwrite the load or transform functions. That's it. .. code-block:: python from lakehouse.spark import bronze, silver, gold spark = #Create your schemas spark.sql(f"CREATE SCHEMA IF NOT EXISTS .") options = { "catalog": "", "target_schema": "" } class StarWarsBronze(bronze.Bronze): def custom_load(self, table): results = [] query = f"https://swapi.tech/api/{table}" json_request = requests.get(query).json() results.extend(json_request["results"]) while json_request["next"]: json_request = requests.get(json_request["next"]).json() results.extend(json_request["results"]) return self.spark.createDataFrame(results) bronze_instance = StarWarsBronze(spark, **options) bronze_instance.load().write(mode="overwrite").execute("people", "planets") See detailed samples here: https://github.com/datanikkthegreek/lakehouse-docu/tree/main/spark/samples In the examples you will find also an notebook showing you how easy debugging and testing is despite using this framework. ***************************** Daft Set-Up and Get Started ***************************** `pip install getdaft deltalake lakehouse-ns` That's already it! A catalog is not yet supported. Coming soon! A catalog and schema will still be required which can be used for path construction and consistancy. Now just import the Bronze, Silver and Gold classes and overwrite the load or transform functions. That's it. .. code-block:: python from lakehouse.daft import bronze, silver, gold import daft options = { "catalog": "", "target_schema": "" } class StarWarsBronze(bronze.Bronze): def custom_load(self, table): results = [] query = f"https://swapi.tech/api/{table}" json_request = requests.get(query).json() results.extend(json_request["results"]) while json_request["next"]: json_request = requests.get(json_request["next"]).json() results.extend(json_request["results"]) return daft.from_pylist(results) def target_path(self, table: str) -> str: return f"D:/Data/{self.catalog}/{self.target_schema}/{table}" bronze_instance = StarWarsBronze(**options) bronze_instance.load().write(mode="overwrite").execute("people", "planets") See detailed samples here: https://github.com/datanikkthegreek/lakehouse-docu/tree/main/daft/samples In the examples you will find also an notebook showing you how easy debugging and testing is despite using this framework. ***************************** Options ***************************** You can/must pass in the Bronze, Silver and Gold class the following options. Besides you can specifiy any custom options which you can access via self.options in your class. +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | Option Type | Option | Type | Engine | Default | Accepted Values | Bronze | Silver | Gold | Description | +================+=====================+================+================+================+====================================+=========+=========+=========+===========================================================================================================================================================+ | class | catalog | String | spark, daft | To be defined | Any String | Required| Required| Required| The name of the catalog, e.g. spark_catalog, hive_metastore or any other custom catalog | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | class | source_schema | String | spark, daft | To be defined | Any String | Optional| Required| Required| The schema from which the data is loaded | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | class | target_schema | String | spark, daft | To be defined | Any String | Required| Required| Required| The schema to which the data are written | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | class | io_config | String | daft | None | Daft IOConfig | Optional| Optional| Optional| IOConfig object from daft as to https://www.getdaft.io/projects/docs/en/stable/api_docs/doc_gen/io_configs/daft.io.IOConfig.html and | | | | | | | | | | | https://www.getdaft.io/projects/docs/en/stable/integrations/ | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | class | config | String | spark, daft | None | Dict with below options | Options | Options | Options | config to optionally pass the configs here instead of passing it during execute. Keys are load, transform, write, optimize and tblproperties | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | class | any | Any | spark, daft | To be defined | Any | Optional| Optional| Optional| Pass any other options you can use within the class | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | load | mode | String | spark, daft | default | default, custom | N/A | Optional| Optional| Mode of loading loads either the table from source_schema.table as default or as defined in the | | | | | | | | | | | custom_load function. In Bronze always a custom_load function is needed meaning the default is | | | | | | | | | | | custom | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | load | filter | String | spark, daft | all | all, custom, new | N/A | Optional| Optional| Allows applying directly filters on the loaded data for predicate pushdown using "custom", | | | | | | | | | | | otherwise "all" data is loaded. In Bronze always all data is loaded based on the custom_load | | | | | | | | | | | function | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | load | source_tbl | String | spark, daft | None | None, Any String | N/A | N/A | Optional| Name of the source table, If provided the source table is used instead of the provided target | | | | | | | | | | | table to load data | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | load | date_col | String | spark, daft | None | None, Any String | N/A | Optional| Optional| Name of the date column used to determine new data, default: None, must exist in the source and | | | | | | | | | | | target schema and must be defined if filter is new | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | transform | ignore_defaults | boolean | spark, daft | FALSE | TRUE, FALSE | Optional| Optional| Optional| Ignores executing default transformations as defined in default_transform function. Usually used | | | | | | | | | | | during debugging | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | transform | transformation_order| list[str] | spark, daft | default funcs | any function defined by user | Optional| Optional| Optional| Allows to define the order of transformations. If not defined the default order is used | | | | | | in docstring | including the defaults | | | | function name as value. For the tables it is not defined custom_transform is used | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | transform | tbl_transformations | Dict | spark, daft | {} | Dict[str, str] | Optional| Optional| Optional| Allows to define custom transformations per table by specifying the table name as key and the | | | | | | | | | | | function name as value. For the tables it is not defined custom_transform is used | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | transform | rename_columns | Dict | spark, daft | {} | Dict[str, Dict[str, str]] | Optional| Optional| Optional| Allows to define column renaming per table by specifying the table name as key and the column | | | | | | | | | | | renaming as value. The value is a dictionary with the old column name as key and the new column | | | | | | | | | | | name as value | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | transform | select_columns | Dict | spark, daft | None | Dict[str, list[str]] | Optional| Optional| Optional| Allows to define column selection per table by specifying the table name as key and the column | | | | | | | | | | | names as value | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | transform | cast_column_types | Dict | spark, daft | {} | Dict[str, Dict[str, str]] | Optional| Optional| Optional| Allows to define column casting per table by specifying the table name as key and the column | | | | | | | | | | | casting as value. The value is a dictionary with the column name as key and the cast type as | | | | | | | | | | | value | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | write | mode | String | spark, daft | append | overwrite, append, replace, merge, | Optional| Defines the mode of writing the data as overwrite, append, replace (define replace filter with | | | | | | | stream, custom| Optional| Optional | | | | function get_replace_condition(), merge (define merge builder with function get_delta_merge_builder(), custom (define custom_write() function) | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | write | overwrite_schema | boolean | daft | FALSE | TRUE, FALSE | Optional| Optional| Optional| If the schema should be overwritten, only available for mode overwrite | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | write | merge_schema | boolean | spark | FALSE | TRUE, FALSE | Optional| Optional| Optional| If the schema should be automatically envolved/merged | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | write | external | boolean | spark | FALSE | TRUE, FALSE | Optional| Optional| Optional| If True tables are saved as external tables based on the defined path in the path function | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | optimize | optimize | boolean | spark, daft | FALSE | TRUE, FALSE | Optional| Optional| Optional| Execute the optimize command on the given tables | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | optimize | zorder_cols | list[str] | daft | None | Any columns list | Optional| Optional| Optional| zorder cols to optimize based on frequently used cols, requires optimize=True | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | optimize | optimize_full | boolean | spark | FALSE | TRUE, FALSE | Optional| Optional| Optional| Execute the optimize as Full mode (needs Liquid clustering) | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | optimize | vacuum | boolean | spark, daft | FALSE | TRUE, FALSE | Optional| Optional| Optional| Execute the vacuum command on the given tables as to the defined retention | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | optimize | vacuum_lite | boolean | spark | FALSE | TRUE, FALSE | Optional| Optional| Optional| Execute the vacuum with lite command | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | optimize | retention | int | spark, daft | None | any int | Optional| Optional| Optional| Retention time for files to be considered for vacuum. Retention time must be at higher or equal | | | | | | | | | | | default retention threshold. The default is 7 days (168 hours). If no value is provided retention | | | | | | | | | | | threshold is taken. The threshold can be changed with the delta property delta.deletedFileRetentionDuration | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | optimize | analyze | boolean | spark | FALSE | TRUE, FALSE | Optional| Optional| Optional| Execute the analyze command on the given table to compute statistics. Does not work with all | | | | | | | | | | | spark sessions/ catalogs. Hive Metastore and Unity Catalogue on Databricks work | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | optimize | excl_cols | list[str] | spark | None | any list[str] | Optional| Optional| Optional| List of cols to be excluded to compute statistics | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | tblproperties | clusterby | list[str] | spark | None | any list[str] | Optional| Optional| Optional| List of cols to be applied as liquid clustering columns | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | tblproperties | deletion_vectors | boolean | spark | TRUE | TRUE, FALSE | Optional| Optional| Optional| enable deletion vectors (delta.enableDeletionVectors), default True, see also here: https://docs.delta.io/latest/delta-deletion-vectors.html | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | tblproperties | auto_compact | boolean | spark | TRUE | TRUE, FALSE | Optional| Optional| Optional| enable auto optimize (delta.autoOptimize.autoCompact), default True: https://docs.delta.io/latest/optimizations-oss.html#auto-compaction | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | tblproperties | optimize_write | boolean | spark | TRUE | TRUE, FALSE | Optional| Optional| Optional| enable optimize write (delta.autoOptimize.optimizeWrite), default True, see also here: https://docs.delta.io/latest/optimizations-oss.html#optimized-write| +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | tblproperties | change_data_feed | boolean | spark | TRUE | TRUE, FALSE | Optional| Optional| Optional| enable change data feed (delta.enableChangeDataFeed), default True: https://docs.delta.io/latest/delta-change-data-feed.html | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | tblproperties | row_tracking | boolean | spark | TRUE | TRUE, FALSE | Optional| Optional| Optional| enable row tracking (delta.enableRowTracking), default True: https://docs.delta.io/latest/delta-row-tracking.html | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | tblproperties | type_widening | boolean | spark | FALSE | TRUE, FALSE | Optional| Optional| Optional| enable type widening (delta.enableTypeWidening), default False: https://docs.delta.io/latest/delta-type-widening.html | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | tblproperties | tblproperties | dict[str, str] | spark | None | any key-value pair | Optional| Optional| Optional| dict of any delta tblproperties as to https://docs.delta.io/ | +----------------+---------------------+----------------+----------------+----------------+------------------------------------+---------+---------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ ***************************** Functions ***************************** The following functions can be overwritten in the Bronze, Silver and Gold class +-----------+---------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-------------------------------------------------+ | Type | Function | Engine | Description | Bronze | Silver & Gold | +===========+=============================================================================================+================+=====================================================================================================================================+===================================+=================================================+ | load | custom_load(self, table: str) -> DataFrame | spark, daft | Function to customize the way or the source data is loaded | required | required, if load(mode="custom") else ignored | +-----------+---------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-------------------------------------------------+ | load | custom_filter(self, df: DataFrame, table: str) -> DataFrame | spark, daft | Function to apply a custom filter after data loading for predicate pushdown | N/A | required, if load(filter="custom") else ignored | +-----------+---------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-------------------------------------------------+ | load | source_path(self, table: str) -> str | daft | Allows you to specify a dynamic path for the source | N/A | required | +-----------+---------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-------------------------------------------------+ | load | target_path(self, table: str) -> str | daft | Allows you to specify a dynamic path for the target | required | required | +-----------+---------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-------------------------------------------------+ | transform | custom_transform(self, df: DataFrame, table: str) -> DataFrame | spark, daft | Can be overwritten to add custom transformations. Only executed if transform() is defined | Optional | Optional | +-----------+---------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-------------------------------------------------+ | transform | rename_columns(self, df: DataFrame, table: str) -> DataFrame | spark, daft | Function to rename columns based on defined config. Can be overwritten. | Optional | Optional | +-----------+---------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-------------------------------------------------+ | transform | select_columns(self, df: DataFrame, table: str) -> DataFrame | spark, daft | Function to select columns based on defined config. Can be overwritten. | Optional | Optional | +-----------+---------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-------------------------------------------------+ | transform | cast_column_types(self, df: DataFrame, table: str) -> DataFrame | spark, daft | Function to cast column types based on defined config. Can be overwritten. | Optional | Optional | +-----------+---------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-------------------------------------------------+ | transform | default_transform(self, df: DataFrame, table: str) -> DataFrame | spark, daft | Can be overwritten to add default transformations executed after the the custom transformations. | Optional | Optional | | | | | | | | | | | | Defaults create a timestamp column with the current timestamp of transformations. | | | | | | | | | | | | | | Only executed if transform(ignore_defaults=False) | | | +-----------+---------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-------------------------------------------------+ | write | target_path(self, table: str) -> str | spark, daft | Allows you to specify a dynamic path if using external tables or daft | required if write(external=True) | required if write(external=True) | | | | | | for daft always required | for daft always required | +-----------+---------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-------------------------------------------------+ | write | get_replace_condition(self, df: DataFrame, table: str) -> str | spark, daft | Allows you to define the filter used for the replace where overwrite operation | required if write(mode="replace") | required if write(mode="replace") | +-----------+---------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-------------------------------------------------+ | write | get_delta_merge_builder(self, df: DataFrame, delta_table: DeltaTable) -> DeltaMergeBuilder | spark, daft | Allows you to define the merge builder for the merge write into delta | required if write(mode="merge") | required if write(mode="merge") | +-----------+---------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-------------------------------------------------+ | write | custom_write(self, df: DataFrame, table: str) -> None | spark, daft | Allows to define a custom write operation | required if write(mode="custom") | required if write(mode="custom") | +-----------+---------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-------------------------------------------------+ | write | checkpoint_path(self, table: str) -> str | daft | Function to be overwritten to create the checkpoint path if performing a streaming write | required if write(mode="stream") | required if write(mode="stream") | +-----------+---------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-------------------------------------------------+ | optimize | target_path(self, table: str) -> str | daft | Allows you to specify a dynamic path if using external tables or daft | required | required | +-----------+---------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------+-------------------------------------------------+ ***************************** Lakehouse-NS API Reference ***************************** .. toctree:: :maxdepth: 2 lakehouse