Apache Airflow

Core

Quickstart

import logging
from datetime import datetime
from typing import Dict

import requests
from airflow.decorators import dag, task
from airflow.operators.email import EmailOperator

API = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true&include_24hr_vol=true&include_24hr_change=true&include_last_updated_at=true"


@dag(schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False)
def taskflow():
    @task(task_id="extract", retries=2)
    def extract_bitcoin_price() -> Dict[str, float]:
        return requests.get(API).json()["bitcoin"]

    @task(multiple_outputs=True)
    def process_data(response: Dict[str, float]) -> Dict[str, float]:
        logging.info(response)
        return {"usd": response["usd"], "change": response["usd_24h_change"]}

    @task
    def store_data(data: Dict[str, float]):
        logging.info(f"Store: {data['usd']} with change {data['change']}")

    email_notification = EmailOperator(
        task_id="email_notification",
        to="noreply@astronomer.io",
        subject="dag completed",
        html_content="the dag has finished",
    )

    store_data(process_data(extract_bitcoin_price())) >> email_notification


taskflow()

XCOM

Quickstart

from airflow.decorators import dag, task
from datetime import datetime


@dag(schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False)
def my_dag():
    @task
    def task1(ti=None):
        ti.xcom_push(key='mobile_phone', value='iphone')
    
    @task
    def task2(ti=None):
        phone = ti.xcom_pull(task_ids='task1', key='mobile_phone')
        print(phone)

Limitation

  • SQLite: 2GB

  • Postgres: 1GB

  • MySql: 64MB

MSSQL

Connection

Connection Type: MsSQL

Operator

Hook

from airflow.decorators import dag, task
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook

@task
def get_data():
    hook = MsSqlHook(mssql_conn)

    sql = “””
    SELECT * FROM db.tbl
    ”””
    df = hook.get_pandas_df(sql)
    return df.to_dict()
    
@task
def insert_data():
    hook = MsSqlHook(mssql_conn)
    table_name = "db.tbl"
    rows = [
        [1, "Bob", "Doe"]
    ]
    target_fields = ["id", "first_name", "last_name"]
    hook.insert_rows(table=table_name, rows=rows, target_fields=target_fields)
@task
def create_table():
    hook = MsSqlHook(mssql_conn)
    sql= “””
    SELECT * INTO db.tbl2 FROM db.tbl1
    ”””
    hook.run(sql)

Oracle

Connection

Connection Type: Oracle
Extra: {“thick_mode”: “true”}

Operator

from airflow.providers.oracle.operators.oracle import OracleOperator

ora_task = OracleOperator(oracle_conn_id='oracle_default')

opr_sql = SQLExecuteQueryOperator(
    task_id="task_sql", conn_id="oracle", sql="SELECT 1 FROM DUAL", autocommit=True
)

opr_stored_procedure_with_dict_input_output = OracleStoredProcedureOperator(
    task_id="opr_stored_procedure_with_dict_input_output",
    oracle_conn_id="oracle",
    procedure="TEST_PROCEDURE",
    parameters={"val_in": 3, "val_out": int},
)

Hook

HiveServer2

Connection

Connection Type: Hive Server 2 Thrift
Host: localhost
Login: admin
Port: 10000
Extra: {“auth_mechanism”: “LDAP”}

Hook

from airflow.decorators import dag, task
from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook

@task
def get_data():
    hook = HiveServer2Hook(hs2_conn)
    
    hql = “””
    SELECT * FROM db.tbl limit 100
    ”””
    df = hook.get_pandas_df(sql)
    return df.to_dict()

HiveClient

Connection

Connection Type: Hive Client Wrapper
Host: localhost
Login: admin
Port: 10000
Extra: {“use_beeline”: “true”, “auth”: “LDAP”}

Operator

from airflow.decorators import dag, task
from airflow.operators.hive_operator import HiveOperator

hql = “””
CREATE TABLE IF NOT EXISTS db.tbl2 AS
SELECT * FROM db.tbl1
”””

hive_task = HiveOperator(hql=hql , hive_cli_conn_id='hc_conn')

Hook

from airflow.decorators import dag, task
from airflow.providers.apache.hive.hooks.hive import HiveCliHook

@task
def create_table()
    hook = HiveCliHook(hc_conn)

    hql = “””
    CREATE TABLE IF NOT EXISTS db.tbl2 AS
    SELECT * FROM db.tbl1
    ”””
    
    hook.run_cli(sql)

WebHDFS

Connection

Hook

from airflow.decorators import dag, task
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

@task
def check_path():
    hook = WebHDFSHook(webhdfs_conn_id='webhdfs_conn_id')
    path_is_exits = hook.check_for_path(hdfs_path='hdfs://storage')
    return path_is_exits
    
@task
def load_file():
    hook = WebHDFSHook(webhdfs_conn_id='webhdfs_conn_id')
    hook.load_file(source='./data.csv', destination='hdfs://storage/data.csv', overwrite=True)

Last updated