to learn pyiceberg. we need to have catalog and object storage. in this article, I will use postgres for catalog and minio for s3. Those tools will be setup using docker-compose
Copy version: "3.9"
services:
postgres:
image: postgres
container_name: pg_catalog
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: password
POSTGRES_DB: mycatalog
restart: unless-stopped
ports:
- 5432:5432
minio:
image: quay.io/minio/minio:latest
command: server /data --console-address ":9001"
container_name: minio_s3
# volumes:
# - ./data:/data
environment:
MINIO_ROOT_USER: admin
MINIO_ROOT_PASSWORD: password
restart: unless-stopped
ports:
- 9000:9000
- 9001:9001
Copy S3_ACCESS_KEY=<Replace Access Key>
S3_SECRET_KEY=<Replace Secret Key>
PG_URL=postgresql+psycopg2://admin:password@localhost:5432/mycatalog
Copy import os
from minio import Minio
from minio.error import S3Error
from dotenv import load_dotenv
load_dotenv(override=True)
client = Minio(
endpoint="localhost:9000",
access_key=os.getenv("S3_ACCESS_KEY"),
secret_key=os.getenv("S3_SECRET_KEY"),
secure=False
)
bucket_name = "pyiceberg"
if not client.bucket_exists(bucket_name):
client.make_bucket(bucket_name)
Copy import os
from pyiceberg.catalog import load_catalog
catalog = load_catalog(
"docs",
**{
"uri": os.getenv("PG_URL"),
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": os.getenv("S3_ACCESS_KEY"),
"s3.secret-access-key": os.getenv("S3_SECRET_KEY")
}
)
Copy # init catalog
catalog.create_tables()
# create name space if not exists
namespace = "docs_example"
if not catalog._namespace_exists(namespace):
catalog.create_namespace(namespace)
catalog.list_namespaces()
catalog.list_tables("docs_example")
Create Schema & Partition & Table
Copy from pyiceberg.schema import Schema
from pyiceberg.types import (
TimestampType,
FloatType,
DoubleType,
StringType,
NestedField,
StructType,
IntegerType
)
schema = Schema(
NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True),
NestedField(field_id=2, name="symbol", field_type=StringType(), required=True),
NestedField(field_id=3, name="bid", field_type=FloatType(), required=False),
NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
NestedField(
field_id=5,
name="details",
field_type=StructType(
NestedField(
field_id=4, name="created_by", field_type=StringType(), required=False
),
),
required=False,
),
)
Copy from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform
partition_spec = PartitionSpec(
PartitionField(
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
)
)
Copy from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform
sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))
catalog.create_table(
identifier="docs_example.bids",
schema=schema,
location="s3://pyiceberg",
partition_spec=partition_spec,
sort_order=sort_order,
)
Copy table = catalog.load_table("docs_example.bids")
df = table.scan().to_pandas()
df
Copy from pyiceberg.table import StaticTable
static_table = StaticTable.from_metadata(
"s3://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json"
)
Schema Evolution (Update table)
Copy with table.update_schema() as update:
update.add_column("volume", IntegerType(), "doc")