# Apache Airflow

## Core

### Quickstart

```python
import logging
from datetime import datetime
from typing import Dict

import requests
from airflow.decorators import dag, task
from airflow.operators.email import EmailOperator

API = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true&include_24hr_vol=true&include_24hr_change=true&include_last_updated_at=true"


@dag(schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False)
def taskflow():
    @task(task_id="extract", retries=2)
    def extract_bitcoin_price() -> Dict[str, float]:
        return requests.get(API).json()["bitcoin"]

    @task(multiple_outputs=True)
    def process_data(response: Dict[str, float]) -> Dict[str, float]:
        logging.info(response)
        return {"usd": response["usd"], "change": response["usd_24h_change"]}

    @task
    def store_data(data: Dict[str, float]):
        logging.info(f"Store: {data['usd']} with change {data['change']}")

    email_notification = EmailOperator(
        task_id="email_notification",
        to="noreply@astronomer.io",
        subject="dag completed",
        html_content="the dag has finished",
    )

    store_data(process_data(extract_bitcoin_price())) >> email_notification


taskflow()
```

### XCOM

#### Quickstart

```python
from airflow.decorators import dag, task
from datetime import datetime


@dag(schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False)
def my_dag():
    @task
    def task1(ti=None):
        ti.xcom_push(key='mobile_phone', value='iphone')
    
    @task
    def task2(ti=None):
        phone = ti.xcom_pull(task_ids='task1', key='mobile_phone')
        print(phone)
```

#### Limitation

* SQLite: 2GB
* Postgres: 1GB
* MySql: 64MB

## MSSQL

### Connection

```
Connection Type: MsSQL
```

### Operator

```
```

### Hook

```python
from airflow.decorators import dag, task
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook

@task
def get_data():
    hook = MsSqlHook(mssql_conn)

    sql = “””
    SELECT * FROM db.tbl
    ”””
    df = hook.get_pandas_df(sql)
    return df.to_dict()
    
@task
def insert_data():
    hook = MsSqlHook(mssql_conn)
    table_name = "db.tbl"
    rows = [
        [1, "Bob", "Doe"]
    ]
    target_fields = ["id", "first_name", "last_name"]
    hook.insert_rows(table=table_name, rows=rows, target_fields=target_fields)
@task
def create_table():
    hook = MsSqlHook(mssql_conn)
    sql= “””
    SELECT * INTO db.tbl2 FROM db.tbl1
    ”””
    hook.run(sql)
```

## Oracle

### Connection

```
Connection Type: Oracle
Extra: {“thick_mode”: “true”}
```

### Operator

```python
from airflow.providers.oracle.operators.oracle import OracleOperator

ora_task = OracleOperator(oracle_conn_id='oracle_default')

opr_sql = SQLExecuteQueryOperator(
    task_id="task_sql", conn_id="oracle", sql="SELECT 1 FROM DUAL", autocommit=True
)

opr_stored_procedure_with_dict_input_output = OracleStoredProcedureOperator(
    task_id="opr_stored_procedure_with_dict_input_output",
    oracle_conn_id="oracle",
    procedure="TEST_PROCEDURE",
    parameters={"val_in": 3, "val_out": int},
)
```

### Hook

## HiveServer2

### Connection

```
Connection Type: Hive Server 2 Thrift
Host: localhost
Login: admin
Port: 10000
Extra: {“auth_mechanism”: “LDAP”}
```

### Hook

```python
from airflow.decorators import dag, task
from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook

@task
def get_data():
    hook = HiveServer2Hook(hs2_conn)
    
    hql = “””
    SELECT * FROM db.tbl limit 100
    ”””
    df = hook.get_pandas_df(sql)
    return df.to_dict()
```

## HiveClient

### Connection

```
Connection Type: Hive Client Wrapper
Host: localhost
Login: admin
Port: 10000
Extra: {“use_beeline”: “true”, “auth”: “LDAP”}
```

### Operator

```python
from airflow.decorators import dag, task
from airflow.operators.hive_operator import HiveOperator

hql = “””
CREATE TABLE IF NOT EXISTS db.tbl2 AS
SELECT * FROM db.tbl1
”””

hive_task = HiveOperator(hql=hql , hive_cli_conn_id='hc_conn')
```

### Hook

```python
from airflow.decorators import dag, task
from airflow.providers.apache.hive.hooks.hive import HiveCliHook

@task
def create_table()
    hook = HiveCliHook(hc_conn)

    hql = “””
    CREATE TABLE IF NOT EXISTS db.tbl2 AS
    SELECT * FROM db.tbl1
    ”””
    
    hook.run_cli(sql)
```

## WebHDFS

### Connection

```
```

### Hook

<pre class="language-python"><code class="lang-python">from airflow.decorators import dag, task
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

@task
def check_path():
    hook = WebHDFSHook(webhdfs_conn_id='webhdfs_conn_id')
<strong>    path_is_exits = hook.check_for_path(hdfs_path='hdfs://storage')
</strong><strong>    return path_is_exits
</strong><strong>    
</strong><strong>@task
</strong><strong>def load_file():
</strong>    hook = WebHDFSHook(webhdfs_conn_id='webhdfs_conn_id')
    hook.load_file(source='./data.csv', destination='hdfs://storage/data.csv', overwrite=True)
</code></pre>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.datahungry.dev/library/apache-airflow.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
