424 KiB
Бизнес цели¶
- Прогнозирование цен на акции Tesla на основе действий инсайдеров: Одна из ключевых бизнес-целей состоит в создании модели для прогнозирования динамики акций Tesla, используя данные о транзакциях инсайдеров. Поскольку инсайдеры обладают глубоким знанием внутреннего состояния компании, их действия могут предсказывать изменения в стоимости акций. На основе анализа паттернов и частоты инсайдерских покупок и продаж можно разработать предсказательную модель, которая поможет инвесторам и аналитикам принимать более обоснованные решения.
- Анализ влияния транзакций инсайдеров на динамику цены акций Tesla для оценки краткосрочных и долгосрочных рисков: Цель – исследовать, как действия инсайдеров (особенно крупных акционеров и ключевых лиц) влияют на цену акций Tesla. Выявление корреляций между объёмом, типом и частотой инсайдерских сделок и изменениями цены акций позволит оценить риски и тенденции в динамике акций.
Цель технического проекта: Разработка модели машинного обучения для прогнозирования будущих продаж акций топ-менеджментом компании, а также анализ влияния транзакций инсайдеров на динамику цены акций Tesla для оценки краткосрочных и долгосрочных рисков.
from typing import Any
from math import ceil
import time
import pandas as pd
from pandas import DataFrame, Series
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.linear_model import LinearRegression
from sklearn.metrics import root_mean_squared_error, r2_score, mean_absolute_error
from sklearn.ensemble import RandomForestRegressor
from imblearn.over_sampling import SMOTE
import featuretools as ft
from featuretools.entityset.entityset import EntitySet
import matplotlib.pyplot as plt
df: DataFrame = pd.read_csv("static/csv/TSLA.csv")
Конвертация данных:
# Преобразование типов данных
df['Insider Trading'] = df['Insider Trading'].astype('category')
df['Relationship'] = df['Relationship'].astype('category')
df['Transaction'] = df['Transaction'].astype('category')
df['Cost'] = pd.to_numeric(df['Cost'], errors='coerce')
df['Shares'] = pd.to_numeric(df['Shares'].str.replace(',', ''), errors='coerce')
df['Value ($)'] = pd.to_numeric(df['Value ($)'].str.replace(',', ''), errors='coerce')
df['Shares Total'] = pd.to_numeric(df['Shares Total'].str.replace(',', ''), errors='coerce')
print('Выборка данных:')
df.head(10)
Проблема пропущенных данных:
Проверка на отсутствие значений, представленная ниже, показала, что DataFrame не имеет пустых значений признаков. Нет необходимости использовать методы заполнения пропущенных данных.
# Проверка пропущенных данных
def check_null_columns(dataframe: DataFrame) -> None:
# Присутствуют ли пустые значения признаков
print('Присутствуют ли пустые значения признаков в колонке:')
print(dataframe.isnull().any(), '\n')
# Количество пустых значений признаков
print('Количество пустых значений признаков в колонке:')
print(dataframe.isnull().sum(), '\n')
# Процент пустых значений признаков
print('Процент пустых значений признаков в колонке:')
for column in dataframe.columns:
null_rate: float = dataframe[column].isnull().sum() / len(dataframe) * 100
if null_rate > 0:
print(f"{column} процент пустых значений: {null_rate:.2f}%")
print()
# Проверка пропущенных данных
check_null_columns(df)
Проблема зашумленности данных
Зашумленность – это наличие случайных ошибок или вариаций в данных, которые могут затруднить выявление истинных закономерностей. В свою очередь выбросы - это значения, которые значительно отличаются от остальных наблюдений в наборе данных Представленный ниже код помогает определить наличие выбросов в наборе данных и устранить их (при наличии), заменив значения ниже нижней границы (рассматриваемого минимума) на значения нижней границы, а значения выше верхней границы (рассматриваемого максимума) – на значения верхней границы.
# Проверка выбросов в DataFrame
def check_outliers(dataframe: DataFrame, columns: list[str]) -> None:
for column in columns:
if not pd.api.types.is_numeric_dtype(dataframe[column]): # Проверяем, является ли колонка числовой
continue
Q1: float = dataframe[column].quantile(0.25) # 1-й квартиль (25%)
Q3: float = dataframe[column].quantile(0.75) # 3-й квартиль (75%)
IQR: float = Q3 - Q1 # Вычисляем межквартильный размах
# Определяем границы для выбросов
lower_bound: float = Q1 - 1.5 * IQR # Нижняя граница
upper_bound: float = Q3 + 1.5 * IQR # Верхняя граница
# Подсчитываем количество выбросов
outliers: DataFrame = dataframe[(dataframe[column] < lower_bound) | (dataframe[column] > upper_bound)]
outlier_count: int = outliers.shape[0]
print(f"Колонка {column}:")
print(f"\tЕсть выбросы: {'Да' if outlier_count > 0 else 'Нет'}")
print(f"\tКоличество выбросов: {outlier_count}")
print(f"\tМинимальное значение: {dataframe[column].min()}")
print(f"\tМаксимальное значение: {dataframe[column].max()}")
print(f"\t1-й квартиль (Q1): {Q1}")
print(f"\t3-й квартиль (Q3): {Q3}\n")
# Визуализация выбросов
def visualize_outliers(dataframe: DataFrame, columns: list[str]) -> None:
# Диаграммы размахов
plt.figure(figsize=(15, 10))
rows: int = ceil(len(columns) / 3)
for index, column in enumerate(columns, 1):
plt.subplot(rows, 3, index)
plt.boxplot(dataframe[column], vert=True, patch_artist=True)
plt.title(f"Диаграмма размахов для \"{column}\"")
plt.xlabel(column)
# Отображение графиков
plt.tight_layout()
plt.show()
# Числовые столбцы DataFrame
numeric_columns: list[str] = [
'Cost',
'Shares',
'Value ($)',
'Shares Total'
]
# Проверка наличия выбросов в колонках
print('Проверка наличия выбросов в колонках:')
check_outliers(df, numeric_columns)
visualize_outliers(df, numeric_columns)
Устраняем выбросы и проводим проверку на их устранение
# Устранить выборсы в DataFrame
def remove_outliers(dataframe: DataFrame, columns: list[str]) -> DataFrame:
for column in columns:
if not pd.api.types.is_numeric_dtype(dataframe[column]): # Проверяем, является ли колонка числовой
continue
Q1: float = dataframe[column].quantile(0.25) # 1-й квартиль (25%)
Q3: float = dataframe[column].quantile(0.75) # 3-й квартиль (75%)
IQR: float = Q3 - Q1 # Вычисляем межквартильный размах
# Определяем границы для выбросов
lower_bound: float = Q1 - 1.5 * IQR # Нижняя граница
upper_bound: float = Q3 + 1.5 * IQR # Верхняя граница
# Устраняем выбросы:
# Заменяем значения ниже нижней границы на нижнюю границу
# А значения выше верхней границы – на верхнюю
dataframe[column] = dataframe[column].apply(lambda x: lower_bound if x < lower_bound else upper_bound if x > upper_bound else x)
return dataframe
# Устраняем выборсы
df: DataFrame = remove_outliers(df, numeric_columns)
# Проверка наличия выбросов в колонках
print('Проверка наличия выбросов в колонках после их устранения:')
check_outliers(df, numeric_columns)
visualize_outliers(df, numeric_columns)
Разбиение набора данных на выборки:
Обучающая выборка (60-80%). Обучение модели (подбор коэффициентов некоторой математической функции для аппроксимации). Контрольная выборка (10-20%). Выбор метода обучения, настройка гиперпараметров. Тестовая выборка (10-20% или 20-30%). Оценка качества модели перед передачей заказчику.
Данные должны быть сбалансированными, чтобы достичь этого воспользуемся методами аугментации данных. В данном случае воспользуемся методом oversampling.
# Функция для создания выборок
def split_stratified_into_train_val_test(
df_input,
stratify_colname="y",
frac_train=0.6,
frac_val=0.15,
frac_test=0.25,
random_state=None,
) -> tuple[Any, Any, Any]:
if frac_train + frac_val + frac_test != 1.0:
raise ValueError(
"fractions %f, %f, %f do not add up to 1.0"
% (frac_train, frac_val, frac_test)
)
if stratify_colname not in df_input.columns:
raise ValueError("%s is not a column in the dataframe" % (stratify_colname))
X: DataFrame = df_input
y: DataFrame = df_input[
[stratify_colname]
]
df_train, df_temp, y_train, y_temp = train_test_split(
X, y,
stratify=y,
test_size=(1.0 - frac_train),
random_state=random_state
)
relative_frac_test: float = frac_test / (frac_val + frac_test)
df_val, df_test, y_val, y_test = train_test_split(
df_temp,
y_temp,
stratify=y_temp,
test_size=relative_frac_test,
random_state=random_state,
)
assert len(df_input) == len(df_train) + len(df_val) + len(df_test)
return df_train, df_val, df_test
# Оценка сбалансированности
def check_balance(dataframe: DataFrame, dataframe_name: str, column: str) -> None:
counts: Series[int] = dataframe[column].value_counts()
print(dataframe_name + ": ", dataframe.shape)
print(f"Распределение выборки данных по классам в колонке \"{column}\":\n", counts)
total_count: int = len(dataframe)
for value in counts.index:
percentage: float = counts[value] / total_count * 100
print(f"Процент объектов класса \"{value}\": {percentage:.2f}%")
print()
# Определение необходимости аугментации данных
def need_augmentation(dataframe: DataFrame,
column: str,
first_value: Any, second_value: Any) -> bool:
counts: Series[int] = dataframe[column].value_counts()
ratio: float = counts[first_value] / counts[second_value]
return ratio > 1.5 or ratio < 0.67
# Визуализация сбалансированности классов
def visualize_balance(dataframe_train: DataFrame,
dataframe_val: DataFrame,
dataframe_test: DataFrame,
column: str) -> None:
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# Обучающая выборка
counts_train: Series[int] = dataframe_train[column].value_counts()
axes[0].pie(counts_train, labels=counts_train.index, autopct='%1.1f%%', startangle=90)
axes[0].set_title(f"Распределение классов \"{column}\" в обучающей выборке")
# Контрольная выборка
counts_val: Series[int] = dataframe_val[column].value_counts()
axes[1].pie(counts_val, labels=counts_val.index, autopct='%1.1f%%', startangle=90)
axes[1].set_title(f"Распределение классов \"{column}\" в контрольной выборке")
# Тестовая выборка
counts_test: Series[int] = dataframe_test[column].value_counts()
axes[2].pie(counts_test, labels=counts_test.index, autopct='%1.1f%%', startangle=90)
axes[2].set_title(f"Распределение классов \"{column}\" в тренировочной выборке")
# Отображение графиков
plt.tight_layout()
plt.show()
# Унитарное кодирование категориальных признаков (one-hot encoding)
df_encoded: DataFrame = pd.get_dummies(df)
# Вывод распределения количества наблюдений по меткам (классам)
print('Распределение количества наблюдений по меткам (классам):')
print(df_encoded['Cost'].value_counts(), '\n')
# Статистическое описание целевого признака
print('Статистическое описание целевого признака:')
print(df_encoded['Cost'].describe().transpose(), '\n')
# Определим границы для каждой категории стоимости акций
bins: list[float] = [df_encoded['Cost'].min() - 1,
df_encoded['Cost'].quantile(0.25),
df_encoded['Cost'].quantile(0.75),
df_encoded['Cost'].max() + 1]
labels: list[str] = ['low', 'medium', 'high']
# Создаем новую колонку с категориями стоимости акций
df_encoded['Cost_category'] = pd.cut(df_encoded['Cost'], bins=bins, labels=labels)
# Вывод распределения количества наблюдений по меткам (классам)
print('Распределение количества наблюдений по меткам (классам):')
print(df_encoded['Cost_category'].value_counts(), '\n')
df_train, df_val, df_test = split_stratified_into_train_val_test(
df_encoded,
stratify_colname="Cost_category",
frac_train=0.60,
frac_val=0.20,
frac_test=0.20
)
# Проверка сбалансированности выборок
print('Проверка сбалансированности выборок:')
check_balance(df_train, 'Обучающая выборка', 'Cost_category')
check_balance(df_val, 'Контрольная выборка', 'Cost_category')
check_balance(df_test, 'Тестовая выборка', 'Cost_category')
# Проверка необходимости аугментации выборок
print('Проверка необходимости аугментации выборок:')
print(f"Для обучающей выборки аугментация данных {'не ' if not need_augmentation(df_train, 'Cost_category', 'low', 'medium') else ''}требуется")
print(f"Для контрольной выборки аугментация данных {'не ' if not need_augmentation(df_val, 'Cost_category', 'low', 'medium') else ''}требуется")
print(f"Для тестовой выборки аугментация данных {'не ' if not need_augmentation(df_test, 'Cost_category', 'low', 'medium') else ''}требуется")
# Визуализация сбалансированности классов
visualize_balance(df_train, df_val, df_test, 'Cost_category')
Необходимо применить аугментацию выборки с избытком (oversampling) – копирование наблюдений или генерация новых наблюдений на основе существующих с помощью алгоритмов SMOTE и ADASYN (нахождение k-ближайших соседей).
# Метод приращения с избытком (oversampling)
def oversample(df: DataFrame, column: str) -> DataFrame:
X: DataFrame = pd.get_dummies(df.drop(column, axis=1))
y: DataFrame = df[column] # type: ignore
smote = SMOTE()
X_resampled, y_resampled = smote.fit_resample(X, y) # type: ignore
df_resampled: DataFrame = pd.concat([X_resampled, y_resampled], axis=1)
return df_resampled
# Приращение данных (oversampling)
df_train_oversampled: DataFrame = oversample(df_train, 'Cost_category')
df_val_oversampled: DataFrame = oversample(df_val, 'Cost_category')
df_test_oversampled: DataFrame = oversample(df_test, 'Cost_category')
# Проверка сбалансированности выборок
print('Проверка сбалансированности выборок после применения метода oversampling:')
check_balance(df_train_oversampled, 'Обучающая выборка', 'Cost_category')
check_balance(df_val_oversampled, 'Контрольная выборка', 'Cost_category')
check_balance(df_test_oversampled, 'Тестовая выборка', 'Cost_category')
# Проверка необходимости аугментации выборок
print('Проверка необходимости аугментации выборок после применения метода oversampling:')
print(f"Для обучающей выборки аугментация данных {'не ' if not need_augmentation(df_train_oversampled, 'Cost_category', 'low', 'medium') else ''}требуется")
print(f"Для контрольной выборки аугментация данных {'не ' if not need_augmentation(df_val_oversampled, 'Cost_category', 'low', 'medium') else ''}требуется")
print(f"Для тестовой выборки аугментация данных {'не ' if not need_augmentation(df_test_oversampled, 'Cost_category', 'low', 'medium') else ''}требуется")
# Визуализация сбалансированности классов
visualize_balance(df_train_oversampled, df_val_oversampled, df_test_oversampled, 'Cost_category')
Конструирование признаков:
Конструирование признаков - определением признаков, которые войду в нашу обучающую модель
Будем использовать метод конструирования признаков "Унитарное кодирование категориальных признаков" или one-hot-encoding. Он необходим для преобразования категориальных переменных в числовой формат.
df_encoded.head(10)
Дискретизация числовых признаков – процесс преобразования непрерывных числовых значений в категориальные группы или интервалы (дискретные значения).
В данном случае преобразование числовой колонки "Cost" уже было выполнено ранее для стратифицированного разбиения исходных данных на выборки (обучающую, контрольную и тестовую). Для этого использовался метод квартильной группировки.
print('Обучающая выборка:')
df_train_oversampled[['Cost', 'Cost_category']].head(10)
«Ручной» синтез признаков – процесс создания новых признаков на основе существующих данных. Это может включать в себя комбинирование нескольких признаков, использование математических операций (например, сложение, вычитание), а также создание полиномиальных или логарифмических признаков.
df['Date'] = pd.to_datetime(df['Date']) # Преобразование в datetime
df['Year'] = df['Date'].dt.year # Год
df['Quarter'] = df['Date'].dt.quarter # Квартал
df['Month'] = df['Date'].dt.month # Месяц
df[['Date', 'Year', 'Quarter', 'Month']].head(10)
Ну и наконец, масштабирование признаков на основе нормировки и стандартизации – метод, который позволяет привести все числовые признаки к одинаковым или очень похожим диапазонам значений либо распределениям.
# scaler = MinMaxScaler()
scaler = StandardScaler()
# Применяем масштабирование к выбранным признакам
df[numeric_columns] = scaler.fit_transform(df[numeric_columns])
df[numeric_columns].head(10)
FeatureTools - библиотека для автоматизированного создания признаков из структурированных данных.
df: DataFrame = pd.read_csv("static/csv/TSLA.csv")
# Создание уникального идентификатора для каждой строки
df['Id'] = range(1, len(df) + 1)
# Создание EntitySet
es = ft.EntitySet(id="Id")
# Добавляем таблицу с индексом
es: EntitySet = es.add_dataframe(
dataframe_name="trades",
dataframe=df,
index="Id",
time_index="Date"
)
# Генерация признаков с помощью глубокого синтеза признаков
feature_matrix, feature_defs = ft.dfs(
entityset=es,
target_dataframe_name='trades',
max_depth=1
)
# Выводим первые 10 строк сгенерированного набора признаков
feature_matrix.head(10)
Оценка качества набора признаков:
Предсказательная способность: Способность набора признаков успешно прогнозировать целевую переменную. Это определяется через метрики, такие как RMSE, MAE, R², которые показывают, насколько хорошо модель использует признаки для достижения точных результатов.
Скорость вычисления: Время, необходимое для обработки данных и выполнения алгоритмов машинного обучения.
Надежность: Устойчивость и воспроизводимость результатов при изменении входных данных.
Корреляция: Степень взаимосвязи между признаками и целевой переменной, а также между самими признаками. Высокая корреляция с целевой переменной указывает на потенциальную предсказательную силу, тогда как высокая взаимосвязь между самими признаками может приводить к многоколлинеарности и снижению эффективности модели.
Цельность: Не является производным от других признаков.
# Разбить выборку на входные данные и целевой признак
def split_dataframe(dataframe: DataFrame, column: str) -> tuple[DataFrame, DataFrame]:
X_dataframe: DataFrame = dataframe.drop(columns=column, axis=1)
y_dataframe: DataFrame = dataframe[column]
return X_dataframe, y_dataframe
# Разбиение обучающей выборки на входные данные и целевой признак
df_train_oversampled: DataFrame = pd.get_dummies(df_train_oversampled)
X_df_train, y_df_train = split_dataframe(df_train_oversampled, "Cost")
# Разбиение контрольной выборки на входные данные и целевой признак
df_val_oversampled: DataFrame = pd.get_dummies(df_val_oversampled)
X_df_val, y_df_val = split_dataframe(df_val_oversampled, "Cost")
# Разбиение тестовой выборки на входные данные и целевой признак
df_test_oversampled: DataFrame = pd.get_dummies(df_test_oversampled)
X_df_test, y_df_test = split_dataframe(df_test_oversampled, "Cost")
# Модель линейной регрессии для обучения
model = LinearRegression()
# Начинаем отсчет времени
start_time: float = time.time()
model.fit(X_df_train, y_df_train)
# Время обучения модели
train_time: float = time.time() - start_time
# Предсказания и оценка модели
predictions = model.predict(X_df_val)
mse = root_mean_squared_error(y_df_val, predictions)
print(f'Время обучения модели: {train_time:.2f} секунд')
print(f'Среднеквадратичная ошибка: {mse:.2f}')
# Модель случайного леса для обучения
model = RandomForestRegressor()
# Обучение модели
model.fit(X_df_train, y_df_train)
# Предсказание и оценка
y_predictions = model.predict(X_df_test)
rmse = root_mean_squared_error(y_df_test, y_predictions)
r2 = r2_score(y_df_test, y_predictions)
mae = mean_absolute_error(y_df_test, y_predictions)
print(f"RMSE: {rmse}")
print(f"R²: {r2}")
print(f"MAE: {mae}\n")
# Кросс-валидация
scores = cross_val_score(model, X_df_train, y_df_train, cv=5, scoring='neg_mean_squared_error')
rmse_cv = (-scores.mean())**0.5
print(f"Кросс-валидация RMSE: {rmse_cv}\n")
# Анализ важности признаков
feature_importances = model.feature_importances_
feature_names = X_df_train.columns
# Проверка на переобучение
y_train_predictions = model.predict(X_df_train)
rmse_train = root_mean_squared_error(y_df_train, y_train_predictions)
r2_train = r2_score(y_df_train, y_train_predictions)
mae_train = mean_absolute_error(y_df_train, y_train_predictions)
print(f"Train RMSE: {rmse_train}")
print(f"Train R²: {r2_train}")
print(f"Train MAE: {mae_train}\n")
# Визуализация результатов
plt.figure(figsize=(10, 6))
plt.scatter(y_df_test, y_predictions, alpha=0.5)
plt.plot([y_df_test.min(), y_df_test.max()], [y_df_test.min(), y_df_test.max()], 'k--', lw=2)
plt.xlabel('Фактическая стоимость')
plt.ylabel('Прогнозируемая стоимость')
plt.title('Фактическая стоимость по сравнению с прогнозируемой')
Вывод:
- Оценка качества модели на тестовой выборке:
RMSE (Корень из среднеквадратичной ошибки) на тестовой выборке составил 89.71, что указывает на среднюю ошибку в прогнозах. R² (Коэффициент детерминации) равен 0.96, что означает, что модель объясняет 96% дисперсии данных. Это хороший показатель, указывающий на высокую объяснительную способность модели. MAE (Средняя абсолютная ошибка) составила 51.21, показывая среднее абсолютное отклонение предсказаний от фактических значений.
- Результаты кросс-валидации:
RMSE кросс-валидации равен 148.73, что заметно выше значения RMSE на тестовой выборке. Это может свидетельствовать о том, что модель может быть подвержена колебаниям в зависимости от данных и, возможно, склонна к некоторому переобучению.
- Проверка на переобучение:
Метрики на обучающей выборке (RMSE = 49.74, R² = 0.99, MAE = 22.62) значительно лучше, чем на тестовой, что указывает на высокую точность на обучающих данных.