airflow

airflow taskflow

DanielZZI 2024. 3. 6. 20:41
728x90

airflow taskflow예시

 

 

다양한 종류의 데이터별로 쿼리를 수행한후 파싱한후에

 

일부는 db에 담아 continuous learning용도로 쓰고,

일부는 pivot해서 spotfire등의 visualization용도로 사용한다면?

 

 

빅데이터레이크에 직접적으로 여러 쿼리를 날릴경우에, 병렬수행을 하면 dag의 구동시간을 줄이고 효율적인 workflow가 될 수있을것이다.

 

이때 taskflow를 사용한 dag작성은 직관적이면서도 쉽게 동시성 코딩을 할 수 있다.

 

아래는 예시.. 자세한 설명은 귀찮으니 담에...

from airflow.utils.dates import days_ago
from airflow.decorators import dag, task, task_group

import pandas as pd

import datetime
default_args={
    "owner":"test",
    "start_date":days_ago(1),
    "provide_context":True,
}

@dag(
    defulat_args=default_args,
    schedule_interval="0 */8 * * *",
    dag_id="sample_taskflow",
    tags=["test"],
    catchup=False
)
def main(**kwargs):
    @task(retries=3, execution_timeout=datetime.timedelta(minutes=60))
    def data1_query1(day,step):
        print('data1 query1')
        df=pd.DataFrame({'a':[1,2,3],'b':[1,2,3],'c':[1,2,3]})
        print(day)
        print('step:',step)
        print(df)
        return df

    @task(retries=3,execution_timeout=datetime.timedelta(minutes=60))
    def data1_query2(day,step):
        print('data1 query2')
        df=pd.DataFrame({'a':[4,5,6],'b':[4,5,6],'c':[4,5,6]})
        print(day)
        print('step:',step)
        print(df)
        return df
    
    @task(retries=3,execution_timeout=datetime.timedelta(minutes=60))
    def data1_query3(day,step):
        print('data1 query3')
        df=pd.DataFrame({'a':[7,8,9],'b':[7,8,9],'c':[7,8,9]})
        print(day)
        print('step:',step)
        print(df)
        return df
    
    @task()
    def data1_extract1(PARA: pd.DataFrame):
        print('data1 ex1')
        print(PARA)
        return PARA
    
    @task()
    def data1_extract2(PARA: pd.DataFrame, before_pivot=pd.DataFrame):
        print('data1 ex2')
        PARA=pd.concat([before_pivot,PARA])
        print(PARA)

        return PARA
    
    @task()
    def data1_extract3(PARA: pd.DataFrame, before_pivot=pd.DataFrame):
        print('data1 ex3')
        PARA=pd.concat([before_pivot,PARA])
        print(PARA)

        return PARA


    @task()
    def postgres_add(df,table,dbname):
        print('insert table')

    @task()
    def data1_pivot(before_pivot):
        print('data1 pivot')


    @task(retries=3,execution_timeout=datetime.timedelta(minutes=60))
    def data2_query1(day,step):
        print('dcop query')
        df=pd.DataFrame({'d':[1,2,3],'e':[1,2,3],'f':[1,2,3]})
        print(day)
        print('step:',step)
        print(df)
        return df
    

    
    @task()
    def data2_extract1(PARA: pd.DataFrame):
        print('data2 ex1')
        print(PARA)
        return PARA


    @task(retries=3,execution_timeout=datetime.timedelta(minutes=60))
    def data3_query1(day):
        print('data3 query1')
        df=pd.DataFrame({'d':[1,2,3],'e':[1,2,3],'f':[1,2,3]})
        print(day)
        
        print(df)


    @task()
    def data3_extract1(PARA:pd.DataFrame):
        print('data3 ex1')
        print(PARA)
        return PARA
    


    @task_group
    def data1_parse(day,step):
        data1_q_result1=data1_query1(day,step)
        data1_e_result1=data1_extract1(PARA=data1_q_result1)

        data1_q_result2=data1_query2(day,step)
        data1_e_result2=data1_extract2(PARA=data1_q_result2, before_pivot=data1_e_result1)
        
        data1_q_result3=data1_query3(day,step)
        data1_e_result3=data1_extract3(PARA=data1_q_result3, before_pivot=data1_e_result2)

        postgres_add(df=data1_q_result3, table="data1_all", dbname='testdb')

        data1_pivot(before_pivot=data1_e_result3)


    @task_group
    def data2_parse(day,step):
        data2_q_result1=data2_query1(day,step)
        data2_e_result1=data2_extract1(PARA=data2_q_result1)

    
    @task_group
    def data3_parse(day):
        data3_q_result1=data3_query1(day)
        data3_extract1(data3_q_result1)


    day=[2,0]
    step_list=['step1_123','step1_1234']

    data1_parse.partial(day=day).expand(step=step_list)
    data2_parse.partial(day=day).expand(step=step_list)
    data3_parse(day=day)

main()