DataHungry Documents
  • Welcom to DataHungry Documents
  • Library
    • Apache Airflow
    • Apache Iceberge
    • Bardapi
    • Binance
    • Databricks
    • Datetime
    • dotenv
    • FastAPI
    • Logging
    • Langchain
    • Minio (S3)
    • MLflow
    • OpenCV
    • Optuna
    • os
    • Pyiceberg
    • Pyspark
    • Pytest
    • Schedule
    • Sklearn & SHAP
    • SQLAlchemy
    • transformers (huggingface)
    • Firebase Firestore
  • Course
    • Web Scraping
    • Streamlit
    • NLP
  • Utility
    • Docker
    • Google Sheet
  • SQL
    • Basic SQL Statements
    • PL/SQL
    • Stored Procedure & Function
  • Scala
    • Setup
    • Spark
  • Cloud
    • AWS
    • Google Cloud
Powered by GitBook
On this page
  • Code
  • Connect S3
  • Connect Catalog & S3
  • Create Namespace
  • Create Schema & Partition & Table
  • Load Table
  • Schema Evolution (Update table)
  • Reference
  1. Library

Pyiceberg

to learn pyiceberg. we need to have catalog and object storage. in this article, I will use postgres for catalog and minio for s3. Those tools will be setup using docker-compose

docker-compose.yml
version: "3.9"
services:
  postgres:
    image: postgres
    container_name: pg_catalog
    environment:
      POSTGRES_USER: admin
      POSTGRES_PASSWORD: password
      POSTGRES_DB: mycatalog
    restart: unless-stopped
    ports:
      - 5432:5432

  minio:
    image: quay.io/minio/minio:latest
    command: server /data --console-address ":9001"
    container_name: minio_s3
    # volumes:
    #   - ./data:/data
    environment:
      MINIO_ROOT_USER: admin
      MINIO_ROOT_PASSWORD: password
    restart: unless-stopped
    ports:
      - 9000:9000
      - 9001:9001
.env
S3_ACCESS_KEY=<Replace Access Key>
S3_SECRET_KEY=<Replace Secret Key>
PG_URL=postgresql+psycopg2://admin:password@localhost:5432/mycatalog

Code

Connect S3

import os
from minio import Minio
from minio.error import S3Error
from dotenv import load_dotenv
load_dotenv(override=True)

client = Minio(
    endpoint="localhost:9000",
    access_key=os.getenv("S3_ACCESS_KEY"),
    secret_key=os.getenv("S3_SECRET_KEY"),
    secure=False
)

bucket_name = "pyiceberg"
if not client.bucket_exists(bucket_name):
    client.make_bucket(bucket_name)

Connect Catalog & S3

import os
from pyiceberg.catalog import load_catalog

catalog = load_catalog(
    "docs",
    **{
        "uri": os.getenv("PG_URL"),
        "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": os.getenv("S3_ACCESS_KEY"),
        "s3.secret-access-key": os.getenv("S3_SECRET_KEY")
    }
)

Create Namespace

# init catalog
catalog.create_tables()

# create name space if not exists
namespace = "docs_example"
if not catalog._namespace_exists(namespace):
    catalog.create_namespace(namespace)
    
catalog.list_namespaces()
catalog.list_tables("docs_example")

Create Schema & Partition & Table

from pyiceberg.schema import Schema
from pyiceberg.types import (
    TimestampType,
    FloatType,
    DoubleType,
    StringType,
    NestedField,
    StructType,
    IntegerType
)

schema = Schema(
    NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True),
    NestedField(field_id=2, name="symbol", field_type=StringType(), required=True),
    NestedField(field_id=3, name="bid", field_type=FloatType(), required=False),
    NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
    NestedField(
        field_id=5,
        name="details",
        field_type=StructType(
            NestedField(
                field_id=4, name="created_by", field_type=StringType(), required=False
            ),
        ),
        required=False,
    ),
)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform

partition_spec = PartitionSpec(
    PartitionField(
        source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
    )
)
from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform

sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))

catalog.create_table(
    identifier="docs_example.bids",
    schema=schema,
    location="s3://pyiceberg",
    partition_spec=partition_spec,
    sort_order=sort_order,
)

Load Table

Catalog

table = catalog.load_table("docs_example.bids")
df = table.scan().to_pandas()
df

direct metadata file

from pyiceberg.table import StaticTable

static_table = StaticTable.from_metadata(
    "s3://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json"
)

Schema Evolution (Update table)

with table.update_schema() as update:
    update.add_column("volume", IntegerType(), "doc")

update_schema: NotImplementedError

Reference

PreviousosNextPyspark

Last updated 1 year ago

Create Access and Secret key on minio website and replace in .env file

http://localhost:9001
https://github.com/minio/minio/blob/master/docs/docker/README.md
https://hub.docker.com/_/postgres