寺田 学
目的・モチベーション
AWSでサーバの運用を行っていると、さまざまなログがCloudWatch Logsに保存されます。保存されたログから、動作状況の確認やエラーなどを特定、傾向を掴むために外部のツールと組み合わせることがあります。
pandasというPythonでデータ分析を行っている方には馴染みのあるツールを使えば、ログを処理できます。さらに、Pythonコードで実行できることで、定期的な実行もできます。
本記事を読む上での予備知識
実際の加工や可視化を行う前に、本記事の前提となる用語の解説やツールの説明をします。
AWS CloudWatch Logsとは
パブリッククラウドであるAmazon Web Service
よくある分析方法
CloudWatch Logsでよくあるログ分析方法を説明します。
- Webコンソールで見る
- CloudWatch Logsでは、AWS Webコンソール経由でほぼリアルタイムでログを閲覧できたり、フィルタフィングしたりする機能があります。しかし、詳細な分析をするにはAWS Webコンソールでは不十分なことがあります。
- CloudWatch Logs Insights機能を使う
- Webコンソールから
「Logs Insights」 を利用することで、ある程度の分析や傾向を掴むことができます。 - S3にデータ転送し、複数ファイルをダウンロードして加工する
- AWSのストレージサービスである、
「S3」 にログデータを転送し、ローカルPCにダウンロードすることができます。ただし、何かしらのツールでデータをS3に転送する処理を実行する必要があります。また、CloudWatchのログは時間によって自動的に分割されるため、ひと手間作業しないとログをまとめて分析することができません。 - Amazon Athenaに連携して分析
- AWSのマネージドサービスである
「Amazon Athena」 を利用するという方法があります。常にログの監視を行っている場合には、導入されているケースもあるかと思います。追加の費用が必要だったり、事前の設定が必要というところがネックになる場合があります。
pandasでの時系列データの扱い
pandasはPythonのサードパーティ製パッケージで、データ分析を行う上でよく使われているライブラリです。pandasには、データ分析を行う上で豊富な機能が備わっています。特に
pandasでは時系列のIndexを付与したDataFrameが作成できるので、ログデータを分析しやすくすることができます。時系列データの処理ができるので、時間を指定しての検索や、時間ごとの統計的な分析、可視化ができます。
AWS CloudWatch Logsにおけるログデータの扱いが大変
CloudWatch Logsのデータは一般のログデータと違い、JSON形式で扱われることがあります。さらに、テキスト形式で扱われる場合もあり、その時にはログ内の改行によってレコードが分割されることがあります。また、ダウンロードしたファイルは複数ファイルに分かれていて、ダウンロード時には圧縮されていることがあり、多種多様な形式になっています。
Pythonの標準ライブラリを使ってログデータの解析を行うことはできますが、この記事ではCloudWatch Logsを簡単に扱うためのツールを紹介します。
AWS SDK for pandasのインストールと環境の準備
AWS SDK for pandasは、AWSが開発し公開しているオープンソースのライブラリで、CloudWatch LogsやAthena、S3データなどのAWSのリソースを、pandasで効率的に扱うことができます。
項目 | 内容 |
---|---|
ライブラリ名 | AWS SDK for pandas |
開発元 | AWS |
ライセンス | Apache License 2. |
Python バージョン | 3. |
公式サイト | https:// |
PyPI | https:// |
GitHub | https:// |
執筆時点のバージョン | 3. |
このツールでは、以下に対応しています
Athena, Glue, Redshift, Timestream, OpenSearch, Neptune, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL)
今回は、CloudWatch Logsの解析に着目して紹介します。
具体的には、CloudWatch Logsのログを読み込むawswrangler.
インストールと基本設定
インストール方法は以下の通りです。
pip install awswrangler
awswranglerをインストールすると、同時にpandasがインストールされます。また、AWS SDK for Pythonであるboto3も同時にインストールさます。
この記事で紹介するコードをJupyterLabで実行可能にし、可視化するためにグラフライブラリのMatplotlibもインストールしてください。JupyterLabはWebブラウザで対話型にプログラムを実行できる環境です。
pip install jupyterlab matplotlib
awswranglerでcloudwatch.
を使ってログを取得するには、AWSのクレデンシャル設定が必要です。
AWSのクレデンシャル設定の前に、AWS IAMにてCloudWatch Logsの読み取り権限を持つユーザを作ります。IAMユーザに幅広い権限を付けると不用意な操作ができるため、専用のユーザを作ることをお勧めします。
項目 | 内容 |
---|---|
ユーザ名 | gihyo-monthly-test |
マネジメントコンソール | ユーザーアクセスを提供しない |
ポリシー | CloudWatchLogsReadOnlyAccess |
アクセスキー | 作成する |
IAMユーザの作成方法の詳細は、IAM公式ドキュメント、または他の参考資料をご確認ください。
IAMユーザを作成する時に、
AWSプロファイルを手元のPCに設定する方法もありますが、今回はプロファイルを生成せずに環境変数でコードに渡す方法で説明します。AWSプロファイルの設定を行いたい場合は、AWS公式ドキュメント
環境変数への登録と確認
「アクセスキー」
$ export AWS_ACCESS_KEY=****************JUMP # 作成したアクセスキー $ export AWS_SECRET_KEY=**************************1C53n # 作成したアクセスキーののシークレット $ jupyter lab
環境変数を設定して、JupyterLabを立ち上げ、PythonのOSモジュールを使って環境変数の取得を行います。以下のように設定した
cloudwatch.read_logsの基本的な使い方
ここでは、cloudwatch.
を使ってAWS WAFのログを取得し、JSON形式のログをDataFrameの列に展開し、必要な項目を抽出します。
ログのグループ名、期間、クエリーを設定して取得
CloudFrontに設定したファイアウォール
WAFのログをCloudWatch Logsに保存するには、以下のようにAWSコンソールで設定をする必要があります。
サンプル用に、ロググループ名はaws-waf-logs-XXXXXXXXXXXXXX-cf
とします。なお、今回のログは、CloudFront関連のため、リージョン us-east-1に保存されています。
AWSコンソールでCloudWatch Logsを見ると以下のようになっています。
ここでは、PythonコードでCloudWatch Logsのデータを取得し、pandasのDataFrameに変換する方法を紹介します。
このコードでは、2023年12月4日18時の前後5分のログを取得しています。ログを取得した結果、1338件のログが記録されていることがわかりました。
ログの取得にfields @timestamp, @message | sort @timestamp asc
とクエリを設定しました。これは、timestamp
とmessage
のフィールドを取得し、timestamp
の昇順で取得する意味のクエリです。AWSコンソールで見た通り、CloudWatch Logsにはタイムスタンプ
CloudWatch Logsを検索するクエリ文法については、次の公式サイトを参照してください。
JSON形式のログを展開
AWS WAFのログは、message列がJSON形式で以下のように出力されています。
{
"timestamp": 1701680117244,
"formatVersion": 1,
"webaclId": "arn:aws:wafv2:us-east-1:227xxxxxx242:global/webacl/xxxxxxxxxxxxcf/baed1c94-xxxxxxxxxxxxxxxx-55de7",
"terminatingRuleId": "Default_Action",
"terminatingRuleType": "REGULAR",
"action": "ALLOW",
"terminatingRuleMatchDetails": [],
"httpSourceName": "CF",
"httpSourceId": "EOXXXXXXXXXXXXMW",
"ruleGroupList": [],
"rateBasedRuleList": [],
"nonTerminatingMatchingRules": [],
"requestHeadersInserted": null,
"responseCodeSent": null,
"httpRequest": {
"clientIp": "52.xxx.xxx.247",
"country": "JP",
"headers": [
{ "name": "Host", "value": "www.xxxxxxx.jp" },
{ "name": "Accept-Encoding", "value": "gzip, deflate" },
{ "name": "User-Agent", "value": "xxxxxxxxxxxxxx" },
{ "name": "x-forwarded-for", "value": "10.x.xx.210, 127.0.0.1" },
{ "name": "x-forwarded-host", "value": "10.x.xx.75" },
{ "name": "Accept", "value": "application/json" },
{ "name": "Connection", "value": "close" }
],
"uri": "/",
"args": "",
"httpVersion": "HTTP/1.1",
"httpMethod": "GET",
"requestId": "pWxlfUPbfcztylxxxxxxxxxxxxxxxxxxxxxxxxxxyd4ll_CYPDXA=="
},
"ja3Fingerprint": "398430069xxxxxxxxxxxxxxxxxxxxx58d77"
}
このままではpandasで解析しにくいので、pandasのjson_
を用いてJSONのキーごとにDataFrameの列に分割します。
新たなDataFrameは、25列になっていることがわかりました。
列名を出力して確認すると、以下のような情報が列に分割されていることがわかりました。
print(df_waf.columns)
Index(['timestamp', 'formatVersion', 'webaclId', 'terminatingRuleId',
'terminatingRuleType', 'action', 'terminatingRuleMatchDetails',
'httpSourceName', 'httpSourceId', 'ruleGroupList', 'rateBasedRuleList',
'nonTerminatingMatchingRules', 'requestHeadersInserted',
'responseCodeSent', 'ja3Fingerprint', 'httpRequest.clientIp',
'httpRequest.country', 'httpRequest.headers', 'httpRequest.uri',
'httpRequest.args', 'httpRequest.httpVersion', 'httpRequest.httpMethod',
'httpRequest.requestId', 'requestBodySize',
'requestBodySizeInspectedByWAF'],
dtype='object')
先頭の2行を出力します。
df_waf.head(2)
ここまでで、AWS WAF の CloudWatch ログを pandas の DataFrame に変換することができました。
message内の必要な情報を抽出
ここでは、DataFrameを加工して解析を行うための準備を行います。
最初に、時系列データを扱うために、timestamp
列を使ってインデックスにtimestamp
列に入っています。DataFrameのdt
アクセッサーからto_
メソッドを呼び出し、awareに変換します。さらに、dt
アクセッサーからtz_
を使ってJSTに変換しています。
df_waf.index = df_waf_raw.iloc[:, 0].dt.tz_localize("UTC").dt.tz_convert("Asia/Tokyo")
データの先頭2行を出力して、インデックスがタイムゾーン付きのDataFrameになっていることを確認します。
df_waf.head(2)
次に、分析に利用したい列を抽出して、新たなDataFrameを生成します。
今回は、WAFのログから以下を抽出します。
- action:WAFの動作
(ALLOW / BLOCK) - terminatingRuleId:WAFのルール
- httpRequest.
clientIp:接続元IPアドレス - httpRequest.
uri:リクエストURL
df = df_waf.loc[:, ["action", "terminatingRuleId", "httpRequest.clientIp", "httpRequest.uri"]]
boolインデックスで目的の件数を調査
ここでは、WAFの動作でBLOCK
された件数を調べたいと思います。
action列に、ALLOWまたはBLOCKが記録されています。BLOCKは何かしらのルールに従って、WAF
df_blocked = df.loc[df.loc[:, "action"] == "BLOCK", :]
print(df_blocked.shape) # (4, 4)
今回は、10分間に4件のBLOCKが発生したことがわかりました。pandasのboolインデックスの機能を使い、action
列がBLOCK
のものを抽出したdf_
を作りました。
適切にBLOCKされているのであれば問題ありませんが、本来アクセスできるはずなのに、何かしらの要因でBLOCKされたものを調べたい時には、このようにBLOCK
されているログのみを抽出します。抽出したログを参照して、適用されたWAFのルール、接続元IPアドレスやURLなどを確認することで、原因を特定していきます。
今回はBLOCK
された件数が4件と少なかったので、全データを確認することができました。件数が多い場合や傾向を知りたい場合は、次のセクションの
ログの集計と可視化、分析方法
ここでは、pandasを利用してログからBLOCK
された件数を集計したり、可視化する方法について解説します。
1分単位でBLOCKされた状況を確認
今回は10分間のログデータを使っているので、1分単位でALLOW/
df["round_min"] = df.index.round("min") # Indexの日時から1分に丸めた列を作る
print(df.groupby(["round_min", "action"]).size())
出力結果は以下のようになりました。
round_min action 2023-12-04 17:55:00+09:00 ALLOW 10 2023-12-04 17:56:00+09:00 ALLOW 277 2023-12-04 17:57:00+09:00 ALLOW 395 2023-12-04 17:58:00+09:00 ALLOW 140 2023-12-04 17:59:00+09:00 ALLOW 270 BLOCK 4 2023-12-04 18:00:00+09:00 ALLOW 106 2023-12-04 18:01:00+09:00 ALLOW 20 2023-12-04 18:02:00+09:00 ALLOW 61 2023-12-04 18:03:00+09:00 ALLOW 20 2023-12-04 18:04:00+09:00 ALLOW 25 2023-12-04 18:05:00+09:00 ALLOW 10 dtype: int64
17:59の前後に4件のBLOCKが発生していることがわかります。
pandasのplot機能
df.groupby(["round_min", "action"]).size().unstack().plot(kind="bar", stacked=True)
可視化することで、どのタイミングでBLOCKが発生しているのかが一目瞭然となりました。
BLOCKされたIPアドレスの確認
BLOCKされたIPアドレスを抽出し、それぞれの件数を確認します。
print(df_blocked.loc[:, "httpRequest.clientIp"].value_counts())
結果は以下の通りです
httpRequest.clientIp 52.xxx.xxx.247 2 1.xxx.xxx.213 1 240b:11:600:xxxx:xxxx:xxxx:xxxx:75e4 1 Name: count, dtype: int64
3ヵ所のIPアドレスからアクセスがあり、1つは2回アクセスがあり、他は1回のみのアクセスとなっていました。この結果から同じ場所から大量にアクセスされていることはなく、何かしらの間違いでこのURLにアクセスされたのではないかと予想できます。
また、1つのIPアドレスはIPv6となっていました。アクセス制御をIPv4で行っている場合、接続元のネットワークがIPv6をサポートしていて想定していない接続ルートになっている可能性があると考えられます。
BLOCKされた前後のログを確認する
ここでは、IPアドレス52.
からのアクセスでBLOCKされた、17:58:45前後のログを抽出します。
df_one_ip = df.loc[df.loc[:, "httpRequest.clientIp"] == "52.xxx.xxx.247", :]
df_one_ip.loc[
"2023-12-04 17:58:40+09:00":"2023-12-04 17:58:50+09:00",
["action", "httpRequest.clientIp", "httpRequest.uri"]
]
URLの一部をマスクしています。
前後のURLを見たところ、不正にアクセスしている様子は見受けられず、なにかしらの原因でlogin
に関係するURLへのアクセスがされた模様です。URLを見て正常かどうかを判断するには、サイトの特性やどのようなアクセスが正常なのかを知っている必要があります。
Lambdaから出力されるログを解析
ここからはAWS Lambdaの動作ログを確認します。定期実行されるLambdaのログをCloudWatch Logsで確認します。構成は以下の通りです。
AWS Webコンソールでログを確認すると、以下のように表示されます。WAFの時はJSON形式で出力されましたが、Lambdaはテキスト形式でログ出力されます。
Lambdaの起動時に出力されるログと、コード内で出力しているログが混ざって出力されます。
ここでは、メッセージregion_
を変更しています。
次のコードでCloudWatch Logsからデータを取得し、DataFrame化します。
AWS WAFと同様に、Indexにawareな日時を設定します。
df = df_lambda_raw.loc[:, ["message"]]
df.index = df_lambda_raw.iloc[:, 0].dt.tz_localize("UTC").dt.tz_convert("Asia/Tokyo")
先頭の5件を表示すると以下のようになります。
指定の文字列が含まれる件数を調べる
「error」
print(df.loc[:, "message"].str.contains("error", case=False).sum()) # 出力結果「0件」
print(df.loc[:, "message"].str.contains("warning", case=False).sum()) # 出力結果「2件」
今回の例だと、.str
アクセサを使って、contains()
メソッドで指定の文字列が含まれている行を抽出しています。その際に、case=False
とし、大文字・contains()
メソッドの戻り値は、以下のようにboolインデックス
となります。
print(df.loc[:, "message"].str.contains("warning", case=False).iloc[:3])
timestamp 2023-12-04 09:05:02.715000+09:00 False 2023-12-04 09:05:06.623000+09:00 True 2023-12-04 09:05:06.623000+09:00 True Name: message, dtype: boolean
今回は件数を知りたかったので、sum()
メソッドを使いTrue
の数を出力しました。
メッセージ内の文字列の表示
今回のデータは、warning
が2件含まれていました。この内容を確認していきます。
df.loc[df.loc[:, "message"].str.contains("warning", case=False), "message"]
timestamp 2023-12-04 09:05:06.623000+09:00 /var/task/lxxxxxxxn/__init__.py:34: UserWarnin... 2023-12-04 09:05:06.623000+09:00 warnings.warn( Name: message, dtype: string
2行のデータを、先ほどcontains()
メソッドで確認したboolインデックスを使い抽出しました。
ここでは、1件目のデータの文字列を確認します。
print(df.loc[df.loc[:, "message"].str.contains("warning", case=False), "message"].iloc[0])
/var/task/lxxxxxxxn/__init__.py:34: UserWarning: Importing xxxxxxxx from xxxxxxxx root module is no longer supported. Please use xxxxxx.xxx.xxxxxxxxxx instead.
内容はマスクしましたが、利用できなくなっているモジュールがあるので別の物を利用するように警告が出ていました。
まとめ
今月は、AWS SDK for pandas
pandasは時系列データを取り扱う機能が豊富に揃っています。AWSのツールでログを解析する方法もありますが、Pythonやpandasに慣れている方からするとさまざまなデータの取扱ができ、可視化や自動化など応用範囲が広がるのではないかと思っています。
みなさんもpandasでのログの分析に挑戦してみましょう。