Pyiceberg

to learn pyiceberg. we need to have catalog and object storage. in this article, I will use postgres for catalog and minio for s3. Those tools will be setup using docker-compose

docker-compose.yml
version: "3.9"
services:
  postgres:
    image: postgres
    container_name: pg_catalog
    environment:
      POSTGRES_USER: admin
      POSTGRES_PASSWORD: password
      POSTGRES_DB: mycatalog
    restart: unless-stopped
    ports:
      - 5432:5432

  minio:
    image: quay.io/minio/minio:latest
    command: server /data --console-address ":9001"
    container_name: minio_s3
    # volumes:
    #   - ./data:/data
    environment:
      MINIO_ROOT_USER: admin
      MINIO_ROOT_PASSWORD: password
    restart: unless-stopped
    ports:
      - 9000:9000
      - 9001:9001

Create Access and Secret key on minio website http://localhost:9001 and replace in .env file

.env
S3_ACCESS_KEY=<Replace Access Key>
S3_SECRET_KEY=<Replace Secret Key>
PG_URL=postgresql+psycopg2://admin:password@localhost:5432/mycatalog

Code

Connect S3

import os
from minio import Minio
from minio.error import S3Error
from dotenv import load_dotenv
load_dotenv(override=True)

client = Minio(
    endpoint="localhost:9000",
    access_key=os.getenv("S3_ACCESS_KEY"),
    secret_key=os.getenv("S3_SECRET_KEY"),
    secure=False
)

bucket_name = "pyiceberg"
if not client.bucket_exists(bucket_name):
    client.make_bucket(bucket_name)

Connect Catalog & S3

import os
from pyiceberg.catalog import load_catalog

catalog = load_catalog(
    "docs",
    **{
        "uri": os.getenv("PG_URL"),
        "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": os.getenv("S3_ACCESS_KEY"),
        "s3.secret-access-key": os.getenv("S3_SECRET_KEY")
    }
)

Create Namespace

# init catalog
catalog.create_tables()

# create name space if not exists
namespace = "docs_example"
if not catalog._namespace_exists(namespace):
    catalog.create_namespace(namespace)
    
catalog.list_namespaces()
catalog.list_tables("docs_example")

Create Schema & Partition & Table

from pyiceberg.schema import Schema
from pyiceberg.types import (
    TimestampType,
    FloatType,
    DoubleType,
    StringType,
    NestedField,
    StructType,
    IntegerType
)

schema = Schema(
    NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True),
    NestedField(field_id=2, name="symbol", field_type=StringType(), required=True),
    NestedField(field_id=3, name="bid", field_type=FloatType(), required=False),
    NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
    NestedField(
        field_id=5,
        name="details",
        field_type=StructType(
            NestedField(
                field_id=4, name="created_by", field_type=StringType(), required=False
            ),
        ),
        required=False,
    ),
)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform

partition_spec = PartitionSpec(
    PartitionField(
        source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
    )
)
from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform

sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))

catalog.create_table(
    identifier="docs_example.bids",
    schema=schema,
    location="s3://pyiceberg",
    partition_spec=partition_spec,
    sort_order=sort_order,
)

Load Table

Catalog

table = catalog.load_table("docs_example.bids")
df = table.scan().to_pandas()
df

direct metadata file

from pyiceberg.table import StaticTable

static_table = StaticTable.from_metadata(
    "s3://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json"
)

Schema Evolution (Update table)

with table.update_schema() as update:
    update.add_column("volume", IntegerType(), "doc")

update_schema: NotImplementedError

Reference

Last updated