Python Monthly Topics

AWS CloudWatchのログをpandasで解析し⁠エラーの状況を可視化する

寺田 学@terapyonです。2023年12月の「Python Monthly Topics」は、CloudWatch Logsに蓄積したログをpandasで状況を確認したり、可視化する方法を解説します。

目的⁠モチベーション

AWSでサーバの運用を行っていると、さまざまなログがCloudWatch Logsに保存されます。保存されたログから、動作状況の確認やエラーなどを特定、傾向を掴むために外部のツールと組み合わせることがあります。

pandasというPythonでデータ分析を行っている方には馴染みのあるツールを使えば、ログを処理できます。さらに、Pythonコードで実行できることで、定期的な実行もできます。

本記事を読む上での予備知識

実際の加工や可視化を行う前に、本記事の前提となる用語の解説やツールの説明をします。

AWS CloudWatch Logsとは

パブリッククラウドであるAmazon Web Service(AWS)のマネージドサービスを使っていると、⁠CloudWatch Logs」というログ収集機能でログが保存されます。たとえば、⁠AWS Lambda」や今回紹介する「AWS WAF」などの動作ログやエラーログがCloudWatch Logsに保存されます。

よくある分析方法

CloudWatch Logsでよくあるログ分析方法を説明します。

Webコンソールで見る
CloudWatch Logsでは、AWS Webコンソール経由でほぼリアルタイムでログを閲覧できたり、フィルタフィングしたりする機能があります。しかし、詳細な分析をするにはAWS Webコンソールでは不十分なことがあります。
CloudWatch Logs Insights機能を使う
Webコンソールから「Logs Insights」を利用することで、ある程度の分析や傾向を掴むことができます。
S3にデータ転送し⁠複数ファイルをダウンロードして加工する
AWSのストレージサービスである、⁠S3」にログデータを転送し、ローカルPCにダウンロードすることができます。ただし、何かしらのツールでデータをS3に転送する処理を実行する必要があります。また、CloudWatchのログは時間によって自動的に分割されるため、ひと手間作業しないとログをまとめて分析することができません。
Amazon Athenaに連携して分析
AWSのマネージドサービスであるAmazon Athenaを利用するという方法があります。常にログの監視を行っている場合には、導入されているケースもあるかと思います。追加の費用が必要だったり、事前の設定が必要というところがネックになる場合があります。

pandasでの時系列データの扱い

pandasはPythonのサードパーティ製パッケージで、データ分析を行う上でよく使われているライブラリです。pandasには、データ分析を行う上で豊富な機能が備わっています。特に「DataFrame」というデータ形式で、二次元のデータをIndex付きで扱います。

pandasでは時系列のIndexを付与したDataFrameが作成できるので、ログデータを分析しやすくすることができます。時系列データの処理ができるので、時間を指定しての検索や、時間ごとの統計的な分析、可視化ができます。

AWS CloudWatch Logsにおけるログデータの扱いが大変

CloudWatch Logsのデータは一般のログデータと違い、JSON形式で扱われることがあります。さらに、テキスト形式で扱われる場合もあり、その時にはログ内の改行によってレコードが分割されることがあります。また、ダウンロードしたファイルは複数ファイルに分かれていて、ダウンロード時には圧縮されていることがあり、多種多様な形式になっています。

JSONで保存されるWAFのログの例
WAFのCloudWatch Logsのログを確認
テキストで保存されるLambdaのログの例
LambdaのCloudWatch Logsのコンソール表示

Pythonの標準ライブラリを使ってログデータの解析を行うことはできますが、この記事ではCloudWatch Logsを簡単に扱うためのツールを紹介します。

AWS SDK for pandasのインストールと環境の準備

AWS SDK for pandasは、AWSが開発し公開しているオープンソースのライブラリで、CloudWatch LogsやAthena、S3データなどのAWSのリソースを、pandasで効率的に扱うことができます。

項目 内容
ライブラリ名 AWS SDK for pandas(awswrangler)
開発元 AWS
ライセンス Apache License 2.0
Python バージョン 3.8 以上
公式サイト https://aws-sdk-pandas.readthedocs.io/en/stable/
PyPI https://pypi.org/project/awswrangler/
GitHub https://github.com/aws/aws-sdk-pandas
執筆時点のバージョン 3.4.2

