拋棄混亂無章的工作排程-使用 Airflow 管理

你是否有許多工作依靠 crontab 來管理,結果除了四散各處難以管理外,許多有相依性的功能又沒辦法保證順序?明明前一個工作失敗了,後一個依舊開始執行,結果使修復工作更加複雜?

這個時候 Apache Airflow 也許就是你的好選擇。


什麼是 Airflow

Airflow 是原先由 Airbnb 開發,而後開源出來的工作排程管理平台,提供了多樣的 operator 可以使用,例如 Bash Operator、Python Operator,甚至可以直接對 GCP、S3、Slack 等等進行操作,少了自己刻輪子的麻煩。

再來是我們會將有相關的工作整合為一個有向無循環圖 DAG(Directed Acyclic Graph),顧名思義就是有方向性且無回向造成循環的結構,而且整個 DAG 就是一個 Python 程式,達到大家所追求的 Infrastructure as code,減少維運上的複雜度。

在先前文章《論文選讀 — Real-Time Personalization using Embeddings for Search Ranking at Airbnb》中亦有提到,他們在處理訓練資料與訓練模型的流程也是依靠 Airflow 來管理

完整的 Airflow 包含了三個主要的元件

Airflow Webserver

Airflow 原本就提供了方便的圖形化頁面,可以快速地看到排程執行的狀態,以及在上面查看 Log 或是手動觸發等。

Airflow Scheduler

顧名思義這就是負責排程的東西,會持續監控所有的 DAG 與工作,當有符合條件的工作就會觸發使其執行。

Airflow Worker

實際的排程工作就是交由 Worker 來執行,同一個 Airflow cluster 中可以有多個 Worker,並且可通過指定 worker queue 使工作能在特定的資源上運作。(如訓練模型就由擁有 GPU 的 worker queue 來執行)


安裝 Airflow

使用 pip 安裝

雖然我很想跳過這一段,不過某方面來說也是挺重要的,而 Airflow 最簡單的安裝方法莫過於使用 pip 了,只要使用以下指令

# 安裝完整相關套件
$ pip3 install apache-airflow

# 如果只要安裝部分相關套件(例如只要 celery, slack, redis 三個)
$ pip3 install apache-airflow[celery,slack,redis]

值得注意的是,在較舊的教學中可能用 pip3 install airflow,但是從 1.8.1 版開始以後就已經改用apache-airflow,如果貿然使用舊的 package name 可能會讓程式炸掉。 XD

安裝完畢以後可以先執行初始化資料庫的指令,即可使用最簡單的 sqlite 來儲存設定與 log 等方式。

$ airflow initdb

或是也可以透過編輯預設路徑中 $HOME/airflow/airflow.cfg 的設定,改為使用 MySQL 等資料庫。

# airflow.cfg 內修改
sql_alchemy_conn = mysql://DB_USERNAME:DB_PASSWORD@DB_HOST:DB_PORT/DB_database

# 更改完再初始化一次資料庫
$ airflow initdb

接著應該可以用 command line 指令來啟動,也可以使用 systemd 等方式來管理,這些之後有機會再提。

$ airflow webserver
$ airflow scheduler
$ airflow worker

如果三個服務正常起來了,就可以正式開始用囉。

使用 docker compose

如果已經會使用 docker compose 的人,可以參考一下 puckel/docker-airflow 這個 repo。

docker run -d -p 8080:8080 puckel/docker-airflow webserver

第一個 DAG

如果你之前是使用 crontab 來管理三個相依的工作的話,那你可能已經有了類似以下的設定

10 * * * * /path/to/script/one.sh
20 * * * * /path/to/script/two.sh
30 * * * * /path/to/script/three.sh

這三個工作每個小時都會依序執行一次,但是因為各種其他原因只好間隔十分鐘排程一個,如果改用 Airflow 管理的話,只需要使用最基本的 Bash Operator 來執行就好。首先我們來看一下整個 DAG 的程式會是什麼樣子

from datetime import datetime, timedelta

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

