что-то накидала. мини-мини рефакторинг + прогнозирование бартренд + сохранение в бд

This commit is contained in:
dmitry 2024-11-05 02:44:26 +04:00
parent 737af8e01d
commit 68d98c50ac
3 changed files with 161 additions and 116 deletions

View File

@ -1,10 +1,13 @@
#from PyWeather.weather.stations.davis import VantagePro import gc
import logging import logging
import time
from datetime import datetime
import mariadb import mariadb
import serial.tools.list_ports import serial.tools.list_ports
import gc
import time from PyWeather.weather.stations.davis import VantagePro
from pprint import pprint from prediction import run_prediction_module
logging.basicConfig(filename="Stations.log", logging.basicConfig(filename="Stations.log",
format='%(asctime)s %(message)s', format='%(asctime)s %(message)s',
@ -12,16 +15,21 @@ logging.basicConfig(filename="Stations.log",
logger = logging.getLogger('davis_api') logger = logging.getLogger('davis_api')
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
logger.addHandler(console_handler)
def write_data(device, station, send=True): def write_data(device, station, send=True):
try: try:
# device.parse() # device.parse()
data = device.fields data = device.fields
print(data) logger.info(data)
if len(data) < 1: if len(data) < 1:
return return
else: else:
print(data) logger.info(data)
fields = ['BarTrend', 'CRC', 'DateStamp', 'DewPoint', 'HeatIndex', 'ETDay', 'HeatIndex', fields = ['BarTrend', 'CRC', 'DateStamp', 'DewPoint', 'HeatIndex', 'ETDay', 'HeatIndex',
'HumIn', 'HumOut', 'Pressure', 'RainDay', 'RainMonth', 'RainRate', 'RainStorm', 'HumIn', 'HumOut', 'Pressure', 'RainDay', 'RainMonth', 'RainRate', 'RainStorm',
'RainYear', 'SunRise', 'SunSet', 'TempIn', 'TempOut', 'WindDir', 'WindSpeed', 'RainYear', 'SunRise', 'SunSet', 'TempIn', 'TempOut', 'WindDir', 'WindSpeed',
@ -35,7 +43,7 @@ def write_data(device, station, send=True):
cursor.execute(sql, values) cursor.execute(sql, values)
conn.commit() conn.commit()
else: else:
print(data) logger.info(data)
del data del data
del fields del fields
@ -45,6 +53,38 @@ def write_data(device, station, send=True):
raise e raise e
def get_previous_values(cursor):
cursor.execute("SELECT SunRise, SunSet, WindDir FROM weather_data ORDER BY DateStamp DESC LIMIT 1")
result = cursor.fetchone()
if result is None:
return None, None, None
sun_rise, sun_set, wind_dir = result
return sun_rise, sun_set, wind_dir
def save_prediction_to_db(predictions):
try:
current_timestamp = datetime.now()
sun_rise, sun_set, wind_dir = get_previous_values(cursor)
fields = ['DateStamp', 'SunRise', 'SunSet', 'WindDir'] + list(predictions.keys())
placeholders = ', '.join(['%s'] * len(fields))
field_names = ', '.join(fields)
values = [current_timestamp, sun_rise, sun_set, wind_dir] + list(predictions.values())
sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})"
# cursor.execute(sql, values)
# conn.commit()
logger.info("Save prediction to db success!")
except Exception as e:
logger.error(str(e))
raise e
try: try:
conn = mariadb.connect( conn = mariadb.connect(
user="wind", user="wind",
@ -59,25 +99,26 @@ except mariadb.Error as e:
raise e raise e
try: try:
# ports = serial.tools.list_ports.comports() ports = serial.tools.list_ports.comports()
# available_ports = {} available_ports = {}
#
# for port in ports:
# if port.serial_number == '0001':
# available_ports[port.name] = port.vid
#devices = [VantagePro(port) for port in available_ports.keys()] for port in ports:
devices = {} if port.serial_number == '0001':
available_ports[port.name] = port.vid
devices = [VantagePro(port) for port in available_ports.keys()]
while True: while True:
for i in range(1): for i in range(1):
if len(devices) != 0: if len(devices) != 0:
print(devices) logger.info(devices)
# write_data(devices[i], 'st' + str(available_ports[list(available_ports.keys())[i]]), True) # write_data(devices[i], 'st' + str(available_ports[list(available_ports.keys())[i]]), True)
else: else:
print('bashmak') raise Exception("Can't connect to device")
time.sleep(1) time.sleep(1)
except Exception as e: except Exception as e:
logger.error('Device_error: ' + str(e)) logger.error('Device_error: ' + str(e))
raise e predictions = run_prediction_module()
logger.info(predictions)
if predictions is not None:
save_prediction_to_db(predictions)

View File

@ -1,97 +0,0 @@
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
# TODO
# 1.Добавить поля
# -barTrend - перечисление и правило какое-то (смотрим через один если повысилось после steady то rising если уменьшилось после steady то falling)
# -DateStamp - берется с последнего предсказанного + время опроса
# -SunRise - берется с последнего предсказанного
# -SunSet - берется с последнего предсказанного
#
# 12-10-2024 12:00
# 12-10-2024 12:01
#
# 2.Отделить вставку данных в бд от парсера данных с метеостанции
#
# 3.Возвращение результата в бд
# Настройка подключения к базе данных
DATABASE_URI = 'mysql+pymysql://wind:wind@193.124.203.110:3306/wind_towers' # Замените на вашу строку подключения
engine = create_engine(DATABASE_URI)
# Запрос данных из базы данных
query = """
SELECT CRC, DateStamp, DewPoint, HeatIndex, ETDay, HumIn, HumOut,
Pressure, RainDay, RainMonth, RainRate, RainStorm, RainYear,
TempIn, TempOut, WindDir, WindSpeed, WindSpeed10Min
FROM weather_data
WHERE DateStamp >= '2024-10-14 21:00:00' - INTERVAL 12 HOUR;
"""
df = pd.read_sql(query, engine)
# Преобразование даты и установка индекса
df['DateStamp'] = pd.to_datetime(df['DateStamp'])
df.set_index('DateStamp', inplace=True)
df.sort_index(inplace=True)
# Создание временных признаков с использованием pd.concat для минимизации фрагментации
lags = 3
shifted_dfs = [df] # Начинаем с оригинальных данных
# Создаем лаговые признаки
for lag in range(1, lags + 1):
shifted_df = df.shift(lag).add_suffix(f'_t-{lag}')
shifted_dfs.append(shifted_df)
# Объединяем все данные вместе, избегая фрагментации
df_with_lags = pd.concat(shifted_dfs, axis=1)
# Удаляем строки с NaN значениями
df_with_lags.dropna(inplace=True)
df_with_lags = df_with_lags.copy() # Устраняем фрагментацию
# Исключаем все нечисловые столбцы
df_with_lags = df_with_lags.select_dtypes(include=['float64', 'int64'])
# Словарь для хранения моделей и оценок
models = {}
mse_scores = {}
# Обучение модели для каждого целевого поля
for target_column in df.columns[:22]: # Ограничиваем до 22 полей
if target_column not in df_with_lags.columns:
continue # Пропускаем, если колонка отсутствует в данных с лагами
X = df_with_lags.drop(columns=[target_column])
y = df_with_lags[target_column]
# Разделение на обучающую и тестовую выборки
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
# Обучение модели
model = RandomForestRegressor()
model.fit(X_train, y_train)
# Оценка модели
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
mse_scores[target_column] = mse
models[target_column] = model
print(f"MSE для {target_column}: {mse}")
# Предсказание для следующего шага по всем моделям
predictions = {}
last_data = X.iloc[-1].values.reshape(1, -1) # Последняя строка данных для прогноза
for target_column, model in models.items():
predictions[target_column] = model.predict(last_data)[0]
print(f"Предсказание для {target_column}: {predictions[target_column]}")
print(predictions)

101
davisAPI/prediction.py Normal file
View File

@ -0,0 +1,101 @@
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sqlalchemy import create_engine
def run_prediction_module():
engine = create_engine('mysql+pymysql://wind:wind@193.124.203.110:3306/wind_towers')
query = """
SELECT BarTrend, CRC, DateStamp, DewPoint, HeatIndex, ETDay, HumIn, HumOut,
Pressure, RainDay, RainMonth, RainRate, RainStorm, RainYear,
TempIn, TempOut, WindDir, WindSpeed, WindSpeed10Min
FROM weather_data
WHERE DateStamp >= '2024-10-14 21:00:00' - INTERVAL 36 HOUR;
"""
df = pd.read_sql(query, engine)
df['DateStamp'] = pd.to_datetime(df['DateStamp'])
df.set_index('DateStamp', inplace=True)
df.sort_index(inplace=True)
lags = 3
shifted_dfs = [df]
for lag in range(1, lags + 1):
shifted_df = df.shift(lag).add_suffix(f'_t-{lag}')
shifted_dfs.append(shifted_df)
df_with_lags = pd.concat(shifted_dfs, axis=1)
df_with_lags.dropna(inplace=True)
df_with_lags = df_with_lags.copy()
# Преобразуем BarTrend в числовой формат
le = LabelEncoder()
df_with_lags['BarTrend_encoded'] = le.fit_transform(df_with_lags['BarTrend'])
# Выбор только числовых данных
df_with_lags = df_with_lags.select_dtypes(include=['float64', 'int64'])
# Словари для хранения моделей и MSE
models = {}
mse_scores = {}
# Обучение моделей для каждого целевого столбца
for target_column in df.columns:
if target_column not in df_with_lags.columns:
continue
X = df_with_lags.drop(columns=[target_column]).values
y = df_with_lags[target_column].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
model = RandomForestRegressor()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
mse_scores[target_column] = mse
models[target_column] = model
quality = "хорошая" if mse < 1.0 else "плохая"
print(f"MSE для {target_column}: {mse} ({quality})")
# Обучаем модель для BarTrend_encoded отдельно
X_bartrend = df_with_lags.drop(columns=['BarTrend_encoded']).values
y_bartrend = df_with_lags['BarTrend_encoded'].values
X_train_bartrend, X_test_bartrend, y_train_bartrend, y_test_bartrend = train_test_split(X_bartrend, y_bartrend,
test_size=0.2,
shuffle=False)
model_bartrend = RandomForestRegressor()
model_bartrend.fit(X_train_bartrend, y_train_bartrend)
y_pred_bartrend = model_bartrend.predict(X_test_bartrend)
mse_bartrend = mean_squared_error(y_test_bartrend, y_pred_bartrend)
models['BarTrend_encoded'] = model_bartrend
mse_scores['BarTrend_encoded'] = mse_bartrend
quality_bartrend = "хорошая" if mse_bartrend < 1.0 else "плохая"
print(f"MSE для BarTrend: {mse_bartrend} ({quality_bartrend})")
last_data = X[-1].reshape(1, -1)
predictions = {}
for target_column, model in models.items():
prediction = model.predict(last_data)[0]
if target_column == 'BarTrend_encoded':
prediction = le.inverse_transform([int(prediction)])[0]
predictions['BarTrend'] = prediction
print(f"Предсказание для BarTrend: {prediction}")
break
predictions[target_column] = prediction
print(f"Предсказание для {target_column}: {prediction}")
return predictions # Возвращаем словарь с предсказанными значениями и названиями столбцов