Skip to content

API Reference

Auto-generated documentation from source code docstrings.

Configuration

poor_man_lakehouse.config.Settings

Bases: BaseSettings

Settings class for application settings and secrets management.

Docs: https://docs.pydantic.dev/latest/concepts/pydantic_settings/

WAREHOUSE_BUCKET property

Compute warehouse bucket URI from BUCKET_NAME.

SETTINGS_PATH property

Compute settings path from REPO_PATH.

LOG_FILE_PATH property

Compute log file path from LOG_FOLDER and LOG_FILE_NAME.

poor_man_lakehouse.config.get_settings() cached

Generate and get the settings.

Returns:

Type Description
Settings

Configured Settings instance.

Raises:

Type Description
SettingsError

If settings initialization fails.

Source code in src/poor_man_lakehouse/config.py
@cache
def get_settings() -> Settings:
    """Generate and get the settings.

    Returns:
        Configured Settings instance.

    Raises:
        SettingsError: If settings initialization fails.
    """
    try:
        settings = Settings()
        settings._setup_logger()
        return settings

    except Exception as e:
        logger.error(f"Error: impossible to get the settings: {e}")
        raise SettingsError(f"Error importing settings: {e}") from e

poor_man_lakehouse.config.reload_settings()

Reload base settings.

Source code in src/poor_man_lakehouse/config.py
def reload_settings() -> Settings:
    """Reload base settings."""
    get_settings.cache_clear()
    return get_settings()

Catalog Factory

poor_man_lakehouse.catalog.get_catalog(catalog_type=None)

Create a PyIceberg Catalog for the specified or configured catalog type.

Parameters:

Name Type Description Default
catalog_type LakehouseCatalogType | None

Catalog backend to use. Defaults to settings.CATALOG.

None

Returns:

Type Description
Catalog

A configured PyIceberg Catalog instance.

Raises:

Type Description
ValueError

If the catalog type is not supported.

Source code in src/poor_man_lakehouse/catalog.py
def get_catalog(
    catalog_type: LakehouseCatalogType | None = None,
) -> Catalog:
    """Create a PyIceberg Catalog for the specified or configured catalog type.

    Args:
        catalog_type: Catalog backend to use. Defaults to settings.CATALOG.

    Returns:
        A configured PyIceberg Catalog instance.

    Raises:
        ValueError: If the catalog type is not supported.
    """
    resolved_type = catalog_type or settings.CATALOG
    if resolved_type not in _VALID_CATALOG_TYPES:
        raise ValueError(f"Unsupported catalog type: '{resolved_type}'. Supported: {sorted(_VALID_CATALOG_TYPES)}")

    catalog_name = settings.CATALOG_NAME

    if resolved_type == "glue":
        config = _build_glue_config()
    elif resolved_type == "postgres":
        config = _build_postgres_config()
    else:
        config = _build_rest_config(resolved_type)

    logger.debug(f"Creating PyIceberg catalog '{catalog_name}' (type={resolved_type})")
    return load_catalog(catalog_name, **config)

poor_man_lakehouse.catalog.LakehouseCatalogType = Literal['nessie', 'lakekeeper', 'postgres', 'glue'] module-attribute


Connectors

LakehouseConnection

poor_man_lakehouse.lakehouse.LakehouseConnection

Unified connection manager for Iceberg table access.

Provides catalog browsing, native Polars/Arrow scans, DuckDB engine access, and Ibis multi-engine wrappers. All operations go through a single PyIceberg catalog instance created by get_catalog().

Supports catalogs: lakekeeper, nessie, postgres, glue.

Example

conn = LakehouseConnection() conn.list_namespaces() ['default', 'staging'] lf = conn.scan_polars( ... "default", ... "users", ... ) duck = conn.duckdb_connection

