InfluxDB用Pythonライブラリの使い方

この記事のまとめ:
  • 時系列データベースのオープンソースソフトウェアであるInfluxDBに対して、Pythonからデータベースを操作するための基本的な使い方の説明を行います。
背景

時系列データの機械学習をやってみようと思い、InfluxDBを使い始めましたので、今回はPythonを使ってInfluxDBにデータの挿入等を行ってみました。

InfluxDBのユーザー認証とアクセス制御

まずはInfluxDBに外部から接続できるようにユーザー設定をアクセス設定を行います。

ユーザー認証の設定

外部からの接続をユーザー認証によってアクセス制限します。 (※注意※初期設定ではIPアドレスとポート番号だけでアクセス可能になっていましたのですぐに変更したほうがよいです)

/etc/influxdb/influxdb.confをエディターで開き、下記のように変更します。

[http]
  enabled = true
  bind-address = ":8086"
  auth-enabled = true
  log-enabled = true
  write-tracing = false
  pprof-enabled = false
  https-enabled = false
  https-certificate = "/etc/ssl/influxdb.pem"

設定を変更したら、InfluxDBを再起動します。

$ sudo systemctl restart influxdb
管理者ユーザーの作成

再度、influxコマンドでInfluxDBに接続し、admin ユーザーを作成します。 下記のコマンドでadminユーザーを作成できます。

> CREATE USER admin WITH PASSWORD '********' WITH ALL PRIVILEGES

なお、adminユーザーができていない場合、どんなコマンドを打っても下記のようなエラーが出ます。

ERR: error authorizing query: create admin user first or disable authentication

一度InfluxDBとの接続を切り、先ほど作ったadminユーザーで接続し直します。

$ influx -precision rfc3339 -username admin -password *********

ちなみにadminユーザーを作った後に、何かしようとしても次のようにエラーが出るのでadminユーザーでの再接続は避けようがありません。

ERR: unable to parse authentication credentials
PythonからInfluxDBを操作する方法

InfluxDB用のPythonライブラリが提供されていますので、それを利用すれば大体のことは何でもできるはずです。

InfluxDB用のPythonライブラリに関しては、下記の公式のGithubとドキュメントにすべて記載されていますので、それらの一部を紹介していきます。

InfluxDB用Pythonライブラリ

InfluxDB用Pythonライブラリのインストール、アップデート、アンインストールは下記のコマンドでできます。

$ pip install influxdb
$ pip install --upgrade influxdb
$ pip uninstall influxdb

サポートされているPythonバージョンは、2.7, 3.5, 3.6, PyPy and PyPy3のようです。

サンプルコード

いくつか簡単な動作を実行しながら動作を見ていきたいと思います。


ライブラリのインポート

まずはPythonを起動し、ライブラリをインポートしましょう。

$ python
>>> import influxdb


クライアントインスタンスの生成

クライアントインスタンスを作ります。

>>> client = influxdb.InfluxDBClinet(host='aaa', port='8086', username='bbb', password='ccc')

初期化パラメーターとして次の引数を持つことができます。今回は最低限の引数のみでインスタンスを作りました。

  • host=u'localhost', port=8086, username=u'root', password=u'root', database=None, ssl=False, verify_ssl=False, timeout=None, retries=3, use_udp=False, udp_port=4444, proxies=None

また、次のようにDSN (Data Source Name)でもインスタンスを生成することもできます。

>>> client = influxdb.InfluxDBClinet.from_dsn('influxdb://[username]:[password]@[host]:[port]/[database]')


データベースの操作

作成済みのデータベースを表示してみます。次のように作成済みのデータベース名を表示できます。

>>> client.get_list_database()
>>> [{'name': '_internal'}, {'name': 'testdb'}]

使用するデータベースを選択・切替してみます。なお、データベースの選択は、クライアントインスタンス生成時にあらかじめ行っておくこともできます。

>>> client.switch_database('testdb')

その他、次の関数を使うことでデータベースの作成や削除もできます。

  • create_database(dbname)
  • drop_database(dbname)

クエリの実行

DBの操作のためのクエリの実行は次のようにできます。

>>> client.query('SELECT * FROM test_measurement')
>>> [{'time': '2018-10-06T13:04:43.515Z', 'host': 'cpu_load_short', 'value': 0.64}]

クエリの実行は、query関数を使うほか、request関数を使ってHTTP APIを使う方法や、send_packet関数でUDPを使う方法などありそうです。

  • query(query, params=None, epoch=None, expected_response_code=200, database=None, raise_errors=True, chunked=False, chunk_size=0)
  • request(url, method=u'GET', params=None, data=None, expected_response_code=200, headers=None)
  • send_packet(packet, protocol=u'json')

pointの挿入

pointの挿入は、queryを使うこともできますが、専用の関数が用意されています。

>>> json_body = [
    {
        "measurement": "cpu_load_short",
        "tags": {
            "host": "server01",
            "region": "us-west"
        },
        "time": "2009-11-10T23:00:00Z",
        "fields": {
            "value": 0.64
        }
    }
]
>>> client.write_points(json_body)

その他、HTTP APIを使って書き込みを行う関数(?)もありそうです。

  • write(data, params=None, expected_response_code=204, protocol=u'json')
  • write_points(points, time_precision=None, database=None, retention_policy=None, tags=None, batch_size=None, protocol=u'json')

その他の操作

measurementの取得、削除

  • get_list_measurements()
  • drop_measurement(measurement)

pointの削除

  • delete_series(database=None, measurement=None, tags=None)

ユーザー操作

  • get_list_users()
  • create_user(username, password, admin=False)
  • drop_user(username)
  • set_user_password(username, password)
  • switch_user(username, password)

リテンションポリシー操作

  • get_list_retention_policies(database=None)
  • create_retention_policy(name, duration, replication, database=None, default=False)
  • drop_retention_policy(name, database=None)
  • alter_retention_policy(name, database=None, duration=None, replication=None, default=None)

権限操作

  • get_list_privileges(username)
  • grant_privilege(privilege, database, username)
  • revoke_privilege(privilege, database, username)
  • grant_admin_privileges(username)
  • revoke_admin_privileges(username)

切断

  • close()

今回は以上です。 最後まで読んでいただき、ありがとうございます。
関連記事




コメント

このブログの人気の投稿

ネットワーク越しの RTL-SDR で SDR# を使う方法

PythonでPinterestのPin (画像)の検索結果を取得する