Apache Airflow の HTTP Operator の使い方

この記事のまとめ:
  • Apache Airflow のワークフローのタスクを定義するオペレーターの一つである HTTP Operator の使い方を紹介します。
背景

私が Apache Airflow を使用する際、Docker コンテナーで Airflow 自身を立ち上げて使用するのですが、処理させたいタスクの実行が Airflow のコンテナー内で行われることが気持ち悪かったので、処理させたいタスクを別のコンテナーで実行するために、処理用のコンテナーを REST API を持つコンテナーとして立ち上げ、 Airflow からは REST API でそのコンテナーにキックできるようにしようとしました。

HTTP Operator の使い方

Airflow のタスクは OperatorSensor を使って定義することができ、HTTP Operator は タスクのアクションとして HTTP システムのエンドポイント(もしくは URI )を呼び出すことができます。

公式の GitHub で HTTP Operator のサンプルコードが公開されていますが、本記事では公式のサンプルコードには書いていない使い方を説明していこうと思います。

サンプルコード

サンプルとして用意した DAG の定義ファイルは下記の通りです。

from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.exceptions import AirflowException
from datetime import datetime, timedelta
import pendulum, logging, json
 
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": pendulum.today("Asia/Tokyo"),
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "catchup": False
}
 
def response_check_example(response):
    result = response.json()
    if result['status'] == 'success':
        return True
    else:
        return False
 
dag1 = DAG("http_operator_example", default_args=default_args, schedule_interval=timedelta(hours=1))
task1 = SimpleHttpOperator(
    task_id='example',
    http_conn_id='',
    endpoint='http://myapi:5000',
    data=json.dumps({'input1':1, 'input2':2}),
    headers={"Content-Type": "application/json"},
    response_check=response_check_example,
    log_response=True,
    dag=dag1,
)

HTTP Operator としてデフォルトで用意されている SimpleHttpOperator クラスを使用します。この Operator を使うにあたって、重要なコンストラクターとして渡しているパラメーターについて見ていきます。

http_conn_id と endpoint

http_conn_idendpoint はセットになっており、この2つでリクエスト先の URI を指定します。

Airflow では、サービスごとのホストアドレスがあらかじめ定義されており、それを利用することで Operator を作成するごとにアクセス先のすべての URI を記載するのでなく、エンドポイントのみで指定しようとしています。つまり、http_conn_id で定義されるホストアドレス + endpoint で URI が決まるということです。

SimpleHttpOperator では http_operator.py において、デフォルト値として、http_conn_id = ['](http://http_conn_id: str = 'http_default',)``http_default``' として設定されています。また、 http_default は、db.py において、 https://google.com として定義されていますので、公式の HTTP Operator のサンプルコードではすべて Google の API を使用しているということがわかります。

今回のサンプルコードでは Google 以外の API を使用するために、http_conn_id=``'' とすることで http_conn_id で決定されるホストアドレスを空文字にし、endpoint で URI を定義しています。

data と headers

これらは HTTP リクエストのヘッダーと JSON リクエストとして送る JSON データを指定します。なお、 SimpleHttpOperator ではデフォルトで HTTP メソッドとして POST が設定されています。

response_check

Airflow のタスクが無事完了したかどうかの判定を response_check で行います。判定方法は lambda 式で行うか、定義した関数を呼び出して行うかを選べます。何も指定しない場合、(ソースコード上)無条件でタスクの実行は成功したことになるはずです。

公式のサンプルコードでは lambda 式を使用していますので、本サンプルコードでは response_check_example という関数を定義して使用しています。

この関数は単純に HTTP レスポンスの JSON ファイルの中身を見て、status という項目が success かどうかを判定しています。なお、HTTP レスポンスの JSON ファイルの中身がそのような構成になっている前提です。このように簡単にレスポンスの中身を見た判定が行えます。

log_response

このパラメーターを True にしておくと Airflow の Web GUI 上でログとして HTTP レスポンスの中身を出力してくれるようになります。



ここまでで HTTP Operator の使い方の概要はわかったのではないでしょうか?

もし、もっと処理を工夫したいとなると、SimpleHttpOperator を使うのではなく、SimpleHttpOperator を継承させて新たなクラスを定義すると良いかもしれませんのでぜひチャレンジしてみてください。


今回は以上です。 最後まで読んでいただき、ありがとうございます。
関連記事



コメント

このブログの人気の投稿

ネットワーク越しの RTL-SDR で SDR# を使う方法

PythonでPinterestのPin (画像)の検索結果を取得する