SQLAlchemy
import pandas as pd
from sqlalchemy import create_engine, text, inspect
sqlalchemy.__version__
pip install sqlalchemy-cockroachdb
Create engine
from sqlalchemy import create_engine, text, inspect
engine = create_engine('sqlite:///mydb.db')
Inspector
from sqlalchemy import create_engine, text, inspect
engine = create_engine('sqlite:///mydb.db')
inspector = inspect(engine)
schemas = inspector.get_schema_names()
schemas
schema = schemas[0]
table_names = inspector.get_table_names(schema=schema)
table_names
table_name = table_names[0]
columns = inspector.get_columns(table_name, schema=schema)
columns
if inspect(engine).has_table(table_name):
print("Has table")
query = text("""
CREATE DATABASE mydb
""")
with engine.connect() as conn:
conn.execute(text(query))
url = "https://raw.githubusercontent.com/ywchiu/riii/master/data/house-prices.csv"
df = pd.read_csv(url)
df
df.to_sql('house_prices', index=False, if_exists='replace', con=engine)
query = text("""
SELECT
*
FROM
house_prices
LIMIT
20
""")
df = pd.read_sql(query, con=engine)
df
Session
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine('postgresql+psycopg2://username:password@localhost:port/database')
Session = sessionmaker(engine)
session = Session()
try:
user1 = User(name="user1")
session.add(user1)
session.execute()
session.delete(obj1)
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()
Object Relational Mapping (ORM)
from sqlalchemy import create_engine
from sqlalchemy import func, or_, not_, and_
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.automap import automap_base
engine = create_engine('postgresql+psycopg2://username:password@localhost:port/database')
Base = automap_base()
Base.prepare(engine, reflect=True)
# map existing table
User = Base.classes.user
Acnt = Base.classes.account
Txn = Base.classes.transaction
Session = sessionmaker(engine)
session = Session()
def get_user(user_id: int):
user = session.query(User).filter(
User.id == user_id
).first()
return user
last_txn_id = session.query((func.max(Txn.id)).group_by(User.id).subquery()
last_txn_by_users = session.query(User, Acnt, Txn).filter(
User.id == Txn.user_id,
User.id == Acnt.user_id,
User.is_active == 1,
Acnt.deleted_at.is_(None),
or_(Acnt.expire_date.is_(None), str(datetime.now()) < func.date(Acnt.expire_date)),
Txnid.in_(last_txn_id)
).all()
# session.add(row)
# session.commit()
# session.refresh(row)
# .offset(skip) .limit
Last updated