594 KiB
Лабораторная 4¶
Датасет: Набор данных для анализа и прогнозирования сердечного приступа
import pandas as pd
from sklearn import set_config
set_config(transform_output="pandas")
df = pd.read_csv("csv\\heart_2022_no_nans.csv")
print(df.columns)
map_heart_disease_to_int = {'No': 0, 'Yes': 1}
TARGET_COLUMN_NAME_CLASSIFICATION = 'HadHeartAttack'
df[TARGET_COLUMN_NAME_CLASSIFICATION] = df[TARGET_COLUMN_NAME_CLASSIFICATION].map(map_heart_disease_to_int).astype('int32')
Классификация¶
Бизнес цель 1:¶
Предсказание сердечного приступа (HadHeartAttack) на основе других факторов.
Формируем выборки
from typing import Tuple
import pandas as pd
from pandas import DataFrame
from sklearn.model_selection import train_test_split
def split_stratified_into_train_val_test(
df_input,
stratify_colname="y",
frac_train=0.6,
frac_val=0.15,
frac_test=0.25,
random_state=None,
) -> Tuple[DataFrame, DataFrame, DataFrame, DataFrame, DataFrame, DataFrame]:
if frac_train + frac_val + frac_test != 1.0:
raise ValueError(
"fractions %f, %f, %f do not add up to 1.0"
% (frac_train, frac_val, frac_test)
)
if stratify_colname not in df_input.columns:
raise ValueError("%s is not a column in the dataframe" % (stratify_colname))
X = df_input # Contains all columns.
y = df_input[
[stratify_colname]
] # Dataframe of just the column on which to stratify.
# Split original dataframe into train and temp dataframes.
df_train, df_temp, y_train, y_temp = train_test_split(
X, y, stratify=y, test_size=(1.0 - frac_train), random_state=random_state
)
if frac_val <= 0:
assert len(df_input) == len(df_train) + len(df_temp)
return df_train, pd.DataFrame(), df_temp, y_train, pd.DataFrame(), y_temp
# Split the temp dataframe into val and test dataframes.
relative_frac_test = frac_test / (frac_val + frac_test)
df_val, df_test, y_val, y_test = train_test_split(
df_temp,
y_temp,
stratify=y_temp,
test_size=relative_frac_test,
random_state=random_state,
)
assert len(df_input) == len(df_train) + len(df_val) + len(df_test)
return df_train, df_val, df_test, y_train, y_val, y_test
X_train, X_val, X_test, y_train, y_val, y_test = split_stratified_into_train_val_test(
df, stratify_colname=TARGET_COLUMN_NAME_CLASSIFICATION, frac_train=0.80, frac_val=0, frac_test=0.20, random_state=9
)
display("X_train", X_train)
display("y_train", y_train)
display("X_test", X_test)
display("y_test", y_test)
null_values = df.isnull().sum()
print("Пропущенные значения по столбцам:")
print(null_values)
stat_summary = df.describe()
print("\nСтатистический обзор данных:")
print(stat_summary)
Формируем конвеер для классификации данных и проверка конвеера
from sklearn.compose import ColumnTransformer
from sklearn.discriminant_analysis import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
columns_to_drop = ['AgeCategory', 'Sex']
num_columns = [
column
for column in df.columns
if column not in columns_to_drop and df[column].dtype != "object"
]
cat_columns = [
column
for column in df.columns
if column not in columns_to_drop and df[column].dtype == "object"
]
num_imputer = SimpleImputer(strategy="median")
num_scaler = StandardScaler()
preprocessing_num = Pipeline(
[
("imputer", num_imputer),
("scaler", num_scaler),
]
)
cat_imputer = SimpleImputer(strategy="constant", fill_value="unknown")
cat_encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=False, drop="first")
preprocessing_cat = Pipeline(
[
("imputer", cat_imputer),
("encoder", cat_encoder),
]
)
features_preprocessing = ColumnTransformer(
verbose_feature_names_out=False,
transformers=[
("prepocessing_num", preprocessing_num, num_columns),
("prepocessing_cat", preprocessing_cat, cat_columns),
],
remainder="passthrough"
)
drop_columns = ColumnTransformer(
verbose_feature_names_out=False,
transformers=[
("drop_columns", "drop", columns_to_drop),
],
remainder="passthrough",
)
pipeline_end = Pipeline(
[
("features_preprocessing", features_preprocessing),
("drop_columns", drop_columns),
]
)
preprocessing_result = pipeline_end.fit_transform(X_train)
preprocessed_df = pd.DataFrame(
preprocessing_result,
columns=pipeline_end.get_feature_names_out(),
)
preprocessed_df
Формируем набор моделей
from sklearn import ensemble, linear_model, naive_bayes, neighbors, neural_network, tree
class_models = {
"logistic": {"model": linear_model.LogisticRegression()},
"ridge": {"model": linear_model.LogisticRegression(penalty="l2", class_weight="balanced")},
"decision_tree": {
"model": tree.DecisionTreeClassifier(max_depth=7, random_state=9)
},
"knn": {"model": neighbors.KNeighborsClassifier(n_neighbors=7)},
"naive_bayes": {"model": naive_bayes.GaussianNB()},
"gradient_boosting": {
"model": ensemble.GradientBoostingClassifier(n_estimators=210)
},
"random_forest": {
"model": ensemble.RandomForestClassifier(
max_depth=11, class_weight="balanced", random_state=9
)
},
"mlp": {
"model": neural_network.MLPClassifier(
hidden_layer_sizes=(7,),
max_iter=500,
early_stopping=True,
random_state=9,
)
},
}
Обучаем модели и тестируем их
import numpy as np
from sklearn import metrics
for model_name in class_models.keys():
print(f"Model: {model_name}")
model = class_models[model_name]["model"]
model_pipeline = Pipeline([("pipeline", pipeline_end), ("model", model)])
model_pipeline = model_pipeline.fit(X_train, y_train.values.ravel())
y_train_predict = model_pipeline.predict(X_train)
y_test_probs = model_pipeline.predict_proba(X_test)[:, 1]
y_test_predict = np.where(y_test_probs > 0.5, 1, 0)
class_models[model_name]["pipeline"] = model_pipeline
class_models[model_name]["probs"] = y_test_probs
class_models[model_name]["preds"] = y_test_predict
class_models[model_name]["Precision_train"] = metrics.precision_score(
y_train, y_train_predict
)
class_models[model_name]["Precision_test"] = metrics.precision_score(
y_test, y_test_predict
)
class_models[model_name]["Recall_train"] = metrics.recall_score(
y_train, y_train_predict
)
class_models[model_name]["Recall_test"] = metrics.recall_score(
y_test, y_test_predict
)
class_models[model_name]["Accuracy_train"] = metrics.accuracy_score(
y_train, y_train_predict
)
class_models[model_name]["Accuracy_test"] = metrics.accuracy_score(
y_test, y_test_predict
)
class_models[model_name]["ROC_AUC_test"] = metrics.roc_auc_score(
y_test, y_test_probs
)
class_models[model_name]["F1_train"] = metrics.f1_score(y_train, y_train_predict, average=None)
class_models[model_name]["F1_test"] = metrics.f1_score(y_test, y_test_predict, average=None)
class_models[model_name]["MCC_test"] = metrics.matthews_corrcoef(
y_test, y_test_predict
)
class_models[model_name]["Cohen_kappa_test"] = metrics.cohen_kappa_score(
y_test, y_test_predict
)
class_models[model_name]["Confusion_matrix"] = metrics.confusion_matrix(
y_test, y_test_predict
)
Матрица неточностей
from matplotlib import pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay
_, ax = plt.subplots(int(len(class_models) / 2), 2, figsize=(12, 10), sharex=False, sharey=False)
for index, key in enumerate(class_models.keys()):
c_matrix = class_models[key]["Confusion_matrix"]
disp = ConfusionMatrixDisplay(
confusion_matrix=c_matrix, display_labels=[f"No {TARGET_COLUMN_NAME_CLASSIFICATION}", TARGET_COLUMN_NAME_CLASSIFICATION]
).plot(ax=ax.flat[index])
disp.ax_.set_title(key)
plt.subplots_adjust(top=1, bottom=0, hspace=0.4, wspace=0.1)
plt.show()
Точность, полнота, верность (аккуратность), F-мера
class_metrics = pd.DataFrame.from_dict(class_models, "index")[
[
"Precision_train",
"Precision_test",
"Recall_train",
"Recall_test",
"Accuracy_train",
"Accuracy_test",
"F1_train",
"F1_test",
]
]
class_metrics.sort_values(
by="Accuracy_test", ascending=False
)
ROC-кривая, каппа Коэна, коэффициент корреляции Мэтьюса
class_metrics = pd.DataFrame.from_dict(class_models, "index")[
[
"Accuracy_test",
"F1_test",
"ROC_AUC_test",
"Cohen_kappa_test",
"MCC_test",
]
]
class_metrics.sort_values(by="ROC_AUC_test", ascending=False)
Лучшая модель
best_model = str(class_metrics.sort_values(by="MCC_test", ascending=False).iloc[0].name)
display(best_model)
Находим ошибки
preprocessing_result = pipeline_end.transform(X_test)
preprocessed_df = pd.DataFrame(
preprocessing_result,
columns=pipeline_end.get_feature_names_out(),
)
y_new_pred = class_models[best_model]["preds"]
error_index = y_test[y_test[TARGET_COLUMN_NAME_CLASSIFICATION] != y_new_pred].index.tolist()
display(f"Error items count: {len(error_index)}")
error_predicted = pd.Series(y_new_pred, index=y_test.index).loc[error_index]
error_df = X_test.loc[error_index].copy()
error_df.insert(loc=1, column="Predicted", value=error_predicted)
error_df.sort_index()
Пример использования модели (конвейера) для предсказания
model = class_models[best_model]["pipeline"]
example_id = 187130
test = pd.DataFrame(X_test.loc[example_id, :]).T
test_preprocessed = pd.DataFrame(preprocessed_df.loc[example_id, :]).T
display(test)
display(test_preprocessed)
result_proba = model.predict_proba(test)[0]
result = model.predict(test)[0]
real = int(y_test.loc[example_id].values[0])
display(f"predicted: {result} (proba: {result_proba})")
display(f"real: {real}")
Создаем гиперпараметры методом поиска по сетке
from sklearn.model_selection import GridSearchCV
optimized_model_type = 'random_forest'
random_state = 9
random_forest_model = class_models[optimized_model_type]["pipeline"]
param_grid = {
"model__n_estimators": [10, 50, 100],
"model__max_features": ["sqrt", "log2"],
"model__max_depth": [5, 7, 10],
"model__criterion": ["gini", "entropy"],
}
gs_optomizer = GridSearchCV(
estimator=random_forest_model, param_grid=param_grid, n_jobs=-1
)
gs_optomizer.fit(X_train, y_train.values.ravel())
gs_optomizer.best_params_
Обучение модели с новыми гиперпараметрами
optimized_model = ensemble.RandomForestClassifier(
random_state=42,
criterion="gini",
max_depth=5,
max_features="sqrt",
n_estimators=50,
)
result = {}
result["pipeline"] = Pipeline([("pipeline", pipeline_end), ("model", optimized_model)]).fit(X_train, y_train.values.ravel())
result["train_preds"] = result["pipeline"].predict(X_train)
result["probs"] = result["pipeline"].predict_proba(X_test)[:, 1]
result["preds"] = np.where(result["probs"] > 0.5, 1, 0)
result["Precision_train"] = metrics.precision_score(y_train, result["train_preds"])
result["Precision_test"] = metrics.precision_score(y_test, result["preds"])
result["Recall_train"] = metrics.recall_score(y_train, result["train_preds"])
result["Recall_test"] = metrics.recall_score(y_test, result["preds"])
result["Accuracy_train"] = metrics.accuracy_score(y_train, result["train_preds"])
result["Accuracy_test"] = metrics.accuracy_score(y_test, result["preds"])
result["ROC_AUC_test"] = metrics.roc_auc_score(y_test, result["probs"])
result["F1_train"] = metrics.f1_score(y_train, result["train_preds"])
result["F1_test"] = metrics.f1_score(y_test, result["preds"])
result["MCC_test"] = metrics.matthews_corrcoef(y_test, result["preds"])
result["Cohen_kappa_test"] = metrics.cohen_kappa_score(y_test, result["preds"])
result["Confusion_matrix"] = metrics.confusion_matrix(y_test, result["preds"])
Формирование данных для оценки старой и новой версии модели и сама оценка данных
optimized_metrics = pd.DataFrame(columns=list(result.keys()))
optimized_metrics.loc[len(optimized_metrics)] = pd.Series(
data=class_models[optimized_model_type]
)
optimized_metrics.loc[len(optimized_metrics)] = pd.Series(
data=result
)
optimized_metrics.insert(loc=0, column="Name", value=["Old", "New"])
optimized_metrics = optimized_metrics.set_index("Name")
optimized_metrics[
[
"Precision_train",
"Precision_test",
"Recall_train",
"Recall_test",
"Accuracy_train",
"Accuracy_test",
"F1_train",
"F1_test",
]
]
optimized_metrics[
[
"Accuracy_test",
"F1_test",
"ROC_AUC_test",
"Cohen_kappa_test",
"MCC_test",
]
]
_, ax = plt.subplots(1, 2, figsize=(10, 4), sharex=False, sharey=False
)
for index in range(0, len(optimized_metrics)):
c_matrix = optimized_metrics.iloc[index]["Confusion_matrix"]
disp = ConfusionMatrixDisplay(
confusion_matrix=c_matrix, display_labels=[f"No {TARGET_COLUMN_NAME_CLASSIFICATION}", TARGET_COLUMN_NAME_CLASSIFICATION]
).plot(ax=ax.flat[index])
plt.subplots_adjust(top=1, bottom=0, hspace=0.4, wspace=0.3)
plt.show()
Модель хорошо классифицировала объекты, которые относятся к "No HadHeartAttack" и "HadHeartAttack"
Регрессия¶
Бизнес цель 2:¶
Предсказание среднего количества часов сна в день (SleepTime) на основе других факторов.
Формируем выборки
df = pd.read_csv("csv\\heart_2022_no_nans.csv")
TARGET_COLUMN_NAME_REGRESSION = "SleepHours"
def split_into_train_test(
df_input: DataFrame,
target_colname: str,
frac_train: float = 0.8,
random_state: int = None,
) -> Tuple[DataFrame, DataFrame, DataFrame, DataFrame]:
if not (0 < frac_train < 1):
raise ValueError("Fraction must be between 0 and 1.")
if target_colname not in df_input.columns:
raise ValueError(f"{target_colname} is not a column in the DataFrame.")
X = df_input.drop(columns=[target_colname])
y = df_input[[target_colname]]
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=(1.0 - frac_train),
random_state=random_state
)
return X_train, X_test, y_train, y_test
X_train, X_test, y_train, y_test = split_into_train_test(
df,
target_colname=TARGET_COLUMN_NAME_REGRESSION,
frac_train=0.8,
random_state=42
)
display("X_train", X_train)
display("y_train", y_train)
display("X_test", X_test)
display("y_test", y_test)
def get_filtered_columns(df: DataFrame, no_numeric=False, no_text=False) -> list[str]:
"""
Возвращает список колонок по фильтру
"""
w = []
for column in df.columns:
if no_numeric and pd.api.types.is_numeric_dtype(df[column]):
continue
if no_text and not pd.api.types.is_numeric_dtype(df[column]):
continue
w.append(column)
return w
Выполним one-hot encoding, чтобы избавиться от категориальных признаков
cat_features = get_filtered_columns(df, no_numeric=True)
X_test = pd.get_dummies(X_test, columns=cat_features, drop_first=True)
X_train = pd.get_dummies(X_train, columns=cat_features, drop_first=True)
X_test
X_train
Определение перечня алгоритмов решения задачи регрессии
import math
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import PolynomialFeatures
models = {
"linear": {"model": linear_model.LinearRegression(n_jobs=-1)},
"linear_poly": {
"model": make_pipeline(
PolynomialFeatures(degree=2),
linear_model.LinearRegression(fit_intercept=False, n_jobs=-1),
)
},
"linear_interact": {
"model": make_pipeline(
PolynomialFeatures(interaction_only=True),
linear_model.LinearRegression(fit_intercept=False, n_jobs=-1),
)
},
"ridge": {"model": linear_model.RidgeCV()},
"decision_tree": {
"model": tree.DecisionTreeRegressor(max_depth=7, random_state=random_state)
},
"knn": {"model": neighbors.KNeighborsRegressor(n_neighbors=7, n_jobs=-1)},
"random_forest": {
"model": ensemble.RandomForestRegressor(
max_depth=7, random_state=random_state, n_jobs=-1
)
},
"mlp": {
"model": neural_network.MLPRegressor(
activation="tanh",
hidden_layer_sizes=(3),
max_iter=500,
early_stopping=True,
random_state=random_state,
)
},
}
for model_name in models.keys():
print(f"Model: {model_name}")
fitted_model = models[model_name]["model"].fit(
X_train.values, y_train.values.ravel()
)
y_train_pred = fitted_model.predict(X_train.values)
y_test_pred = fitted_model.predict(X_test.values)
models[model_name]["fitted"] = fitted_model
models[model_name]["train_preds"] = y_train_pred
models[model_name]["preds"] = y_test_pred
models[model_name]["RMSE_train"] = math.sqrt(
metrics.mean_squared_error(y_train, y_train_pred)
)
models[model_name]["RMSE_test"] = math.sqrt(
metrics.mean_squared_error(y_test, y_test_pred)
)
models[model_name]["RMAE_test"] = math.sqrt(
metrics.mean_absolute_error(y_test, y_test_pred)
)
models[model_name]["R2_test"] = metrics.r2_score(y_test, y_test_pred)
Выводим результаты оценки
reg_metrics = pd.DataFrame.from_dict(models, "index")[
["RMSE_train", "RMSE_test", "RMAE_test", "R2_test"]
]
reg_metrics.sort_values(by="RMSE_test").style.background_gradient(
cmap="viridis", low=1, high=0.3, subset=["RMSE_train", "RMSE_test"]
).background_gradient(cmap="plasma", low=0.3, high=1, subset=["RMAE_test", "R2_test"])
Выводим лучшую модель
best_model = str(reg_metrics.sort_values(by="RMSE_test").iloc[0].name)
display(best_model)
Подбираем гиперпараметры методом поиска по сетке
from sklearn.ensemble import RandomForestRegressor
X = df[get_filtered_columns(df, no_numeric=True)]
y = df[TARGET_COLUMN_NAME_REGRESSION]
model = RandomForestRegressor()
param_grid = {
'n_estimators': [50, 100],
'max_depth': [10, 20],
'min_samples_split': [5, 10]
}
grid_search = GridSearchCV(estimator=model, param_grid=param_grid,
scoring='neg_mean_squared_error', cv=3, n_jobs=-1, verbose=2)
grid_search.fit(X_train, y_train)
print("Лучшие параметры:", grid_search.best_params_)
print("Лучший результат (MSE):", -grid_search.best_score_)
Обучаем модель с новыми гиперпараметрами и сравниваем новых данных со старыми
# Old data
old_param_grid = param_grid
old_grid_search = grid_search
old_grid_search.fit(X_train, y_train)
old_best_params = old_grid_search.best_params_
old_best_mse = -old_grid_search.best_score_
# New data
new_param_grid = {
'n_estimators': [100],
'max_depth': [10],
'min_samples_split': [5]
}
new_grid_search = GridSearchCV(estimator=RandomForestRegressor(),
param_grid=new_param_grid,
scoring='neg_mean_squared_error', cv=2)
new_grid_search.fit(X_train, y_train)
new_best_params = new_grid_search.best_params_
new_best_mse = -new_grid_search.best_score_
new_best_model = RandomForestRegressor(**new_best_params)
new_best_model.fit(X_train, y_train)
old_best_model = RandomForestRegressor(**old_best_params)
old_best_model.fit(X_train, y_train)
y_new_pred = new_best_model.predict(X_test)
y_old_pred = old_best_model.predict(X_test)
mse = metrics.mean_squared_error(y_test, y_new_pred)
rmse = np.sqrt(mse)
print("Старые параметры:", old_best_params)
print("Лучший результат (MSE) на старых параметрах:", old_best_mse)
print("\nНовые параметры:", new_best_params)
print("Лучший результат (MSE) на новых параметрах:", new_best_mse)
print("Среднеквадратическая ошибка (MSE) на тестовых данных:", mse)
print("Корень среднеквадратичной ошибки (RMSE) на тестовых данных:", rmse)
Визуализация данных
plt.figure(figsize=(16, 8))
plt.scatter(range(len(y_test)), y_test, label="Истинные значения", color="black", alpha=0.5)
plt.scatter(range(len(y_test)), y_new_pred, label="Предсказанные (новые параметры)", color="blue", alpha=0.5)
plt.scatter(range(len(y_test)), y_old_pred, label="Предсказанные (старые параметры)", color="red", alpha=0.5)
plt.xlabel("Выборка")
plt.ylabel("Значения")
plt.legend()
plt.title("Сравнение предсказанных и истинных значений")
plt.show()