Kedro + MLFlowでMLOpsを導入してみた
1. 背景
データフローパイプラインとモデルの実験管理を行えるようなMLOpsパイプラインを作成したいと思い、調べているとKedro+MLflowでできることがわかりました。実際にKedroとMLFlowを使ってパイプラインを作って実験管理をしてみたので共有します。
2. Kedro
Kedroとは?
Kedroは、Pythonでのデータパイプライン開発を容易にするために設計されたオープンソースのフレームワークです。Kedroを使用することで、データサイエンティストやエンジニアは、高品質のデータパイプラインを迅速に構築できます。
Kedroの特徴
- データセットの抽象化:Kedroは、さまざまな種類のデータソース(CSV、JSON、データベースなど)に対応するために、データセットと呼ばれる抽象化レイヤーを提供しています。これにより、データセットを簡単に交換し、再利用することができます。
- パイプラインの可視化:Kedroは、パイプラインの可視化機能を提供しています。これにより、データの流れを理解しやすくなり、パイプラインのデバッグや最適化が容易になります。
- テストの自動化:Kedroは、pytestと統合されており、自動化されたユニットテストと統合テストを実行することができます。これにより、コンポーネントやパイプラインが正常に動作することを確認できます。
- モジュール性:Kedroは、コンポーネントを再利用可能な形で開発することを容易にします。これにより、コンポーネントの再利用や、パイプラインのカスタマイズが簡単になります。
Kedro公式ページ:
3. MLFlow
MLflowとは?
MLflowは、機械学習の開発ライフサイクルにおいて発生する問題を解決するために作られたプラットフォームです。機械学習の開発ライフサイクルは、データの前処理や特徴量の選択、モデルのトレーニング、モデルのデプロイなど、多岐にわたります。このような開発ライフサイクルにおいては、モデルの開発や実行に関する様々な情報を追跡し、管理する必要があります。MLflowは、このような情報を効率的に管理するためのプラットフォームであり、以下のような機能を提供します。
MLflowの機能
- トラッキング:モデルのトレーニングにおける実行結果やメトリック、パラメータ、入出力をトラッキングするための機能です。
- プロジェクト:モデルのコードや依存ライブラリを管理するための機能です。
- モデルのバージョニング:モデルのバージョン管理を行うための機能です。
- モデルのデプロイ:モデルのデプロイに関する機能です。
MLflowの使い方
MLflowを使用するためには、以下の手順を実行します。
- MLflowをインストールする。
- MLflowトラッキングサーバーを起動する。
- MLflowのAPIを使用して、トラッキング情報を保存する。
- 保存したトラッキング情報を、MLflow UIを使用して閲覧する。
MLFlow公式ページ:
4. ライブラリ
kedroを使用する際は、gitをインストールをしておく必要がある。
以下が使用したライブラリのバージョンです。
python == 3.8.16
numpy == 1.23.5
pandas == 1.5.3
scikit-learn == 1.2.1
kedro == 0.18.5
mlflow == 2.1.1
5. Kedro + MLFlowでパイプライン作成、実験管理をおこなってみる
やりたいこと
ボストンデータセットを使用し、PLS(部分最小二乗法)を使って前処理〜学習までのデータフローパイプラインを作成する。パイプラインのなかでPLSモデルのハイパーパラメータを振って学習させ、その結果をMLFLowに保存して管理する。
データ準備
あらかじめ、以下のスクリプトを実行しデータセットを準備しておく。
import pandas as pd
california_housing = fetch_california_housing()
train_x = pd.DataFrame(california_housing.data, columns=california_housing.feature_names)
train_y = pd.DataFrame(california_housing.target, columns=california_housing.target_names)
raw_df = pd.concat([train_x, train_y], axis=1)
raw_df.to_csv('boston_house_prices.csv')
データフローパイプラインの作成
続いて、以下のコマンドを実行し、kedroパイプラインの雛形を作成します。名前をきかれるのでboston-plsとします。
kedro new
実行すると、以下のディレクトリが作成されます。
./boston-pls/
├── README.md
├── conf
│ ├── README.md
│ ├── base
│ └── local
├── data
│ ├── 01_raw
│ ├── 02_intermediate
│ ├── 03_primary
│ ├── 04_feature
│ ├── 05_model_input
│ ├── 06_models
│ ├── 07_model_output
│ └── 08_reporting
├── docs
│ └── source
├── logs
│ ├── errors.log
│ └── info.log
├── notebooks
├── pyproject.toml
├── setup.cfg
└── src
├── boston_pls
├── requirements.txt
├── setup.py
└── tests
続いて、あらかじめ出力しておいた今回使用するcsvファイルを./data/01_rawに格納し、./conf/base/catalog.ymlに以下のスクリプトを追加することでcsvファイルの読み込みがパイプラインに組み込まれます。
boston:
type: pandas.CSVDataSet
filepath: data/01_raw/boston_house_prices.csv
これにより、bostonという名前のデータセットが定義され、PandasのCSVDataSetを使用して、data/01_raw/boston_house_prices.csvファイルを読み込むことができます。
次に以下のコマンドを実行し、前処理用のパイプラインを定義するdata_processingファイルが作成されます。
kedro pipeline create data_processing
すると、.boston-pls/src/boston_pls/に以下のファイルが作成されます。
.boston-pls/src/boston_pls/
├── pipelines
│ ├── __init__.py
│ ├── data_processing
│ │ ├── README.md
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ ├── nodes.py
│ │ └── pipeline.py
nodes.pyにデータ前処理用の関数を以下のように定義します。
import pandas as pd
from sklearn.model_selection import train_test_split
def parse_data(raw_df):
data = raw_df.drop(['MedHouseVal'], axis=1)
label = raw_df['MedHouseVal']
return data, label
def split_dataset(data, label):
data_train, data_eval, label_train, label_eval = train_test_split(data, label)
data_fit, data_val, label_fit, label_val = train_test_split(data_train, label_train)
return data_fit, label_fit, data_val, label_val, data_eval, label_eval
続いて、pipeline.pyに以下のスクリプトを書きます。これによりnodes.pyで定義した関数を繋いでパイプラインが構築できる。
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import parse_data, split_dataset
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=parse_data,
inputs="boston",
outputs=["data", "label"],
name="parse_data",
),
node(
func=split_dataset,
inputs=["data", "label"],
outputs=["data_fit", "label_fit","data_val", "label_val", "data_eval", "label_eval"],
name="split_dataset",
),
]
)
続いて、以下のコマンドを実行し、モデル学習用のパイプラインを定義するdata_scienceファイルが作成されます。
kedro pipeline create data_science
すると、.boston-pls/src/boston_pls/に以下のファイルが作成されます。
.boston-pls/src/boston_pls/
├── pipelines
│ ├── __init__.py
│ ├── data_science
│ │ ├── README.md
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ ├── nodes.py
│ │ └── pipeline.py
nodes.pyに学習用の関数を以下のように定義します。
import mlflow
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import os
import shutil
import pickle
from sklearn.cross_decomposition import PLSRegression
from sklearn.metrics import r2_score
import seaborn as sns
def data_prepare(data_fit,label_fit, data_val, label_val, data_eval, label_eval):
data_fit_scaled = (data_fit - data_fit.min()) / (data_fit.max() - data_fit.min())
data_val_scaled = (data_val - data_fit.min()) / (data_fit.max() - data_fit.min())
data_eval_scaled = (data_eval - data_fit.min()) / (data_fit.max() - data_fit.min())
label_fit_scaled = (label_fit - label_fit.mean()) / label_fit.std()
label_val_scaled = (label_val - label_fit.mean()) / label_fit.std()
label_eval_scaled = (label_eval - label_fit.mean()) / label_fit.std()
return data_fit_scaled, data_val_scaled, data_eval_scaled, label_fit_scaled, label_val_scaled, label_eval_scaled
def train_model(data_fit_scaled, label_fit, label_fit_scaled, data_val_scaled, label_val, label_val_scaled):
experiment_id = mlflow.create_experiment('boston-pls')
max_pls_component_number = data_fit_scaled.shape[1]
pls_components = np.arange(1, min(np.linalg.matrix_rank(data_fit_scaled) + 1, max_pls_component_number + 1), 1)
with mlflow.start_run(experiment_id=experiment_id):
r2_train_all = list()
r2_val_all = list()
for pls_component in pls_components:
with mlflow.start_run(nested=True, experiment_id=experiment_id):
pls_model = PLSRegression(n_components=pls_component)
pls_model.fit(data_fit_scaled, label_fit_scaled)
train_pred_scaled = pls_model.predict(data_fit_scaled)
val_pred_scaled = pls_model.predict(data_val_scaled)
train_pred = train_pred_scaled * label_fit.std(ddof=1) + label_fit.mean()
val_pred = val_pred_scaled * label_fit.std(ddof=1) + label_fit.mean()
r2_train = r2_score(label_fit, train_pred)
r2_val = r2_score(label_val, val_pred)
r2_train_all.append(r2_train)
r2_val_all.append(r2_val)
""" MLFlow管理"""
# ハイパーパラメータ, 評価指標, 学習済みモデルをMLflowへ保存
mlflow.log_param("n_components", pls_component)
mlflow.log_metric("R2_train", r2_train)
mlflow.log_metric("R2_val", r2_val)
mlflow.sklearn.log_model(pls_model, "model")
optimal_pls_component_number = np.where(r2_val_all == np.max(r2_val_all))
optimal_pls_component_number = optimal_pls_component_number[0][0] + 1
best_model = PLSRegression(n_components=optimal_pls_component_number)
best_model.fit(data_fit_scaled, label_fit_scaled)
return experiment_id, r2_train_all, r2_val_all, best_model
def dump_study(experiment_id, r2_train_all, r2_val_all, best_model, data_fit, label_fit, data_val, label_val):
sns.set()
max_pls_component_number = data_fit.shape[1]
pls_components = np.arange(1, min(np.linalg.matrix_rank(data_fit) + 1, max_pls_component_number + 1), 1)
with mlflow.start_run(experiment_id = experiment_id):
os.makedirs("results", exist_ok=True)
plt.figure(figsize=(12,6))
plt.title('Boston-PLS')
plt.plot(pls_components, r2_train_all, label='train')
plt.plot(pls_components, r2_val_all,label='val')
plt.xlabel('N_components')
plt.ylabel('R2')
plt.legend()
plt.savefig(os.path.join("results", "r2_score.png"))
plt.show()
data_fit.to_csv(os.path.join("results", "train_feature.csv"))
label_fit.to_csv(os.path.join("results", "train_label.csv"))
data_val.to_csv(os.path.join("results", "val_feature.csv"))
label_val.to_csv(os.path.join("results", "val_label.csv"))
#pls-vip法による変数重要度の可視化
plt.figure(figsize=(12,6))
sel_var , vips = _pls_vip(best_model.x_weights_, best_model.y_loadings_.T, best_model.x_scores_ )
vips_df = pd.DataFrame(vips, index=data_fit.columns, columns=['vips']).sort_values('vips', ascending=False)
plt.barh(vips_df.index, vips_df['vips'])
plt.savefig(os.path.join("results", "vips.png"))
with open(os.path.join("results", "model.picke"), "wb") as f:
pickle.dump(best_model, f, protocol=2)
mlflow.log_artifacts("results", artifact_path="results")
shutil.rmtree("results")
def eval_model(experiment_id, best_model,label_fit, data_eval_scaled, label_eval):
with mlflow.start_run(experiment_id=experiment_id):
eval_pred_scaled = best_model.predict(data_eval_scaled)
eval_pred = eval_pred_scaled * label_fit.std(ddof=1) + label_fit.mean()
r2_eval = r2_score(label_eval, eval_pred)
mlflow.log_metric("R2_test_best_model", r2_eval)
#PLS特徴量重要度の可視化
def _pls_vip(W, D, T):
M, R = W.shape
weight = np.zeros([R])
vips = np.zeros([M])
ssy = np.diag(T.T @ T @ D @ D.T)
total_ssy = np.sum(ssy)
for m in range(M):
for r in range(R):
weight[r] = np.array([(W[m, r]/ np.linalg.norm(W[:, r]))**2])
vips[m] = np.sqrt(M*(ssy @ weight) / total_ssy)
sel_var = np.arange(M)
sel_var = sel_var[vips>=1]
return sel_var, vips
回帰モデルとしてPLSを採用し、パイプラインの中にPLSのハイパーパラメータである圧縮次元サイズを変えてモデルを作成し、圧縮次元サイズとモデル予測精度の関係をMLFlowに保存する機能を組み込む。
最後にpipeline.pyに以下のスクリプトを追加してパイプラインを完成させます。
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import *
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=data_prepare,
inputs=["data_fit","label_fit", "data_val", "label_val", "data_eval", "label_eval"],
outputs=["data_fit_scaled", "data_val_scaled", "data_eval_scaled", "label_fit_scaled", "label_val_scaled", "label_eval_scaled"],
name="data_prepare",
),
node(
func=train_model,
inputs=["data_fit_scaled", "label_fit", "label_fit_scaled", "data_val_scaled", "label_val", "label_val_scaled"],
outputs=["exp_id", "r2_train", "r2_val", "best_model"],
name="train_model",
),
node(
func=dump_study,
inputs=["exp_id", "r2_train", "r2_val", "best_model", "data_fit", "label_fit", "data_val", "label_val"],
outputs=None,
name="dump_study",
),
node(
func=eval_model,
inputs=["exp_id", "best_model","label_fit", "data_eval_scaled", "label_eval"],
outputs=None,
name="eval_model",
)
]
)
以上で準備は完了です。./boston-pls/src/boston-pls/で以下のコマンドを実行すると設定したパイプラインで学習が行われます。
パイプラインの実行
kedro run
特にエラーがなければ以下のログがコマンドライン上に表示され、完了します。
[02/28/23 23:31:43] INFO Kedro project boston-pls session.py:355
[02/28/23 23:31:45] INFO Loading data from 'boston' (CSVDataSet)... data_catalog.py:343
INFO Running node: parse_data: parse_data([boston]) -> [data,label] node.py:329
INFO Saving data to 'data' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'label' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'data' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'label' (MemoryDataSet)... data_catalog.py:343
INFO Running node: split_dataset: split_dataset([data,label]) -> node.py:329
[data_fit,label_fit,data_val,label_val,data_eval,label_eval]
INFO Saving data to 'data_fit' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'label_fit' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'data_val' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'label_val' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'data_eval' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'label_eval' (MemoryDataSet)... data_catalog.py:382
INFO Completed 2 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'data_fit' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'label_fit' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'data_val' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'label_val' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'data_eval' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'label_eval' (MemoryDataSet)... data_catalog.py:343
INFO Running node: data_prepare: node.py:329
data_prepare([data_fit,label_fit,data_val,label_val,data_eval,label_eval]) ->
[data_fit_scaled,data_val_scaled,data_eval_scaled,label_fit_scaled,label_val_scale
d,label_eval_scaled]
INFO Saving data to 'data_fit_scaled' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'data_val_scaled' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'data_eval_scaled' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'label_fit_scaled' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'label_val_scaled' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'label_eval_scaled' (MemoryDataSet)... data_catalog.py:382
INFO Completed 3 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'data_fit_scaled' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'label_fit' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'label_fit_scaled' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'data_val_scaled' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'label_val' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'label_val_scaled' (MemoryDataSet)... data_catalog.py:343
INFO Running node: train_model: node.py:329
train_model([data_fit_scaled,label_fit,label_fit_scaled,data_val_scaled,label_val,
label_val_scaled]) -> [exp_id,r2_train,r2_val,best_model]
[02/28/23 23:31:55] INFO Saving data to 'exp_id' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'r2_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'r2_val' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'best_model' (MemoryDataSet)... data_catalog.py:382
INFO Completed 4 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'exp_id' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'r2_train' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'r2_val' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'best_model' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'data_fit' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'label_fit' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'data_val' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'label_val' (MemoryDataSet)... data_catalog.py:343
INFO Running node: dump_study: node.py:329
dump_study([exp_id,r2_train,r2_val,best_model,data_fit,label_fit,data_val,label_va
l]) -> None
[02/28/23 23:31:56] INFO Completed 5 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'exp_id' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'best_model' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'label_fit' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'data_eval_scaled' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'label_eval' (MemoryDataSet)... data_catalog.py:343
INFO Running node: eval_model: node.py:329
eval_model([exp_id,best_model,label_fit,data_eval_scaled,label_eval]) -> None
INFO Completed 6 out of 6 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:93
INFO Loading data from 'label_eval_scaled' (MemoryDataSet)... data_catalog.py:343
また、./src/bosto-pls/にmlrunsというフォルダが作成されます。ここに.src/boston-pls/pipelines/data_processing/node.py中で定義したmlflow実験管理データ(モデルやr2スコア)が格納されます。
./boston-pls/mlruns/
├── 0
│ └── meta.yaml
└── 532802700296094184
├── 2af6eb9ad10b44b19f6b337f9542c66b
├── 30a58760b0cd4268b9ec2477f4f4cf22
├── 30f65f75498b4bdfb7a8d4f867e3c31b
├── 5c9cb141070e422eb899dfa36e6e0363
├── 6af57d0dfca4419989650f705f5d1f5d
├── 70d0e5a34596418ca1817924b4f49704
├── 748f57d1a2f24b1b9ca96ccbb6db7fde
├── cf9db618de3f471d9c376326bfd1e2c1
├── e98200c1a5df46889e2924628d938c89
├── ea47e333b8e9432ca892d32f6c5fb9e0
├── f67bad881cb841b4bb14dc7a170e6205
├── fc5c7e7f81d9460ea9da594a16ae659d
└── meta.yaml
デフォルトでは、実験IDは0に設定されており、指定しなければ実験IDが0のディレクトリの下にRUNIDが追加され、そのRUNIDに対応するモデルとそのモデルの設定ハイパーパラメータ、予測精度が保存されていく。
今回は、デフォルトの実験IDではなく、新しく実験ID(=bosto-pls)を定義しているため、それに紐づく532802700296094184というディレクトが作成され、その下にそれぞれのモデルのデータが保存されている。
./boston-pls/mlruns/532802700296094184/2af6eb9ad10b44b19f6b337f9542c66b/
├── artifacts
│ └── model
│ ├── MLmodel
│ ├── conda.yaml
│ ├── model.pkl
│ ├── python_env.yaml
│ └── requirements.txt
├── meta.yaml
├── metrics
│ ├── R2_train
│ └── R2_val
├── params
│ └── n_components
└── tags
├── mlflow.log-model.history
├── mlflow.parentRunId
├── mlflow.runName
├── mlflow.source.name
├── mlflow.source.type
└── mlflow.user
モデルはartifacts/modelに保存される。またモデルの精度、ハイパーパラメータはそれぞれmetrics/、params/に格納されていることが確認できる。
実験データを閲覧する
mlflowではこのmlrunsファイル内に保存されている実験結果をブラウザ上で確認できる。
./boston-pls/で以下のコマンドを実行します。
mlflow ui (--port 5000)
ポートを指定する場合は--port ポート番号を追加します。
すると以下のブラウザが立ち上がります。
初期起動時は左上のExperimentsのチェックがDefaultに入っているので、boston-plsに変更します。
plsのハイパーパラメータを変えて実験をおこなった結果がRunName:gregarious-hare-668以下に保存されているので展開し、さらにColumnsを開き、全てにチェックを入れます。
すると、各モデルに対する学習データ、テストデータr2スコアが表示されます。今回の場合、plsの圧縮サイズ(=n_componerts)は5が最もバリデーションデータに対する精度が良いことが確認できます。
さらに、Run Name内のリンクをクリックすると、その試行で設定したハイパーパラメータ(Paramtersタブ)、予測精度(Metricsタブ)、学習済みモデル等(Artifactsタブ)が格納されたページに遷移します。
Artifactsタブを展開すると、以下のように格納されている学習済モデルファイルを呼び出すpythonスクリプトが表示されます。
このスクリプトを実行することで、学習済みモデルを使用して回帰をすることができます。
6. 最後に
今回、kedro + MLFlowを使って機械学習モデル実験管理パイプラインを作ってみました。
MLOpsパイプライン(Kedro + MLFlow)作成の参考になれば幸いです。