Apache Airflow
Core
Quickstart
import logging
from datetime import datetime
from typing import Dict
import requests
from airflow.decorators import dag, task
from airflow.operators.email import EmailOperator
API = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true&include_24hr_vol=true&include_24hr_change=true&include_last_updated_at=true"
@dag(schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False)
def taskflow():
@task(task_id="extract", retries=2)
def extract_bitcoin_price() -> Dict[str, float]:
return requests.get(API).json()["bitcoin"]
@task(multiple_outputs=True)
def process_data(response: Dict[str, float]) -> Dict[str, float]:
logging.info(response)
return {"usd": response["usd"], "change": response["usd_24h_change"]}
@task
def store_data(data: Dict[str, float]):
logging.info(f"Store: {data['usd']} with change {data['change']}")
email_notification = EmailOperator(
task_id="email_notification",
to="noreply@astronomer.io",
subject="dag completed",
html_content="the dag has finished",
)
store_data(process_data(extract_bitcoin_price())) >> email_notification
taskflow()
XCOM
Quickstart
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False)
def my_dag():
@task
def task1(ti=None):
ti.xcom_push(key='mobile_phone', value='iphone')
@task
def task2(ti=None):
phone = ti.xcom_pull(task_ids='task1', key='mobile_phone')
print(phone)
Limitation
SQLite: 2GB
Postgres: 1GB
MySql: 64MB
MSSQL
Connection
Connection Type: MsSQL
Operator
Hook
from airflow.decorators import dag, task
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
@task
def get_data():
hook = MsSqlHook(mssql_conn)
sql = “””
SELECT * FROM db.tbl
”””
df = hook.get_pandas_df(sql)
return df.to_dict()
@task
def insert_data():
hook = MsSqlHook(mssql_conn)
table_name = "db.tbl"
rows = [
[1, "Bob", "Doe"]
]
target_fields = ["id", "first_name", "last_name"]
hook.insert_rows(table=table_name, rows=rows, target_fields=target_fields)
@task
def create_table():
hook = MsSqlHook(mssql_conn)
sql= “””
SELECT * INTO db.tbl2 FROM db.tbl1
”””
hook.run(sql)
Oracle
Connection
Connection Type: Oracle
Extra: {“thick_mode”: “true”}
Operator
from airflow.providers.oracle.operators.oracle import OracleOperator
ora_task = OracleOperator(oracle_conn_id='oracle_default')
opr_sql = SQLExecuteQueryOperator(
task_id="task_sql", conn_id="oracle", sql="SELECT 1 FROM DUAL", autocommit=True
)
opr_stored_procedure_with_dict_input_output = OracleStoredProcedureOperator(
task_id="opr_stored_procedure_with_dict_input_output",
oracle_conn_id="oracle",
procedure="TEST_PROCEDURE",
parameters={"val_in": 3, "val_out": int},
)
Hook
HiveServer2
Connection
Connection Type: Hive Server 2 Thrift
Host: localhost
Login: admin
Port: 10000
Extra: {“auth_mechanism”: “LDAP”}
Hook
from airflow.decorators import dag, task
from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook
@task
def get_data():
hook = HiveServer2Hook(hs2_conn)
hql = “””
SELECT * FROM db.tbl limit 100
”””
df = hook.get_pandas_df(sql)
return df.to_dict()
HiveClient
Connection
Connection Type: Hive Client Wrapper
Host: localhost
Login: admin
Port: 10000
Extra: {“use_beeline”: “true”, “auth”: “LDAP”}
Operator
from airflow.decorators import dag, task
from airflow.operators.hive_operator import HiveOperator
hql = “””
CREATE TABLE IF NOT EXISTS db.tbl2 AS
SELECT * FROM db.tbl1
”””
hive_task = HiveOperator(hql=hql , hive_cli_conn_id='hc_conn')
Hook
from airflow.decorators import dag, task
from airflow.providers.apache.hive.hooks.hive import HiveCliHook
@task
def create_table()
hook = HiveCliHook(hc_conn)
hql = “””
CREATE TABLE IF NOT EXISTS db.tbl2 AS
SELECT * FROM db.tbl1
”””
hook.run_cli(sql)
WebHDFS
Connection
Hook
from airflow.decorators import dag, task
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
@task
def check_path():
hook = WebHDFSHook(webhdfs_conn_id='webhdfs_conn_id')
path_is_exits = hook.check_for_path(hdfs_path='hdfs://storage')
return path_is_exits
@task
def load_file():
hook = WebHDFSHook(webhdfs_conn_id='webhdfs_conn_id')
hook.load_file(source='./data.csv', destination='hdfs://storage/data.csv', overwrite=True)
Last updated