メインコンテンツへスキップ

11時間のプロセスを37分に短縮した方法

データエンジニアリングにおけるプロセス最適化

ビッグデータ 学習 リーンマインド データエンジニアリング ブログ

はじめに

この記事は、ウリセスが書いた 5時間のプロセスを5分に短縮した方法 に非常によく似ていますが、文脈が異なります。ここではNodeJSのコードを最適化するのではなく、データエンジニアリングにおけるPythonの適用について扱っており、今回は 11時間かかっていたプロセスを37分に短縮 しました。まずは少し背景を説明させてください。

問題点(冬が来る)

現在、私たちは Apache Airflow というツールを使用してプロセスの自動化を管理しています。これを使うと、crontabでプロセスの実行スケジュールを設定でき、自動的に実行され、Pythonコードで定義されたタスクを実行します。

私たちの多くのプロセスのうちの1つは、プロバイダーからのデータ抽出と、その後ドメインに適応させるための変換です。このプロセスは通常 約5時間 かかっており、いくつかの理由である程度許容できるものでした:

  • 通常は夜間に自動で実行され、翌朝には完了している
  • 大量のデータを扱うため、すべての操作に時間がかかる
  • リアルタイムで必要なわけではないので、他のチームにとっては重要ではない

以下のスクリーンショットは、プロセスが時々失敗することがあるものの、確認と修正で問題なく安定していることを示しています:

proceso11horas-1

そして冬が到来

何らかの理由で、キューにさらに数百万件のレコードが追加され、実行時間が大幅に増加しました。具体的には、5時間から11時間に増加しました:

proceso11horas-2

これは管理不可能で持続不可能な状態になっていました。ほぼ半日かかるプロセスが、24時間で2回も実行されるのです…計算してみてください。

なぜこのようなことが起きたのか?

データ処理において定番のライブラリは Pandas で、とてもよく機能します。しかし、大規模なデータセットに対してはパフォーマンスが悪くなります。そこで登場するのが Dask です — 大量のデータを効率的に処理するために設計されています。

Daskは大規模データセットでPandasを上回る性能を発揮しますが、正しく使わないと扱いが難しい場合があります。ベストプラクティスを扱った書籍も多数あり、例えば Data Science at Scale with Python and Dask があります。

そのため、私たちはDaskの使い方に何か問題があるのではないかと考え、コードを見直すことにしました。

謎の謎

まず、プロセスが何をしているのかを確認しました:

3つのデータセットを収集し、1つに統合して、データの出所に基づいた優先リストに従って処理します。これを provider_rank_list と呼びます(覚えておいてください、後で戻ってきます)。

3つのソースデータセットは単一のファイルではなく、Daskが処理しやすいように分割されています。ここで気づいたのは、その分割が非常に小さいことです — 各1〜2MBの1028パーティション。これはDaskにとって致命的で、すべての利点が逆効果になってしまいます。

私たちが実装した改善の1つは、1028パーティションをより大きなファイルにまとめることでした(ただし、Daskが効率的に動作できるように分割は維持)。

ここで今日の重要なアドバイス:

Daskを使う場合、パーティションはデータ量に合わせて設定してください。あまりにも小さなパーティションが多すぎたり、大きすぎるパーティションが少なすぎたりしないようにしましょう。公式ドキュメントでは 約100MBのパーティション を推奨しています。

考え方を示すコード例は以下の通りです:

input_dataset = dask.read_dask(path_to_file)
dataset_to_process = input_dataset.repartition(partition_size=self._partition_size) # Fix input to ensure it doesn’t break anything
...
Lots of code doing other things...
...
result_dataframe = processed_dataset.repartition(partition_size=self._partition_size) # Fix output size in case it grew during processing

なぜファイル数ではなくパーティションサイズを設定するのか?

良い質問です。Daskには2つのパーティション方法があります:npartitionspartition_size

  • npartitions はパーティションの数を正確に固定します。
  • partition_size は各パーティションの最大サイズを固定し、Dask DataFrameの処理を強制します。これは一見デメリットのように思えますが、リソースと時間を多く消費するからです。しかし私たちの場合、処理は避けられず、保存時にPandasに変換されるため(DaskはPandasの上に構築されています)。

TL;DR: パーティションの再設定は難しかったので、多くのドキュメントを読み、最適な方法を選びました。

そして…この話はまだ終わっていません :(

パフォーマンスは改善したのでしょうか?はい。完璧だったのでしょうか?いいえ。プロセスはまだ数時間かかり、依然として実用的ではありませんでした。私たちはさらに解決策を探し続ける必要がありました…

そして、次のコード行を発見しました:

dataframe.set_index(provider_priority_column)

set_index は何をするのか? -> 指定したパラメータに基づいてDataFrameをソートします。この場合、プロバイダーに基づいてデータをフィルタリングしたいので、いくつかの数値に基づいてソートしたいのです。例えば:

{"cool_provider": 0, "nice_provider": 1, "not_so_cool_provider": 2}

以前、この方法を実装したときは、クールで実現可能に見えました…しかし、全くスケーラブルではありませんでした。データ量が増えるにつれて、Daskのソート性能は悪化しました。

ここまで読んだあなたは、非常に複雑な解決策、まるで魔法のレシピのような方法を期待するかもしれません…残念ながら、この解決策はコードとは関係なく、問題へのアプローチの仕方にあります。

解決策

前述の通り、このプロセスは3つのデータセットを取得し、priority_rank_list に基づいてデータを処理するために1つに結合します。私たちは、優先リストに基づいて結合(ほとんど瞬時に行える操作で、ほとんどリソースを消費しません)することを考えました。こうすることで、結果をソートする必要がなくなり、すでにソートされた状態で取得できるため、作業量も大幅に減ります。とてもシンプルです。コードで示すのはあまり意味がありませんが、結果がすべてを物語っています:

proceso11horas-3

さらに興味があれば、このアプローチを前のプロセスにも拡張しました。改善された時間を示す表は以下の通りです:

改善前

プロセス1プロセス2プロセス3合計時間
プロバイダー A約20分約8時間50分約11時間20時間10分
プロバイダー B約20分約3時間30分約11時間14時間50分
プロバイダー C約10分約7時間約11時間11時間17分

改善後

プロセス1プロセス2プロセス3合計時間
プロバイダー A約20分約1時間約1〜2時間3時間20分
プロバイダー B約20分約45分約1〜2時間3時間5分
プロバイダー C約10分約7分約1〜2時間2時間17分

結局のところ、実行時間を節約できただけでなく、このプロセスを実行するKubernetesのPodやAWSマシンのリソースも節約できました。つまり、費用も削減されました。そしてよく言われるように、一枚の写真は千の言葉に勝ります。嵐が静まった様子を示す最後の写真はこちらです: proceso11horas-4