274 KiB
Лабораторная работа 4¶
Бизнес цели:
- Оптимизация ценовой стратегии: анализ факторов, влияющих на стоимость недвижимости, чтобы помочь продавцам устанавливать конкурентоспособные цены и увеличивать прибыль.
- Улучшение инвестиционных решений: предоставление аналитики для инвесторов, чтобы они могли определить наиболее выгодные районы и типы недвижимости для вложений.
Загрузка набора данных¶
import pandas as pd
from sklearn import set_config
set_config(transform_output="pandas")
random_state = 42
df = pd.read_csv("data/kc_house_data.csv", index_col="id")
df["date"] = pd.to_datetime(df["date"])
df["date_numeric"] = (df["date"] - pd.Timestamp("1970-01-01")).dt.days
df = df.drop(columns=["date"])
average_price = df['price'].mean()
print(f"Среднее значение поля 'цена': {average_price}")
average_price = df["price"].mean()
df['above_average_price'] = (df['price'] > average_price).astype(int)
df
Разделение набора данных на обучающую и тестовые выборки (80/20) для задачи классификации¶
from typing import Tuple
import pandas as pd
from pandas import DataFrame
from sklearn.model_selection import train_test_split
def split_stratified_into_train_val_test(
df_input,
stratify_colname="y",
frac_train=0.6,
frac_val=0.15,
frac_test=0.25,
random_state=None,
) -> Tuple[DataFrame, DataFrame, DataFrame, DataFrame, DataFrame, DataFrame]:
if frac_train + frac_val + frac_test != 1.0:
raise ValueError(
"fractions %f, %f, %f do not add up to 1.0"
% (frac_train, frac_val, frac_test)
)
if stratify_colname not in df_input.columns:
raise ValueError("%s is not a column in the dataframe" % (stratify_colname))
X = df_input
y = df_input[
[stratify_colname]
]
df_train, df_temp, y_train, y_temp = train_test_split(
X, y, stratify=y, test_size=(1.0 - frac_train), random_state=random_state
)
if frac_val <= 0:
assert len(df_input) == len(df_train) + len(df_temp)
return df_train, pd.DataFrame(), df_temp, y_train, pd.DataFrame(), y_temp
relative_frac_test = frac_test / (frac_val + frac_test)
df_val, df_test, y_val, y_test = train_test_split(
df_temp,
y_temp,
stratify=y_temp,
test_size=relative_frac_test,
random_state=random_state,
)
assert len(df_input) == len(df_train) + len(df_val) + len(df_test)
return df_train, df_val, df_test, y_train, y_val, y_test
X_train, X_val, X_test, y_train, y_val, y_test = split_stratified_into_train_val_test(
df,
stratify_colname="above_average_price",
frac_train=0.80,
frac_val=0,
frac_test=0.20,
random_state=random_state,
)
display("X_train", X_train)
display("y_train", y_train)
display("X_test", X_test)
display("y_test", y_test)
Формирование конвейера для классификации данных¶
preprocessing_num - конвейер для обработки числовых данных: заполнение пропущенных значений и стандартизация
preprocessing_cat - конвейер для обработки категориальных данных: заполнение пропущенных данных и унитарное кодирование
features_preprocessing - трансформер для предобработки признаков
features_engineering - трансформер для конструирования признаков
drop_columns - трансформер для удаления колонок
pipeline_end - основной конвейер предобработки данных и конструирования признаков
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
# Список числовых колонок
num_columns = [
"price",
"bedrooms",
"bathrooms",
"sqft_living",
"sqft_lot",
"floors",
"waterfront",
"view",
"condition",
"grade",
"sqft_above",
"sqft_basement",
"yr_built",
"yr_renovated",
"zipcode",
"lat",
"long",
"sqft_living15",
"sqft_lot15",
"date_numeric"
]
columns_to_drop = ["date"]
# Конвейер для числовых данных
num_imputer = SimpleImputer(strategy="median")
num_scaler = StandardScaler()
preprocessing_num = Pipeline(
[
("imputer", num_imputer),
("scaler", num_scaler),
]
)
# Конвейер для удаления колонок
drop_columns = ColumnTransformer(
transformers=[
("drop_columns", "drop", columns_to_drop),
],
remainder="passthrough",
)
# Предобработка только для числовых данных
features_preprocessing = ColumnTransformer(
transformers=[
("preprocessing_num", preprocessing_num, num_columns),
],
remainder="passthrough",
)
# Итоговый конвейер
pipeline_end = Pipeline(
[
("features_preprocessing", features_preprocessing),
]
)
Демонстрация работы конвейера для предобработки данных при классификации¶
preprocessing_result = pipeline_end.fit_transform(X_train)
preprocessed_df = pd.DataFrame(
preprocessing_result,
columns=pipeline_end.get_feature_names_out(),
)
preprocessed_df
Формирование набора моделей для классификации¶
logistic -- логистическая регрессия
ridge -- гребневая регрессия
decision_tree -- дерево решений
knn -- k-ближайших соседей
naive_bayes -- наивный Байесовский классификатор
gradient_boosting -- метод градиентного бустинга (набор деревьев решений)
random_forest -- метод случайного леса (набор деревьев решений)
mlp -- многослойный персептрон (нейронная сеть)
from sklearn import ensemble, linear_model, naive_bayes, neighbors, neural_network, tree
# Сами классификационные модели
class_models = {
# от 0 до 1, принадлежит ли объект к классу
"logistic": {"model": linear_model.LogisticRegression()},
# Логическая, но с регуляризацией (модель не так точно запоминает данные)
"ridge": {
"model": linear_model.LogisticRegression(penalty="l2", class_weight="balanced")
},
# Деления данных на условия с помощью построения дерева
"decision_tree": {
"model": tree.DecisionTreeClassifier(max_depth=7, random_state=random_state)
},
# Определяет ближайших объектов и находит и класс
"knn": {"model": neighbors.KNeighborsClassifier(n_neighbors=7)},
# Вероятности для классификации
"naive_bayes": {"model": naive_bayes.GaussianNB()},
# Постепенно улучшает предсказания с помощью слабых моделей
"gradient_boosting": {
"model": ensemble.GradientBoostingClassifier(n_estimators=210)
},
"random_forest": {
"model": ensemble.RandomForestClassifier(
max_depth=11, class_weight="balanced", random_state=random_state
)
},
"mlp": {
"model": neural_network.MLPClassifier(
hidden_layer_sizes=(7,),
max_iter=500,
early_stopping=True,
random_state=random_state,
)
},
}
Обучение моделей на обучающем наборе данных и оценка на тестовом¶
import numpy as np
from sklearn import metrics
for model_name in class_models.keys():
print(f"Model: {model_name}")
model = class_models[model_name]["model"]
model_pipeline = Pipeline([("pipeline", pipeline_end), ("model", model)])
model_pipeline = model_pipeline.fit(X_train, y_train.values.ravel())
y_train_predict = model_pipeline.predict(X_train)
y_test_probs = model_pipeline.predict_proba(X_test)[:, 1]
y_test_predict = np.where(y_test_probs > 0.5, 1, 0)
class_models[model_name]["pipeline"] = model_pipeline
class_models[model_name]["probs"] = y_test_probs
class_models[model_name]["preds"] = y_test_predict
class_models[model_name]["Precision_train"] = metrics.precision_score(
y_train, y_train_predict
)
class_models[model_name]["Precision_test"] = metrics.precision_score(
y_test, y_test_predict
)
class_models[model_name]["Recall_train"] = metrics.recall_score(
y_train, y_train_predict
)
class_models[model_name]["Recall_test"] = metrics.recall_score(
y_test, y_test_predict
)
class_models[model_name]["Accuracy_train"] = metrics.accuracy_score(
y_train, y_train_predict
)
class_models[model_name]["Accuracy_test"] = metrics.accuracy_score(
y_test, y_test_predict
)
class_models[model_name]["ROC_AUC_test"] = metrics.roc_auc_score(
y_test, y_test_probs
)
class_models[model_name]["F1_train"] = metrics.f1_score(y_train, y_train_predict)
class_models[model_name]["F1_test"] = metrics.f1_score(y_test, y_test_predict)
class_models[model_name]["MCC_test"] = metrics.matthews_corrcoef(
y_test, y_test_predict
)
class_models[model_name]["Cohen_kappa_test"] = metrics.cohen_kappa_score(
y_test, y_test_predict
)
class_models[model_name]["Confusion_matrix"] = metrics.confusion_matrix(
y_test, y_test_predict
)
Сводная таблица оценок качества для использованных моделей классификации¶
Матрица неточностей
from sklearn.metrics import ConfusionMatrixDisplay
import matplotlib.pyplot as plt
_, ax = plt.subplots(int(len(class_models) / 2), 2, figsize=(12, 10), sharex=False, sharey=False)
for index, key in enumerate(class_models.keys()):
c_matrix = class_models[key]["Confusion_matrix"]
disp = ConfusionMatrixDisplay(
confusion_matrix=c_matrix, display_labels=["Less", "More"]
).plot(ax=ax.flat[index])
disp.ax_.set_title(key)
plt.subplots_adjust(top=1, bottom=0, hspace=0.4, wspace=0.1)
plt.show()
Точность, полнота, верность (аккуратность), F-мера
class_metrics = pd.DataFrame.from_dict(class_models, "index")[
[
"Precision_train",
"Precision_test",
"Recall_train",
"Recall_test",
"Accuracy_train",
"Accuracy_test",
"F1_train",
"F1_test",
]
]
class_metrics.sort_values(
by="Accuracy_test", ascending=False
).style.background_gradient(
cmap="plasma",
low=0.3,
high=1,
subset=["Accuracy_train", "Accuracy_test", "F1_train", "F1_test"],
).background_gradient(
cmap="viridis",
low=1,
high=0.3,
subset=[
"Precision_train",
"Precision_test",
"Recall_train",
"Recall_test",
],
)
ROC-кривая, каппа Коэна, коэффициент корреляции Мэтьюса
class_metrics = pd.DataFrame.from_dict(class_models, "index")[
[
"Accuracy_test",
"F1_test",
"ROC_AUC_test",
"Cohen_kappa_test",
"MCC_test",
]
]
class_metrics.sort_values(by="ROC_AUC_test", ascending=False).style.background_gradient(
cmap="plasma",
low=0.3,
high=1,
subset=[
"ROC_AUC_test",
"MCC_test",
"Cohen_kappa_test",
],
).background_gradient(
cmap="viridis",
low=1,
high=0.3,
subset=[
"Accuracy_test",
"F1_test",
],
)
best_model = str(class_metrics.sort_values(by="MCC_test", ascending=False).iloc[0].name)
display(best_model)
Вывод данных с ошибкой предсказания для оценки¶
preprocessing_result = pipeline_end.transform(X_test)
preprocessed_df = pd.DataFrame(
preprocessing_result,
columns=pipeline_end.get_feature_names_out(),
)
y_pred = class_models[best_model]["preds"]
# Cравнение реальных значений (y_test["above_average_price"]) с предсказанными значениями (y_pred)
# на тестовых данных
error_index = y_test[y_test["above_average_price"] != y_pred].index.tolist()
display(f"Error items count: {len(error_index)}")
error_predicted = pd.Series(y_pred, index=y_test.index).loc[error_index]
error_df = X_test.loc[error_index].copy()
error_df.insert(loc=1, column="Predicted", value=error_predicted)
error_df.sort_index()
Пример использования обученной модели (конвейера) для предсказания¶
model = class_models[best_model]["pipeline"]
example_id = 626059335
test = pd.DataFrame(X_test.loc[example_id, :]).T
test_preprocessed = pd.DataFrame(preprocessed_df.loc[example_id, :]).T
display(test)
display(test_preprocessed)
result_proba = model.predict_proba(test)[0]
result = model.predict(test)[0]
real = int(y_test.loc[example_id].values[0])
display(f"predicted: {result} (proba: {result_proba})")
display(f"real: {real}")
Подбор гиперпараметров методом поиска по сетке¶
from sklearn.model_selection import GridSearchCV
optimized_model_type = "random_forest"
random_forest_model = class_models[optimized_model_type]["pipeline"]
param_grid = {
"model__n_estimators": [10, 20, 30, 40, 50, 100, 150, 200, 250, 500],
"model__max_features": ["sqrt", "log2", 2],
"model__max_depth": [2, 3, 4, 5, 6, 7, 8, 9 ,10],
"model__criterion": ["gini", "entropy", "log_loss"],
}
gs_optomizer = GridSearchCV(
estimator=random_forest_model, param_grid=param_grid, n_jobs=-1
)
gs_optomizer.fit(X_train, y_train.values.ravel())
gs_optomizer.best_params_
Обучение модели с новыми гиперпараметрами
optimized_model = ensemble.RandomForestClassifier(
random_state=random_state,
criterion="gini",
max_depth=7,
max_features="sqrt",
n_estimators=30,
)
result = {}
result["pipeline"] = Pipeline([("pipeline", pipeline_end), ("model", optimized_model)]).fit(X_train, y_train.values.ravel())
result["train_preds"] = result["pipeline"].predict(X_train)
result["probs"] = result["pipeline"].predict_proba(X_test)[:, 1]
result["preds"] = np.where(result["probs"] > 0.5, 1, 0)
result["Precision_train"] = metrics.precision_score(y_train, result["train_preds"])
result["Precision_test"] = metrics.precision_score(y_test, result["preds"])
result["Recall_train"] = metrics.recall_score(y_train, result["train_preds"])
result["Recall_test"] = metrics.recall_score(y_test, result["preds"])
result["Accuracy_train"] = metrics.accuracy_score(y_train, result["train_preds"])
result["Accuracy_test"] = metrics.accuracy_score(y_test, result["preds"])
result["ROC_AUC_test"] = metrics.roc_auc_score(y_test, result["probs"])
result["F1_train"] = metrics.f1_score(y_train, result["train_preds"])
result["F1_test"] = metrics.f1_score(y_test, result["preds"])
result["MCC_test"] = metrics.matthews_corrcoef(y_test, result["preds"])
result["Cohen_kappa_test"] = metrics.cohen_kappa_score(y_test, result["preds"])
result["Confusion_matrix"] = metrics.confusion_matrix(y_test, result["preds"])
Формирование данных для оценки старой и новой версии модели
optimized_metrics = pd.DataFrame(columns=list(result.keys()))
optimized_metrics.loc[len(optimized_metrics)] = pd.Series(
data=class_models[optimized_model_type]
)
optimized_metrics.loc[len(optimized_metrics)] = pd.Series(
data=result
)
optimized_metrics.insert(loc=0, column="Name", value=["Old", "New"])
optimized_metrics = optimized_metrics.set_index("Name")
Оценка параметров старой и новой модели
optimized_metrics[
[
"Precision_train",
"Precision_test",
"Recall_train",
"Recall_test",
"Accuracy_train",
"Accuracy_test",
"F1_train",
"F1_test",
]
].style.background_gradient(
cmap="plasma",
low=0.3,
high=1,
subset=["Accuracy_train", "Accuracy_test", "F1_train", "F1_test"],
).background_gradient(
cmap="viridis",
low=1,
high=0.3,
subset=[
"Precision_train",
"Precision_test",
"Recall_train",
"Recall_test",
],
)
optimized_metrics[
[
"Accuracy_test",
"F1_test",
"ROC_AUC_test",
"Cohen_kappa_test",
"MCC_test",
]
].style.background_gradient(
cmap="plasma",
low=0.3,
high=1,
subset=[
"ROC_AUC_test",
"MCC_test",
"Cohen_kappa_test",
],
).background_gradient(
cmap="viridis",
low=1,
high=0.3,
subset=[
"Accuracy_test",
"F1_test",
],
)
_, ax = plt.subplots(1, 2, figsize=(10, 4), sharex=False, sharey=False
)
for index in range(0, len(optimized_metrics)):
c_matrix = optimized_metrics.iloc[index]["Confusion_matrix"]
disp = ConfusionMatrixDisplay(
confusion_matrix=c_matrix, display_labels=["Less", "More"]
).plot(ax=ax.flat[index])
plt.subplots_adjust(top=1, bottom=0, hspace=0.4, wspace=0.3)
plt.show()