добавление авроры с использованием открытого датасета

This commit is contained in:
darya 2024-11-27 00:38:45 +04:00
parent 3ffc4467bb
commit 39cd0d0860
2 changed files with 205 additions and 260 deletions

View File

@ -1,190 +1,53 @@
# import gc
# import logging
# import time
# from datetime import datetime, timedelta
# from pprint import pprint
# import mariadb
# import serial.tools.list_ports
#
# #from PyWeather.weather.stations.davis import VantagePro
# from prediction import run_prediction_module
#
# logging.basicConfig(filename="Stations.log",
# format='%(asctime)s %(message)s',
# filemode='a')
# logger = logging.getLogger('davis_api')
# logger.setLevel(logging.DEBUG)
#
# console_handler = logging.StreamHandler()
# console_handler.setLevel(logging.DEBUG)
# console_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
# logger.addHandler(console_handler)
#
#
# def write_data(device, station, send=True):
# try:
# # device.parse()
# data = device.fields
# logger.info(data)
# if len(data) < 1:
# return
# else:
# logger.info(data)
# fields = ['BarTrend', 'CRC', 'DateStamp', 'DewPoint', 'HeatIndex', 'ETDay', 'HeatIndex',
# 'HumIn', 'HumOut', 'Pressure', 'RainDay', 'RainMonth', 'RainRate', 'RainStorm',
# 'RainYear', 'SunRise', 'SunSet', 'TempIn', 'TempOut', 'WindDir', 'WindSpeed',
# 'WindSpeed10Min']
#
# if send:
# placeholders = ', '.join(['%s'] * len(fields))
# field_names = ', '.join(fields)
# sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})"
# values = [data[field] for field in fields]
# cursor.execute(sql, values)
# conn.commit()
# else:
# logger.info(data)
#
# del data
# del fields
# gc.collect()
# except Exception as e:
# logger.error(str(e))
# raise e
#
#
# def get_previous_values(cursor):
# cursor.execute("SELECT SunRise, SunSet, WindDir, DateStamp FROM weather_data ORDER BY DateStamp DESC LIMIT 1")
# result = cursor.fetchone()
#
# if result is None:
# return None, None, None, None
#
# sun_rise, sun_set, wind_dir, datestamp = result
# return sun_rise, sun_set, wind_dir, datestamp
#
#
# def save_prediction_to_db(predictions):
# try:
#
# sun_rise, sun_set, wind_dir, datestamp = get_previous_values(cursor)
#
# fields = ['DateStamp', 'SunRise', 'SunSet', 'WindDir'] + list(predictions.keys())
# placeholders = ', '.join(['%s'] * len(fields))
# field_names = ', '.join(fields)
#
# values = [datestamp + timedelta(minutes = 1), sun_rise, sun_set, wind_dir] + list(predictions.values())
# pprint(dict(zip(fields, values)))
# sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})"
# # cursor.execute(sql, values)
# # conn.commit()
# logger.info("Save prediction to db success!")
# except Exception as e:
# logger.error(str(e))
# raise e
#
#
# try:
# conn = mariadb.connect(
# user="wind",
# password="wind",
# host="193.124.203.110",
# port=3306,
# database="wind_towers"
# )
# cursor = conn.cursor()
# except mariadb.Error as e:
# logger.error('DB_ERR: ' + str(e))
# raise e
# while True:
# try:
# ports = serial.tools.list_ports.comports()
# available_ports = {}
#
# for port in ports:
# if port.serial_number == '0001':
# available_ports[port.name] = port.vid
#
# devices = [VantagePro(port) for port in available_ports.keys()]
# while True:
# for i in range(1):
# if len(devices) != 0:
# logger.info(devices)
# # write_data(devices[i], 'st' + str(available_ports[list(available_ports.keys())[i]]), True)
# else:
# raise Exception('Can`t connect to device')
# time.sleep(60)
# except Exception as e:
# logger.error('Device_error' + str(e))
# predictions = run_prediction_module()
# #logger.info(predictions)
# if predictions is not None:
# save_prediction_to_db(predictions)
# time.sleep(60)
import logging
import time
import mariadb
import serial.tools.list_ports
#todo переписать под influx, для линухи приколы сделать
import metpy.calc
from datetime import datetime
import torch
from aurora import AuroraSmall, Batch, Metadata
from metpy.units import units
from PyWeather.weather.stations.davis import VantagePro
def get_wind_speed_and_direction(lat:float,lon:float):
model = AuroraSmall()
model.load_checkpoint("microsoft/aurora", "aurora-0.25-small-pretrained.ckpt")
logging.basicConfig(filename="Stations.log",
format='%(asctime)s %(message)s',
filemode='a')
logger = logging.getLogger('davis_api')
logger.setLevel(logging.DEBUG)
batch = Batch(
surf_vars={k: torch.randn(1, 2, 17, 32) for k in ("2t", "10u", "10v", "msl")},
static_vars={k: torch.randn(17, 32) for k in ("lsm", "z", "slt")},
atmos_vars={k: torch.randn(1, 2, 4, 17, 32) for k in ("z", "u", "v", "t", "q")},
metadata=Metadata(
lat=torch.linspace(90, -90, 17),
lon=torch.linspace(0, 360, 32 + 1)[:-1],
time=(datetime(2024, 11, 26, 23, 7),),
atmos_levels=(100,),
),
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
logger.addHandler(console_handler)
try:
conn = mariadb.connect(
user="wind",
password="wind",
host="193.124.203.110",
port=3306,
database="wind_towers"
)
prediction = model.forward(batch)
cursor = conn.cursor()
except mariadb.Error as e:
logger.error('DB_ERR: ' + str(e))
raise e
while True:
try:
ports = serial.tools.list_ports.comports()
available_ports = {}
target_lat = lat
target_lon = lon
for port in ports:
if port.serial_number == '0001':
available_ports[port.name] = port.vid
lat_idx = torch.abs(batch.metadata.lat - target_lat).argmin()
lon_idx = torch.abs(batch.metadata.lon - target_lon).argmin()
devices = [VantagePro(port) for port in available_ports.keys()]
while True:
for i in range(1):
if len(devices) != 0:
logger.info(devices)
else:
raise Exception('Can`t connect to device')
time.sleep(60)
except Exception as e:
logger.error('Device_error' + str(e))
time.sleep(60)
u_values = prediction.atmos_vars["u"][:, :, :, lat_idx, lon_idx]
v_values = prediction.atmos_vars["v"][:, :, :, lat_idx, lon_idx]
print("u values at target location:", u_values)
print("v values at target location:", v_values)
u_scalar = u_values.item()
v_scalar = v_values.item()
print("u value:", u_scalar)
print("v value:", v_scalar)
u_with_units = u_scalar * units("m/s")
v_with_units = v_scalar * units("m/s")
# Рассчитайте направление и скорость ветра
wind_dir = metpy.calc.wind_direction(u_with_units, v_with_units)
wind_speed = metpy.calc.wind_speed(u_with_units, v_with_units)
wind_dir_text = wind_direction_to_text(wind_dir.magnitude)
print(type(wind_dir))
# Вывод результата
print(f"Направление ветра: {wind_dir_text} ({wind_dir:.2f}°)")
print(f"Скорость ветра: {wind_speed:.2f} м/с")
return wind_dir.magnitude.item(),wind_speed.magnitude.item()
def wind_direction_to_text(wind_dir_deg):
directions = [
"север", "северо-восток", "восток", "юго-восток",
"юг", "юго-запад", "запад", "северо-запад"
]
idx = int((wind_dir_deg + 22.5) // 45) % 8
return directions[idx]
print(get_wind_speed_and_direction(50,20))
# todo переписать под influx, для линухи приколы сделать

View File

@ -1,103 +1,185 @@
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sqlalchemy import create_engine
from datetime import datetime
from pathlib import Path
import metpy.calc
import numpy as np
import requests
import torch
import xarray as xr
from aurora import AuroraSmall, Batch, Metadata
from metpy.units import units
def run_prediction_module():
engine = create_engine('mysql+pymysql://wind:wind@193.124.203.110:3306/wind_towers')
def get_download_paths(date):
"""Создает список путей для загрузки данных."""
download_path = Path("~/downloads/hres_0.1").expanduser()
downloads = {}
var_nums = {
"2t": "167", "10u": "165", "10v": "166", "msl": "151", "t": "130",
"u": "131", "v": "132", "q": "133", "z": "129", "slt": "043", "lsm": "172",
}
for v in ["2t", "10u", "10v", "msl", "z", "slt", "lsm"]:
downloads[download_path / date.strftime(f"surf_{v}_%Y-%m-%d.grib")] = (
f"https://data.rda.ucar.edu/ds113.1/"
f"ec.oper.an.sfc/{date.year}{date.month:02d}/ec.oper.an.sfc.128_{var_nums[v]}_{v}."
f"regn1280sc.{date.year}{date.month:02d}{date.day:02d}.grb"
)
for v in ["z", "t", "u", "v", "q"]:
for hour in [0, 6, 12, 18]:
prefix = "uv" if v in {"u", "v"} else "sc"
downloads[download_path / date.strftime(f"atmos_{v}_%Y-%m-%d_{hour:02d}.grib")] = (
f"https://data.rda.ucar.edu/ds113.1/"
f"ec.oper.an.pl/{date.year}{date.month:02d}/ec.oper.an.pl.128_{var_nums[v]}_{v}."
f"regn1280{prefix}.{date.year}{date.month:02d}{date.day:02d}{hour:02d}.grb"
)
return downloads, download_path
query = """
SELECT BarTrend, CRC, DateStamp, DewPoint, HeatIndex, ETDay, HumIn, HumOut,
Pressure, RainDay, RainMonth, RainRate, RainStorm, RainYear,
TempIn, TempOut, WindDir, WindSpeed, WindSpeed10Min
FROM weather_data
WHERE DateStamp >= '2024-10-14 21:00:00' - INTERVAL 36 HOUR;
"""
df = pd.read_sql(query, engine) # Загружаем данные из SQL-запроса в DataFrame
df['DateStamp'] = pd.to_datetime(df['DateStamp']) # Преобразуем столбец 'DateStamp' в формат datetime
df.set_index('DateStamp', inplace=True) # Устанавливаем 'DateStamp' как индекс
df.sort_index(inplace=True) # Сортируем DataFrame по индексу (по дате)
def download_data(downloads):
"""Скачивает файлы, если они отсутствуют в целевой директории."""
for target, source in downloads.items():
if not target.exists():
print(f"Downloading {source}")
target.parent.mkdir(parents=True, exist_ok=True)
response = requests.get(source)
response.raise_for_status()
with open(target, "wb") as f:
f.write(response.content)
print("Downloads finished!")
lags = 3 # Задаем количество временных сдвигов (лагов)
shifted_dfs = [df] # Создаем список для хранения исходного DataFrame и его лаговых версий
for lag in range(1, lags + 1):
shifted_df = df.shift(lag).add_suffix(f'_t-{lag}') # Создаем сдвинутый на lag строк DataFrame
shifted_dfs.append(shifted_df) # Добавляем его в список
def load_surf(v, v_in_file, download_path, date):
"""Загружает переменные поверхностного уровня или статические переменные."""
ds = xr.open_dataset(download_path / date.strftime(f"surf_{v}_%Y-%m-%d.grib"), engine="cfgrib")
data = ds[v_in_file].values[:2]
data = data[None]
return torch.from_numpy(data)
df_with_lags = pd.concat(shifted_dfs, axis=1) # Объединяем исходный DataFrame и все лаги по столбцам
df_with_lags.dropna(inplace=True) # Удаляем строки с пропущенными значениями
df_with_lags = df_with_lags.copy() # Создаем копию DataFrame (для предотвращения предупреждений)
def load_atmos(v, download_path, date, levels):
"""Загружает атмосферные переменные для заданных уровней давления."""
ds_00 = xr.open_dataset(
download_path / date.strftime(f"atmos_{v}_%Y-%m-%d_00.grib"), engine="cfgrib"
)
ds_06 = xr.open_dataset(
download_path / date.strftime(f"atmos_{v}_%Y-%m-%d_06.grib"), engine="cfgrib"
)
ds_00 = ds_00[v].sel(isobaricInhPa=list(levels))
ds_06 = ds_06[v].sel(isobaricInhPa=list(levels))
data = np.stack((ds_00.values, ds_06.values), axis=0)
data = data[None]
return torch.from_numpy(data)
# Преобразуем столбец 'BarTrend' в числовой формат, используя кодировщик категорий
le = LabelEncoder()
df_with_lags['BarTrend_encoded'] = le.fit_transform(df_with_lags['BarTrend'])
# Оставляем в DataFrame только числовые столбцы
df_with_lags = df_with_lags.select_dtypes(include=['float64', 'int64'])
def create_batch(date, levels, downloads, download_path):
"""Создает объект Batch с данными для модели."""
ds = xr.open_dataset(next(iter(downloads.keys())), engine="cfgrib")
batch = Batch(
surf_vars={
"2t": load_surf("2t", "t2m", download_path, date),
"10u": load_surf("10u", "u10", download_path, date),
"10v": load_surf("10v", "v10", download_path, date),
"msl": load_surf("msl", "msl", download_path, date),
},
static_vars={
"z": load_surf("z", "z", download_path, date)[0, 0],
"slt": load_surf("slt", "slt", download_path, date)[0, 0],
"lsm": load_surf("lsm", "lsm", download_path, date)[0, 0],
},
atmos_vars={
"t": load_atmos("t", download_path, date, levels),
"u": load_atmos("u", download_path, date, levels),
"v": load_atmos("v", download_path, date, levels),
"q": load_atmos("q", download_path, date, levels),
"z": load_atmos("z", download_path, date, levels),
},
metadata=Metadata(
lat=torch.from_numpy(ds.latitude.values),
lon=torch.from_numpy(ds.longitude.values),
time=(date.replace(hour=6),),
atmos_levels=levels,
),
)
return batch.regrid(res=0.1)
# Создаем словари для хранения моделей и значений MSE
models = {}
mse_scores = {}
# Обучаем модели для каждого целевого столбца
for target_column in df.columns:
if target_column not in df_with_lags.columns: # Пропускаем, если столбец отсутствует в df_with_lags
continue
def create_batch_random(levels: tuple[int], date: datetime):
"""Создает объект Batch с рандомными данными для модели."""
return Batch(
surf_vars={k: torch.randn(1, 2, 17, 32) for k in ("2t", "10u", "10v", "msl")},
static_vars={k: torch.randn(17, 32) for k in ("lsm", "z", "slt")},
atmos_vars={k: torch.randn(1, 2, 4, 17, 32) for k in ("z", "u", "v", "t", "q")},
metadata=Metadata(
lat=torch.linspace(90, -90, 17),
lon=torch.linspace(0, 360, 32 + 1)[:-1],
time=(date,),
atmos_levels=levels,
),
)
X = df_with_lags.drop(columns=[target_column]).values # Признаки - все столбцы, кроме целевого
y = df_with_lags[target_column].values # Целевой столбец
# Разделяем данные на обучающую и тестовую выборки без перемешивания (временной ряд)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
def run_model(batch):
"""Инициализирует модель AuroraSmall и выполняет предсказание."""
model = AuroraSmall()
model.load_checkpoint("microsoft/aurora", "aurora-0.25-small-pretrained.ckpt")
model.eval()
model = model.to("cpu")
with torch.inference_mode():
prediction = model.forward(batch)
return prediction
model = RandomForestRegressor() # Инициализируем модель случайного леса
model.fit(X_train, y_train) # Обучаем модель
y_pred = model.predict(X_test) # Делаем предсказания на тестовой выборке
mse = mean_squared_error(y_test, y_pred) # Вычисляем среднеквадратичную ошибку
mse_scores[target_column] = mse # Сохраняем MSE для целевого столбца
models[target_column] = model # Сохраняем модель для целевого столбца
def get_wind_speed_and_direction(prediction, batch: Batch, lat: float, lon: float):
target_lat = lat
target_lon = lon
quality = "хорошая" if mse < 1.0 else "плохая" # Определяем качество модели
print(f"MSE для {target_column}: {mse} ({quality})") # Выводим MSE и качество
lat_idx = torch.abs(batch.metadata.lat - target_lat).argmin()
lon_idx = torch.abs(batch.metadata.lon - target_lon).argmin()
# Обучаем модель для столбца 'BarTrend_encoded' отдельно
X_bartrend = df_with_lags.drop(columns=['BarTrend_encoded']).values # Признаки
y_bartrend = df_with_lags['BarTrend_encoded'].values # Целевой столбец 'BarTrend_encoded'
u_values = prediction.atmos_vars["u"][:, :, :, lat_idx, lon_idx]
v_values = prediction.atmos_vars["v"][:, :, :, lat_idx, lon_idx]
# Разделяем данные на обучающую и тестовую выборки без перемешивания
X_train_bartrend, X_test_bartrend, y_train_bartrend, y_test_bartrend = train_test_split(X_bartrend, y_bartrend,
test_size=0.2,
shuffle=False)
u_scalar = u_values.item()
v_scalar = v_values.item()
model_bartrend = RandomForestRegressor() # Инициализируем модель случайного леса
model_bartrend.fit(X_train_bartrend, y_train_bartrend) # Обучаем модель
print("u value:", u_scalar)
print("v value:", v_scalar)
u_with_units = u_scalar * units("m/s")
v_with_units = v_scalar * units("m/s")
y_pred_bartrend = model_bartrend.predict(X_test_bartrend) # Предсказания на тестовой выборке для 'BarTrend_encoded'
mse_bartrend = mean_squared_error(y_test_bartrend, y_pred_bartrend) # Вычисляем MSE
models['BarTrend_encoded'] = model_bartrend # Сохраняем модель для 'BarTrend_encoded'
mse_scores['BarTrend_encoded'] = mse_bartrend # Сохраняем MSE для 'BarTrend_encoded'
# Рассчитайте направление и скорость ветра
wind_dir = metpy.calc.wind_direction(u_with_units, v_with_units)
wind_speed = metpy.calc.wind_speed(u_with_units, v_with_units)
quality_bartrend = "хорошая" if mse_bartrend < 1.0 else "плохая" # Определяем качество модели для 'BarTrend_encoded'
print(f"MSE для BarTrend: {mse_bartrend} ({quality_bartrend})") # Выводим MSE и качество
wind_dir_text = wind_direction_to_text(wind_dir.magnitude)
# Вывод результата
print(f"Направление ветра: {wind_dir_text} ({wind_dir:.2f}°)")
print(f"Скорость ветра: {wind_speed:.2f} м/с")
return wind_dir.magnitude.item(), wind_speed.magnitude.item()
last_data = X[-1].reshape(1, -1) # Берем последнюю строку данных и преобразуем в формат для предсказания
predictions = {} # Создаем словарь для хранения предсказаний
for target_column, model in models.items():
prediction = model.predict(last_data)[0] # Делаем предсказание для последней строки данных
if target_column == 'BarTrend_encoded':
prediction = le.inverse_transform([int(prediction)])[0] # Декодируем категориальное значение
predictions['BarTrend'] = prediction # Сохраняем предсказание для 'BarTrend'
#print(f"Предсказание для BarTrend: {prediction}") # Выводим предсказание
continue # Продолжаем цикл после предсказания для 'BarTrend_encoded'
predictions[target_column] = prediction # Сохраняем предсказание для остальных столбцов
#print(f"Предсказание для {target_column}: {prediction}") # Выводим предсказание для столбца
def wind_direction_to_text(wind_dir_deg):
directions = [
"север", "северо-восток", "восток", "юго-восток",
"юг", "юго-запад", "запад", "северо-запад"
]
idx = int((wind_dir_deg + 22.5) // 45) % 8
return directions[idx]
return predictions # Возвращаем словарь с предсказанными значениями и названиями столбцов
def main():
levels = (100,)
date = datetime(2024, 11, 5, 12)
# downloads, download_path = get_download_paths(date)
# download_data(downloads) # Скачиваем данные, если их нет
# batch_actual = create_batch(date, levels, downloads, download_path)
batch_actual = create_batch_random(levels, date)
prediction_actual = run_model(batch_actual)
wind_speed_and_direction = get_wind_speed_and_direction(prediction_actual, batch_actual, 50, 20)
return wind_speed_and_direction
if __name__ == "__main__":
main()
print("Prediction completed!")