844 KiB
Лабораторная 4
Информация о диабете индейцев Пима
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn import set_config
set_config(transform_output="pandas")
df = pd.read_csv(".//scv//diabetes.csv")
print(df.columns)
df
Формирование выборок
from typing import Tuple
import pandas as pd
from pandas import DataFrame
from sklearn.model_selection import train_test_split
def split_stratified_into_train_val_test(
df_input,
stratify_colname="y",
frac_train=0.6,
frac_val=0.15,
frac_test=0.25,
random_state=None,
) -> Tuple[DataFrame, DataFrame, DataFrame, DataFrame, DataFrame, DataFrame]:
if frac_train + frac_val + frac_test != 1.0:
raise ValueError(
"fractions %f, %f, %f do not add up to 1.0"
% (frac_train, frac_val, frac_test)
)
if stratify_colname not in df_input.columns:
raise ValueError("%s is not a column in the dataframe" % (stratify_colname))
X = df_input # Contains all columns.
y = df_input[
[stratify_colname]
] # Dataframe of just the column on which to stratify.
# Split original dataframe into train and temp dataframes.
df_train, df_temp, y_train, y_temp = train_test_split(
X, y, stratify=y, test_size=(1.0 - frac_train), random_state=random_state
)
if frac_val <= 0:
assert len(df_input) == len(df_train) + len(df_temp)
return df_train, pd.DataFrame(), df_temp, y_train, pd.DataFrame(), y_temp
# Split the temp dataframe into val and test dataframes.
relative_frac_test = frac_test / (frac_val + frac_test)
df_val, df_test, y_val, y_test = train_test_split(
df_temp,
y_temp,
stratify=y_temp,
test_size=relative_frac_test,
random_state=random_state,
)
assert len(df_input) == len(df_train) + len(df_val) + len(df_test)
return df_train, df_val, df_test, y_train, y_val, y_test
X_train, X_val, X_test, y_train, y_val, y_test = split_stratified_into_train_val_test(
df, stratify_colname="Outcome", frac_train=0.80, frac_val=0, frac_test=0.20, random_state=9
)
display("X_train", X_train)
display("y_train", y_train)
display("X_test", X_test)
display("y_test", y_test)
null_values = df.isnull().sum()
print("Пропущенные значения по столбцам:")
print(null_values)
stat_summary = df.describe()
print("\nСтатистический обзор данных:")
print(stat_summary)
Q1 = df["DiabetesPedigreeFunction"].quantile(0.25)
Q3 = df["DiabetesPedigreeFunction"].quantile(0.75)
IQR = Q3 - Q1
threshold = 1.5 * IQR
lower_bound = Q1 - threshold
upper_bound = Q3 + threshold
outliers = (df["DiabetesPedigreeFunction"] < lower_bound) | (df["DiabetesPedigreeFunction"] > upper_bound)
# Вывод выбросов
print("Выбросы в датасете:")
print(df[outliers])
# Заменяем выбросы на медианные значения
median_score = df["DiabetesPedigreeFunction"].median()
df.loc[outliers, "DiabetesPedigreeFunction"] = median_score
# Визуализация данных после обработки
plt.figure(figsize=(10, 6))
plt.scatter(df['DiabetesPedigreeFunction'], df['Age'])
plt.xlabel('Функция родословной диабета')
plt.ylabel('Возраст')
plt.title('Диаграмма рассеивания после чистки')
plt.show()
Классификация данных
from sklearn.compose import ColumnTransformer
from sklearn.discriminant_analysis import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
columns_to_drop = ["Pregnancies", "SkinThickness", "BloodPressure", "Outcome", "DiabetesPedigreeFunction"]
num_columns = [
column
for column in df.columns
if column not in columns_to_drop and df[column].dtype != "object"
]
cat_columns = [
column
for column in df.columns
if column not in columns_to_drop and df[column].dtype == "object"
]
num_imputer = SimpleImputer(strategy="median")
num_scaler = StandardScaler()
preprocessing_num = Pipeline(
[
("imputer", num_imputer),
("scaler", num_scaler),
]
)
cat_imputer = SimpleImputer(strategy="constant", fill_value="unknown")
cat_encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=False, drop="first")
preprocessing_cat = Pipeline(
[
("imputer", cat_imputer),
("encoder", cat_encoder),
]
)
features_preprocessing = ColumnTransformer(
verbose_feature_names_out=False,
transformers=[
("prepocessing_num", preprocessing_num, num_columns),
("prepocessing_cat", preprocessing_cat, cat_columns),
],
remainder="passthrough"
)
drop_columns = ColumnTransformer(
verbose_feature_names_out=False,
transformers=[
("drop_columns", "drop", columns_to_drop),
],
remainder="passthrough",
)
pipeline_end = Pipeline(
[
("features_preprocessing", features_preprocessing),
("drop_columns", drop_columns),
]
)
Проверка работы конвеера
preprocessing_result = pipeline_end.fit_transform(X_train)
preprocessed_df = pd.DataFrame(
preprocessing_result,
columns=pipeline_end.get_feature_names_out(),
)
preprocessed_df
Формирование набора моделей для классификации
from sklearn import ensemble, linear_model, naive_bayes, neighbors, neural_network, tree
class_models = {
"logistic": {"model": linear_model.LogisticRegression()},
# "ridge": {"model": linear_model.RidgeClassifierCV(cv=5, class_weight="balanced")},
"ridge": {"model": linear_model.LogisticRegression(penalty="l2", class_weight="balanced")},
"decision_tree": {
"model": tree.DecisionTreeClassifier(max_depth=7, random_state=9)
},
"knn": {"model": neighbors.KNeighborsClassifier(n_neighbors=7)},
"naive_bayes": {"model": naive_bayes.GaussianNB()},
"gradient_boosting": {
"model": ensemble.GradientBoostingClassifier(n_estimators=210)
},
"random_forest": {
"model": ensemble.RandomForestClassifier(
max_depth=11, class_weight="balanced", random_state=9
)
},
"mlp": {
"model": neural_network.MLPClassifier(
hidden_layer_sizes=(7,),
max_iter=500,
early_stopping=True,
random_state=9,
)
},
}
Обучение моделей на обучающем наборе данных и оценка на тестовом
from sklearn import metrics
for model_name in class_models.keys():
print(f"Model: {model_name}")
model = class_models[model_name]["model"]
model_pipeline = Pipeline([("pipeline", pipeline_end), ("model", model)])
model_pipeline = model_pipeline.fit(X_train, y_train.values.ravel())
y_train_predict = model_pipeline.predict(X_train)
y_test_probs = model_pipeline.predict_proba(X_test)[:, 1]
y_test_predict = np.where(y_test_probs > 0.5, 1, 0)
class_models[model_name]["pipeline"] = model_pipeline
class_models[model_name]["probs"] = y_test_probs
class_models[model_name]["preds"] = y_test_predict
class_models[model_name]["Precision_train"] = metrics.precision_score(
y_train, y_train_predict
)
class_models[model_name]["Precision_test"] = metrics.precision_score(
y_test, y_test_predict
)
class_models[model_name]["Recall_train"] = metrics.recall_score(
y_train, y_train_predict
)
class_models[model_name]["Recall_test"] = metrics.recall_score(
y_test, y_test_predict
)
class_models[model_name]["Accuracy_train"] = metrics.accuracy_score(
y_train, y_train_predict
)
class_models[model_name]["Accuracy_test"] = metrics.accuracy_score(
y_test, y_test_predict
)
class_models[model_name]["ROC_AUC_test"] = metrics.roc_auc_score(
y_test, y_test_probs
)
class_models[model_name]["F1_train"] = metrics.f1_score(y_train, y_train_predict)
class_models[model_name]["F1_test"] = metrics.f1_score(y_test, y_test_predict)
class_models[model_name]["MCC_test"] = metrics.matthews_corrcoef(
y_test, y_test_predict
)
class_models[model_name]["Cohen_kappa_test"] = metrics.cohen_kappa_score(
y_test, y_test_predict
)
class_models[model_name]["Confusion_matrix"] = metrics.confusion_matrix(
y_test, y_test_predict
)
Сводная таблица оценок качества для использованных моделей классификации
Матрица неточностей
from sklearn.metrics import ConfusionMatrixDisplay
import matplotlib.pyplot as plt
_, ax = plt.subplots(int(len(class_models) / 2), 2, figsize=(12, 10), sharex=False, sharey=False)
for index, key in enumerate(class_models.keys()):
c_matrix = class_models[key]["Confusion_matrix"]
disp = ConfusionMatrixDisplay(
confusion_matrix=c_matrix, display_labels=["Healthy", "Sick"]
).plot(ax=ax.flat[index])
disp.ax_.set_title(key)
plt.subplots_adjust(top=1, bottom=0, hspace=0.4, wspace=0.1)
plt.show()
Точность, полнота, верность (аккуратность), F-мера
class_metrics = pd.DataFrame.from_dict(class_models, "index")[
[
"Precision_train",
"Precision_test",
"Recall_train",
"Recall_test",
"Accuracy_train",
"Accuracy_test",
"F1_train",
"F1_test",
]
]
class_metrics.sort_values(
by="Accuracy_test", ascending=False
).style.background_gradient(
cmap="plasma",
low=0.3,
high=1,
subset=["Accuracy_train", "Accuracy_test", "F1_train", "F1_test"],
).background_gradient(
cmap="viridis",
low=1,
high=0.3,
subset=[
"Precision_train",
"Precision_test",
"Recall_train",
"Recall_test",
],
)
ROC-кривая, каппа Коэна, коэффициент корреляции Мэтьюса
class_metrics = pd.DataFrame.from_dict(class_models, "index")[
[
"Accuracy_test",
"F1_test",
"ROC_AUC_test",
"Cohen_kappa_test",
"MCC_test",
]
]
class_metrics.sort_values(by="ROC_AUC_test", ascending=False).style.background_gradient(
cmap="plasma",
low=0.3,
high=1,
subset=[
"ROC_AUC_test",
"MCC_test",
"Cohen_kappa_test",
],
).background_gradient(
cmap="viridis",
low=1,
high=0.3,
subset=[
"Accuracy_test",
"F1_test",
],
)
best_model = str(class_metrics.sort_values(by="MCC_test", ascending=False).iloc[0].name)
display(best_model)
Вывод данных с ошибкой предсказания для оценки
preprocessing_result = pipeline_end.transform(X_test)
preprocessed_df = pd.DataFrame(
preprocessing_result,
columns=pipeline_end.get_feature_names_out(),
)
y_pred = class_models[best_model]["preds"]
error_index = y_test[y_test["Outcome"] != y_pred].index.tolist()
display(f"Error items count: {len(error_index)}")
error_predicted = pd.Series(y_pred, index=y_test.index).loc[error_index]
error_df = X_test.loc[error_index].copy()
error_df.insert(loc=1, column="Predicted", value=error_predicted)
error_df.sort_index()
Пример использования обученной модели (конвейера) для предсказания
model = class_models[best_model]["pipeline"]
example_id = 450
test = pd.DataFrame(X_test.loc[example_id, :]).T
test_preprocessed = pd.DataFrame(preprocessed_df.loc[example_id, :]).T
display(test)
display(test_preprocessed)
result_proba = model.predict_proba(test)[0]
result = model.predict(test)[0]
real = int(y_test.loc[example_id].values[0])
display(f"predicted: {result} (proba: {result_proba})")
display(f"real: {real}")
Подбор гиперпараметров методом поиска по сетке
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
import numpy as np
from sklearn import metrics
import pandas as pd
# Определяем числовые признаки
numeric_features = X_train.select_dtypes(include=['float64', 'int64']).columns.tolist()
# Установка random_state
random_state = 9
# Определение трансформера
pipeline_end = ColumnTransformer([
('numeric', StandardScaler(), numeric_features),
# Добавьте другие трансформеры, если требуется
])
# Объявление модели
optimized_model = RandomForestClassifier(
random_state=random_state,
criterion="gini",
max_depth=5,
max_features="sqrt",
n_estimators=10,
)
# Создание пайплайна с корректными шагами
result = {}
# Обучение модели
result["pipeline"] = Pipeline([
("pipeline", pipeline_end),
("model", optimized_model)
]).fit(X_train, y_train.values.ravel())
# Прогнозирование и расчет метрик
result["train_preds"] = result["pipeline"].predict(X_train)
result["probs"] = result["pipeline"].predict_proba(X_test)[:, 1]
result["preds"] = np.where(result["probs"] > 0.5, 1, 0)
# Метрики для оценки модели
result["Precision_train"] = metrics.precision_score(y_train, result["train_preds"])
result["Precision_test"] = metrics.precision_score(y_test, result["preds"])
result["Recall_train"] = metrics.recall_score(y_train, result["train_preds"])
result["Recall_test"] = metrics.recall_score(y_test, result["preds"])
result["Accuracy_train"] = metrics.accuracy_score(y_train, result["train_preds"])
result["Accuracy_test"] = metrics.accuracy_score(y_test, result["preds"])
result["ROC_AUC_test"] = metrics.roc_auc_score(y_test, result["probs"])
result["F1_train"] = metrics.f1_score(y_train, result["train_preds"])
result["F1_test"] = metrics.f1_score(y_test, result["preds"])
result["MCC_test"] = metrics.matthews_corrcoef(y_test, result["preds"])
result["Cohen_kappa_test"] = metrics.cohen_kappa_score(y_test, result["preds"])
result["Confusion_matrix"] = metrics.confusion_matrix(y_test, result["preds"])
Формирование данных для оценки старой и новой версии модели
optimized_model_type = "random_forest"
optimized_metrics = pd.DataFrame(columns=list(result.keys()))
optimized_metrics.loc[len(optimized_metrics)] = pd.Series(
data=class_models[optimized_model_type]
)
optimized_metrics.loc[len(optimized_metrics)] = pd.Series(
data=result
)
optimized_metrics.insert(loc=0, column="Name", value=["Old", "New"])
optimized_metrics = optimized_metrics.set_index("Name")
Оценка параметров старой и новой модели
optimized_metrics[
[
"Precision_train",
"Precision_test",
"Recall_train",
"Recall_test",
"Accuracy_train",
"Accuracy_test",
"F1_train",
"F1_test",
]
].style.background_gradient(
cmap="plasma",
low=0.3,
high=1,
subset=["Accuracy_train", "Accuracy_test", "F1_train", "F1_test"],
).background_gradient(
cmap="viridis",
low=1,
high=0.3,
subset=[
"Precision_train",
"Precision_test",
"Recall_train",
"Recall_test",
],
)
optimized_metrics[
[
"Accuracy_test",
"F1_test",
"ROC_AUC_test",
"Cohen_kappa_test",
"MCC_test",
]
].style.background_gradient(
cmap="plasma",
low=0.3,
high=1,
subset=[
"ROC_AUC_test",
"MCC_test",
"Cohen_kappa_test",
],
).background_gradient(
cmap="viridis",
low=1,
high=0.3,
subset=[
"Accuracy_test",
"F1_test",
],
)
_, ax = plt.subplots(1, 2, figsize=(10, 4), sharex=False, sharey=False
)
for index in range(0, len(optimized_metrics)):
c_matrix = optimized_metrics.iloc[index]["Confusion_matrix"]
disp = ConfusionMatrixDisplay(
confusion_matrix=c_matrix, display_labels=["Healthy", "Sick"]
).plot(ax=ax.flat[index])
plt.subplots_adjust(top=1, bottom=0, hspace=0.4, wspace=0.3)
plt.show()
В желтом квадрате мы видим значение 79, что обозначает количество правильно классифицированных объектов, отнесенных к классу "Sick". Это свидетельствует о том, что модель успешно идентифицирует объекты этого класса, минимизируя количество ложных положительных срабатываний.
В зеленом квадрате значение 42 указывает на количество правильно классифицированных объектов, отнесенных к классу "Healthy". Это также является показателем хорошей точности модели в определении объектов данного класса.
Определение достижимого уровня качества модели для второй задачи
Подготовка данных
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn import set_config
random_state = 9
set_config(transform_output="pandas")
df = pd.read_csv(".//scv//diabetes.csv")
print(df.describe())
Формирование выборок
from typing import Tuple
import pandas as pd
from pandas import DataFrame
from sklearn.model_selection import train_test_split
def split_stratified_into_train_val_test(
df_input: DataFrame,
stratify_colname: str = "y",
frac_train: float = 0.6,
frac_val: float = 0.15,
frac_test: float = 0.25,
random_state: int = None,
) -> Tuple[DataFrame, DataFrame, DataFrame, DataFrame, DataFrame, DataFrame]:
if not (0 < frac_train < 1) or not (0 <= frac_val <= 1) or not (0 <= frac_test <= 1):
raise ValueError("Fractions must be between 0 and 1 and the sum must equal 1.")
if not (frac_train + frac_val + frac_test == 1.0):
raise ValueError("fractions %f, %f, %f do not add up to 1.0" %
(frac_train, frac_val, frac_test))
if stratify_colname not in df_input.columns:
raise ValueError(f"{stratify_colname} is not a column in the DataFrame.")
X = df_input
y = df_input[[stratify_colname]]
df_train, df_temp, y_train, y_temp = train_test_split(
X, y, stratify=y, test_size=(1.0 - frac_train), random_state=random_state
)
if frac_val == 0:
return df_train, pd.DataFrame(), df_temp, y_train, pd.DataFrame(), y_temp
relative_frac_test = frac_test / (frac_val + frac_test)
df_val, df_test, y_val, y_test = train_test_split(
df_temp,
y_temp,
stratify=y_temp,
test_size=relative_frac_test,
random_state=random_state,
)
assert len(df_input) == len(df_train) + len(df_val) + len(df_test)
return df_train, df_val, df_test, y_train, y_val, y_test
X_train, X_val, X_test, y_train, y_val, y_test = split_stratified_into_train_val_test(
df, stratify_colname="Outcome", frac_train=0.80, frac_val=0.0, frac_test=0.20, random_state=random_state
)
display("X_train", X_train)
display("y_train", y_train)
display("X_test", X_test)
display("y_test", y_test)
Формирование конвейера для классификации данных
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.discriminant_analysis import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
class DiabetFeatures(BaseEstimator, TransformerMixin):
def __init__(self):
pass
def fit(self, X, y=None):
return self
columns_to_drop = ["Pregnancies", "SkinThickness", "Insulin", "BMI", "Outcome"]
num_columns = ["Glucose", "Age", "BloodPressure", "DiabetesPedigreeFunction"]
cat_columns = []
num_imputer = SimpleImputer(strategy="median")
num_scaler = StandardScaler()
preprocessing_num = Pipeline(
[
("imputer", num_imputer),
("scaler", num_scaler),
]
)
cat_imputer = SimpleImputer(strategy="constant", fill_value="unknown")
cat_encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=False, drop="first")
preprocessing_cat = Pipeline(
[
("imputer", cat_imputer),
("encoder", cat_encoder),
]
)
features_preprocessing = ColumnTransformer(
verbose_feature_names_out=False,
transformers=[
("prepocessing_num", preprocessing_num, num_columns),
("prepocessing_cat", preprocessing_cat, cat_columns),
],
remainder="passthrough"
)
drop_columns = ColumnTransformer(
verbose_feature_names_out=False,
transformers=[
("drop_columns", "drop", columns_to_drop),
],
remainder="passthrough",
)
features_postprocessing = ColumnTransformer(
verbose_feature_names_out=False,
transformers=[
("prepocessing_cat", preprocessing_cat, ["Cabin_type"]),
],
remainder="passthrough",
)
pipeline_end = Pipeline(
[
("features_preprocessing", features_preprocessing),
("drop_columns", drop_columns),
]
)
Демонстрация работы конвейера
preprocessing_result = pipeline_end.fit_transform(X_train)
preprocessed_df = pd.DataFrame(
preprocessing_result,
columns=pipeline_end.get_feature_names_out(),
)
preprocessed_df
Формирование набора моделей для классификации
from sklearn import ensemble, linear_model, naive_bayes, neighbors, neural_network, tree
class_models = {
"logistic": {"model": linear_model.LogisticRegression()},
"ridge": {"model": linear_model.RidgeClassifierCV(cv=5, class_weight="balanced")},
"ridge": {"model": linear_model.LogisticRegression(penalty="l2", class_weight="balanced")},
"decision_tree": {
"model": tree.DecisionTreeClassifier(max_depth=7, random_state=random_state)
},
"knn": {"model": neighbors.KNeighborsClassifier(n_neighbors=7)},
"naive_bayes": {"model": naive_bayes.GaussianNB()},
"gradient_boosting": {
"model": ensemble.GradientBoostingClassifier(n_estimators=210)
},
"random_forest": {
"model": ensemble.RandomForestClassifier(
max_depth=11, class_weight="balanced", random_state=random_state
)
},
"mlp": {
"model": neural_network.MLPClassifier(
hidden_layer_sizes=(7,),
max_iter=500,
early_stopping=True,
random_state=random_state,
)
},
}
Обучение моделей на обучающем наборе данных и оценка на тестовом¶
import numpy as np
from sklearn import metrics
for model_name in class_models.keys():
print(f"Model: {model_name}")
model = class_models[model_name]["model"]
model_pipeline = Pipeline([("pipeline", pipeline_end), ("model", model)])
model_pipeline = model_pipeline.fit(X_train, y_train.values.ravel())
y_train_predict = model_pipeline.predict(X_train)
y_test_probs = model_pipeline.predict_proba(X_test)[:, 1]
y_test_predict = np.where(y_test_probs > 0.5, 1, 0)
class_models[model_name]["pipeline"] = model_pipeline
class_models[model_name]["probs"] = y_test_probs
class_models[model_name]["preds"] = y_test_predict
class_models[model_name]["Precision_train"] = metrics.precision_score(
y_train, y_train_predict
)
class_models[model_name]["Precision_test"] = metrics.precision_score(
y_test, y_test_predict
)
class_models[model_name]["Recall_train"] = metrics.recall_score(
y_train, y_train_predict
)
class_models[model_name]["Recall_test"] = metrics.recall_score(
y_test, y_test_predict
)
class_models[model_name]["Accuracy_train"] = metrics.accuracy_score(
y_train, y_train_predict
)
class_models[model_name]["Accuracy_test"] = metrics.accuracy_score(
y_test, y_test_predict
)
class_models[model_name]["ROC_AUC_test"] = metrics.roc_auc_score(
y_test, y_test_probs
)
class_models[model_name]["F1_train"] = metrics.f1_score(y_train, y_train_predict)
class_models[model_name]["F1_test"] = metrics.f1_score(y_test, y_test_predict)
class_models[model_name]["MCC_test"] = metrics.matthews_corrcoef(
y_test, y_test_predict
)
class_models[model_name]["Cohen_kappa_test"] = metrics.cohen_kappa_score(
y_test, y_test_predict
)
class_models[model_name]["Confusion_matrix"] = metrics.confusion_matrix(
y_test, y_test_predict
)
Сводная таблица оценок качества для использованных моделей классификации¶
Матрица неточностей
from sklearn.metrics import ConfusionMatrixDisplay
import matplotlib.pyplot as plt
_, ax = plt.subplots(int(len(class_models) / 2), 2, figsize=(12, 10), sharex=False, sharey=False)
for index, key in enumerate(class_models.keys()):
c_matrix = class_models[key]["Confusion_matrix"]
disp = ConfusionMatrixDisplay(
confusion_matrix=c_matrix, display_labels=["Healthy", "Sick"]
).plot(ax=ax.flat[index])
disp.ax_.set_title(key)
plt.subplots_adjust(top=1, bottom=0, hspace=0.4, wspace=0.1)
plt.show()
class_metrics = pd.DataFrame.from_dict(class_models, "index")[
[
"Precision_train",
"Precision_test",
"Recall_train",
"Recall_test",
"Accuracy_train",
"Accuracy_test",
"F1_train",
"F1_test",
]
]
class_metrics.sort_values(
by="Accuracy_test", ascending=False
).style.background_gradient(
cmap="plasma",
low=0.3,
high=1,
subset=["Accuracy_train", "Accuracy_test", "F1_train", "F1_test"],
).background_gradient(
cmap="viridis",
low=1,
high=0.3,
subset=[
"Precision_train",
"Precision_test",
"Recall_train",
"Recall_test",
],
)
Почти все модели, включая логистическую регрессию, ридж-регрессию, KNN, наивный байесовский классификатор, многослойную перцептронную сеть, случайный лес, дерево решений и градиентный бустинг, демонстрируют 100% точность (1.000000) на обучающей выборке. Это указывает на то, что модели смогли подстроиться под обучающие данные, что может указывать на возможное переобучение.
ROC-кривая, каппа Коэна, коэффициент корреляции Мэтьюса
class_metrics = pd.DataFrame.from_dict(class_models, "index")[
[
"Accuracy_test",
"F1_test",
"ROC_AUC_test",
"Cohen_kappa_test",
"MCC_test",
]
]
class_metrics.sort_values(by="ROC_AUC_test", ascending=False).style.background_gradient(
cmap="plasma",
low=0.3,
high=1,
subset=[
"ROC_AUC_test",
"MCC_test",
"Cohen_kappa_test",
],
).background_gradient(
cmap="viridis",
low=1,
high=0.3,
subset=[
"Accuracy_test",
"F1_test",
],
)
best_model = str(class_metrics.sort_values(by="MCC_test", ascending=False).iloc[0].name)
display(best_model)
Вывод данных с ошибкой предсказания для оценки
preprocessing_result = pipeline_end.transform(X_test)
preprocessed_df = pd.DataFrame(
preprocessing_result,
columns=pipeline_end.get_feature_names_out(),
)
y_pred = class_models[best_model]["preds"]
error_index = y_test[y_test["Outcome"] != y_pred].index.tolist()
display(f"Error items count: {len(error_index)}")
error_predicted = pd.Series(y_pred, index=y_test.index).loc[error_index]
error_df = X_test.loc[error_index].copy()
error_df.insert(loc=1, column="Predicted", value=error_predicted)
error_df.sort_index()
Пример использования обученной модели (конвейера) для предсказания
model = class_models[best_model]["pipeline"]
example_id = 555
test = pd.DataFrame(X_test.loc[example_id, :]).T
test_preprocessed = pd.DataFrame(preprocessed_df.loc[example_id, :]).T
display(test)
display(test_preprocessed)
result_proba = model.predict_proba(test)[0]
result = model.predict(test)[0]
real = int(y_test.loc[example_id].values[0])
display(f"predicted: {result} (proba: {result_proba})")
display(f"real: {real}")
Подбор гиперпараметров методом поиска по сетке
from sklearn.model_selection import GridSearchCV
optimized_model_type = "random_forest"
random_forest_model = class_models[optimized_model_type]["pipeline"]
param_grid = {
"model__n_estimators": [10, 50, 100],
"model__max_features": ["sqrt", "log2"],
"model__max_depth": [5, 7, 10],
"model__criterion": ["gini", "entropy"],
}
gs_optomizer = GridSearchCV(
estimator=random_forest_model, param_grid=param_grid, n_jobs=-1
)
gs_optomizer.fit(X_train, y_train.values.ravel())
gs_optomizer.best_params_
Обучение модели с новыми гиперпараметрами
optimized_model = ensemble.RandomForestClassifier(
random_state=random_state,
criterion="gini",
max_depth=5,
max_features="log2",
n_estimators=10,
)
result = {}
result["pipeline"] = Pipeline([("pipeline", pipeline_end), ("model", optimized_model)]).fit(X_train, y_train.values.ravel())
result["train_preds"] = result["pipeline"].predict(X_train)
result["probs"] = result["pipeline"].predict_proba(X_test)[:, 1]
result["preds"] = np.where(result["probs"] > 0.5, 1, 0)
result["Precision_train"] = metrics.precision_score(y_train, result["train_preds"])
result["Precision_test"] = metrics.precision_score(y_test, result["preds"])
result["Recall_train"] = metrics.recall_score(y_train, result["train_preds"])
result["Recall_test"] = metrics.recall_score(y_test, result["preds"])
result["Accuracy_train"] = metrics.accuracy_score(y_train, result["train_preds"])
result["Accuracy_test"] = metrics.accuracy_score(y_test, result["preds"])
result["ROC_AUC_test"] = metrics.roc_auc_score(y_test, result["probs"])
result["F1_train"] = metrics.f1_score(y_train, result["train_preds"])
result["F1_test"] = metrics.f1_score(y_test, result["preds"])
result["MCC_test"] = metrics.matthews_corrcoef(y_test, result["preds"])
result["Cohen_kappa_test"] = metrics.cohen_kappa_score(y_test, result["preds"])
result["Confusion_matrix"] = metrics.confusion_matrix(y_test, result["preds"])
Формирование данных для оценки старой и новой версии модели
optimized_metrics = pd.DataFrame(columns=list(result.keys()))
optimized_metrics.loc[len(optimized_metrics)] = pd.Series(
data=class_models[optimized_model_type]
)
optimized_metrics.loc[len(optimized_metrics)] = pd.Series(
data=result
)
optimized_metrics.insert(loc=0, column="Name", value=["Old", "New"])
optimized_metrics = optimized_metrics.set_index("Name")
Оценка параметров старой и новой модели
optimized_metrics[
[
"Precision_train",
"Precision_test",
"Recall_train",
"Recall_test",
"Accuracy_train",
"Accuracy_test",
"F1_train",
"F1_test",
]
].style.background_gradient(
cmap="plasma",
low=0.3,
high=1,
subset=["Accuracy_train", "Accuracy_test", "F1_train", "F1_test"],
).background_gradient(
cmap="viridis",
low=1,
high=0.3,
subset=[
"Precision_train",
"Precision_test",
"Recall_train",
"Recall_test",
],
)
optimized_metrics[
[
"Accuracy_test",
"F1_test",
"ROC_AUC_test",
"Cohen_kappa_test",
"MCC_test",
]
].style.background_gradient(
cmap="plasma",
low=0.3,
high=1,
subset=[
"ROC_AUC_test",
"MCC_test",
"Cohen_kappa_test",
],
).background_gradient(
cmap="viridis",
low=1,
high=0.3,
subset=[
"Accuracy_test",
"F1_test",
],
)
_, ax = plt.subplots(1, 2, figsize=(10, 4), sharex=False, sharey=False
)
for index in range(0, len(optimized_metrics)):
c_matrix = optimized_metrics.iloc[index]["Confusion_matrix"]
disp = ConfusionMatrixDisplay(
confusion_matrix=c_matrix, display_labels=["Healthy", "Sick"]
).plot(ax=ax.flat[index])
plt.subplots_adjust(top=1, bottom=0, hspace=0.4, wspace=0.3)
plt.show()
Регрессионная модель
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn import set_config
random_state=9
set_config(transform_output="pandas")
df = pd.read_csv(".//scv//diabetes.csv")
print(df.columns)
df
Разделение набора данных на обучающую и тестовые выборки
from typing import Tuple
import pandas as pd
from pandas import DataFrame
from sklearn.model_selection import train_test_split
def split_into_train_test(
df_input: DataFrame,
target_colname: str = "Outcome",
frac_train: float = 0.8,
random_state: int = None,
) -> Tuple[DataFrame, DataFrame, DataFrame, DataFrame]:
if not (0 < frac_train < 1):
raise ValueError("Fraction must be between 0 and 1.")
# Проверка наличия целевого признака
if target_colname not in df_input.columns:
raise ValueError(f"{target_colname} is not a column in the DataFrame.")
# Разделяем данные на признаки и целевую переменную
X = df_input.drop(columns=[target_colname]) # Признаки
y = df_input[[target_colname]] # Целевая переменная
# Разделяем данные на обучающую и тестовую выборки
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=(1.0 - frac_train),
random_state=random_state
)
return X_train, X_test, y_train, y_test
# Применение функции для разделения данных
X_train, X_test, y_train, y_test = split_into_train_test(
df,
target_colname="Outcome",
frac_train=0.8,
random_state=42 # Убедитесь, что вы задали нужное значение random_state
)
# Для отображения результатов
display("X_train", X_train)
display("y_train", y_train)
display("X_test", X_test)
display("y_test", y_test)
Определение перечня алгоритмов решения задачи аппроксимации (регрессии)
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn import linear_model, tree, neighbors, ensemble, neural_network
random_state = 9
models = {
"linear": {"model": linear_model.LinearRegression(n_jobs=-1)},
"linear_poly": {
"model": make_pipeline(
PolynomialFeatures(degree=2),
linear_model.LinearRegression(fit_intercept=False, n_jobs=-1),
)
},
"linear_interact": {
"model": make_pipeline(
PolynomialFeatures(interaction_only=True),
linear_model.LinearRegression(fit_intercept=False, n_jobs=-1),
)
},
"ridge": {"model": linear_model.RidgeCV()},
"decision_tree": {
"model": tree.DecisionTreeRegressor(max_depth=7, random_state=random_state)
},
"knn": {"model": neighbors.KNeighborsRegressor(n_neighbors=7, n_jobs=-1)},
"random_forest": {
"model": ensemble.RandomForestRegressor(
max_depth=7, random_state=random_state, n_jobs=-1
)
},
"mlp": {
"model": neural_network.MLPRegressor(
activation="tanh",
hidden_layer_sizes=(3,),
max_iter=500,
early_stopping=True,
random_state=random_state,
)
},
}
import math
from pandas import DataFrame
from sklearn import metrics
for model_name in models.keys():
print(f"Model: {model_name}")
fitted_model = models[model_name]["model"].fit(
X_train.values, y_train.values.ravel()
)
y_train_pred = fitted_model.predict(X_train.values)
y_test_pred = fitted_model.predict(X_test.values)
models[model_name]["fitted"] = fitted_model
models[model_name]["train_preds"] = y_train_pred
models[model_name]["preds"] = y_test_pred
models[model_name]["RMSE_train"] = math.sqrt(
metrics.mean_squared_error(y_train, y_train_pred)
)
models[model_name]["RMSE_test"] = math.sqrt(
metrics.mean_squared_error(y_test, y_test_pred)
)
models[model_name]["RMAE_test"] = math.sqrt(
metrics.mean_absolute_error(y_test, y_test_pred)
)
models[model_name]["R2_test"] = metrics.r2_score(y_test, y_test_pred)
Вывод результатов оценки
reg_metrics = pd.DataFrame.from_dict(models, "index")[
["RMSE_train", "RMSE_test", "RMAE_test", "R2_test"]
]
reg_metrics.sort_values(by="RMSE_test").style.background_gradient(
cmap="viridis", low=1, high=0.3, subset=["RMSE_train", "RMSE_test"]
).background_gradient(cmap="plasma", low=0.3, high=1, subset=["RMAE_test", "R2_test"])
Вывод реального и "спрогнозированного" результата для обучающей и тестовой выборок
Получение лучшей модели
best_model = str(reg_metrics.sort_values(by="RMSE_test").iloc[0].name)
display(best_model)
Подбор гиперпараметров методом поиска по сетке
import pandas as pd
import numpy as np
from sklearn import metrics
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.ensemble import RandomForestRegressor # Используем регрессор
from sklearn.preprocessing import StandardScaler
df.dropna(inplace=True)
# Предикторы и целевая переменная
X = df[["Glucose", "Age", "BloodPressure", "DiabetesPedigreeFunction"]]
y = df['Outcome'] # Целевая переменная для регрессии
model = RandomForestRegressor()
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [None, 10, 20, 30],
'min_samples_split': [2, 5, 10]
}
# 3. Подбор гиперпараметров с помощью Grid Search
grid_search = GridSearchCV(estimator=model, param_grid=param_grid,
scoring='neg_mean_squared_error', cv=5, n_jobs=-1, verbose=2)
# Обучение модели на тренировочных данных
grid_search.fit(X_train, y_train)
# 4. Результаты подбора гиперпараметров
print("Лучшие параметры:", grid_search.best_params_)
print("Лучший результат (MSE):", -grid_search.best_score_)
Обучение модели с новыми гиперпараметрами и сравнение новых и старых данных
import pandas as pd
import numpy as np
from sklearn import metrics
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split, GridSearchCV
import matplotlib.pyplot as plt
old_param_grid = {
'n_estimators': [50, 100, 200], # Количество деревьев
'max_depth': [None, 10, 20, 30], # Максимальная глубина дерева
'min_samples_split': [2, 5, 10] # Минимальное количество образцов для разбиения узла
}
old_grid_search = GridSearchCV(estimator=RandomForestRegressor(),
param_grid=old_param_grid,
scoring='neg_mean_squared_error', cv=5, n_jobs=-1, verbose=2)
old_grid_search.fit(X_train, y_train)
old_best_params = old_grid_search.best_params_
old_best_mse = -old_grid_search.best_score_ # Меняем знак, так как берем отрицательное значение MSE
new_param_grid = {
'n_estimators': [200],
'max_depth': [20],
'min_samples_split': [10]
}
new_grid_search = GridSearchCV(estimator=RandomForestRegressor(),
param_grid=new_param_grid,
scoring='neg_mean_squared_error', cv=2)
new_grid_search.fit(X_train, y_train)
new_best_params = new_grid_search.best_params_
new_best_mse = -new_grid_search.best_score_ # Меняем знак, так как берем отрицательное значение MSE
model_best = RandomForestRegressor(**new_best_params)
model_best.fit(X_train, y_train)
model_oldbest = RandomForestRegressor(**old_best_params)
model_oldbest.fit(X_train, y_train)
y_pred = model_best.predict(X_test)
y_oldpred = model_oldbest.predict(X_test)
mse = metrics.mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
print("Старые параметры:", old_best_params)
print("Лучший результат (MSE) на старых параметрах:", old_best_mse)
print("\nНовые параметры:", new_best_params)
print("Лучший результат (MSE) на новых параметрах:", new_best_mse)
print("Среднеквадратическая ошибка (MSE) на тестовых данных:", mse)
print("Корень среднеквадратичной ошибки (RMSE) на тестовых данных:", rmse)
Визуализация
plt.figure(figsize=(10, 5))
plt.plot(y_test.values, label='Истинные значения', color='blue', linewidth=2)
plt.plot(y_oldpred, label='Предсказанные значения(после)', color='red', linestyle='--', linewidth=2)
plt.plot(y_pred, label='Предсказанные значения(до)', color='green', linestyle='-', linewidth=2)
plt.title('Сравнение предсказанных и истинных значений')
plt.xlabel('Подбор параметров')
plt.ylabel('Значения')
plt.grid()
plt.legend( loc ='lower right')
plt.show()