このツールでは、以下に対応しています(公式サイトより⁠⁠。

Athena, Glue, Redshift, Timestream, OpenSearch, Neptune, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL)

今回は、CloudWatch Logsの解析に着目して紹介します。

具体的には、CloudWatch Logsのログを読み込むawswrangler.cloudwatch.read_logs関数があります。この関数を使用することで、CloudWatch LogsのログをpandasのDataFrameに変換できます。詳細については後述します。

インストールと基本設定

インストール方法は以下の通りです。

pip install awswrangler

awswranglerをインストールすると、同時にpandasがインストールされます。また、AWS SDK for Pythonであるboto3も同時にインストールさます。

この記事で紹介するコードをJupyterLabで実行可能にし、可視化するためにグラフライブラリのMatplotlibもインストールしてください。JupyterLabはWebブラウザで対話型にプログラムを実行できる環境です。

pip install jupyterlab matplotlib

awswranglerでcloudwatch.read_logsを使ってログを取得するには、AWSのクレデンシャル設定が必要です。

AWSのクレデンシャル設定の前に、AWS IAMにてCloudWatch Logsの読み取り権限を持つユーザを作ります。IAMユーザに幅広い権限を付けると不用意な操作ができるため、専用のユーザを作ることをお勧めします。

IAMユーザの設定情報
項目 内容
ユーザ名 gihyo-monthly-test(任意の名前)
マネジメントコンソール ユーザーアクセスを提供しない(チェックなし)
ポリシー CloudWatchLogsReadOnlyAccess
アクセスキー 作成する
IAMのポリシー設定画面
IAMのポリシー設定画面

IAMユーザの作成方法の詳細は、IAM公式ドキュメント、または他の参考資料をご確認ください。

IAMユーザを作成する時に、⁠アクセスキー」「シークレットアクセスキー」が作成時のみ画面に表示されます。これらのキーは後ほどログを取得する際に利用しますので、大切に保管してください。

AWSプロファイルを手元のPCに設定する方法もありますが、今回はプロファイルを生成せずに環境変数でコードに渡す方法で説明します。AWSプロファイルの設定を行いたい場合は、AWS公式ドキュメント(Configure the AWS CLI) を確認してください。

環境変数への登録と確認

「アクセスキー」「シークレットアクセスキー」を環境変数に設定します。

$ export AWS_ACCESS_KEY=****************JUMP  # 作成したアクセスキー
$ export AWS_SECRET_KEY=**************************1C53n  # 作成したアクセスキーののシークレット
$ jupyter lab

環境変数を設定して、JupyterLabを立ち上げ、PythonのOSモジュールを使って環境変数の取得を行います。以下のように設定した「アクセスキー」「シークレットアクセスキー」が取得できたら、ここでの確認は完了となります。

JupyterLabで環境変数の取得確認
JupyterLabで環境変数の取得確認

cloudwatch.read_logsの基本的な使い方

ここでは、cloudwatch.read_logsを使ってAWS WAFのログを取得し、JSON形式のログをDataFrameの列に展開し、必要な項目を抽出します。

ログのグループ名⁠期間⁠クエリーを設定して取得

CloudFrontに設定したファイアウォールAWS WAFのログを取得します。AWS WAFはCloudFrontの前面に配置できるファイアウォールです。今回の例では、ログインを伴うURLに対してIPアドレス制限をし、設定したIP以外からのログインを禁止するように設定されています。

WAFのCloudWatch Logsのデータを読み込む概念図
WAFのCloudWatch Logsのデータを読み込む

WAFのログをCloudWatch Logsに保存するには、以下のようにAWSコンソールで設定をする必要があります。

WAFのログをCloudWatch Logsに保存する設定
WAFのログをCloudWatch Logsに保存する設定1
WAFのログをCloudWatch Logsに保存する設定2

サンプル用に、ロググループ名はaws-waf-logs-XXXXXXXXXXXXXX-cfとします。なお、今回のログは、CloudFront関連のため、リージョン us-east-1に保存されています。

AWSコンソールでCloudWatch Logsを見ると以下のようになっています。

CloudWatch Logsでログストリームを選択
CloudWatch Logsでログストリームを選択
CloudWatch Logsのログを確認
CloudWatch Logsのログを確認