Source code in src/poor_man_lakehouse/lakehouse.py
class LakehouseConnection:
    """Unified connection manager for Iceberg table access.

    Provides catalog browsing, native Polars/Arrow scans, DuckDB engine access,
    and Ibis multi-engine wrappers. All operations go through a single PyIceberg
    catalog instance created by get_catalog().

    Supports catalogs: lakekeeper, nessie, postgres, glue.

    Example:
        >>> conn = LakehouseConnection()
        >>> conn.list_namespaces()
        ['default', 'staging']
        >>> lf = conn.scan_polars(
        ...     "default",
        ...     "users",
        ... )
        >>> duck = conn.duckdb_connection
    """

    def __init__(self, catalog_type: LakehouseCatalogType | None = None) -> None:
        """Initialize the connection.

        Args:
            catalog_type: Catalog backend to use. Defaults to settings.CATALOG.
        """
        self._catalog_type = (catalog_type or settings.CATALOG).lower()
        self.catalog = get_catalog(self._catalog_type)  # type: ignore[arg-type]
        logger.debug(f"LakehouseConnection initialized (catalog_type={self._catalog_type})")

    # -- Catalog browsing --

    def list_namespaces(self) -> list[str]:
        """List all namespaces in the catalog."""
        raw = self.catalog.list_namespaces()
        return [ns[0] if len(ns) == 1 else ".".join(ns) for ns in raw]

    def create_namespace(self, namespace: str) -> None:
        """Create a namespace in the catalog.

        Args:
            namespace: The namespace name to create.
        """
        self.catalog.create_namespace(namespace)
        logger.info(f"Created namespace '{namespace}'")

    def list_tables(self, namespace: str) -> list[str]:
        """List all tables in a namespace.

        Args:
            namespace: The namespace to list tables from.

        Returns:
            List of table names.
        """
        raw = self.catalog.list_tables(namespace)
        return [tbl[1] for tbl in raw]

    def load_table(self, namespace: str, table_name: str) -> Table:
        """Load an Iceberg table object.

        Args:
            namespace: The namespace containing the table.
            table_name: The table name.

        Returns:
            PyIceberg Table object with full metadata access.
        """
        return self.catalog.load_table(f"{namespace}.{table_name}")

    def table_schema(self, namespace: str, table_name: str) -> list[dict]:
        """Get the schema of an Iceberg table.

        Args:
            namespace: The namespace containing the table.
            table_name: The table name.

        Returns:
            List of dicts with field_id, name, type, and required for each column.
        """
        table = self.load_table(namespace, table_name)
        return [
            {
                "field_id": field.field_id,
                "name": field.name,
                "type": str(field.field_type),
                "required": field.required,
            }
            for field in table.schema().fields
        ]

    def snapshot_history(self, namespace: str, table_name: str) -> list[dict]:
        """Get the snapshot history of a table.

        Args:
            namespace: The namespace containing the table.
            table_name: The table name.

        Returns:
            List of snapshot dicts with snapshot_id, timestamp_ms, and summary.
        """
        table = self.load_table(namespace, table_name)
        return [
            {
                "snapshot_id": snap.snapshot_id,
                "timestamp_ms": snap.timestamp_ms,
                "summary": snap.summary.model_dump() if snap.summary else {},
            }
            for snap in (table.metadata.snapshots or [])
        ]

    # -- Native scans --

    def scan_polars(self, namespace: str, table_name: str) -> pl.LazyFrame:
        """Scan an Iceberg table and return a Polars LazyFrame.

        Args:
            namespace: The namespace containing the table.
            table_name: The table name.

        Returns:
            Polars LazyFrame for lazy evaluation.
        """
        table = self.load_table(namespace, table_name)
        return pl.scan_iceberg(table)

    def scan_arrow(self, namespace: str, table_name: str) -> pa.Table:
        """Scan an Iceberg table and return a PyArrow Table.

        Args:
            namespace: The namespace containing the table.
            table_name: The table name.

        Returns:
            PyArrow Table.
        """
        table = self.load_table(namespace, table_name)
        return table.scan().to_arrow()

    # -- DuckDB engine --

    @cached_property
    def duckdb_connection(self) -> DuckDBBackend:
        """Lazily initialize DuckDB Ibis connection with Iceberg catalog attached."""
        if self._catalog_type == "glue":
            return self._init_duckdb_glue()
        return self._init_duckdb_s3()

    def _init_duckdb_s3(self) -> DuckDBBackend:
        """Initialize DuckDB with S3/MinIO access and REST catalog."""
        import ibis

        logger.debug(f"Initializing DuckDB connection ({self._catalog_type} catalog)...")
        con = ibis.duckdb.connect(database=":memory:", read_only=False, extensions=["iceberg"])

        endpoint = settings.AWS_ENDPOINT_URL.replace("https://", "").replace("http://", "")
        use_ssl = "true" if settings.AWS_ENDPOINT_URL.startswith("https://") else "false"
        con.raw_sql(f"""
            CREATE OR REPLACE SECRET s3_secret (
                TYPE S3,
                KEY_ID '{settings.AWS_ACCESS_KEY_ID}',
                SECRET '{settings.AWS_SECRET_ACCESS_KEY}',
                REGION '{settings.AWS_DEFAULT_REGION}',
                ENDPOINT '{endpoint}',
                URL_STYLE 'path',
                USE_SSL {use_ssl}
            );
        """)

        catalog_name = settings.CATALOG_NAME
        if self._catalog_type in ("lakekeeper", "nessie"):
            uri_map: dict[str, str] = {
                "lakekeeper": settings.LAKEKEEPER_SERVER_URI,
                "nessie": settings.NESSIE_REST_URI,
            }
            con.raw_sql(f"""
                ATTACH OR REPLACE '{settings.BUCKET_NAME}' AS {catalog_name} (
                    TYPE iceberg,
                    ENDPOINT '{uri_map[self._catalog_type]}',
                    TOKEN ''
                );
            """)

        logger.debug(f"DuckDB initialized ({self._catalog_type} catalog)")
        return con

    def _init_duckdb_glue(self) -> DuckDBBackend:
        """Initialize DuckDB with AWS Glue Catalog."""
        import ibis

        logger.debug("Initializing DuckDB connection with Glue catalog...")
        con = ibis.duckdb.connect(database=":memory:", read_only=False, extensions=["iceberg"])

        con.raw_sql(f"""
            CREATE OR REPLACE SECRET s3_secret (
                TYPE S3,
                PROVIDER credential_chain,
                REGION '{settings.AWS_DEFAULT_REGION}'
            );
        """)

        catalog_name = settings.CATALOG_NAME
        glue_catalog_id_clause = ""
        if settings.GLUE_CATALOG_ID:
            glue_catalog_id_clause = f",\n                CATALOG_ID '{settings.GLUE_CATALOG_ID}'"
        con.raw_sql(f"""
            ATTACH OR REPLACE '{settings.BUCKET_NAME}' AS {catalog_name} (
                TYPE iceberg,
                CATALOG_TYPE glue,
                REGION '{settings.AWS_DEFAULT_REGION}'{glue_catalog_id_clause}
            );
        """)

        logger.debug(f"DuckDB attached to Glue catalog as '{catalog_name}'")
        return con

    # -- Ibis engine access --

    def ibis_duckdb(self) -> DuckDBBackend:
        """Get the DuckDB Ibis backend with catalog attached.

        Returns:
            DuckDB Ibis backend connection.
        """
        return self.duckdb_connection

    def ibis_polars(self, namespace: str, table_name: str) -> PolarsBackend:
        """Get a Polars Ibis backend with a table registered.

        Args:
            namespace: The namespace containing the table.
            table_name: The table name.

        Returns:
            Polars Ibis backend with the table registered.
        """
        import ibis

        lazyframe = self.scan_polars(namespace, table_name)
        con = ibis.polars.connect()
        con.create_table(f"{namespace}.{table_name}", lazyframe, overwrite=True)
        return con

    def ibis_pyspark(self) -> PySparkBackend:
        """Get the PySpark Ibis backend.

        Returns:
            PySpark Ibis backend connection.
        """
        import ibis

        from poor_man_lakehouse.spark_connector.builder import retrieve_current_spark_session

        logger.info("Initializing PySpark Ibis connection...")
        return ibis.pyspark.connect(session=retrieve_current_spark_session())

    # -- SQL & write operations --

    def sql(self, query: str, engine: SQLEngine = "duckdb") -> ir.Table:
        """Execute a SQL query using the specified engine.

        Args:
            query: The SQL query string.
            engine: The engine to use ("duckdb" or "pyspark").

        Returns:
            Ibis table expression with query results.

        Raises:
            ValueError: If engine is not supported for SQL.
        """
        if engine not in _SQL_ENGINES:
            raise ValueError(f"SQL execution only supports {_SQL_ENGINES}, got: '{engine}'")

        if engine == "duckdb":
            return self.duckdb_connection.sql(query)

        return self.ibis_pyspark().sql(query)

    def write_table(
        self,
        namespace: str,
        table_name: str,
        *,
        data: ir.Table | None = None,
        query: str | None = None,
        mode: WriteMode = "append",
    ) -> None:
        """Write data to an Iceberg table via DuckDB.

        Args:
            namespace: The namespace name.
            table_name: The table name.
            data: Ibis table expression to write. Mutually exclusive with query.
            query: SQL query whose results to write. Mutually exclusive with data.
            mode: Write mode — "append" or "overwrite".

        Raises:
            ValueError: If mode is invalid or neither data nor query is provided.
        """
        if mode not in _WRITE_MODES:
            raise ValueError(f"Unsupported write mode: '{mode}'. Supported: {_WRITE_MODES}")
        if data is None and query is None:
            raise ValueError("Either 'data' or 'query' must be provided")

        catalog_name = settings.CATALOG_NAME
        fqn = f"{catalog_name}.{namespace}.{table_name}"
        con = self.duckdb_connection

        con.raw_sql(f"USE {catalog_name}.{namespace};")

        if mode == "overwrite":
            con.raw_sql(f"DELETE FROM {fqn} WHERE true")  # noqa: S608

        if query is not None:
            con.raw_sql(f"INSERT INTO {fqn} {query}")  # noqa: S608
        elif data is not None:
            con.raw_sql(f"CREATE OR REPLACE TEMP VIEW _write_staging AS {data.compile()}")  # noqa: S608
            con.raw_sql(f"INSERT INTO {fqn} SELECT * FROM _write_staging")  # noqa: S608
            con.raw_sql("DROP VIEW IF EXISTS _write_staging")

        logger.info(f"Wrote to {fqn} (mode={mode}) via DuckDB")

    def create_table(self, namespace: str, table_name: str, schema_sql: str) -> None:
        """Create an Iceberg table via DuckDB.

        Args:
            namespace: The namespace name.
            table_name: The table name.
            schema_sql: Column definitions, e.g. "id INTEGER, name VARCHAR".
        """
        catalog_name = settings.CATALOG_NAME
        fqn = f"{catalog_name}.{namespace}.{table_name}"
        self.duckdb_connection.raw_sql(f"CREATE TABLE IF NOT EXISTS {fqn} ({schema_sql})")  # noqa: S608
        logger.info(f"Created table {fqn}")

    # -- Lifecycle --

    def close(self) -> None:
        """Close all active connections and clear cached properties."""
        for prop in ("duckdb_connection",):
            if prop in self.__dict__:
                del self.__dict__[prop]
        logger.debug("LakehouseConnection closed")

    def __enter__(self) -> Self:
        """Enter context manager."""
        return self

    def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None:
        """Exit context manager and close connections."""
        self.close()

    def __repr__(self) -> str:
        """String representation."""
        return f"LakehouseConnection(catalog_type='{self._catalog_type}')"

