to learn pyiceberg. we need to have catalog and object storage. in this article, I will use postgres for catalog and minio for s3. Those tools will be setup using docker-compose
Copy version : "3.9"
services :
postgres :
image : postgres
container_name : pg_catalog
environment :
POSTGRES_USER : admin
POSTGRES_PASSWORD : password
POSTGRES_DB : mycatalog
restart : unless-stopped
ports :
- 5432:5432
minio :
image : quay.io/minio/minio:latest
command : server /data --console-address ":9001"
container_name : minio_s3
# volumes:
# - ./data:/data
environment :
MINIO_ROOT_USER : admin
MINIO_ROOT_PASSWORD : password
restart : unless-stopped
ports :
- 9000:9000
- 9001:9001
Copy S3_ACCESS_KEY=<Replace Access Key>
S3_SECRET_KEY=<Replace Secret Key>
PG_URL=postgresql+psycopg2://admin:password@localhost:5432/mycatalog
Copy import os
from minio import Minio
from minio . error import S3Error
from dotenv import load_dotenv
load_dotenv (override = True )
client = Minio (
endpoint = "localhost:9000" ,
access_key = os. getenv ( "S3_ACCESS_KEY" ),
secret_key = os. getenv ( "S3_SECRET_KEY" ),
secure = False
)
bucket_name = "pyiceberg"
if not client . bucket_exists (bucket_name):
client . make_bucket (bucket_name)
Copy import os
from pyiceberg . catalog import load_catalog
catalog = load_catalog (
"docs" ,
** {
"uri" : os. getenv ( "PG_URL" ),
"py-io-impl" : "pyiceberg.io.pyarrow.PyArrowFileIO" ,
"s3.endpoint" : "http://localhost:9000" ,
"s3.access-key-id" : os. getenv ( "S3_ACCESS_KEY" ),
"s3.secret-access-key" : os. getenv ( "S3_SECRET_KEY" )
}
)
Copy # init catalog
catalog . create_tables ()
# create name space if not exists
namespace = "docs_example"
if not catalog . _namespace_exists (namespace):
catalog . create_namespace (namespace)
catalog . list_namespaces ()
catalog . list_tables ( "docs_example" )
Create Schema & Partition & Table
Copy from pyiceberg . schema import Schema
from pyiceberg . types import (
TimestampType ,
FloatType ,
DoubleType ,
StringType ,
NestedField ,
StructType ,
IntegerType
)
schema = Schema (
NestedField (field_id = 1 , name = "datetime" , field_type = TimestampType (), required = True ),
NestedField (field_id = 2 , name = "symbol" , field_type = StringType (), required = True ),
NestedField (field_id = 3 , name = "bid" , field_type = FloatType (), required = False ),
NestedField (field_id = 4 , name = "ask" , field_type = DoubleType (), required = False ),
NestedField (
field_id = 5 ,
name = "details" ,
field_type = StructType (
NestedField (
field_id = 4 , name = "created_by" , field_type = StringType (), required = False
),
),
required = False ,
),
)
Copy from pyiceberg . partitioning import PartitionSpec , PartitionField
from pyiceberg . transforms import DayTransform
partition_spec = PartitionSpec (
PartitionField (
source_id = 1 , field_id = 1000 , transform = DayTransform (), name = "datetime_day"
)
)
Copy from pyiceberg . table . sorting import SortOrder , SortField
from pyiceberg . transforms import IdentityTransform
sort_order = SortOrder ( SortField (source_id = 2 , transform = IdentityTransform ()))
catalog . create_table (
identifier = "docs_example.bids" ,
schema = schema,
location = "s3://pyiceberg" ,
partition_spec = partition_spec,
sort_order = sort_order,
)
Copy table = catalog . load_table ( "docs_example.bids" )
df = table . scan (). to_pandas ()
df
Copy from pyiceberg . table import StaticTable
static_table = StaticTable . from_metadata (
"s3://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json"
)
Schema Evolution (Update table)
Copy with table . update_schema () as update :
update . add_column ( "volume" , IntegerType (), "doc" )