Apache Airflow
Core
Quickstart
import logging
from datetime import datetime
from typing import Dict
import requests
from airflow.decorators import dag, task
from airflow.operators.email import EmailOperator
API = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true&include_24hr_vol=true&include_24hr_change=true&include_last_updated_at=true"
@dag(schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False)
def taskflow():
@task(task_id="extract", retries=2)
def extract_bitcoin_price() -> Dict[str, float]:
return requests.get(API).json()["bitcoin"]
@task(multiple_outputs=True)
def process_data(response: Dict[str, float]) -> Dict[str, float]:
logging.info(response)
return {"usd": response["usd"], "change": response["usd_24h_change"]}
@task
def store_data(data: Dict[str, float]):
logging.info(f"Store: {data['usd']} with change {data['change']}")
email_notification = EmailOperator(
task_id="email_notification",
to="[email protected]",
subject="dag completed",
html_content="the dag has finished",
)
store_data(process_data(extract_bitcoin_price())) >> email_notification
taskflow()XCOM
Quickstart
Limitation
SQLite: 2GB
Postgres: 1GB
MySql: 64MB
MSSQL
Connection
Operator
Hook
Oracle
Connection
Operator
Hook
HiveServer2
Connection
Hook
HiveClient
Connection
Operator
Hook
WebHDFS
Connection
Hook
Last updated