duckdb_connection cached property

Lazily initialize DuckDB Ibis connection with Iceberg catalog attached.

__init__(catalog_type=None)

Initialize the connection.

Parameters:

Name Type Description Default
catalog_type LakehouseCatalogType | None

Catalog backend to use. Defaults to settings.CATALOG.

None
Source code in src/poor_man_lakehouse/lakehouse.py
def __init__(self, catalog_type: LakehouseCatalogType | None = None) -> None:
    """Initialize the connection.

    Args:
        catalog_type: Catalog backend to use. Defaults to settings.CATALOG.
    """
    self._catalog_type = (catalog_type or settings.CATALOG).lower()
    self.catalog = get_catalog(self._catalog_type)  # type: ignore[arg-type]
    logger.debug(f"LakehouseConnection initialized (catalog_type={self._catalog_type})")

list_namespaces()

List all namespaces in the catalog.

Source code in src/poor_man_lakehouse/lakehouse.py
def list_namespaces(self) -> list[str]:
    """List all namespaces in the catalog."""
    raw = self.catalog.list_namespaces()
    return [ns[0] if len(ns) == 1 else ".".join(ns) for ns in raw]

list_tables(namespace)

List all tables in a namespace.

Parameters:

Name Type Description Default
namespace str

The namespace to list tables from.

