diff --git a/davisAPI/davisAPI.py b/davisAPI/davisAPI.py index 1703d8e..238c84a 100644 --- a/davisAPI/davisAPI.py +++ b/davisAPI/davisAPI.py @@ -1,190 +1,53 @@ -# import gc -# import logging -# import time -# from datetime import datetime, timedelta -# from pprint import pprint -# import mariadb -# import serial.tools.list_ports -# -# #from PyWeather.weather.stations.davis import VantagePro -# from prediction import run_prediction_module -# -# logging.basicConfig(filename="Stations.log", -# format='%(asctime)s %(message)s', -# filemode='a') -# logger = logging.getLogger('davis_api') -# logger.setLevel(logging.DEBUG) -# -# console_handler = logging.StreamHandler() -# console_handler.setLevel(logging.DEBUG) -# console_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s')) -# logger.addHandler(console_handler) -# -# -# def write_data(device, station, send=True): -# try: -# # device.parse() -# data = device.fields -# logger.info(data) -# if len(data) < 1: -# return -# else: -# logger.info(data) -# fields = ['BarTrend', 'CRC', 'DateStamp', 'DewPoint', 'HeatIndex', 'ETDay', 'HeatIndex', -# 'HumIn', 'HumOut', 'Pressure', 'RainDay', 'RainMonth', 'RainRate', 'RainStorm', -# 'RainYear', 'SunRise', 'SunSet', 'TempIn', 'TempOut', 'WindDir', 'WindSpeed', -# 'WindSpeed10Min'] -# -# if send: -# placeholders = ', '.join(['%s'] * len(fields)) -# field_names = ', '.join(fields) -# sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})" -# values = [data[field] for field in fields] -# cursor.execute(sql, values) -# conn.commit() -# else: -# logger.info(data) -# -# del data -# del fields -# gc.collect() -# except Exception as e: -# logger.error(str(e)) -# raise e -# -# -# def get_previous_values(cursor): -# cursor.execute("SELECT SunRise, SunSet, WindDir, DateStamp FROM weather_data ORDER BY DateStamp DESC LIMIT 1") -# result = cursor.fetchone() -# -# if result is None: -# return None, None, None, None -# -# sun_rise, sun_set, wind_dir, datestamp = result -# return sun_rise, sun_set, wind_dir, datestamp -# -# -# def save_prediction_to_db(predictions): -# try: -# -# sun_rise, sun_set, wind_dir, datestamp = get_previous_values(cursor) -# -# fields = ['DateStamp', 'SunRise', 'SunSet', 'WindDir'] + list(predictions.keys()) -# placeholders = ', '.join(['%s'] * len(fields)) -# field_names = ', '.join(fields) -# -# values = [datestamp + timedelta(minutes = 1), sun_rise, sun_set, wind_dir] + list(predictions.values()) -# pprint(dict(zip(fields, values))) -# sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})" -# # cursor.execute(sql, values) -# # conn.commit() -# logger.info("Save prediction to db success!") -# except Exception as e: -# logger.error(str(e)) -# raise e -# -# -# try: -# conn = mariadb.connect( -# user="wind", -# password="wind", -# host="193.124.203.110", -# port=3306, -# database="wind_towers" -# ) -# cursor = conn.cursor() -# except mariadb.Error as e: -# logger.error('DB_ERR: ' + str(e)) -# raise e -# while True: -# try: -# ports = serial.tools.list_ports.comports() -# available_ports = {} -# -# for port in ports: -# if port.serial_number == '0001': -# available_ports[port.name] = port.vid -# -# devices = [VantagePro(port) for port in available_ports.keys()] -# while True: -# for i in range(1): -# if len(devices) != 0: -# logger.info(devices) -# # write_data(devices[i], 'st' + str(available_ports[list(available_ports.keys())[i]]), True) -# else: -# raise Exception('Can`t connect to device') -# time.sleep(60) -# except Exception as e: -# logger.error('Device_error' + str(e)) -# predictions = run_prediction_module() -# #logger.info(predictions) -# if predictions is not None: -# save_prediction_to_db(predictions) -# time.sleep(60) +import logging +import time +import mariadb +import serial.tools.list_ports -#todo переписать под influx, для линухи приколы сделать -import metpy.calc -from datetime import datetime -import torch -from aurora import AuroraSmall, Batch, Metadata -from metpy.units import units +from PyWeather.weather.stations.davis import VantagePro -def get_wind_speed_and_direction(lat:float,lon:float): - model = AuroraSmall() - model.load_checkpoint("microsoft/aurora", "aurora-0.25-small-pretrained.ckpt") +logging.basicConfig(filename="Stations.log", + format='%(asctime)s %(message)s', + filemode='a') +logger = logging.getLogger('davis_api') +logger.setLevel(logging.DEBUG) - batch = Batch( - surf_vars={k: torch.randn(1, 2, 17, 32) for k in ("2t", "10u", "10v", "msl")}, - static_vars={k: torch.randn(17, 32) for k in ("lsm", "z", "slt")}, - atmos_vars={k: torch.randn(1, 2, 4, 17, 32) for k in ("z", "u", "v", "t", "q")}, - metadata=Metadata( - lat=torch.linspace(90, -90, 17), - lon=torch.linspace(0, 360, 32 + 1)[:-1], - time=(datetime(2024, 11, 26, 23, 7),), - atmos_levels=(100,), - ), +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.DEBUG) +console_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s')) +logger.addHandler(console_handler) + +try: + conn = mariadb.connect( + user="wind", + password="wind", + host="193.124.203.110", + port=3306, + database="wind_towers" ) - prediction = model.forward(batch) + cursor = conn.cursor() +except mariadb.Error as e: + logger.error('DB_ERR: ' + str(e)) + raise e +while True: + try: + ports = serial.tools.list_ports.comports() + available_ports = {} - target_lat = lat - target_lon = lon + for port in ports: + if port.serial_number == '0001': + available_ports[port.name] = port.vid - lat_idx = torch.abs(batch.metadata.lat - target_lat).argmin() - lon_idx = torch.abs(batch.metadata.lon - target_lon).argmin() + devices = [VantagePro(port) for port in available_ports.keys()] + while True: + for i in range(1): + if len(devices) != 0: + logger.info(devices) + else: + raise Exception('Can`t connect to device') + time.sleep(60) + except Exception as e: + logger.error('Device_error' + str(e)) + time.sleep(60) - u_values = prediction.atmos_vars["u"][:, :, :, lat_idx, lon_idx] - v_values = prediction.atmos_vars["v"][:, :, :, lat_idx, lon_idx] - - print("u values at target location:", u_values) - print("v values at target location:", v_values) - - u_scalar = u_values.item() - v_scalar = v_values.item() - - print("u value:", u_scalar) - print("v value:", v_scalar) - u_with_units = u_scalar * units("m/s") - v_with_units = v_scalar * units("m/s") - - # Рассчитайте направление и скорость ветра - wind_dir = metpy.calc.wind_direction(u_with_units, v_with_units) - wind_speed = metpy.calc.wind_speed(u_with_units, v_with_units) - - wind_dir_text = wind_direction_to_text(wind_dir.magnitude) - print(type(wind_dir)) - # Вывод результата - print(f"Направление ветра: {wind_dir_text} ({wind_dir:.2f}°)") - print(f"Скорость ветра: {wind_speed:.2f} м/с") - return wind_dir.magnitude.item(),wind_speed.magnitude.item() - - -def wind_direction_to_text(wind_dir_deg): - directions = [ - "север", "северо-восток", "восток", "юго-восток", - "юг", "юго-запад", "запад", "северо-запад" - ] - idx = int((wind_dir_deg + 22.5) // 45) % 8 - return directions[idx] - -print(get_wind_speed_and_direction(50,20)) \ No newline at end of file +# todo переписать под influx, для линухи приколы сделать diff --git a/davisAPI/prediction.py b/davisAPI/prediction.py index 9c7fa85..492569c 100644 --- a/davisAPI/prediction.py +++ b/davisAPI/prediction.py @@ -1,103 +1,185 @@ -import pandas as pd -from sklearn.ensemble import RandomForestRegressor -from sklearn.metrics import mean_squared_error -from sklearn.model_selection import train_test_split -from sklearn.preprocessing import LabelEncoder -from sqlalchemy import create_engine +from datetime import datetime +from pathlib import Path + +import metpy.calc +import numpy as np +import requests +import torch +import xarray as xr +from aurora import AuroraSmall, Batch, Metadata +from metpy.units import units -def run_prediction_module(): - engine = create_engine('mysql+pymysql://wind:wind@193.124.203.110:3306/wind_towers') +def get_download_paths(date): + """Создает список путей для загрузки данных.""" + download_path = Path("~/downloads/hres_0.1").expanduser() + downloads = {} + var_nums = { + "2t": "167", "10u": "165", "10v": "166", "msl": "151", "t": "130", + "u": "131", "v": "132", "q": "133", "z": "129", "slt": "043", "lsm": "172", + } + for v in ["2t", "10u", "10v", "msl", "z", "slt", "lsm"]: + downloads[download_path / date.strftime(f"surf_{v}_%Y-%m-%d.grib")] = ( + f"https://data.rda.ucar.edu/ds113.1/" + f"ec.oper.an.sfc/{date.year}{date.month:02d}/ec.oper.an.sfc.128_{var_nums[v]}_{v}." + f"regn1280sc.{date.year}{date.month:02d}{date.day:02d}.grb" + ) + for v in ["z", "t", "u", "v", "q"]: + for hour in [0, 6, 12, 18]: + prefix = "uv" if v in {"u", "v"} else "sc" + downloads[download_path / date.strftime(f"atmos_{v}_%Y-%m-%d_{hour:02d}.grib")] = ( + f"https://data.rda.ucar.edu/ds113.1/" + f"ec.oper.an.pl/{date.year}{date.month:02d}/ec.oper.an.pl.128_{var_nums[v]}_{v}." + f"regn1280{prefix}.{date.year}{date.month:02d}{date.day:02d}{hour:02d}.grb" + ) + return downloads, download_path - query = """ - SELECT BarTrend, CRC, DateStamp, DewPoint, HeatIndex, ETDay, HumIn, HumOut, - Pressure, RainDay, RainMonth, RainRate, RainStorm, RainYear, - TempIn, TempOut, WindDir, WindSpeed, WindSpeed10Min - FROM weather_data - WHERE DateStamp >= '2024-10-14 21:00:00' - INTERVAL 36 HOUR; - """ - df = pd.read_sql(query, engine) # Загружаем данные из SQL-запроса в DataFrame - df['DateStamp'] = pd.to_datetime(df['DateStamp']) # Преобразуем столбец 'DateStamp' в формат datetime - df.set_index('DateStamp', inplace=True) # Устанавливаем 'DateStamp' как индекс - df.sort_index(inplace=True) # Сортируем DataFrame по индексу (по дате) +def download_data(downloads): + """Скачивает файлы, если они отсутствуют в целевой директории.""" + for target, source in downloads.items(): + if not target.exists(): + print(f"Downloading {source}") + target.parent.mkdir(parents=True, exist_ok=True) + response = requests.get(source) + response.raise_for_status() + with open(target, "wb") as f: + f.write(response.content) + print("Downloads finished!") - lags = 3 # Задаем количество временных сдвигов (лагов) - shifted_dfs = [df] # Создаем список для хранения исходного DataFrame и его лаговых версий - for lag in range(1, lags + 1): - shifted_df = df.shift(lag).add_suffix(f'_t-{lag}') # Создаем сдвинутый на lag строк DataFrame - shifted_dfs.append(shifted_df) # Добавляем его в список +def load_surf(v, v_in_file, download_path, date): + """Загружает переменные поверхностного уровня или статические переменные.""" + ds = xr.open_dataset(download_path / date.strftime(f"surf_{v}_%Y-%m-%d.grib"), engine="cfgrib") + data = ds[v_in_file].values[:2] + data = data[None] + return torch.from_numpy(data) - df_with_lags = pd.concat(shifted_dfs, axis=1) # Объединяем исходный DataFrame и все лаги по столбцам - df_with_lags.dropna(inplace=True) # Удаляем строки с пропущенными значениями - df_with_lags = df_with_lags.copy() # Создаем копию DataFrame (для предотвращения предупреждений) +def load_atmos(v, download_path, date, levels): + """Загружает атмосферные переменные для заданных уровней давления.""" + ds_00 = xr.open_dataset( + download_path / date.strftime(f"atmos_{v}_%Y-%m-%d_00.grib"), engine="cfgrib" + ) + ds_06 = xr.open_dataset( + download_path / date.strftime(f"atmos_{v}_%Y-%m-%d_06.grib"), engine="cfgrib" + ) + ds_00 = ds_00[v].sel(isobaricInhPa=list(levels)) + ds_06 = ds_06[v].sel(isobaricInhPa=list(levels)) + data = np.stack((ds_00.values, ds_06.values), axis=0) + data = data[None] + return torch.from_numpy(data) - # Преобразуем столбец 'BarTrend' в числовой формат, используя кодировщик категорий - le = LabelEncoder() - df_with_lags['BarTrend_encoded'] = le.fit_transform(df_with_lags['BarTrend']) - # Оставляем в DataFrame только числовые столбцы - df_with_lags = df_with_lags.select_dtypes(include=['float64', 'int64']) +def create_batch(date, levels, downloads, download_path): + """Создает объект Batch с данными для модели.""" + ds = xr.open_dataset(next(iter(downloads.keys())), engine="cfgrib") + batch = Batch( + surf_vars={ + "2t": load_surf("2t", "t2m", download_path, date), + "10u": load_surf("10u", "u10", download_path, date), + "10v": load_surf("10v", "v10", download_path, date), + "msl": load_surf("msl", "msl", download_path, date), + }, + static_vars={ + "z": load_surf("z", "z", download_path, date)[0, 0], + "slt": load_surf("slt", "slt", download_path, date)[0, 0], + "lsm": load_surf("lsm", "lsm", download_path, date)[0, 0], + }, + atmos_vars={ + "t": load_atmos("t", download_path, date, levels), + "u": load_atmos("u", download_path, date, levels), + "v": load_atmos("v", download_path, date, levels), + "q": load_atmos("q", download_path, date, levels), + "z": load_atmos("z", download_path, date, levels), + }, + metadata=Metadata( + lat=torch.from_numpy(ds.latitude.values), + lon=torch.from_numpy(ds.longitude.values), + time=(date.replace(hour=6),), + atmos_levels=levels, + ), + ) + return batch.regrid(res=0.1) - # Создаем словари для хранения моделей и значений MSE - models = {} - mse_scores = {} - # Обучаем модели для каждого целевого столбца - for target_column in df.columns: - if target_column not in df_with_lags.columns: # Пропускаем, если столбец отсутствует в df_with_lags - continue +def create_batch_random(levels: tuple[int], date: datetime): + """Создает объект Batch с рандомными данными для модели.""" + return Batch( + surf_vars={k: torch.randn(1, 2, 17, 32) for k in ("2t", "10u", "10v", "msl")}, + static_vars={k: torch.randn(17, 32) for k in ("lsm", "z", "slt")}, + atmos_vars={k: torch.randn(1, 2, 4, 17, 32) for k in ("z", "u", "v", "t", "q")}, + metadata=Metadata( + lat=torch.linspace(90, -90, 17), + lon=torch.linspace(0, 360, 32 + 1)[:-1], + time=(date,), + atmos_levels=levels, + ), + ) - X = df_with_lags.drop(columns=[target_column]).values # Признаки - все столбцы, кроме целевого - y = df_with_lags[target_column].values # Целевой столбец - # Разделяем данные на обучающую и тестовую выборки без перемешивания (временной ряд) - X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) +def run_model(batch): + """Инициализирует модель AuroraSmall и выполняет предсказание.""" + model = AuroraSmall() + model.load_checkpoint("microsoft/aurora", "aurora-0.25-small-pretrained.ckpt") + model.eval() + model = model.to("cpu") + with torch.inference_mode(): + prediction = model.forward(batch) + return prediction - model = RandomForestRegressor() # Инициализируем модель случайного леса - model.fit(X_train, y_train) # Обучаем модель - y_pred = model.predict(X_test) # Делаем предсказания на тестовой выборке - mse = mean_squared_error(y_test, y_pred) # Вычисляем среднеквадратичную ошибку - mse_scores[target_column] = mse # Сохраняем MSE для целевого столбца - models[target_column] = model # Сохраняем модель для целевого столбца +def get_wind_speed_and_direction(prediction, batch: Batch, lat: float, lon: float): + target_lat = lat + target_lon = lon - quality = "хорошая" if mse < 1.0 else "плохая" # Определяем качество модели - print(f"MSE для {target_column}: {mse} ({quality})") # Выводим MSE и качество + lat_idx = torch.abs(batch.metadata.lat - target_lat).argmin() + lon_idx = torch.abs(batch.metadata.lon - target_lon).argmin() - # Обучаем модель для столбца 'BarTrend_encoded' отдельно - X_bartrend = df_with_lags.drop(columns=['BarTrend_encoded']).values # Признаки - y_bartrend = df_with_lags['BarTrend_encoded'].values # Целевой столбец 'BarTrend_encoded' + u_values = prediction.atmos_vars["u"][:, :, :, lat_idx, lon_idx] + v_values = prediction.atmos_vars["v"][:, :, :, lat_idx, lon_idx] - # Разделяем данные на обучающую и тестовую выборки без перемешивания - X_train_bartrend, X_test_bartrend, y_train_bartrend, y_test_bartrend = train_test_split(X_bartrend, y_bartrend, - test_size=0.2, - shuffle=False) + u_scalar = u_values.item() + v_scalar = v_values.item() - model_bartrend = RandomForestRegressor() # Инициализируем модель случайного леса - model_bartrend.fit(X_train_bartrend, y_train_bartrend) # Обучаем модель + print("u value:", u_scalar) + print("v value:", v_scalar) + u_with_units = u_scalar * units("m/s") + v_with_units = v_scalar * units("m/s") - y_pred_bartrend = model_bartrend.predict(X_test_bartrend) # Предсказания на тестовой выборке для 'BarTrend_encoded' - mse_bartrend = mean_squared_error(y_test_bartrend, y_pred_bartrend) # Вычисляем MSE - models['BarTrend_encoded'] = model_bartrend # Сохраняем модель для 'BarTrend_encoded' - mse_scores['BarTrend_encoded'] = mse_bartrend # Сохраняем MSE для 'BarTrend_encoded' + # Рассчитайте направление и скорость ветра + wind_dir = metpy.calc.wind_direction(u_with_units, v_with_units) + wind_speed = metpy.calc.wind_speed(u_with_units, v_with_units) - quality_bartrend = "хорошая" if mse_bartrend < 1.0 else "плохая" # Определяем качество модели для 'BarTrend_encoded' - print(f"MSE для BarTrend: {mse_bartrend} ({quality_bartrend})") # Выводим MSE и качество + wind_dir_text = wind_direction_to_text(wind_dir.magnitude) + # Вывод результата + print(f"Направление ветра: {wind_dir_text} ({wind_dir:.2f}°)") + print(f"Скорость ветра: {wind_speed:.2f} м/с") + return wind_dir.magnitude.item(), wind_speed.magnitude.item() - last_data = X[-1].reshape(1, -1) # Берем последнюю строку данных и преобразуем в формат для предсказания - predictions = {} # Создаем словарь для хранения предсказаний - for target_column, model in models.items(): - prediction = model.predict(last_data)[0] # Делаем предсказание для последней строки данных - if target_column == 'BarTrend_encoded': - prediction = le.inverse_transform([int(prediction)])[0] # Декодируем категориальное значение - predictions['BarTrend'] = prediction # Сохраняем предсказание для 'BarTrend' - #print(f"Предсказание для BarTrend: {prediction}") # Выводим предсказание - continue # Продолжаем цикл после предсказания для 'BarTrend_encoded' - predictions[target_column] = prediction # Сохраняем предсказание для остальных столбцов - #print(f"Предсказание для {target_column}: {prediction}") # Выводим предсказание для столбца +def wind_direction_to_text(wind_dir_deg): + directions = [ + "север", "северо-восток", "восток", "юго-восток", + "юг", "юго-запад", "запад", "северо-запад" + ] + idx = int((wind_dir_deg + 22.5) // 45) % 8 + return directions[idx] - return predictions # Возвращаем словарь с предсказанными значениями и названиями столбцов + +def main(): + levels = (100,) + date = datetime(2024, 11, 5, 12) + + # downloads, download_path = get_download_paths(date) + # download_data(downloads) # Скачиваем данные, если их нет + # batch_actual = create_batch(date, levels, downloads, download_path) + batch_actual = create_batch_random(levels, date) + prediction_actual = run_model(batch_actual) + wind_speed_and_direction = get_wind_speed_and_direction(prediction_actual, batch_actual, 50, 20) + return wind_speed_and_direction + + +if __name__ == "__main__": + main() + print("Prediction completed!")