diff --git a/davisAPI/davisAPI.py b/davisAPI/davisAPI.py index 6cb4849..c727b2a 100644 --- a/davisAPI/davisAPI.py +++ b/davisAPI/davisAPI.py @@ -1,10 +1,13 @@ -#from PyWeather.weather.stations.davis import VantagePro +import gc import logging +import time +from datetime import datetime + import mariadb import serial.tools.list_ports -import gc -import time -from pprint import pprint + +from PyWeather.weather.stations.davis import VantagePro +from prediction import run_prediction_module logging.basicConfig(filename="Stations.log", format='%(asctime)s %(message)s', @@ -12,16 +15,21 @@ logging.basicConfig(filename="Stations.log", logger = logging.getLogger('davis_api') logger.setLevel(logging.DEBUG) +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.DEBUG) +console_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s')) +logger.addHandler(console_handler) + def write_data(device, station, send=True): try: # device.parse() data = device.fields - print(data) + logger.info(data) if len(data) < 1: return else: - print(data) + logger.info(data) fields = ['BarTrend', 'CRC', 'DateStamp', 'DewPoint', 'HeatIndex', 'ETDay', 'HeatIndex', 'HumIn', 'HumOut', 'Pressure', 'RainDay', 'RainMonth', 'RainRate', 'RainStorm', 'RainYear', 'SunRise', 'SunSet', 'TempIn', 'TempOut', 'WindDir', 'WindSpeed', @@ -35,7 +43,7 @@ def write_data(device, station, send=True): cursor.execute(sql, values) conn.commit() else: - print(data) + logger.info(data) del data del fields @@ -45,6 +53,38 @@ def write_data(device, station, send=True): raise e +def get_previous_values(cursor): + cursor.execute("SELECT SunRise, SunSet, WindDir FROM weather_data ORDER BY DateStamp DESC LIMIT 1") + result = cursor.fetchone() + + if result is None: + return None, None, None + + sun_rise, sun_set, wind_dir = result + return sun_rise, sun_set, wind_dir + + +def save_prediction_to_db(predictions): + try: + current_timestamp = datetime.now() + + sun_rise, sun_set, wind_dir = get_previous_values(cursor) + + fields = ['DateStamp', 'SunRise', 'SunSet', 'WindDir'] + list(predictions.keys()) + placeholders = ', '.join(['%s'] * len(fields)) + field_names = ', '.join(fields) + + values = [current_timestamp, sun_rise, sun_set, wind_dir] + list(predictions.values()) + + sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})" + # cursor.execute(sql, values) + # conn.commit() + logger.info("Save prediction to db success!") + except Exception as e: + logger.error(str(e)) + raise e + + try: conn = mariadb.connect( user="wind", @@ -59,25 +99,26 @@ except mariadb.Error as e: raise e try: - # ports = serial.tools.list_ports.comports() - # available_ports = {} - # - # for port in ports: - # if port.serial_number == '0001': - # available_ports[port.name] = port.vid + ports = serial.tools.list_ports.comports() + available_ports = {} - #devices = [VantagePro(port) for port in available_ports.keys()] - devices = {} + for port in ports: + if port.serial_number == '0001': + available_ports[port.name] = port.vid + + devices = [VantagePro(port) for port in available_ports.keys()] while True: for i in range(1): if len(devices) != 0: - print(devices) + logger.info(devices) # write_data(devices[i], 'st' + str(available_ports[list(available_ports.keys())[i]]), True) else: - print('bashmak') + raise Exception("Can't connect to device") time.sleep(1) except Exception as e: logger.error('Device_error: ' + str(e)) - raise e - + predictions = run_prediction_module() + logger.info(predictions) + if predictions is not None: + save_prediction_to_db(predictions) diff --git a/davisAPI/hype.py b/davisAPI/hype.py deleted file mode 100644 index 908bade..0000000 --- a/davisAPI/hype.py +++ /dev/null @@ -1,97 +0,0 @@ -import pandas as pd -import numpy as np -from sqlalchemy import create_engine -from sklearn.model_selection import train_test_split -from sklearn.ensemble import RandomForestRegressor -from sklearn.metrics import mean_squared_error - -# TODO -# 1.Добавить поля -# -barTrend - перечисление и правило какое-то (смотрим через один если повысилось после steady то rising если уменьшилось после steady то falling) -# -DateStamp - берется с последнего предсказанного + время опроса -# -SunRise - берется с последнего предсказанного -# -SunSet - берется с последнего предсказанного -# -# 12-10-2024 12:00 -# 12-10-2024 12:01 -# -# 2.Отделить вставку данных в бд от парсера данных с метеостанции -# -# 3.Возвращение результата в бд - -# Настройка подключения к базе данных - - -DATABASE_URI = 'mysql+pymysql://wind:wind@193.124.203.110:3306/wind_towers' # Замените на вашу строку подключения -engine = create_engine(DATABASE_URI) - -# Запрос данных из базы данных -query = """ -SELECT CRC, DateStamp, DewPoint, HeatIndex, ETDay, HumIn, HumOut, - Pressure, RainDay, RainMonth, RainRate, RainStorm, RainYear, - TempIn, TempOut, WindDir, WindSpeed, WindSpeed10Min -FROM weather_data -WHERE DateStamp >= '2024-10-14 21:00:00' - INTERVAL 12 HOUR; -""" -df = pd.read_sql(query, engine) - -# Преобразование даты и установка индекса -df['DateStamp'] = pd.to_datetime(df['DateStamp']) -df.set_index('DateStamp', inplace=True) -df.sort_index(inplace=True) - -# Создание временных признаков с использованием pd.concat для минимизации фрагментации -lags = 3 -shifted_dfs = [df] # Начинаем с оригинальных данных - -# Создаем лаговые признаки -for lag in range(1, lags + 1): - shifted_df = df.shift(lag).add_suffix(f'_t-{lag}') - shifted_dfs.append(shifted_df) - -# Объединяем все данные вместе, избегая фрагментации -df_with_lags = pd.concat(shifted_dfs, axis=1) - -# Удаляем строки с NaN значениями -df_with_lags.dropna(inplace=True) -df_with_lags = df_with_lags.copy() # Устраняем фрагментацию - -# Исключаем все нечисловые столбцы -df_with_lags = df_with_lags.select_dtypes(include=['float64', 'int64']) - -# Словарь для хранения моделей и оценок -models = {} -mse_scores = {} - -# Обучение модели для каждого целевого поля -for target_column in df.columns[:22]: # Ограничиваем до 22 полей - if target_column not in df_with_lags.columns: - continue # Пропускаем, если колонка отсутствует в данных с лагами - - X = df_with_lags.drop(columns=[target_column]) - y = df_with_lags[target_column] - - # Разделение на обучающую и тестовую выборки - X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) - - # Обучение модели - model = RandomForestRegressor() - model.fit(X_train, y_train) - - # Оценка модели - y_pred = model.predict(X_test) - mse = mean_squared_error(y_test, y_pred) - mse_scores[target_column] = mse - models[target_column] = model - - print(f"MSE для {target_column}: {mse}") - -# Предсказание для следующего шага по всем моделям -predictions = {} -last_data = X.iloc[-1].values.reshape(1, -1) # Последняя строка данных для прогноза - -for target_column, model in models.items(): - predictions[target_column] = model.predict(last_data)[0] - print(f"Предсказание для {target_column}: {predictions[target_column]}") -print(predictions) - diff --git a/davisAPI/prediction.py b/davisAPI/prediction.py new file mode 100644 index 0000000..7d8bb21 --- /dev/null +++ b/davisAPI/prediction.py @@ -0,0 +1,101 @@ +import pandas as pd +from sklearn.ensemble import RandomForestRegressor +from sklearn.metrics import mean_squared_error +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import LabelEncoder +from sqlalchemy import create_engine + + +def run_prediction_module(): + engine = create_engine('mysql+pymysql://wind:wind@193.124.203.110:3306/wind_towers') + + query = """ + SELECT BarTrend, CRC, DateStamp, DewPoint, HeatIndex, ETDay, HumIn, HumOut, + Pressure, RainDay, RainMonth, RainRate, RainStorm, RainYear, + TempIn, TempOut, WindDir, WindSpeed, WindSpeed10Min + FROM weather_data + WHERE DateStamp >= '2024-10-14 21:00:00' - INTERVAL 36 HOUR; + """ + df = pd.read_sql(query, engine) + + df['DateStamp'] = pd.to_datetime(df['DateStamp']) + df.set_index('DateStamp', inplace=True) + df.sort_index(inplace=True) + + lags = 3 + shifted_dfs = [df] + + for lag in range(1, lags + 1): + shifted_df = df.shift(lag).add_suffix(f'_t-{lag}') + shifted_dfs.append(shifted_df) + + df_with_lags = pd.concat(shifted_dfs, axis=1) + + df_with_lags.dropna(inplace=True) + df_with_lags = df_with_lags.copy() + + # Преобразуем BarTrend в числовой формат + le = LabelEncoder() + df_with_lags['BarTrend_encoded'] = le.fit_transform(df_with_lags['BarTrend']) + + # Выбор только числовых данных + df_with_lags = df_with_lags.select_dtypes(include=['float64', 'int64']) + + # Словари для хранения моделей и MSE + models = {} + mse_scores = {} + + # Обучение моделей для каждого целевого столбца + for target_column in df.columns: + if target_column not in df_with_lags.columns: + continue + + X = df_with_lags.drop(columns=[target_column]).values + y = df_with_lags[target_column].values + + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) + + model = RandomForestRegressor() + model.fit(X_train, y_train) + + y_pred = model.predict(X_test) + mse = mean_squared_error(y_test, y_pred) + mse_scores[target_column] = mse + models[target_column] = model + + quality = "хорошая" if mse < 1.0 else "плохая" + print(f"MSE для {target_column}: {mse} ({quality})") + + # Обучаем модель для BarTrend_encoded отдельно + X_bartrend = df_with_lags.drop(columns=['BarTrend_encoded']).values + y_bartrend = df_with_lags['BarTrend_encoded'].values + + X_train_bartrend, X_test_bartrend, y_train_bartrend, y_test_bartrend = train_test_split(X_bartrend, y_bartrend, + test_size=0.2, + shuffle=False) + + model_bartrend = RandomForestRegressor() + model_bartrend.fit(X_train_bartrend, y_train_bartrend) + + y_pred_bartrend = model_bartrend.predict(X_test_bartrend) + mse_bartrend = mean_squared_error(y_test_bartrend, y_pred_bartrend) + models['BarTrend_encoded'] = model_bartrend + mse_scores['BarTrend_encoded'] = mse_bartrend + + quality_bartrend = "хорошая" if mse_bartrend < 1.0 else "плохая" + print(f"MSE для BarTrend: {mse_bartrend} ({quality_bartrend})") + + last_data = X[-1].reshape(1, -1) + + predictions = {} + for target_column, model in models.items(): + prediction = model.predict(last_data)[0] + if target_column == 'BarTrend_encoded': + prediction = le.inverse_transform([int(prediction)])[0] + predictions['BarTrend'] = prediction + print(f"Предсказание для BarTrend: {prediction}") + break + predictions[target_column] = prediction + print(f"Предсказание для {target_column}: {prediction}") + + return predictions # Возвращаем словарь с предсказанными значениями и названиями столбцов