required

Returns:

Type Description
list[str]

List of table names.

Source code in src/poor_man_lakehouse/lakehouse.py
def list_tables(self, namespace: str) -> list[str]:
    """List all tables in a namespace.

    Args:
        namespace: The namespace to list tables from.

    Returns:
        List of table names.
    """
    raw = self.catalog.list_tables(namespace)
    return [tbl[1] for tbl in raw]

load_table(namespace, table_name)

Load an Iceberg table object.

Parameters:

Name Type Description Default
namespace str

The namespace containing the table.

required
table_name str

The table name.

required

Returns:

Type Description
Table

PyIceberg Table object with full metadata access.

Source code in src/poor_man_lakehouse/lakehouse.py
def load_table(self, namespace: str, table_name: str) -> Table:
    """Load an Iceberg table object.

    Args:
        namespace: The namespace containing the table.
        table_name: The table name.

    Returns:
        PyIceberg Table object with full metadata access.
    """
    return self.catalog.load_table(f"{namespace}.{table_name}")

table_schema(namespace, table_name)

Get the schema of an Iceberg table.

Parameters:

Name Type Description Default
namespace str

The namespace containing the table.

required
table_name str

The table name.

required

Returns:

Type Description
list[dict]

List of dicts with field_id, name, type, and required for each column.