# default setting
default_args = {
    'owner': 'OWNER_NAME',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(3),
    'email': ['OWNER_EMAIL'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# dag setting
dag = DAG(
    'example_dag',
    default_args=default_args,
    description='Example airflow dag.',
    schedule_interval='10 * * * *')

# define tasks
task1 = BashOperator(
    task_id='task_1',
    bash_command='/path/to/script/one.sh',
    dag=dag)

task2 = BashOperator(
    task_id='task_2',
    bash_command='/path/to/script/two.sh',
    dag=dag)

task3 = BashOperator(
    task_id='task_3',
    bash_command='/path/to/script/three.sh',
    dag=dag)

# set upstream, downstream
task1.set_downstream(task2)
task2.set_downstream(task3)
# task1 >> task2 >> task3

接著讓我們一步一步拆解說明

基本設定

# default setting
default_args = {
    'owner': 'OWNER_NAME', # dag 管理者名稱
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(3),
    'email': ['OWNER_EMAIL'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# dag setting
dag = DAG(
    'example_dag',
    default_args=default_args,
    description='Example airflow dag.',
    schedule_interval='10 * * * *'
)

這邊是設定共用的預設值,要特別注意的地方是 start_dateschedule_interval 這兩個值的意義。

start_date 開始時間可以使用明確的日期,也可以像是範例中的使用相對時間。而 schedule_interval 排程週期最簡單的方式就是按照 cron 的時間格式,因此如果排程是從 crontab 上搬過來的,就可以原封不動的直接使用。

第一次的執行這 DAG 的時間會是 start_date + schedule_interval 。舉例來說如果開始時間設為 2019-01-01週期設為每天零點整一次,那麽第一次執行將會是 2019-01-02 的零點整,而且此次執行的 execution_date 將會是 2019-01-01

乍看之下這個邏輯非常的多餘、麻煩且不直觀,但是對於「每天早上執行昨天一整天的資料」、「每個小時的十分時處理前一小時的資料」等需求下,反而是更為輕鬆的設計。

Task 定義

# define tasks
task1 = BashOperator(
    task_id='task_1',
    bash_command='/path/to/script/one.sh',
    dag=dag
)

task2 = BashOperator(
    task_id='task_2',
    bash_command='/path/to/script/two.sh',
    dag=dag)

task3 = BashOperator(
    task_id='task_3',
    bash_command='/path/to/script/three.sh',
    dag=dag)

# set upstream, downstream
task1.set_downstream(task2)
task2.set_downstream(task3)

# 也可以將上面兩行替換為
task1 >> task2 >> task3

這邊只使用到了最基礎的 BashOperator ,其餘除了 DockerOperator``PythonOperator``SlackOperator 以外也有整合許多雲端服務商,可以說是相當方便。

分別定義完每個 task 的內容以後,最後就差設定這些項目的順序。 task1.set_downstream(task2) 這個代表在執行完 task1 以後才會執行 task2 ,而這個如果改寫為 task2.set_upstream(task1) 也可以達成一樣效果。

但是如果你覺得這樣設定實在太囉嗦了,在 Airflow 中也可以使用 >>``<< 這兩個來定義順序,例如將本來要分別呼叫 set_downstream() 的合併在一行 task1 >> task2 >> task3就好了,是不是方便了非常多呢 XD

啟用 DAG

完成了 DAG 設定直接放入 airflow.cfg 指定的 DAG 資料夾,並且在介面上點選啟用即可。


總結

當初會導入 Airflow 主要原因就是當系統規模逐漸成長,本來使用 Cron 來排程的工作就變得十分破碎以及難以維護。再加上整個訓練 ML 模型流程的時間難以預估,因此一套適當的管理系統就非常重要。

我認為 Airflow 包含以下優缺點

優點

  1. 清晰易懂的管理介面
  2. 現成的 Operator 方便串接各式系統
  3. 較容易管理複雜的工作流程
  4. Workflow as Code

缺點

  1. 多一套系統需維護
  2. 需設定一些相依服務(資料庫、RabbitMQ 之類的)
  3. 部署多組 Worker 時較麻煩

後記

這篇文章在我草稿夾裡浮浮沉沉了好久,一直不知道該怎麼寫、怎麼收尾。因為發現自己有太多的坑還沒填了,刪刪改改後決定趁著記憶還沒完全消失,就先寫到這邊一段落結束。

有什麼想討論的歡迎提出。