ここでは、PythonコードでCloudWatch Logsのデータを取得し、pandasのDataFrameに変換する方法を紹介します。

get_logs.py - ログを取得してDataFrame化
import os
from datetime import datetime
from zoneinfo import ZoneInfo
import awswrangler as wr
import boto3


ASIA_TOKYO = ZoneInfo("Asia/Tokyo")  # タイムゾーンをJSTに設定
start = datetime(2023, 12, 4, 17, 55, 0, tzinfo=ASIA_TOKYO)  # 2023年12月4日17時55分
end = datetime(2023, 12, 4, 18, 5, 0, tzinfo=ASIA_TOKYO)  # 2023年12月4日18時05分
limit = 2000  # データ件数の上限
region_name = "us-east-1"  # リージョン名
log_name = "aws-waf-logs-XXXXXXXXXXXXXX-cf"  # ロググループ名

# boto3のセッションを作成
boto3_session = boto3.session.Session(
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY"),
    aws_secret_access_key=os.getenv("AWS_SECRET_KEY"),
    region_name=region_name,
)

# CloudWatch Logsからログを取得
df_waf_raw = wr.cloudwatch.read_logs(
    log_group_names=[log_name],  # ロググループ名
    query="fields @timestamp, @message | sort @timestamp asc",  # クエリ
    limit=limit,  # データ件数の上限
    start_time=start,  # 開始時刻
    end_time=end,  # 終了時刻
    boto3_session=boto3_session,  # boto3のセッション
)

print(df_waf_raw.shape)  # (1338, 3)

このコードでは、2023年12月4日18時の前後5分のログを取得しています。ログを取得した結果、1338件のログが記録されていることがわかりました。

ログの取得にfields @timestamp, @message | sort @timestamp ascとクエリを設定しました。これは、timestampmessageのフィールドを取得し、timestampの昇順で取得する意味のクエリです。AWSコンソールで見た通り、CloudWatch Logsにはタイムスタンプ(timestamp)とメッセージ(message)の組み合わせで保存されています。

CloudWatch Logsを検索するクエリ文法については、次の公式サイトを参照してください。

JSON形式のログを展開

AWS WAFのログは、message列がJSON形式で以下のように出力されています。

{
  "timestamp": 1701680117244,
  "formatVersion": 1,
  "webaclId": "arn:aws:wafv2:us-east-1:227xxxxxx242:global/webacl/xxxxxxxxxxxxcf/baed1c94-xxxxxxxxxxxxxxxx-55de7",
  "terminatingRuleId": "Default_Action",
  "terminatingRuleType": "REGULAR",
  "action": "ALLOW",
  "terminatingRuleMatchDetails": [],
  "httpSourceName": "CF",
  "httpSourceId": "EOXXXXXXXXXXXXMW",
  "ruleGroupList": [],
  "rateBasedRuleList": [],
  "nonTerminatingMatchingRules": [],
  "requestHeadersInserted": null,
  "responseCodeSent": null,
  "httpRequest": {
    "clientIp": "52.xxx.xxx.247",
    "country": "JP",
    "headers": [
      { "name": "Host", "value": "www.xxxxxxx.jp" },
      { "name": "Accept-Encoding", "value": "gzip, deflate" },
      { "name": "User-Agent", "value": "xxxxxxxxxxxxxx" },
      { "name": "x-forwarded-for", "value": "10.x.xx.210, 127.0.0.1" },
      { "name": "x-forwarded-host", "value": "10.x.xx.75" },
      { "name": "Accept", "value": "application/json" },
      { "name": "Connection", "value": "close" }
    ],
    "uri": "/",
    "args": "",
    "httpVersion": "HTTP/1.1",
    "httpMethod": "GET",
    "requestId": "pWxlfUPbfcztylxxxxxxxxxxxxxxxxxxxxxxxxxxyd4ll_CYPDXA=="
  },
  "ja3Fingerprint": "398430069xxxxxxxxxxxxxxxxxxxxx58d77"
}

このままではpandasで解析しにくいので、pandasのjson_normalizeを用いてJSONのキーごとにDataFrameの列に分割します。

json_to_columns.py - message列のJSONをjson_normalizeで列分割
import pandas as pd
import json