Source code in src/poor_man_lakehouse/lakehouse.py
def table_schema(self, namespace: str, table_name: str) -> list[dict]:
    """Get the schema of an Iceberg table.

    Args:
        namespace: The namespace containing the table.
        table_name: The table name.

    Returns:
        List of dicts with field_id, name, type, and required for each column.
    """
    table = self.load_table(namespace, table_name)
    return [
        {
            "field_id": field.field_id,
            "name": field.name,
            "type": str(field.field_type),
            "required": field.required,
        }
        for field in table.schema().fields
    ]

snapshot_history(namespace, table_name)

Get the snapshot history of a table.

Parameters:

Name Type Description Default
namespace str

The namespace containing the table.

required
table_name str

The table name.

required

Returns:

Type Description
list[dict]

List of snapshot dicts with snapshot_id, timestamp_ms, and summary.

Source code in src/poor_man_lakehouse/lakehouse.py
def snapshot_history(self, namespace: str, table_name: str) -> list[dict]:
    """Get the snapshot history of a table.

    Args:
        namespace: The namespace containing the table.
        table_name: The table name.

    Returns:
        List of snapshot dicts with snapshot_id, timestamp_ms, and summary.
    """
    table = self.load_table(namespace, table_name)
    return [
        {
            "snapshot_id": snap.snapshot_id,
            "timestamp_ms": snap.timestamp_ms,
            "summary": snap.summary.model_dump() if snap.summary else {},
        }
        for snap in (table.metadata.snapshots or [])
    ]

