From 737af8e01d6908089bc28ec635ff315974baea66 Mon Sep 17 00:00:00 2001 From: "an.lazarev" Date: Tue, 5 Nov 2024 00:12:15 +0400 Subject: [PATCH 1/5] =?UTF-8?q?=D0=BA=D0=B0=D0=BA=D0=BE=D0=B9-=D1=82=D0=BE?= =?UTF-8?q?=20=D0=BF=D1=80=D0=B5=D0=B4=D0=B8=D0=BA=D1=82=20=D1=85=D0=B0?= =?UTF-8?q?=D0=B9=D0=BF=D0=BE=D0=B2=D1=8B=D0=B9,=20=D0=B4=D0=B0=D1=88?= =?UTF-8?q?=D0=B0=20=D0=B2=D0=BE=D1=80=D0=BA=D0=B0=D0=B5=D1=82=20=D0=BD?= =?UTF-8?q?=D0=B0=D0=B4=20=D0=BD=D0=B8=D0=BC=F0=9F=98=8E=F0=9F=98=8E?= =?UTF-8?q?=F0=9F=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- davisAPI/davisAPI.py | 36 ++++++++-------- davisAPI/hype.py | 97 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 16 deletions(-) create mode 100644 davisAPI/hype.py diff --git a/davisAPI/davisAPI.py b/davisAPI/davisAPI.py index f0e6601..6cb4849 100644 --- a/davisAPI/davisAPI.py +++ b/davisAPI/davisAPI.py @@ -1,4 +1,4 @@ -from PyWeather.weather.stations.davis import VantagePro +#from PyWeather.weather.stations.davis import VantagePro import logging import mariadb import serial.tools.list_ports @@ -6,7 +6,6 @@ import gc import time from pprint import pprint - logging.basicConfig(filename="Stations.log", format='%(asctime)s %(message)s', filemode='a') @@ -16,7 +15,7 @@ logger.setLevel(logging.DEBUG) def write_data(device, station, send=True): try: - #device.parse() + # device.parse() data = device.fields print(data) if len(data) < 1: @@ -36,7 +35,7 @@ def write_data(device, station, send=True): cursor.execute(sql, values) conn.commit() else: - pprint(data) + print(data) del data del fields @@ -45,6 +44,7 @@ def write_data(device, station, send=True): logger.error(str(e)) raise e + try: conn = mariadb.connect( user="wind", @@ -59,21 +59,25 @@ except mariadb.Error as e: raise e try: - ports = serial.tools.list_ports.comports() - available_ports = {} + # ports = serial.tools.list_ports.comports() + # available_ports = {} + # + # for port in ports: + # if port.serial_number == '0001': + # available_ports[port.name] = port.vid - for port in ports: - if port.serial_number == '0001': - available_ports[port.name] = port.vid + #devices = [VantagePro(port) for port in available_ports.keys()] + devices = {} - devices = [VantagePro(port) for port in available_ports.keys()] - print(available_ports) while True: - for i in range(len(devices)): - print(devices[i].fields) - #write_data(devices[i], 'st' + str(available_ports[list(available_ports.keys())[i]]), True) - time.sleep(1) + for i in range(1): + if len(devices) != 0: + print(devices) + # write_data(devices[i], 'st' + str(available_ports[list(available_ports.keys())[i]]), True) + else: + print('bashmak') + time.sleep(1) except Exception as e: logger.error('Device_error: ' + str(e)) raise e - + diff --git a/davisAPI/hype.py b/davisAPI/hype.py new file mode 100644 index 0000000..908bade --- /dev/null +++ b/davisAPI/hype.py @@ -0,0 +1,97 @@ +import pandas as pd +import numpy as np +from sqlalchemy import create_engine +from sklearn.model_selection import train_test_split +from sklearn.ensemble import RandomForestRegressor +from sklearn.metrics import mean_squared_error + +# TODO +# 1.Добавить поля +# -barTrend - перечисление и правило какое-то (смотрим через один если повысилось после steady то rising если уменьшилось после steady то falling) +# -DateStamp - берется с последнего предсказанного + время опроса +# -SunRise - берется с последнего предсказанного +# -SunSet - берется с последнего предсказанного +# +# 12-10-2024 12:00 +# 12-10-2024 12:01 +# +# 2.Отделить вставку данных в бд от парсера данных с метеостанции +# +# 3.Возвращение результата в бд + +# Настройка подключения к базе данных + + +DATABASE_URI = 'mysql+pymysql://wind:wind@193.124.203.110:3306/wind_towers' # Замените на вашу строку подключения +engine = create_engine(DATABASE_URI) + +# Запрос данных из базы данных +query = """ +SELECT CRC, DateStamp, DewPoint, HeatIndex, ETDay, HumIn, HumOut, + Pressure, RainDay, RainMonth, RainRate, RainStorm, RainYear, + TempIn, TempOut, WindDir, WindSpeed, WindSpeed10Min +FROM weather_data +WHERE DateStamp >= '2024-10-14 21:00:00' - INTERVAL 12 HOUR; +""" +df = pd.read_sql(query, engine) + +# Преобразование даты и установка индекса +df['DateStamp'] = pd.to_datetime(df['DateStamp']) +df.set_index('DateStamp', inplace=True) +df.sort_index(inplace=True) + +# Создание временных признаков с использованием pd.concat для минимизации фрагментации +lags = 3 +shifted_dfs = [df] # Начинаем с оригинальных данных + +# Создаем лаговые признаки +for lag in range(1, lags + 1): + shifted_df = df.shift(lag).add_suffix(f'_t-{lag}') + shifted_dfs.append(shifted_df) + +# Объединяем все данные вместе, избегая фрагментации +df_with_lags = pd.concat(shifted_dfs, axis=1) + +# Удаляем строки с NaN значениями +df_with_lags.dropna(inplace=True) +df_with_lags = df_with_lags.copy() # Устраняем фрагментацию + +# Исключаем все нечисловые столбцы +df_with_lags = df_with_lags.select_dtypes(include=['float64', 'int64']) + +# Словарь для хранения моделей и оценок +models = {} +mse_scores = {} + +# Обучение модели для каждого целевого поля +for target_column in df.columns[:22]: # Ограничиваем до 22 полей + if target_column not in df_with_lags.columns: + continue # Пропускаем, если колонка отсутствует в данных с лагами + + X = df_with_lags.drop(columns=[target_column]) + y = df_with_lags[target_column] + + # Разделение на обучающую и тестовую выборки + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) + + # Обучение модели + model = RandomForestRegressor() + model.fit(X_train, y_train) + + # Оценка модели + y_pred = model.predict(X_test) + mse = mean_squared_error(y_test, y_pred) + mse_scores[target_column] = mse + models[target_column] = model + + print(f"MSE для {target_column}: {mse}") + +# Предсказание для следующего шага по всем моделям +predictions = {} +last_data = X.iloc[-1].values.reshape(1, -1) # Последняя строка данных для прогноза + +for target_column, model in models.items(): + predictions[target_column] = model.predict(last_data)[0] + print(f"Предсказание для {target_column}: {predictions[target_column]}") +print(predictions) + -- 2.25.1 From cf0ff194d26225c49299b6ee54bfe4bef342fd8f Mon Sep 17 00:00:00 2001 From: darya Date: Tue, 5 Nov 2024 03:05:17 +0400 Subject: [PATCH 2/5] =?UTF-8?q?=D1=87=D1=82=D0=BE-=D1=82=D0=BE=20=D0=BD?= =?UTF-8?q?=D0=B0=D0=BA=D0=B8=D0=B4=D0=B0=D0=BB=D0=B0.=20=D0=BC=D0=B8?= =?UTF-8?q?=D0=BD=D0=B8-=D0=BC=D0=B8=D0=BD=D0=B8=20=D1=80=D0=B5=D1=84?= =?UTF-8?q?=D0=B0=D0=BA=D1=82=D0=BE=D1=80=D0=B8=D0=BD=D0=B3=20+=20=D0=BF?= =?UTF-8?q?=D1=80=D0=BE=D0=B3=D0=BD=D0=BE=D0=B7=D0=B8=D1=80=D0=BE=D0=B2?= =?UTF-8?q?=D0=B0=D0=BD=D0=B8=D0=B5=20=D0=B1=D0=B0=D1=80=D1=82=D1=80=D0=B5?= =?UTF-8?q?=D0=BD=D0=B4=20+=20=D1=81=D0=BE=D1=85=D1=80=D0=B0=D0=BD=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D0=B5=20=D0=B2=20=D0=B1=D0=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- davisAPI/davisAPI.py | 79 ++++++++++++++++++++++++-------- davisAPI/hype.py | 97 --------------------------------------- davisAPI/prediction.py | 101 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 161 insertions(+), 116 deletions(-) delete mode 100644 davisAPI/hype.py create mode 100644 davisAPI/prediction.py diff --git a/davisAPI/davisAPI.py b/davisAPI/davisAPI.py index 6cb4849..c727b2a 100644 --- a/davisAPI/davisAPI.py +++ b/davisAPI/davisAPI.py @@ -1,10 +1,13 @@ -#from PyWeather.weather.stations.davis import VantagePro +import gc import logging +import time +from datetime import datetime + import mariadb import serial.tools.list_ports -import gc -import time -from pprint import pprint + +from PyWeather.weather.stations.davis import VantagePro +from prediction import run_prediction_module logging.basicConfig(filename="Stations.log", format='%(asctime)s %(message)s', @@ -12,16 +15,21 @@ logging.basicConfig(filename="Stations.log", logger = logging.getLogger('davis_api') logger.setLevel(logging.DEBUG) +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.DEBUG) +console_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s')) +logger.addHandler(console_handler) + def write_data(device, station, send=True): try: # device.parse() data = device.fields - print(data) + logger.info(data) if len(data) < 1: return else: - print(data) + logger.info(data) fields = ['BarTrend', 'CRC', 'DateStamp', 'DewPoint', 'HeatIndex', 'ETDay', 'HeatIndex', 'HumIn', 'HumOut', 'Pressure', 'RainDay', 'RainMonth', 'RainRate', 'RainStorm', 'RainYear', 'SunRise', 'SunSet', 'TempIn', 'TempOut', 'WindDir', 'WindSpeed', @@ -35,7 +43,7 @@ def write_data(device, station, send=True): cursor.execute(sql, values) conn.commit() else: - print(data) + logger.info(data) del data del fields @@ -45,6 +53,38 @@ def write_data(device, station, send=True): raise e +def get_previous_values(cursor): + cursor.execute("SELECT SunRise, SunSet, WindDir FROM weather_data ORDER BY DateStamp DESC LIMIT 1") + result = cursor.fetchone() + + if result is None: + return None, None, None + + sun_rise, sun_set, wind_dir = result + return sun_rise, sun_set, wind_dir + + +def save_prediction_to_db(predictions): + try: + current_timestamp = datetime.now() + + sun_rise, sun_set, wind_dir = get_previous_values(cursor) + + fields = ['DateStamp', 'SunRise', 'SunSet', 'WindDir'] + list(predictions.keys()) + placeholders = ', '.join(['%s'] * len(fields)) + field_names = ', '.join(fields) + + values = [current_timestamp, sun_rise, sun_set, wind_dir] + list(predictions.values()) + + sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})" + # cursor.execute(sql, values) + # conn.commit() + logger.info("Save prediction to db success!") + except Exception as e: + logger.error(str(e)) + raise e + + try: conn = mariadb.connect( user="wind", @@ -59,25 +99,26 @@ except mariadb.Error as e: raise e try: - # ports = serial.tools.list_ports.comports() - # available_ports = {} - # - # for port in ports: - # if port.serial_number == '0001': - # available_ports[port.name] = port.vid + ports = serial.tools.list_ports.comports() + available_ports = {} - #devices = [VantagePro(port) for port in available_ports.keys()] - devices = {} + for port in ports: + if port.serial_number == '0001': + available_ports[port.name] = port.vid + + devices = [VantagePro(port) for port in available_ports.keys()] while True: for i in range(1): if len(devices) != 0: - print(devices) + logger.info(devices) # write_data(devices[i], 'st' + str(available_ports[list(available_ports.keys())[i]]), True) else: - print('bashmak') + raise Exception("Can't connect to device") time.sleep(1) except Exception as e: logger.error('Device_error: ' + str(e)) - raise e - + predictions = run_prediction_module() + logger.info(predictions) + if predictions is not None: + save_prediction_to_db(predictions) diff --git a/davisAPI/hype.py b/davisAPI/hype.py deleted file mode 100644 index 908bade..0000000 --- a/davisAPI/hype.py +++ /dev/null @@ -1,97 +0,0 @@ -import pandas as pd -import numpy as np -from sqlalchemy import create_engine -from sklearn.model_selection import train_test_split -from sklearn.ensemble import RandomForestRegressor -from sklearn.metrics import mean_squared_error - -# TODO -# 1.Добавить поля -# -barTrend - перечисление и правило какое-то (смотрим через один если повысилось после steady то rising если уменьшилось после steady то falling) -# -DateStamp - берется с последнего предсказанного + время опроса -# -SunRise - берется с последнего предсказанного -# -SunSet - берется с последнего предсказанного -# -# 12-10-2024 12:00 -# 12-10-2024 12:01 -# -# 2.Отделить вставку данных в бд от парсера данных с метеостанции -# -# 3.Возвращение результата в бд - -# Настройка подключения к базе данных - - -DATABASE_URI = 'mysql+pymysql://wind:wind@193.124.203.110:3306/wind_towers' # Замените на вашу строку подключения -engine = create_engine(DATABASE_URI) - -# Запрос данных из базы данных -query = """ -SELECT CRC, DateStamp, DewPoint, HeatIndex, ETDay, HumIn, HumOut, - Pressure, RainDay, RainMonth, RainRate, RainStorm, RainYear, - TempIn, TempOut, WindDir, WindSpeed, WindSpeed10Min -FROM weather_data -WHERE DateStamp >= '2024-10-14 21:00:00' - INTERVAL 12 HOUR; -""" -df = pd.read_sql(query, engine) - -# Преобразование даты и установка индекса -df['DateStamp'] = pd.to_datetime(df['DateStamp']) -df.set_index('DateStamp', inplace=True) -df.sort_index(inplace=True) - -# Создание временных признаков с использованием pd.concat для минимизации фрагментации -lags = 3 -shifted_dfs = [df] # Начинаем с оригинальных данных - -# Создаем лаговые признаки -for lag in range(1, lags + 1): - shifted_df = df.shift(lag).add_suffix(f'_t-{lag}') - shifted_dfs.append(shifted_df) - -# Объединяем все данные вместе, избегая фрагментации -df_with_lags = pd.concat(shifted_dfs, axis=1) - -# Удаляем строки с NaN значениями -df_with_lags.dropna(inplace=True) -df_with_lags = df_with_lags.copy() # Устраняем фрагментацию - -# Исключаем все нечисловые столбцы -df_with_lags = df_with_lags.select_dtypes(include=['float64', 'int64']) - -# Словарь для хранения моделей и оценок -models = {} -mse_scores = {} - -# Обучение модели для каждого целевого поля -for target_column in df.columns[:22]: # Ограничиваем до 22 полей - if target_column not in df_with_lags.columns: - continue # Пропускаем, если колонка отсутствует в данных с лагами - - X = df_with_lags.drop(columns=[target_column]) - y = df_with_lags[target_column] - - # Разделение на обучающую и тестовую выборки - X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) - - # Обучение модели - model = RandomForestRegressor() - model.fit(X_train, y_train) - - # Оценка модели - y_pred = model.predict(X_test) - mse = mean_squared_error(y_test, y_pred) - mse_scores[target_column] = mse - models[target_column] = model - - print(f"MSE для {target_column}: {mse}") - -# Предсказание для следующего шага по всем моделям -predictions = {} -last_data = X.iloc[-1].values.reshape(1, -1) # Последняя строка данных для прогноза - -for target_column, model in models.items(): - predictions[target_column] = model.predict(last_data)[0] - print(f"Предсказание для {target_column}: {predictions[target_column]}") -print(predictions) - diff --git a/davisAPI/prediction.py b/davisAPI/prediction.py new file mode 100644 index 0000000..7d8bb21 --- /dev/null +++ b/davisAPI/prediction.py @@ -0,0 +1,101 @@ +import pandas as pd +from sklearn.ensemble import RandomForestRegressor +from sklearn.metrics import mean_squared_error +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import LabelEncoder +from sqlalchemy import create_engine + + +def run_prediction_module(): + engine = create_engine('mysql+pymysql://wind:wind@193.124.203.110:3306/wind_towers') + + query = """ + SELECT BarTrend, CRC, DateStamp, DewPoint, HeatIndex, ETDay, HumIn, HumOut, + Pressure, RainDay, RainMonth, RainRate, RainStorm, RainYear, + TempIn, TempOut, WindDir, WindSpeed, WindSpeed10Min + FROM weather_data + WHERE DateStamp >= '2024-10-14 21:00:00' - INTERVAL 36 HOUR; + """ + df = pd.read_sql(query, engine) + + df['DateStamp'] = pd.to_datetime(df['DateStamp']) + df.set_index('DateStamp', inplace=True) + df.sort_index(inplace=True) + + lags = 3 + shifted_dfs = [df] + + for lag in range(1, lags + 1): + shifted_df = df.shift(lag).add_suffix(f'_t-{lag}') + shifted_dfs.append(shifted_df) + + df_with_lags = pd.concat(shifted_dfs, axis=1) + + df_with_lags.dropna(inplace=True) + df_with_lags = df_with_lags.copy() + + # Преобразуем BarTrend в числовой формат + le = LabelEncoder() + df_with_lags['BarTrend_encoded'] = le.fit_transform(df_with_lags['BarTrend']) + + # Выбор только числовых данных + df_with_lags = df_with_lags.select_dtypes(include=['float64', 'int64']) + + # Словари для хранения моделей и MSE + models = {} + mse_scores = {} + + # Обучение моделей для каждого целевого столбца + for target_column in df.columns: + if target_column not in df_with_lags.columns: + continue + + X = df_with_lags.drop(columns=[target_column]).values + y = df_with_lags[target_column].values + + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) + + model = RandomForestRegressor() + model.fit(X_train, y_train) + + y_pred = model.predict(X_test) + mse = mean_squared_error(y_test, y_pred) + mse_scores[target_column] = mse + models[target_column] = model + + quality = "хорошая" if mse < 1.0 else "плохая" + print(f"MSE для {target_column}: {mse} ({quality})") + + # Обучаем модель для BarTrend_encoded отдельно + X_bartrend = df_with_lags.drop(columns=['BarTrend_encoded']).values + y_bartrend = df_with_lags['BarTrend_encoded'].values + + X_train_bartrend, X_test_bartrend, y_train_bartrend, y_test_bartrend = train_test_split(X_bartrend, y_bartrend, + test_size=0.2, + shuffle=False) + + model_bartrend = RandomForestRegressor() + model_bartrend.fit(X_train_bartrend, y_train_bartrend) + + y_pred_bartrend = model_bartrend.predict(X_test_bartrend) + mse_bartrend = mean_squared_error(y_test_bartrend, y_pred_bartrend) + models['BarTrend_encoded'] = model_bartrend + mse_scores['BarTrend_encoded'] = mse_bartrend + + quality_bartrend = "хорошая" if mse_bartrend < 1.0 else "плохая" + print(f"MSE для BarTrend: {mse_bartrend} ({quality_bartrend})") + + last_data = X[-1].reshape(1, -1) + + predictions = {} + for target_column, model in models.items(): + prediction = model.predict(last_data)[0] + if target_column == 'BarTrend_encoded': + prediction = le.inverse_transform([int(prediction)])[0] + predictions['BarTrend'] = prediction + print(f"Предсказание для BarTrend: {prediction}") + break + predictions[target_column] = prediction + print(f"Предсказание для {target_column}: {prediction}") + + return predictions # Возвращаем словарь с предсказанными значениями и названиями столбцов -- 2.25.1 From 7022fdb50bddf54dc0f5e7f403472383eb9c12b9 Mon Sep 17 00:00:00 2001 From: "an.lazarev" Date: Tue, 5 Nov 2024 21:57:32 +0400 Subject: [PATCH 3/5] =?UTF-8?q?=D0=B4=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D0=BB?= =?UTF-8?q?=D0=B8=20=D0=BA=D0=BE=D0=BC=D0=BC=D0=B5=D0=BD=D1=82=D1=8B=20?= =?UTF-8?q?=D0=BA=20=D1=81=D1=82=D1=80=D0=BE=D0=BA=D0=B0=D0=BC,=20=D0=B2?= =?UTF-8?q?=D0=BE=D0=B7=D0=BC=D0=BE=D0=B6=D0=BD=D0=BE=20=D0=BF=D1=80=D0=B8?= =?UTF-8?q?=D0=B4=D0=B5=D1=82=D1=81=D1=8F=20=D0=BF=D0=B5=D1=80=D0=B5=D0=BF?= =?UTF-8?q?=D0=B8=D1=81=D0=B0=D1=82=D1=8C=20=D0=B1=D0=B4=F0=9F=A4=A9?= =?UTF-8?q?=F0=9F=A4=A9=F0=9F=A4=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- davisAPI/davisAPI.py | 66 +++++++++++++++-------------- davisAPI/prediction.py | 94 +++++++++++++++++++++--------------------- 2 files changed, 82 insertions(+), 78 deletions(-) diff --git a/davisAPI/davisAPI.py b/davisAPI/davisAPI.py index c727b2a..a5742b1 100644 --- a/davisAPI/davisAPI.py +++ b/davisAPI/davisAPI.py @@ -1,12 +1,12 @@ import gc import logging import time -from datetime import datetime - +from datetime import datetime, timedelta +from pprint import pprint import mariadb import serial.tools.list_ports -from PyWeather.weather.stations.davis import VantagePro +#from PyWeather.weather.stations.davis import VantagePro from prediction import run_prediction_module logging.basicConfig(filename="Stations.log", @@ -54,28 +54,27 @@ def write_data(device, station, send=True): def get_previous_values(cursor): - cursor.execute("SELECT SunRise, SunSet, WindDir FROM weather_data ORDER BY DateStamp DESC LIMIT 1") + cursor.execute("SELECT SunRise, SunSet, WindDir, DateStamp FROM weather_data ORDER BY DateStamp DESC LIMIT 1") result = cursor.fetchone() if result is None: - return None, None, None + return None, None, None, None - sun_rise, sun_set, wind_dir = result - return sun_rise, sun_set, wind_dir + sun_rise, sun_set, wind_dir, datestamp = result + return sun_rise, sun_set, wind_dir, datestamp def save_prediction_to_db(predictions): try: - current_timestamp = datetime.now() - sun_rise, sun_set, wind_dir = get_previous_values(cursor) + sun_rise, sun_set, wind_dir, datestamp = get_previous_values(cursor) fields = ['DateStamp', 'SunRise', 'SunSet', 'WindDir'] + list(predictions.keys()) placeholders = ', '.join(['%s'] * len(fields)) field_names = ', '.join(fields) - values = [current_timestamp, sun_rise, sun_set, wind_dir] + list(predictions.values()) - + values = [datestamp + timedelta(minutes = 1), sun_rise, sun_set, wind_dir] + list(predictions.values()) + pprint(dict(zip(fields, values))) sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})" # cursor.execute(sql, values) # conn.commit() @@ -97,28 +96,31 @@ try: except mariadb.Error as e: logger.error('DB_ERR: ' + str(e)) raise e +while True: + try: + ports = serial.tools.list_ports.comports() + available_ports = {} -try: - ports = serial.tools.list_ports.comports() - available_ports = {} + for port in ports: + if port.serial_number == '0001': + available_ports[port.name] = port.vid - for port in ports: - if port.serial_number == '0001': - available_ports[port.name] = port.vid + devices = [VantagePro(port) for port in available_ports.keys()] + while True: + for i in range(1): + if len(devices) != 0: + logger.info(devices) + # write_data(devices[i], 'st' + str(available_ports[list(available_ports.keys())[i]]), True) + else: + raise Exception('Can`t connect to device') + time.sleep(60) + except Exception as e: + logger.error('Device_error' + str(e)) + predictions = run_prediction_module() + #logger.info(predictions) + if predictions is not None: + save_prediction_to_db(predictions) + time.sleep(60) - devices = [VantagePro(port) for port in available_ports.keys()] - while True: - for i in range(1): - if len(devices) != 0: - logger.info(devices) - # write_data(devices[i], 'st' + str(available_ports[list(available_ports.keys())[i]]), True) - else: - raise Exception("Can't connect to device") - time.sleep(1) -except Exception as e: - logger.error('Device_error: ' + str(e)) - predictions = run_prediction_module() - logger.info(predictions) - if predictions is not None: - save_prediction_to_db(predictions) +#todo переписать под influx, для линухи приколы сделать \ No newline at end of file diff --git a/davisAPI/prediction.py b/davisAPI/prediction.py index 7d8bb21..9c7fa85 100644 --- a/davisAPI/prediction.py +++ b/davisAPI/prediction.py @@ -16,86 +16,88 @@ def run_prediction_module(): FROM weather_data WHERE DateStamp >= '2024-10-14 21:00:00' - INTERVAL 36 HOUR; """ - df = pd.read_sql(query, engine) + df = pd.read_sql(query, engine) # Загружаем данные из SQL-запроса в DataFrame - df['DateStamp'] = pd.to_datetime(df['DateStamp']) - df.set_index('DateStamp', inplace=True) - df.sort_index(inplace=True) + df['DateStamp'] = pd.to_datetime(df['DateStamp']) # Преобразуем столбец 'DateStamp' в формат datetime + df.set_index('DateStamp', inplace=True) # Устанавливаем 'DateStamp' как индекс + df.sort_index(inplace=True) # Сортируем DataFrame по индексу (по дате) - lags = 3 - shifted_dfs = [df] + lags = 3 # Задаем количество временных сдвигов (лагов) + shifted_dfs = [df] # Создаем список для хранения исходного DataFrame и его лаговых версий for lag in range(1, lags + 1): - shifted_df = df.shift(lag).add_suffix(f'_t-{lag}') - shifted_dfs.append(shifted_df) + shifted_df = df.shift(lag).add_suffix(f'_t-{lag}') # Создаем сдвинутый на lag строк DataFrame + shifted_dfs.append(shifted_df) # Добавляем его в список - df_with_lags = pd.concat(shifted_dfs, axis=1) + df_with_lags = pd.concat(shifted_dfs, axis=1) # Объединяем исходный DataFrame и все лаги по столбцам - df_with_lags.dropna(inplace=True) - df_with_lags = df_with_lags.copy() + df_with_lags.dropna(inplace=True) # Удаляем строки с пропущенными значениями + df_with_lags = df_with_lags.copy() # Создаем копию DataFrame (для предотвращения предупреждений) - # Преобразуем BarTrend в числовой формат + # Преобразуем столбец 'BarTrend' в числовой формат, используя кодировщик категорий le = LabelEncoder() df_with_lags['BarTrend_encoded'] = le.fit_transform(df_with_lags['BarTrend']) - # Выбор только числовых данных + # Оставляем в DataFrame только числовые столбцы df_with_lags = df_with_lags.select_dtypes(include=['float64', 'int64']) - # Словари для хранения моделей и MSE + # Создаем словари для хранения моделей и значений MSE models = {} mse_scores = {} - # Обучение моделей для каждого целевого столбца + # Обучаем модели для каждого целевого столбца for target_column in df.columns: - if target_column not in df_with_lags.columns: + if target_column not in df_with_lags.columns: # Пропускаем, если столбец отсутствует в df_with_lags continue - X = df_with_lags.drop(columns=[target_column]).values - y = df_with_lags[target_column].values + X = df_with_lags.drop(columns=[target_column]).values # Признаки - все столбцы, кроме целевого + y = df_with_lags[target_column].values # Целевой столбец + # Разделяем данные на обучающую и тестовую выборки без перемешивания (временной ряд) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) - model = RandomForestRegressor() - model.fit(X_train, y_train) + model = RandomForestRegressor() # Инициализируем модель случайного леса + model.fit(X_train, y_train) # Обучаем модель - y_pred = model.predict(X_test) - mse = mean_squared_error(y_test, y_pred) - mse_scores[target_column] = mse - models[target_column] = model + y_pred = model.predict(X_test) # Делаем предсказания на тестовой выборке + mse = mean_squared_error(y_test, y_pred) # Вычисляем среднеквадратичную ошибку + mse_scores[target_column] = mse # Сохраняем MSE для целевого столбца + models[target_column] = model # Сохраняем модель для целевого столбца - quality = "хорошая" if mse < 1.0 else "плохая" - print(f"MSE для {target_column}: {mse} ({quality})") + quality = "хорошая" if mse < 1.0 else "плохая" # Определяем качество модели + print(f"MSE для {target_column}: {mse} ({quality})") # Выводим MSE и качество - # Обучаем модель для BarTrend_encoded отдельно - X_bartrend = df_with_lags.drop(columns=['BarTrend_encoded']).values - y_bartrend = df_with_lags['BarTrend_encoded'].values + # Обучаем модель для столбца 'BarTrend_encoded' отдельно + X_bartrend = df_with_lags.drop(columns=['BarTrend_encoded']).values # Признаки + y_bartrend = df_with_lags['BarTrend_encoded'].values # Целевой столбец 'BarTrend_encoded' + # Разделяем данные на обучающую и тестовую выборки без перемешивания X_train_bartrend, X_test_bartrend, y_train_bartrend, y_test_bartrend = train_test_split(X_bartrend, y_bartrend, test_size=0.2, shuffle=False) - model_bartrend = RandomForestRegressor() - model_bartrend.fit(X_train_bartrend, y_train_bartrend) + model_bartrend = RandomForestRegressor() # Инициализируем модель случайного леса + model_bartrend.fit(X_train_bartrend, y_train_bartrend) # Обучаем модель - y_pred_bartrend = model_bartrend.predict(X_test_bartrend) - mse_bartrend = mean_squared_error(y_test_bartrend, y_pred_bartrend) - models['BarTrend_encoded'] = model_bartrend - mse_scores['BarTrend_encoded'] = mse_bartrend + y_pred_bartrend = model_bartrend.predict(X_test_bartrend) # Предсказания на тестовой выборке для 'BarTrend_encoded' + mse_bartrend = mean_squared_error(y_test_bartrend, y_pred_bartrend) # Вычисляем MSE + models['BarTrend_encoded'] = model_bartrend # Сохраняем модель для 'BarTrend_encoded' + mse_scores['BarTrend_encoded'] = mse_bartrend # Сохраняем MSE для 'BarTrend_encoded' - quality_bartrend = "хорошая" if mse_bartrend < 1.0 else "плохая" - print(f"MSE для BarTrend: {mse_bartrend} ({quality_bartrend})") + quality_bartrend = "хорошая" if mse_bartrend < 1.0 else "плохая" # Определяем качество модели для 'BarTrend_encoded' + print(f"MSE для BarTrend: {mse_bartrend} ({quality_bartrend})") # Выводим MSE и качество - last_data = X[-1].reshape(1, -1) + last_data = X[-1].reshape(1, -1) # Берем последнюю строку данных и преобразуем в формат для предсказания - predictions = {} + predictions = {} # Создаем словарь для хранения предсказаний for target_column, model in models.items(): - prediction = model.predict(last_data)[0] + prediction = model.predict(last_data)[0] # Делаем предсказание для последней строки данных if target_column == 'BarTrend_encoded': - prediction = le.inverse_transform([int(prediction)])[0] - predictions['BarTrend'] = prediction - print(f"Предсказание для BarTrend: {prediction}") - break - predictions[target_column] = prediction - print(f"Предсказание для {target_column}: {prediction}") + prediction = le.inverse_transform([int(prediction)])[0] # Декодируем категориальное значение + predictions['BarTrend'] = prediction # Сохраняем предсказание для 'BarTrend' + #print(f"Предсказание для BarTrend: {prediction}") # Выводим предсказание + continue # Продолжаем цикл после предсказания для 'BarTrend_encoded' + predictions[target_column] = prediction # Сохраняем предсказание для остальных столбцов + #print(f"Предсказание для {target_column}: {prediction}") # Выводим предсказание для столбца return predictions # Возвращаем словарь с предсказанными значениями и названиями столбцов -- 2.25.1 From 3ffc4467bbd1834e86659434160e7593d1d99a15 Mon Sep 17 00:00:00 2001 From: Pavel_Sorokin Date: Tue, 26 Nov 2024 23:17:03 +0400 Subject: [PATCH 4/5] metPy function with example Aurora --- davisAPI/davisAPI.py | 298 ++++++++++++++++++++++++++----------------- 1 file changed, 181 insertions(+), 117 deletions(-) diff --git a/davisAPI/davisAPI.py b/davisAPI/davisAPI.py index a5742b1..1703d8e 100644 --- a/davisAPI/davisAPI.py +++ b/davisAPI/davisAPI.py @@ -1,126 +1,190 @@ -import gc -import logging -import time -from datetime import datetime, timedelta -from pprint import pprint -import mariadb -import serial.tools.list_ports - -#from PyWeather.weather.stations.davis import VantagePro -from prediction import run_prediction_module - -logging.basicConfig(filename="Stations.log", - format='%(asctime)s %(message)s', - filemode='a') -logger = logging.getLogger('davis_api') -logger.setLevel(logging.DEBUG) - -console_handler = logging.StreamHandler() -console_handler.setLevel(logging.DEBUG) -console_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s')) -logger.addHandler(console_handler) +# import gc +# import logging +# import time +# from datetime import datetime, timedelta +# from pprint import pprint +# import mariadb +# import serial.tools.list_ports +# +# #from PyWeather.weather.stations.davis import VantagePro +# from prediction import run_prediction_module +# +# logging.basicConfig(filename="Stations.log", +# format='%(asctime)s %(message)s', +# filemode='a') +# logger = logging.getLogger('davis_api') +# logger.setLevel(logging.DEBUG) +# +# console_handler = logging.StreamHandler() +# console_handler.setLevel(logging.DEBUG) +# console_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s')) +# logger.addHandler(console_handler) +# +# +# def write_data(device, station, send=True): +# try: +# # device.parse() +# data = device.fields +# logger.info(data) +# if len(data) < 1: +# return +# else: +# logger.info(data) +# fields = ['BarTrend', 'CRC', 'DateStamp', 'DewPoint', 'HeatIndex', 'ETDay', 'HeatIndex', +# 'HumIn', 'HumOut', 'Pressure', 'RainDay', 'RainMonth', 'RainRate', 'RainStorm', +# 'RainYear', 'SunRise', 'SunSet', 'TempIn', 'TempOut', 'WindDir', 'WindSpeed', +# 'WindSpeed10Min'] +# +# if send: +# placeholders = ', '.join(['%s'] * len(fields)) +# field_names = ', '.join(fields) +# sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})" +# values = [data[field] for field in fields] +# cursor.execute(sql, values) +# conn.commit() +# else: +# logger.info(data) +# +# del data +# del fields +# gc.collect() +# except Exception as e: +# logger.error(str(e)) +# raise e +# +# +# def get_previous_values(cursor): +# cursor.execute("SELECT SunRise, SunSet, WindDir, DateStamp FROM weather_data ORDER BY DateStamp DESC LIMIT 1") +# result = cursor.fetchone() +# +# if result is None: +# return None, None, None, None +# +# sun_rise, sun_set, wind_dir, datestamp = result +# return sun_rise, sun_set, wind_dir, datestamp +# +# +# def save_prediction_to_db(predictions): +# try: +# +# sun_rise, sun_set, wind_dir, datestamp = get_previous_values(cursor) +# +# fields = ['DateStamp', 'SunRise', 'SunSet', 'WindDir'] + list(predictions.keys()) +# placeholders = ', '.join(['%s'] * len(fields)) +# field_names = ', '.join(fields) +# +# values = [datestamp + timedelta(minutes = 1), sun_rise, sun_set, wind_dir] + list(predictions.values()) +# pprint(dict(zip(fields, values))) +# sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})" +# # cursor.execute(sql, values) +# # conn.commit() +# logger.info("Save prediction to db success!") +# except Exception as e: +# logger.error(str(e)) +# raise e +# +# +# try: +# conn = mariadb.connect( +# user="wind", +# password="wind", +# host="193.124.203.110", +# port=3306, +# database="wind_towers" +# ) +# cursor = conn.cursor() +# except mariadb.Error as e: +# logger.error('DB_ERR: ' + str(e)) +# raise e +# while True: +# try: +# ports = serial.tools.list_ports.comports() +# available_ports = {} +# +# for port in ports: +# if port.serial_number == '0001': +# available_ports[port.name] = port.vid +# +# devices = [VantagePro(port) for port in available_ports.keys()] +# while True: +# for i in range(1): +# if len(devices) != 0: +# logger.info(devices) +# # write_data(devices[i], 'st' + str(available_ports[list(available_ports.keys())[i]]), True) +# else: +# raise Exception('Can`t connect to device') +# time.sleep(60) +# except Exception as e: +# logger.error('Device_error' + str(e)) +# predictions = run_prediction_module() +# #logger.info(predictions) +# if predictions is not None: +# save_prediction_to_db(predictions) +# time.sleep(60) -def write_data(device, station, send=True): - try: - # device.parse() - data = device.fields - logger.info(data) - if len(data) < 1: - return - else: - logger.info(data) - fields = ['BarTrend', 'CRC', 'DateStamp', 'DewPoint', 'HeatIndex', 'ETDay', 'HeatIndex', - 'HumIn', 'HumOut', 'Pressure', 'RainDay', 'RainMonth', 'RainRate', 'RainStorm', - 'RainYear', 'SunRise', 'SunSet', 'TempIn', 'TempOut', 'WindDir', 'WindSpeed', - 'WindSpeed10Min'] +#todo переписать под influx, для линухи приколы сделать +import metpy.calc +from datetime import datetime +import torch +from aurora import AuroraSmall, Batch, Metadata +from metpy.units import units - if send: - placeholders = ', '.join(['%s'] * len(fields)) - field_names = ', '.join(fields) - sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})" - values = [data[field] for field in fields] - cursor.execute(sql, values) - conn.commit() - else: - logger.info(data) +def get_wind_speed_and_direction(lat:float,lon:float): + model = AuroraSmall() + model.load_checkpoint("microsoft/aurora", "aurora-0.25-small-pretrained.ckpt") - del data - del fields - gc.collect() - except Exception as e: - logger.error(str(e)) - raise e - - -def get_previous_values(cursor): - cursor.execute("SELECT SunRise, SunSet, WindDir, DateStamp FROM weather_data ORDER BY DateStamp DESC LIMIT 1") - result = cursor.fetchone() - - if result is None: - return None, None, None, None - - sun_rise, sun_set, wind_dir, datestamp = result - return sun_rise, sun_set, wind_dir, datestamp - - -def save_prediction_to_db(predictions): - try: - - sun_rise, sun_set, wind_dir, datestamp = get_previous_values(cursor) - - fields = ['DateStamp', 'SunRise', 'SunSet', 'WindDir'] + list(predictions.keys()) - placeholders = ', '.join(['%s'] * len(fields)) - field_names = ', '.join(fields) - - values = [datestamp + timedelta(minutes = 1), sun_rise, sun_set, wind_dir] + list(predictions.values()) - pprint(dict(zip(fields, values))) - sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})" - # cursor.execute(sql, values) - # conn.commit() - logger.info("Save prediction to db success!") - except Exception as e: - logger.error(str(e)) - raise e - - -try: - conn = mariadb.connect( - user="wind", - password="wind", - host="193.124.203.110", - port=3306, - database="wind_towers" + batch = Batch( + surf_vars={k: torch.randn(1, 2, 17, 32) for k in ("2t", "10u", "10v", "msl")}, + static_vars={k: torch.randn(17, 32) for k in ("lsm", "z", "slt")}, + atmos_vars={k: torch.randn(1, 2, 4, 17, 32) for k in ("z", "u", "v", "t", "q")}, + metadata=Metadata( + lat=torch.linspace(90, -90, 17), + lon=torch.linspace(0, 360, 32 + 1)[:-1], + time=(datetime(2024, 11, 26, 23, 7),), + atmos_levels=(100,), + ), ) - cursor = conn.cursor() -except mariadb.Error as e: - logger.error('DB_ERR: ' + str(e)) - raise e -while True: - try: - ports = serial.tools.list_ports.comports() - available_ports = {} + prediction = model.forward(batch) - for port in ports: - if port.serial_number == '0001': - available_ports[port.name] = port.vid + target_lat = lat + target_lon = lon - devices = [VantagePro(port) for port in available_ports.keys()] - while True: - for i in range(1): - if len(devices) != 0: - logger.info(devices) - # write_data(devices[i], 'st' + str(available_ports[list(available_ports.keys())[i]]), True) - else: - raise Exception('Can`t connect to device') - time.sleep(60) - except Exception as e: - logger.error('Device_error' + str(e)) - predictions = run_prediction_module() - #logger.info(predictions) - if predictions is not None: - save_prediction_to_db(predictions) - time.sleep(60) + lat_idx = torch.abs(batch.metadata.lat - target_lat).argmin() + lon_idx = torch.abs(batch.metadata.lon - target_lon).argmin() + + u_values = prediction.atmos_vars["u"][:, :, :, lat_idx, lon_idx] + v_values = prediction.atmos_vars["v"][:, :, :, lat_idx, lon_idx] + + print("u values at target location:", u_values) + print("v values at target location:", v_values) + + u_scalar = u_values.item() + v_scalar = v_values.item() + + print("u value:", u_scalar) + print("v value:", v_scalar) + u_with_units = u_scalar * units("m/s") + v_with_units = v_scalar * units("m/s") + + # Рассчитайте направление и скорость ветра + wind_dir = metpy.calc.wind_direction(u_with_units, v_with_units) + wind_speed = metpy.calc.wind_speed(u_with_units, v_with_units) + + wind_dir_text = wind_direction_to_text(wind_dir.magnitude) + print(type(wind_dir)) + # Вывод результата + print(f"Направление ветра: {wind_dir_text} ({wind_dir:.2f}°)") + print(f"Скорость ветра: {wind_speed:.2f} м/с") + return wind_dir.magnitude.item(),wind_speed.magnitude.item() -#todo переписать под influx, для линухи приколы сделать \ No newline at end of file +def wind_direction_to_text(wind_dir_deg): + directions = [ + "север", "северо-восток", "восток", "юго-восток", + "юг", "юго-запад", "запад", "северо-запад" + ] + idx = int((wind_dir_deg + 22.5) // 45) % 8 + return directions[idx] + +print(get_wind_speed_and_direction(50,20)) \ No newline at end of file -- 2.25.1 From 39cd0d086095ab6cec048f30214382dca8140a73 Mon Sep 17 00:00:00 2001 From: darya Date: Wed, 27 Nov 2024 00:38:45 +0400 Subject: [PATCH 5/5] =?UTF-8?q?=D0=B4=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D0=B5=20=D0=B0=D0=B2=D1=80=D0=BE=D1=80=D1=8B=20?= =?UTF-8?q?=D1=81=20=D0=B8=D1=81=D0=BF=D0=BE=D0=BB=D1=8C=D0=B7=D0=BE=D0=B2?= =?UTF-8?q?=D0=B0=D0=BD=D0=B8=D0=B5=D0=BC=20=D0=BE=D1=82=D0=BA=D1=80=D1=8B?= =?UTF-8?q?=D1=82=D0=BE=D0=B3=D0=BE=20=D0=B4=D0=B0=D1=82=D0=B0=D1=81=D0=B5?= =?UTF-8?q?=D1=82=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- davisAPI/davisAPI.py | 227 ++++++++------------------------------- davisAPI/prediction.py | 238 +++++++++++++++++++++++++++-------------- 2 files changed, 205 insertions(+), 260 deletions(-) diff --git a/davisAPI/davisAPI.py b/davisAPI/davisAPI.py index 1703d8e..238c84a 100644 --- a/davisAPI/davisAPI.py +++ b/davisAPI/davisAPI.py @@ -1,190 +1,53 @@ -# import gc -# import logging -# import time -# from datetime import datetime, timedelta -# from pprint import pprint -# import mariadb -# import serial.tools.list_ports -# -# #from PyWeather.weather.stations.davis import VantagePro -# from prediction import run_prediction_module -# -# logging.basicConfig(filename="Stations.log", -# format='%(asctime)s %(message)s', -# filemode='a') -# logger = logging.getLogger('davis_api') -# logger.setLevel(logging.DEBUG) -# -# console_handler = logging.StreamHandler() -# console_handler.setLevel(logging.DEBUG) -# console_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s')) -# logger.addHandler(console_handler) -# -# -# def write_data(device, station, send=True): -# try: -# # device.parse() -# data = device.fields -# logger.info(data) -# if len(data) < 1: -# return -# else: -# logger.info(data) -# fields = ['BarTrend', 'CRC', 'DateStamp', 'DewPoint', 'HeatIndex', 'ETDay', 'HeatIndex', -# 'HumIn', 'HumOut', 'Pressure', 'RainDay', 'RainMonth', 'RainRate', 'RainStorm', -# 'RainYear', 'SunRise', 'SunSet', 'TempIn', 'TempOut', 'WindDir', 'WindSpeed', -# 'WindSpeed10Min'] -# -# if send: -# placeholders = ', '.join(['%s'] * len(fields)) -# field_names = ', '.join(fields) -# sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})" -# values = [data[field] for field in fields] -# cursor.execute(sql, values) -# conn.commit() -# else: -# logger.info(data) -# -# del data -# del fields -# gc.collect() -# except Exception as e: -# logger.error(str(e)) -# raise e -# -# -# def get_previous_values(cursor): -# cursor.execute("SELECT SunRise, SunSet, WindDir, DateStamp FROM weather_data ORDER BY DateStamp DESC LIMIT 1") -# result = cursor.fetchone() -# -# if result is None: -# return None, None, None, None -# -# sun_rise, sun_set, wind_dir, datestamp = result -# return sun_rise, sun_set, wind_dir, datestamp -# -# -# def save_prediction_to_db(predictions): -# try: -# -# sun_rise, sun_set, wind_dir, datestamp = get_previous_values(cursor) -# -# fields = ['DateStamp', 'SunRise', 'SunSet', 'WindDir'] + list(predictions.keys()) -# placeholders = ', '.join(['%s'] * len(fields)) -# field_names = ', '.join(fields) -# -# values = [datestamp + timedelta(minutes = 1), sun_rise, sun_set, wind_dir] + list(predictions.values()) -# pprint(dict(zip(fields, values))) -# sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})" -# # cursor.execute(sql, values) -# # conn.commit() -# logger.info("Save prediction to db success!") -# except Exception as e: -# logger.error(str(e)) -# raise e -# -# -# try: -# conn = mariadb.connect( -# user="wind", -# password="wind", -# host="193.124.203.110", -# port=3306, -# database="wind_towers" -# ) -# cursor = conn.cursor() -# except mariadb.Error as e: -# logger.error('DB_ERR: ' + str(e)) -# raise e -# while True: -# try: -# ports = serial.tools.list_ports.comports() -# available_ports = {} -# -# for port in ports: -# if port.serial_number == '0001': -# available_ports[port.name] = port.vid -# -# devices = [VantagePro(port) for port in available_ports.keys()] -# while True: -# for i in range(1): -# if len(devices) != 0: -# logger.info(devices) -# # write_data(devices[i], 'st' + str(available_ports[list(available_ports.keys())[i]]), True) -# else: -# raise Exception('Can`t connect to device') -# time.sleep(60) -# except Exception as e: -# logger.error('Device_error' + str(e)) -# predictions = run_prediction_module() -# #logger.info(predictions) -# if predictions is not None: -# save_prediction_to_db(predictions) -# time.sleep(60) +import logging +import time +import mariadb +import serial.tools.list_ports -#todo переписать под influx, для линухи приколы сделать -import metpy.calc -from datetime import datetime -import torch -from aurora import AuroraSmall, Batch, Metadata -from metpy.units import units +from PyWeather.weather.stations.davis import VantagePro -def get_wind_speed_and_direction(lat:float,lon:float): - model = AuroraSmall() - model.load_checkpoint("microsoft/aurora", "aurora-0.25-small-pretrained.ckpt") +logging.basicConfig(filename="Stations.log", + format='%(asctime)s %(message)s', + filemode='a') +logger = logging.getLogger('davis_api') +logger.setLevel(logging.DEBUG) - batch = Batch( - surf_vars={k: torch.randn(1, 2, 17, 32) for k in ("2t", "10u", "10v", "msl")}, - static_vars={k: torch.randn(17, 32) for k in ("lsm", "z", "slt")}, - atmos_vars={k: torch.randn(1, 2, 4, 17, 32) for k in ("z", "u", "v", "t", "q")}, - metadata=Metadata( - lat=torch.linspace(90, -90, 17), - lon=torch.linspace(0, 360, 32 + 1)[:-1], - time=(datetime(2024, 11, 26, 23, 7),), - atmos_levels=(100,), - ), +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.DEBUG) +console_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s')) +logger.addHandler(console_handler) + +try: + conn = mariadb.connect( + user="wind", + password="wind", + host="193.124.203.110", + port=3306, + database="wind_towers" ) - prediction = model.forward(batch) + cursor = conn.cursor() +except mariadb.Error as e: + logger.error('DB_ERR: ' + str(e)) + raise e +while True: + try: + ports = serial.tools.list_ports.comports() + available_ports = {} - target_lat = lat - target_lon = lon + for port in ports: + if port.serial_number == '0001': + available_ports[port.name] = port.vid - lat_idx = torch.abs(batch.metadata.lat - target_lat).argmin() - lon_idx = torch.abs(batch.metadata.lon - target_lon).argmin() + devices = [VantagePro(port) for port in available_ports.keys()] + while True: + for i in range(1): + if len(devices) != 0: + logger.info(devices) + else: + raise Exception('Can`t connect to device') + time.sleep(60) + except Exception as e: + logger.error('Device_error' + str(e)) + time.sleep(60) - u_values = prediction.atmos_vars["u"][:, :, :, lat_idx, lon_idx] - v_values = prediction.atmos_vars["v"][:, :, :, lat_idx, lon_idx] - - print("u values at target location:", u_values) - print("v values at target location:", v_values) - - u_scalar = u_values.item() - v_scalar = v_values.item() - - print("u value:", u_scalar) - print("v value:", v_scalar) - u_with_units = u_scalar * units("m/s") - v_with_units = v_scalar * units("m/s") - - # Рассчитайте направление и скорость ветра - wind_dir = metpy.calc.wind_direction(u_with_units, v_with_units) - wind_speed = metpy.calc.wind_speed(u_with_units, v_with_units) - - wind_dir_text = wind_direction_to_text(wind_dir.magnitude) - print(type(wind_dir)) - # Вывод результата - print(f"Направление ветра: {wind_dir_text} ({wind_dir:.2f}°)") - print(f"Скорость ветра: {wind_speed:.2f} м/с") - return wind_dir.magnitude.item(),wind_speed.magnitude.item() - - -def wind_direction_to_text(wind_dir_deg): - directions = [ - "север", "северо-восток", "восток", "юго-восток", - "юг", "юго-запад", "запад", "северо-запад" - ] - idx = int((wind_dir_deg + 22.5) // 45) % 8 - return directions[idx] - -print(get_wind_speed_and_direction(50,20)) \ No newline at end of file +# todo переписать под influx, для линухи приколы сделать diff --git a/davisAPI/prediction.py b/davisAPI/prediction.py index 9c7fa85..492569c 100644 --- a/davisAPI/prediction.py +++ b/davisAPI/prediction.py @@ -1,103 +1,185 @@ -import pandas as pd -from sklearn.ensemble import RandomForestRegressor -from sklearn.metrics import mean_squared_error -from sklearn.model_selection import train_test_split -from sklearn.preprocessing import LabelEncoder -from sqlalchemy import create_engine +from datetime import datetime +from pathlib import Path + +import metpy.calc +import numpy as np +import requests +import torch +import xarray as xr +from aurora import AuroraSmall, Batch, Metadata +from metpy.units import units -def run_prediction_module(): - engine = create_engine('mysql+pymysql://wind:wind@193.124.203.110:3306/wind_towers') +def get_download_paths(date): + """Создает список путей для загрузки данных.""" + download_path = Path("~/downloads/hres_0.1").expanduser() + downloads = {} + var_nums = { + "2t": "167", "10u": "165", "10v": "166", "msl": "151", "t": "130", + "u": "131", "v": "132", "q": "133", "z": "129", "slt": "043", "lsm": "172", + } + for v in ["2t", "10u", "10v", "msl", "z", "slt", "lsm"]: + downloads[download_path / date.strftime(f"surf_{v}_%Y-%m-%d.grib")] = ( + f"https://data.rda.ucar.edu/ds113.1/" + f"ec.oper.an.sfc/{date.year}{date.month:02d}/ec.oper.an.sfc.128_{var_nums[v]}_{v}." + f"regn1280sc.{date.year}{date.month:02d}{date.day:02d}.grb" + ) + for v in ["z", "t", "u", "v", "q"]: + for hour in [0, 6, 12, 18]: + prefix = "uv" if v in {"u", "v"} else "sc" + downloads[download_path / date.strftime(f"atmos_{v}_%Y-%m-%d_{hour:02d}.grib")] = ( + f"https://data.rda.ucar.edu/ds113.1/" + f"ec.oper.an.pl/{date.year}{date.month:02d}/ec.oper.an.pl.128_{var_nums[v]}_{v}." + f"regn1280{prefix}.{date.year}{date.month:02d}{date.day:02d}{hour:02d}.grb" + ) + return downloads, download_path - query = """ - SELECT BarTrend, CRC, DateStamp, DewPoint, HeatIndex, ETDay, HumIn, HumOut, - Pressure, RainDay, RainMonth, RainRate, RainStorm, RainYear, - TempIn, TempOut, WindDir, WindSpeed, WindSpeed10Min - FROM weather_data - WHERE DateStamp >= '2024-10-14 21:00:00' - INTERVAL 36 HOUR; - """ - df = pd.read_sql(query, engine) # Загружаем данные из SQL-запроса в DataFrame - df['DateStamp'] = pd.to_datetime(df['DateStamp']) # Преобразуем столбец 'DateStamp' в формат datetime - df.set_index('DateStamp', inplace=True) # Устанавливаем 'DateStamp' как индекс - df.sort_index(inplace=True) # Сортируем DataFrame по индексу (по дате) +def download_data(downloads): + """Скачивает файлы, если они отсутствуют в целевой директории.""" + for target, source in downloads.items(): + if not target.exists(): + print(f"Downloading {source}") + target.parent.mkdir(parents=True, exist_ok=True) + response = requests.get(source) + response.raise_for_status() + with open(target, "wb") as f: + f.write(response.content) + print("Downloads finished!") - lags = 3 # Задаем количество временных сдвигов (лагов) - shifted_dfs = [df] # Создаем список для хранения исходного DataFrame и его лаговых версий - for lag in range(1, lags + 1): - shifted_df = df.shift(lag).add_suffix(f'_t-{lag}') # Создаем сдвинутый на lag строк DataFrame - shifted_dfs.append(shifted_df) # Добавляем его в список +def load_surf(v, v_in_file, download_path, date): + """Загружает переменные поверхностного уровня или статические переменные.""" + ds = xr.open_dataset(download_path / date.strftime(f"surf_{v}_%Y-%m-%d.grib"), engine="cfgrib") + data = ds[v_in_file].values[:2] + data = data[None] + return torch.from_numpy(data) - df_with_lags = pd.concat(shifted_dfs, axis=1) # Объединяем исходный DataFrame и все лаги по столбцам - df_with_lags.dropna(inplace=True) # Удаляем строки с пропущенными значениями - df_with_lags = df_with_lags.copy() # Создаем копию DataFrame (для предотвращения предупреждений) +def load_atmos(v, download_path, date, levels): + """Загружает атмосферные переменные для заданных уровней давления.""" + ds_00 = xr.open_dataset( + download_path / date.strftime(f"atmos_{v}_%Y-%m-%d_00.grib"), engine="cfgrib" + ) + ds_06 = xr.open_dataset( + download_path / date.strftime(f"atmos_{v}_%Y-%m-%d_06.grib"), engine="cfgrib" + ) + ds_00 = ds_00[v].sel(isobaricInhPa=list(levels)) + ds_06 = ds_06[v].sel(isobaricInhPa=list(levels)) + data = np.stack((ds_00.values, ds_06.values), axis=0) + data = data[None] + return torch.from_numpy(data) - # Преобразуем столбец 'BarTrend' в числовой формат, используя кодировщик категорий - le = LabelEncoder() - df_with_lags['BarTrend_encoded'] = le.fit_transform(df_with_lags['BarTrend']) - # Оставляем в DataFrame только числовые столбцы - df_with_lags = df_with_lags.select_dtypes(include=['float64', 'int64']) +def create_batch(date, levels, downloads, download_path): + """Создает объект Batch с данными для модели.""" + ds = xr.open_dataset(next(iter(downloads.keys())), engine="cfgrib") + batch = Batch( + surf_vars={ + "2t": load_surf("2t", "t2m", download_path, date), + "10u": load_surf("10u", "u10", download_path, date), + "10v": load_surf("10v", "v10", download_path, date), + "msl": load_surf("msl", "msl", download_path, date), + }, + static_vars={ + "z": load_surf("z", "z", download_path, date)[0, 0], + "slt": load_surf("slt", "slt", download_path, date)[0, 0], + "lsm": load_surf("lsm", "lsm", download_path, date)[0, 0], + }, + atmos_vars={ + "t": load_atmos("t", download_path, date, levels), + "u": load_atmos("u", download_path, date, levels), + "v": load_atmos("v", download_path, date, levels), + "q": load_atmos("q", download_path, date, levels), + "z": load_atmos("z", download_path, date, levels), + }, + metadata=Metadata( + lat=torch.from_numpy(ds.latitude.values), + lon=torch.from_numpy(ds.longitude.values), + time=(date.replace(hour=6),), + atmos_levels=levels, + ), + ) + return batch.regrid(res=0.1) - # Создаем словари для хранения моделей и значений MSE - models = {} - mse_scores = {} - # Обучаем модели для каждого целевого столбца - for target_column in df.columns: - if target_column not in df_with_lags.columns: # Пропускаем, если столбец отсутствует в df_with_lags - continue +def create_batch_random(levels: tuple[int], date: datetime): + """Создает объект Batch с рандомными данными для модели.""" + return Batch( + surf_vars={k: torch.randn(1, 2, 17, 32) for k in ("2t", "10u", "10v", "msl")}, + static_vars={k: torch.randn(17, 32) for k in ("lsm", "z", "slt")}, + atmos_vars={k: torch.randn(1, 2, 4, 17, 32) for k in ("z", "u", "v", "t", "q")}, + metadata=Metadata( + lat=torch.linspace(90, -90, 17), + lon=torch.linspace(0, 360, 32 + 1)[:-1], + time=(date,), + atmos_levels=levels, + ), + ) - X = df_with_lags.drop(columns=[target_column]).values # Признаки - все столбцы, кроме целевого - y = df_with_lags[target_column].values # Целевой столбец - # Разделяем данные на обучающую и тестовую выборки без перемешивания (временной ряд) - X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) +def run_model(batch): + """Инициализирует модель AuroraSmall и выполняет предсказание.""" + model = AuroraSmall() + model.load_checkpoint("microsoft/aurora", "aurora-0.25-small-pretrained.ckpt") + model.eval() + model = model.to("cpu") + with torch.inference_mode(): + prediction = model.forward(batch) + return prediction - model = RandomForestRegressor() # Инициализируем модель случайного леса - model.fit(X_train, y_train) # Обучаем модель - y_pred = model.predict(X_test) # Делаем предсказания на тестовой выборке - mse = mean_squared_error(y_test, y_pred) # Вычисляем среднеквадратичную ошибку - mse_scores[target_column] = mse # Сохраняем MSE для целевого столбца - models[target_column] = model # Сохраняем модель для целевого столбца +def get_wind_speed_and_direction(prediction, batch: Batch, lat: float, lon: float): + target_lat = lat + target_lon = lon - quality = "хорошая" if mse < 1.0 else "плохая" # Определяем качество модели - print(f"MSE для {target_column}: {mse} ({quality})") # Выводим MSE и качество + lat_idx = torch.abs(batch.metadata.lat - target_lat).argmin() + lon_idx = torch.abs(batch.metadata.lon - target_lon).argmin() - # Обучаем модель для столбца 'BarTrend_encoded' отдельно - X_bartrend = df_with_lags.drop(columns=['BarTrend_encoded']).values # Признаки - y_bartrend = df_with_lags['BarTrend_encoded'].values # Целевой столбец 'BarTrend_encoded' + u_values = prediction.atmos_vars["u"][:, :, :, lat_idx, lon_idx] + v_values = prediction.atmos_vars["v"][:, :, :, lat_idx, lon_idx] - # Разделяем данные на обучающую и тестовую выборки без перемешивания - X_train_bartrend, X_test_bartrend, y_train_bartrend, y_test_bartrend = train_test_split(X_bartrend, y_bartrend, - test_size=0.2, - shuffle=False) + u_scalar = u_values.item() + v_scalar = v_values.item() - model_bartrend = RandomForestRegressor() # Инициализируем модель случайного леса - model_bartrend.fit(X_train_bartrend, y_train_bartrend) # Обучаем модель + print("u value:", u_scalar) + print("v value:", v_scalar) + u_with_units = u_scalar * units("m/s") + v_with_units = v_scalar * units("m/s") - y_pred_bartrend = model_bartrend.predict(X_test_bartrend) # Предсказания на тестовой выборке для 'BarTrend_encoded' - mse_bartrend = mean_squared_error(y_test_bartrend, y_pred_bartrend) # Вычисляем MSE - models['BarTrend_encoded'] = model_bartrend # Сохраняем модель для 'BarTrend_encoded' - mse_scores['BarTrend_encoded'] = mse_bartrend # Сохраняем MSE для 'BarTrend_encoded' + # Рассчитайте направление и скорость ветра + wind_dir = metpy.calc.wind_direction(u_with_units, v_with_units) + wind_speed = metpy.calc.wind_speed(u_with_units, v_with_units) - quality_bartrend = "хорошая" if mse_bartrend < 1.0 else "плохая" # Определяем качество модели для 'BarTrend_encoded' - print(f"MSE для BarTrend: {mse_bartrend} ({quality_bartrend})") # Выводим MSE и качество + wind_dir_text = wind_direction_to_text(wind_dir.magnitude) + # Вывод результата + print(f"Направление ветра: {wind_dir_text} ({wind_dir:.2f}°)") + print(f"Скорость ветра: {wind_speed:.2f} м/с") + return wind_dir.magnitude.item(), wind_speed.magnitude.item() - last_data = X[-1].reshape(1, -1) # Берем последнюю строку данных и преобразуем в формат для предсказания - predictions = {} # Создаем словарь для хранения предсказаний - for target_column, model in models.items(): - prediction = model.predict(last_data)[0] # Делаем предсказание для последней строки данных - if target_column == 'BarTrend_encoded': - prediction = le.inverse_transform([int(prediction)])[0] # Декодируем категориальное значение - predictions['BarTrend'] = prediction # Сохраняем предсказание для 'BarTrend' - #print(f"Предсказание для BarTrend: {prediction}") # Выводим предсказание - continue # Продолжаем цикл после предсказания для 'BarTrend_encoded' - predictions[target_column] = prediction # Сохраняем предсказание для остальных столбцов - #print(f"Предсказание для {target_column}: {prediction}") # Выводим предсказание для столбца +def wind_direction_to_text(wind_dir_deg): + directions = [ + "север", "северо-восток", "восток", "юго-восток", + "юг", "юго-запад", "запад", "северо-запад" + ] + idx = int((wind_dir_deg + 22.5) // 45) % 8 + return directions[idx] - return predictions # Возвращаем словарь с предсказанными значениями и названиями столбцов + +def main(): + levels = (100,) + date = datetime(2024, 11, 5, 12) + + # downloads, download_path = get_download_paths(date) + # download_data(downloads) # Скачиваем данные, если их нет + # batch_actual = create_batch(date, levels, downloads, download_path) + batch_actual = create_batch_random(levels, date) + prediction_actual = run_model(batch_actual) + wind_speed_and_direction = get_wind_speed_and_direction(prediction_actual, batch_actual, 50, 20) + return wind_speed_and_direction + + +if __name__ == "__main__": + main() + print("Prediction completed!") -- 2.25.1