df_waf = pd.json_normalize(df_waf_raw.loc[:, "message"].apply(lambda x: json.loads(x)))

print(df_waf.shape)  # (1338, 25)

新たなDataFrameは、25列になっていることがわかりました。

列名を出力して確認すると、以下のような情報が列に分割されていることがわかりました。

print(df_waf.columns)

Index(['timestamp', 'formatVersion', 'webaclId', 'terminatingRuleId',
       'terminatingRuleType', 'action', 'terminatingRuleMatchDetails',
       'httpSourceName', 'httpSourceId', 'ruleGroupList', 'rateBasedRuleList',
       'nonTerminatingMatchingRules', 'requestHeadersInserted',
       'responseCodeSent', 'ja3Fingerprint', 'httpRequest.clientIp',
       'httpRequest.country', 'httpRequest.headers', 'httpRequest.uri',
       'httpRequest.args', 'httpRequest.httpVersion', 'httpRequest.httpMethod',
       'httpRequest.requestId', 'requestBodySize',
       'requestBodySizeInspectedByWAF'],
      dtype='object')

先頭の2行を出力します。

df_waf.head(2)
df_waf.head(2)の出力 1
df_waf.head(2)の出力1

ここまでで、AWS WAF の CloudWatch ログを pandas の DataFrame に変換することができました。

message内の必要な情報を抽出

ここでは、DataFrameを加工して解析を行うための準備を行います。