scan_polars(namespace, table_name)

Scan an Iceberg table and return a Polars LazyFrame.

Parameters:

Name Type Description Default
namespace str

The namespace containing the table.

required
table_name str

The table name.

required

Returns:

Type Description
LazyFrame

Polars LazyFrame for lazy evaluation.

Source code in src/poor_man_lakehouse/lakehouse.py
def scan_polars(self, namespace: str, table_name: str) -> pl.LazyFrame:
    """Scan an Iceberg table and return a Polars LazyFrame.

    Args:
        namespace: The namespace containing the table.
        table_name: The table name.

    Returns:
        Polars LazyFrame for lazy evaluation.
    """
    table = self.load_table(namespace, table_name)
    return pl.scan_iceberg(table)

scan_arrow(namespace, table_name)

Scan an Iceberg table and return a PyArrow Table.

Parameters:

Name Type Description Default
namespace str

The namespace containing the table.

required
table_name str

The table name.

required

Returns:

Type Description
Table

PyArrow Table.

Source code in src/poor_man_lakehouse/lakehouse.py
def scan_arrow(self, namespace: str, table_name: str) -> pa.Table:
    """Scan an Iceberg table and return a PyArrow Table.

    Args:
        namespace: The namespace containing the table.
        table_name: The table name.

    Returns:
        PyArrow Table.
    """
    table = self.load_table(namespace, table_name)
    return table.scan().to_arrow()

ibis_duckdb()

Get the DuckDB Ibis backend with catalog attached.

Returns:

Type Description
Backend

DuckDB Ibis backend connection.

Source code in src/poor_man_lakehouse/lakehouse.py
def ibis_duckdb(self) -> DuckDBBackend:
    """Get the DuckDB Ibis backend with catalog attached.

    Returns:
        DuckDB Ibis backend connection.
    """
    return self.duckdb_connection

ibis_polars(namespace, table_name)

Get a Polars Ibis backend with a table registered.

Parameters:

Name Type Description Default
namespace str

The namespace containing the table.

required
table_name str

The table name.

required

Returns:

Type Description
Backend

Polars Ibis backend with the table registered.

Source code in src/poor_man_lakehouse/lakehouse.py
def ibis_polars(self, namespace: str, table_name: str) -> PolarsBackend:
    """Get a Polars Ibis backend with a table registered.

    Args:
        namespace: The namespace containing the table.
        table_name: The table name.

    Returns:
        Polars Ibis backend with the table registered.
    """
    import ibis

    lazyframe = self.scan_polars(namespace, table_name)
    con = ibis.polars.connect()
    con.create_table(f"{namespace}.{table_name}", lazyframe, overwrite=True)
    return con

ibis_pyspark()

Get the PySpark Ibis backend.

Returns:

Type Description
Backend

PySpark Ibis backend connection.

Source code in src/poor_man_lakehouse/lakehouse.py
def ibis_pyspark(self) -> PySparkBackend:
    """Get the PySpark Ibis backend.

    Returns:
        PySpark Ibis backend connection.
    """
    import ibis

    from poor_man_lakehouse.spark_connector.builder import retrieve_current_spark_session

    logger.info("Initializing PySpark Ibis connection...")
    return ibis.pyspark.connect(session=retrieve_current_spark_session())

sql(query, engine='duckdb')

Execute a SQL query using the specified engine.

Parameters:

Name Type Description Default
query str

The SQL query string.

required
engine SQLEngine

