import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns import sklearn from sklearn.preprocessing import (LabelEncoder, StandardScaler, MinMaxScaler, RobustScaler) from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold, learning_curve, ShuffleSplit from sklearn.model_selection import cross_val_predict as cvp from sklearn import metrics from sklearn.metrics import mean_absolute_error, mean_squared_error, accuracy_score, confusion_matrix, explained_variance_score from sklearn.tree import DecisionTreeClassifier, plot_tree from sklearn.ensemble import RandomForestClassifier def str_features_to_numeric(data): # Преобразовывает все строковые признаки в числовые. # Определение категориальных признаков categorical_columns = [] numerics = ['int8', 'int16', 'int32', 'int64', 'float16', 'float32', 'float64'] features = data.columns.values.tolist() for col in features: if data[col].dtype in numerics: continue categorical_columns.append(col) # Кодирование категориальных признаков for col in categorical_columns: if col in data.columns: le = LabelEncoder() le.fit(list(data[col].astype(str).values)) data[col] = le.transform(list(data[col].astype(str).values)) return data def fe_creation(df): # Feature engineering (FE) df['age2'] = df['age']//10 df['trestbps2'] = df['trestbps']//10 df['chol2'] = df['chol']//60 df['thalch2'] = df['thalch']//40 df['oldpeak2'] = df['oldpeak']//0.4 for i in ['sex', 'age2', 'fbs', 'restecg', 'exang']: for j in ['cp','trestbps2', 'chol2', 'thalch2', 'oldpeak2', 'slope']: df[i + "_" + j] = df[i].astype('str') + "_" + df[j].astype('str') return df def acc_d(y_meas, y_pred): # Относительная погрешность между прогнозируемыми значениями y_pred и измеренными значениями y_meas return mean_absolute_error(y_meas, y_pred)*len(y_meas)/sum(abs(y_meas)) def acc_rmse(y_meas, y_pred): # Среднеквадратичная ошибка между прогнозируемыми значениями y_pred и измеренными значениями y_meas return (mean_squared_error(y_meas, y_pred))**0.5 def plot_cm(train_target, train_target_pred, valid_target, valid_target_pred, title): # Построение матриц ошибок def cm_calc(y_true, y_pred): cm = confusion_matrix(y_true, y_pred, labels=np.unique(y_true)) cm_sum = np.sum(cm, axis=1, keepdims=True) cm_perc = cm / cm_sum.astype(float) * 100 annot = np.empty_like(cm).astype(str) nrows, ncols = cm.shape for i in range(nrows): for j in range(ncols): c = cm[i, j] p = cm_perc[i, j] if i == j: s = cm_sum[i] annot[i, j] = '%.1f%%\n%d/%d' % (p, c, s) elif c == 0: annot[i, j] = '' else: annot[i, j] = '%.1f%%\n%d' % (p, c) cm = pd.DataFrame(cm, index=np.unique(y_true), columns=np.unique(y_true)) cm.index.name = 'Actual' cm.columns.name = 'Predicted' return cm, annot # Построение матриц ошибок fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(16, 6), sharex=True) # Обучающие данные ax = axes[0] ax.set_title("for training data") cm0, annot0 = cm_calc(train_target, train_target_pred) sns.heatmap(cm0, cmap= "YlGnBu", annot=annot0, fmt='', ax=ax) # Тестовые данные ax = axes[1] ax.set_title("for test (validation) data") cm1, annot1 = cm_calc(valid_target, valid_target_pred) sns.heatmap(cm1, cmap= "YlGnBu", annot=annot1, fmt='', ax=ax) fig.suptitle(f'CONFUSION MATRICES for {title}') plt.savefig(f'CONFUSION MATRICES for {title}.png') plt.show() def acc_metrics_calc(num, acc_all, model, train, valid, train_target, valid_target, title): # Этап выбора моделей # Расчет точности модели по различным показателям ytrain = model.predict(train).astype(int) yvalid = model.predict(valid).astype(int) print('train_target = ', train_target[:5].values) print('ytrain = ', ytrain[:5]) print('valid_target =', valid_target[:5].values) print('yvalid =', yvalid[:5]) num_acc = 0 for x in metrics_now: if x == 1: #критерий точности score acc_train = round(metrics.accuracy_score(train_target, ytrain), 2) acc_valid = round(metrics.accuracy_score(valid_target, yvalid), 2) elif x == 2: # rmse критерий acc_train = round(acc_rmse(train_target, ytrain), 2) acc_valid = round(acc_rmse(valid_target, yvalid), 2) elif x == 3: # критерий относительной погрешности acc_train = round(acc_d(train_target, ytrain) * 100, 2) acc_valid = round(acc_d(valid_target, yvalid) * 100, 2) print('acc of', metrics_all[x], 'for train =', acc_train) print('acc of', metrics_all[x], 'for valid =', acc_valid) acc_all[num_acc].append(acc_train) #train acc_all[num_acc+1].append(acc_valid) #valid num_acc += 2 # Построение матриц plot_cm(train_target, ytrain, valid_target, yvalid, title) return acc_all def plot_feature_importance(feature_importances, feature_names, model_name): import matplotlib.pyplot as plt import seaborn as sns # Создание цветовой палитры colors = sns.color_palette('viridis', len(feature_importances)) # Сортировка индексов важностей признаков indices = feature_importances.argsort()[::-1] # Создание стильного барплота plt.figure(figsize=(12, 8)) ax = sns.barplot(x=feature_importances[indices], y=feature_names[indices], palette=colors) # Добавление декораций plt.xlabel('Важность признака', fontsize=14) plt.ylabel('Признаки', fontsize=14) plt.title(f'Важность признаков в модели {model_name}', fontsize=16) plt.xticks(fontsize=12) plt.yticks(fontsize=12) # Добавление цветовой шкалы и ее описания cbar = plt.colorbar(plt.cm.ScalarMappable(cmap='viridis'), ax=ax) cbar.set_label('Уровень важности', rotation=270, labelpad=15, fontsize=12) # Добавление сетки для лучшей читаемости plt.grid(axis='x', linestyle='--', alpha=0.6) # Сохранение графика в файл plt.savefig('feature_importance_plot.png', bbox_inches='tight') # Отображение графика plt.savefig(f'feature_importances_{model_name}.png') plt.show() def plot_learning_curve(estimator, title, X, y, cv=None, axes=None, ylim=None, n_jobs=None, train_sizes=np.linspace(.1, 1.0, 5), random_state=0): fig, axes = plt.subplots(2, 1, figsize=(20, 10)) if axes is None: _, axes = plt.subplots(1, 2, figsize=(20, 5)) axes[0].set_title(title) if ylim is not None: axes[0].set_ylim(*ylim) axes[0].set_xlabel("Training examples") axes[0].set_ylabel("Score") cv_train = ShuffleSplit(n_splits=cv_n_split, test_size=test_train_split_part, random_state=random_state) train_sizes, train_scores, test_scores, fit_times, _ = \ learning_curve(estimator=estimator, X=X, y=y, cv=cv, train_sizes=train_sizes, return_times=True) train_scores_mean = np.mean(train_scores, axis=1) train_scores_std = np.std(train_scores, axis=1) test_scores_mean = np.mean(test_scores, axis=1) test_scores_std = np.std(test_scores, axis=1) fit_times_mean = np.mean(fit_times, axis=1) fit_times_std = np.std(fit_times, axis=1) # Plot learning curve axes[0].grid() axes[0].fill_between(train_sizes, train_scores_mean - train_scores_std, train_scores_mean + train_scores_std, alpha=0.1, color="r") axes[0].fill_between(train_sizes, test_scores_mean - test_scores_std, test_scores_mean + test_scores_std, alpha=0.1, color="g") axes[0].plot(train_sizes, train_scores_mean, 'o-', color="r", label="Training score") axes[0].plot(train_sizes, test_scores_mean, 'o-', color="g", label="Cross-validation score") axes[0].legend(loc="best") # Plot n_samples vs fit_times axes[1].grid() axes[1].plot(train_sizes, fit_times_mean, 'o-') axes[1].fill_between(train_sizes, fit_times_mean - fit_times_std, fit_times_mean + fit_times_std, alpha=0.1) axes[1].set_xlabel("Training examples") axes[1].set_ylabel("fit_times") axes[1].set_title("Scalability of the model") plt.savefig(f'{title}.png') plt.show() return if __name__ == "__main__": # Загрузка данных # Преобразование данных и предобработка # Обучение моделей Decision Tree Classifier и Random Forest Classifier # Расчет метрик и построение графиков cv_n_split = 5 random_state = 42 test_train_split_part = 0.25 metrics_all = {1: 'acc', 2 : 'rmse', 3 : 're'} metrics_now = [1, 2, 3] data = pd.read_csv("..//heart_disease_uci.csv") data['target'] = data['num'] data = data.drop(columns=['id', 'dataset', 'ca', 'thal', 'num']) data = data[(data['chol'] <= 420) & (data['oldpeak'] >=0) & (data['oldpeak'] <=4)].reset_index(drop=True) data = data.dropna().reset_index(drop=True) print(data.info()) data = str_features_to_numeric(data) data = data[data['target'].isin([0, 1])] # приволим столбец с целевыми значениями к бинарному виду data = fe_creation(data) data = str_features_to_numeric(data) dataset = data.copy() # original data target_name = 'target' target = data.pop(target_name) # Model standartization # The standard score of a sample x is calculated as: # z = (x - мат.ож.) / (стандартное отклонение) scaler = StandardScaler() data = pd.DataFrame(scaler.fit_transform(data), columns = data.columns) train, valid, train_target, valid_target = train_test_split(data, target, test_size=test_train_split_part, random_state=random_state) # list of accuracy of all model - amount of metrics_now * 2 (train & valid datasets) num_models = 6 acc_train = [] acc_valid = [] acc_all = np.empty((len(metrics_now)*2, 0)).tolist() acc_all acc_all_pred = np.empty((len(metrics_now), 0)).tolist() acc_all_pred cv_train = ShuffleSplit(n_splits=cv_n_split, test_size=test_train_split_part, random_state=random_state) decision_tree = DecisionTreeClassifier() param_grid = {'min_samples_leaf': [i for i in range(2,12)]} decision_tree_CV = GridSearchCV(decision_tree, param_grid=param_grid, cv=cv_train, verbose=False) decision_tree_CV.fit(train, train_target) print(decision_tree_CV.best_params_) acc_all = acc_metrics_calc(0, acc_all, decision_tree_CV, train, valid, train_target, valid_target, title="Decision Tree Classifier") plot_learning_curve(decision_tree_CV, "Decision Tree", train, train_target, cv=cv_train) feature_importances_dt = decision_tree_CV.best_estimator_.feature_importances_ plot_feature_importance(feature_importances_dt, data.columns, "Decision Tree")