最初に、時系列データを扱うために、timestamp列を使ってインデックスに「aware(タイムゾーン付き⁠⁠」な日時を設定します。元のデータは、UTC時間でタイムゾーンなしの日時がtimestamp列に入っています。DataFrameのdtアクセッサーからto_localizeメソッドを呼び出し、awareに変換します。さらに、dtアクセッサーからtz_convertを使ってJSTに変換しています。

df_waf.index = df_waf_raw.iloc[:, 0].dt.tz_localize("UTC").dt.tz_convert("Asia/Tokyo")

データの先頭2行を出力して、インデックスがタイムゾーン付きのDataFrameになっていることを確認します。

df_waf.head(2)
df_waf.head(2)の出力 2
df_waf.head(2)の出力2

次に、分析に利用したい列を抽出して、新たなDataFrameを生成します。

今回は、WAFのログから以下を抽出します。

  • action:WAFの動作(ALLOW / BLOCK)
  • terminatingRuleId:WAFのルール
  • httpRequest.clientIp:接続元IPアドレス
  • httpRequest.uri:リクエストURL
df = df_waf.loc[:, ["action", "terminatingRuleId", "httpRequest.clientIp", "httpRequest.uri"]]
df.head(2)の出力結果
df.head(2)の出力結果

boolインデックスで目的の件数を調査

ここでは、WAFの動作でBLOCKされた件数を調べたいと思います。

action列に、ALLOWまたはBLOCKが記録されています。BLOCKは何かしらのルールに従って、WAF(ファイアウォール)がアクセスを拒否したものです。BLOCKされた件数が多いということは、何かしらの攻撃を受けているか、ルールが厳しすぎてアクセスできない場合が多いことを示します。

df_blocked = df.loc[df.loc[:, "action"] == "BLOCK", :]
print(df_blocked.shape)  # (4, 4)

今回は、10分間に4件のBLOCKが発生したことがわかりました。pandasのboolインデックスの機能を使い、action列がBLOCKのものを抽出したdf_blockedを作りました。

df_blockedの出力結果
df_blockedの出力結果

適切にBLOCKされているのであれば問題ありませんが、本来アクセスできるはずなのに、何かしらの要因でBLOCKされたものを調べたい時には、このようにBLOCKされているログのみを抽出します。抽出したログを参照して、適用されたWAFのルール、接続元IPアドレスやURLなどを確認することで、原因を特定していきます。

今回はBLOCKされた件数が4件と少なかったので、全データを確認することができました。件数が多い場合や傾向を知りたい場合は、次のセクションの「ログの集計と可視化、分析方法」を参照してください。

ログの集計と可視化⁠分析方法

ここでは、pandasを利用してログからBLOCKされた件数を集計したり、可視化する方法について解説します。

1分単位でBLOCKされた状況を確認

今回は10分間のログデータを使っているので、1分単位でALLOW/BLOCKの件数を確認します。

df["round_min"] = df.index.round("min")  # Indexの日時から1分に丸めた列を作る
print(df.groupby(["round_min", "action"]).size())

出力結果は以下のようになりました。

round_min                  action
2023-12-04 17:55:00+09:00  ALLOW      10
2023-12-04 17:56:00+09:00  ALLOW     277
2023-12-04 17:57:00+09:00  ALLOW     395
2023-12-04 17:58:00+09:00  ALLOW     140
2023-12-04 17:59:00+09:00  ALLOW     270
                           BLOCK       4
2023-12-04 18:00:00+09:00  ALLOW     106
2023-12-04 18:01:00+09:00  ALLOW      20
2023-12-04 18:02:00+09:00  ALLOW      61
2023-12-04 18:03:00+09:00  ALLOW      20
2023-12-04 18:04:00+09:00  ALLOW      25
2023-12-04 18:05:00+09:00  ALLOW      10
dtype: int64

17:59の前後に4件のBLOCKが発生していることがわかります。

pandasのplot機能(内部的にはMatplotlibが使われている)を使い可視化します。

df.groupby(["round_min", "action"]).size().unstack().plot(kind="bar", stacked=True)
1分ごとのALLOW/BLOCKの可視化
1分ごとのALLOW/BLOCKの可視化

可視化することで、どのタイミングでBLOCKが発生しているのかが一目瞭然となりました。

BLOCKされたIPアドレスの確認

BLOCKされたIPアドレスを抽出し、それぞれの件数を確認します。

print(df_blocked.loc[:, "httpRequest.clientIp"].value_counts())

結果は以下の通りです(IPアドレスの一部をxxxでマスクしています⁠⁠。

httpRequest.clientIp
52.xxx.xxx.247                          2
1.xxx.xxx.213                            1
240b:11:600:xxxx:xxxx:xxxx:xxxx:75e4    1
Name: count, dtype: int64

3ヵ所のIPアドレスからアクセスがあり、1つは2回アクセスがあり、他は1回のみのアクセスとなっていました。この結果から同じ場所から大量にアクセスされていることはなく、何かしらの間違いでこのURLにアクセスされたのではないかと予想できます。

また、1つのIPアドレスはIPv6となっていました。アクセス制御をIPv4で行っている場合、接続元のネットワークがIPv6をサポートしていて想定していない接続ルートになっている可能性があると考えられます。

BLOCKされた前後のログを確認する

ここでは、IPアドレス52.xxx.xxx.247からのアクセスでBLOCKされた、17:58:45前後のログを抽出します。

df_one_ip = df.loc[df.loc[:, "httpRequest.clientIp"] == "52.xxx.xxx.247", :]
df_one_ip.loc[
  "2023-12-04 17:58:40+09:00":"2023-12-04 17:58:50+09:00",
  ["action", "httpRequest.clientIp", "httpRequest.uri"]
]
指定のIPアドレスでBLOCKされた前後
指定のIPアドレスでBLOCKされた前後

URLの一部をマスクしています。

前後のURLを見たところ、不正にアクセスしている様子は見受けられず、なにかしらの原因でloginに関係するURLへのアクセスがされた模様です。URLを見て正常かどうかを判断するには、サイトの特性やどのようなアクセスが正常なのかを知っている必要があります。

Lambdaから出力されるログを解析

ここからはAWS Lambdaの動作ログを確認します。定期実行されるLambdaのログをCloudWatch Logsで確認します。構成は以下の通りです。

LambdaのCloudWatch Logsのデータを読み込む概念図
LambdaのCloudWatch Logsのデータを読み込む

AWS Webコンソールでログを確認すると、以下のように表示されます。WAFの時はJSON形式で出力されましたが、Lambdaはテキスト形式でログ出力されます。

LambdaのCloudWatch Logsのコンソール表示
LambdaのCloudWatch Logsのコンソール表示

Lambdaの起動時に出力されるログと、コード内で出力しているログが混ざって出力されます。

ここでは、メッセージ(message)内に「error」「warning」という文字列の件数をそれぞれ確認します。AWS WAFで行ったのと同じように必要なライブラリをインポートし、期間を指定して取得します。今回は、Lambdaが東京リージョンで動いているので、region_nameを変更しています。

次のコードでCloudWatch Logsからデータを取得し、DataFrame化します。

lambda-logs.py - Lambdaのログを取得してDataFrame化
import os
from datetime import datetime
from zoneinfo import ZoneInfo
import awswrangler as wr
import boto3


ASIA_TOKYO = ZoneInfo("Asia/Tokyo")
start = datetime(2023, 12, 4, 9, 0, 0, tzinfo=ASIA_TOKYO)
end = datetime(2023, 12, 4, 9, 15, 0, tzinfo=ASIA_TOKYO)
limit = 1000
region_name = "ap-northeast-1"  # Lmabdaが実行されるリージョン 今回は東京
log_name = "/aws/lambda/dummy-xxxx-xxxxx"

boto3_session = boto3.session.Session(
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY"),
    aws_secret_access_key=os.getenv("AWS_SECRET_KEY"),
    region_name=region_name,
)

df_lambda_raw = wr.cloudwatch.read_logs(
    log_group_names=[log_name],
    query="fields @timestamp, @message | sort @timestamp asc",
    limit=limit,
    start_time=start,
    end_time=end,
    boto3_session=boto3_session,
)

print(df_lambda_raw.shape)  # (37, 3)

AWS WAFと同様に、Indexにawareな日時を設定します。

df = df_lambda_raw.loc[:, ["message"]]
df.index = df_lambda_raw.iloc[:, 0].dt.tz_localize("UTC").dt.tz_convert("Asia/Tokyo")

先頭の5件を表示すると以下のようになります。

LambdaのログをDataFrameに変換
LambdaのログをDataFrameに変換

指定の文字列が含まれる件数を調べる

「error」「warning」という文字列の件数を出力します。

print(df.loc[:, "message"].str.contains("error", case=False).sum())  # 出力結果「0件」
print(df.loc[:, "message"].str.contains("warning", case=False).sum())  # 出力結果「2件」

今回の例だと、⁠error」は0件、⁠warning」は2件ありました。pandasのDataFrameの.strアクセサを使って、contains()メソッドで指定の文字列が含まれている行を抽出しています。その際に、case=Falseとし、大文字・小文字を区別しないようにしています。contains()メソッドの戻り値は、以下のようにboolインデックスとなります。

print(df.loc[:, "message"].str.contains("warning", case=False).iloc[:3])
timestamp
2023-12-04 09:05:02.715000+09:00    False
2023-12-04 09:05:06.623000+09:00     True
2023-12-04 09:05:06.623000+09:00     True
Name: message, dtype: boolean

今回は件数を知りたかったので、sum()メソッドを使いTrueの数を出力しました。

メッセージ内の文字列の表示

今回のデータは、warningが2件含まれていました。この内容を確認していきます。

df.loc[df.loc[:, "message"].str.contains("warning", case=False), "message"]
timestamp
2023-12-04 09:05:06.623000+09:00    /var/task/lxxxxxxxn/__init__.py:34: UserWarnin...
2023-12-04 09:05:06.623000+09:00                                      warnings.warn(

Name: message, dtype: string

2行のデータを、先ほどcontains()メソッドで確認したboolインデックスを使い抽出しました。

ここでは、1件目のデータの文字列を確認します。

print(df.loc[df.loc[:, "message"].str.contains("warning", case=False), "message"].iloc[0])
/var/task/lxxxxxxxn/__init__.py:34: UserWarning: Importing xxxxxxxx from xxxxxxxx root module is no longer supported. Please use xxxxxx.xxx.xxxxxxxxxx instead.

内容はマスクしましたが、利用できなくなっているモジュールがあるので別の物を利用するように警告が出ていました。

まとめ

今月は、AWS SDK for pandas(awswrangler)を使って、CloudWach Logsで出力されたログを、pandasでDataFrame化して解析する例を紹介しました。

pandasは時系列データを取り扱う機能が豊富に揃っています。AWSのツールでログを解析する方法もありますが、Pythonやpandasに慣れている方からするとさまざまなデータの取扱ができ、可視化や自動化など応用範囲が広がるのではないかと思っています。

みなさんもpandasでのログの分析に挑戦してみましょう。

おすすめ記事

記事・ニュース一覧