The engine to use ("duckdb" or "pyspark").

'duckdb'

Returns:

Type Description
Table

Ibis table expression with query results.

Raises:

Type Description
ValueError

If engine is not supported for SQL.

Source code in src/poor_man_lakehouse/lakehouse.py
def sql(self, query: str, engine: SQLEngine = "duckdb") -> ir.Table:
    """Execute a SQL query using the specified engine.

    Args:
        query: The SQL query string.
        engine: The engine to use ("duckdb" or "pyspark").

    Returns:
        Ibis table expression with query results.

    Raises:
        ValueError: If engine is not supported for SQL.
    """
    if engine not in _SQL_ENGINES:
        raise ValueError(f"SQL execution only supports {_SQL_ENGINES}, got: '{engine}'")

    if engine == "duckdb":
        return self.duckdb_connection.sql(query)

    return self.ibis_pyspark().sql(query)

write_table(namespace, table_name, *, data=None, query=None, mode='append')

Write data to an Iceberg table via DuckDB.

Parameters:

Name Type Description Default
namespace str

The namespace name.

required
table_name str

The table name.

required
data Table | None

Ibis table expression to write. Mutually exclusive with query.

None
query str | None

SQL query whose results to write. Mutually exclusive with data.

None
mode WriteMode

Write mode — "append" or "overwrite".

'append'

Raises:

Type Description
ValueError

If mode is invalid or neither data nor query is provided.

Source code in src/poor_man_lakehouse/lakehouse.py
def write_table(
    self,
    namespace: str,
    table_name: str,
    *,
    data: ir.Table | None = None,
    query: str | None = None,
    mode: WriteMode = "append",
) -> None:
    """Write data to an Iceberg table via DuckDB.

    Args:
        namespace: The namespace name.
        table_name: The table name.
        data: Ibis table expression to write. Mutually exclusive with query.
        query: SQL query whose results to write. Mutually exclusive with data.
        mode: Write mode — "append" or "overwrite".

    Raises:
        ValueError: If mode is invalid or neither data nor query is provided.
    """
    if mode not in _WRITE_MODES:
        raise ValueError(f"Unsupported write mode: '{mode}'. Supported: {_WRITE_MODES}")
    if data is None and query is None:
        raise ValueError("Either 'data' or 'query' must be provided")

    catalog_name = settings.CATALOG_NAME
    fqn = f"{catalog_name}.{namespace}.{table_name}"
    con = self.duckdb_connection

    con.raw_sql(f"USE {catalog_name}.{namespace};")

    if mode == "overwrite":
        con.raw_sql(f"DELETE FROM {fqn} WHERE true")  # noqa: S608

    if query is not None:
        con.raw_sql(f"INSERT INTO {fqn} {query}")  # noqa: S608
    elif data is not None:
        con.raw_sql(f"CREATE OR REPLACE TEMP VIEW _write_staging AS {data.compile()}")  # noqa: S608
        con.raw_sql(f"INSERT INTO {fqn} SELECT * FROM _write_staging")  # noqa: S608
        con.raw_sql("DROP VIEW IF EXISTS _write_staging")

    logger.info(f"Wrote to {fqn} (mode={mode}) via DuckDB")

create_table(namespace, table_name, schema_sql)

Create an Iceberg table via DuckDB.

Parameters:

Name Type Description Default
namespace str

The namespace name.

required
table_name str

The table name.

required
schema_sql str

Column definitions, e.g. "id INTEGER, name VARCHAR".

required
Source code in src/poor_man_lakehouse/lakehouse.py
def create_table(self, namespace: str, table_name: str, schema_sql: str) -> None:
    """Create an Iceberg table via DuckDB.

    Args:
        namespace: The namespace name.
        table_name: The table name.
        schema_sql: Column definitions, e.g. "id INTEGER, name VARCHAR".
    """
    catalog_name = settings.CATALOG_NAME
    fqn = f"{catalog_name}.{namespace}.{table_name}"
    self.duckdb_connection.raw_sql(f"CREATE TABLE IF NOT EXISTS {fqn} ({schema_sql})")  # noqa: S608
    logger.info(f"Created table {fqn}")

close()

Close all active connections and clear cached properties.

Source code in src/poor_man_lakehouse/lakehouse.py
def close(self) -> None:
    """Close all active connections and clear cached properties."""
    for prop in ("duckdb_connection",):
        if prop in self.__dict__:
            del self.__dict__[prop]
    logger.debug("LakehouseConnection closed")

Spark

SparkBuilder

poor_man_lakehouse.spark_connector.builder.SparkBuilder

Bases: ABC

Abstract base class for Spark session builders.

Provides common configuration for S3/Minio access and shared packages. Subclasses only need to implement catalog-specific configuration.

catalog_name property

Return the catalog name for this builder.

Default implementation uses settings.CATALOG_NAME. Subclasses can override for custom catalog names.

get_spark_session()

Build and return a configured Spark session with Iceberg and Delta support.

Returns:

Type Description
SparkSession

A configured SparkSession instance with both Iceberg catalog access

SparkSession

and Delta Lake path-based access enabled.

poor_man_lakehouse.spark_connector.builder.CatalogType

Bases: str, Enum

Supported catalog types.

Source code in src/poor_man_lakehouse/spark_connector/builder.py
class CatalogType(str, Enum):
    """Supported catalog types."""

    POSTGRES = "postgres"
    NESSIE = "nessie"
    LAKEKEEPER = "lakekeeper"
    GLUE = "glue"

poor_man_lakehouse.spark_connector.builder.get_spark_builder(catalog_type)

Get the appropriate Spark builder for the given catalog type.

Parameters:

Name Type Description Default
catalog_type CatalogType | str

The catalog type (enum or string).

required

Returns:

Type Description
SparkBuilder

An instance of the appropriate SparkBuilder subclass.

Raises:

Type Description
ValueError

If the catalog type is not supported.

Source code in src/poor_man_lakehouse/spark_connector/builder.py
def get_spark_builder(catalog_type: CatalogType | str) -> SparkBuilder:
    """Get the appropriate Spark builder for the given catalog type.

    Args:
        catalog_type: The catalog type (enum or string).

    Returns:
        An instance of the appropriate SparkBuilder subclass.

    Raises:
        ValueError: If the catalog type is not supported.
    """
    if isinstance(catalog_type, str):
        try:
            catalog_type = CatalogType(catalog_type)
        except ValueError as e:
            supported = [c.value for c in CatalogType]
            raise ValueError(f"Unsupported catalog: {catalog_type}. Supported: {supported}") from e

    builder_class = _CATALOG_BUILDERS.get(catalog_type)
    if builder_class is None:
        supported = [c.value for c in CatalogType]
        raise ValueError(f"Unsupported catalog: {catalog_type}. Supported: {supported}")

    return builder_class()

poor_man_lakehouse.spark_connector.builder.retrieve_current_spark_session()

Retrieve a Spark session configured for the current catalog setting.

Uses the CATALOG setting from configuration to determine which catalog implementation to use.

Returns:

Type Description
SparkSession

A configured SparkSession instance.

Raises:

Type Description
ValueError

If the configured catalog is not supported.

Source code in src/poor_man_lakehouse/spark_connector/builder.py
def retrieve_current_spark_session() -> SparkSession:
    """Retrieve a Spark session configured for the current catalog setting.

    Uses the CATALOG setting from configuration to determine which
    catalog implementation to use.

    Returns:
        A configured SparkSession instance.

    Raises:
        ValueError: If the configured catalog is not supported.
    """
    logger.debug(f"Setting up Spark session with catalog: {settings.CATALOG}")
    builder = get_spark_builder(settings.CATALOG)
    return builder.get_spark_session()