From 3ffc4467bbd1834e86659434160e7593d1d99a15 Mon Sep 17 00:00:00 2001 From: Pavel_Sorokin Date: Tue, 26 Nov 2024 23:17:03 +0400 Subject: [PATCH] metPy function with example Aurora --- davisAPI/davisAPI.py | 298 ++++++++++++++++++++++++++----------------- 1 file changed, 181 insertions(+), 117 deletions(-) diff --git a/davisAPI/davisAPI.py b/davisAPI/davisAPI.py index a5742b1..1703d8e 100644 --- a/davisAPI/davisAPI.py +++ b/davisAPI/davisAPI.py @@ -1,126 +1,190 @@ -import gc -import logging -import time -from datetime import datetime, timedelta -from pprint import pprint -import mariadb -import serial.tools.list_ports - -#from PyWeather.weather.stations.davis import VantagePro -from prediction import run_prediction_module - -logging.basicConfig(filename="Stations.log", - format='%(asctime)s %(message)s', - filemode='a') -logger = logging.getLogger('davis_api') -logger.setLevel(logging.DEBUG) - -console_handler = logging.StreamHandler() -console_handler.setLevel(logging.DEBUG) -console_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s')) -logger.addHandler(console_handler) +# import gc +# import logging +# import time +# from datetime import datetime, timedelta +# from pprint import pprint +# import mariadb +# import serial.tools.list_ports +# +# #from PyWeather.weather.stations.davis import VantagePro +# from prediction import run_prediction_module +# +# logging.basicConfig(filename="Stations.log", +# format='%(asctime)s %(message)s', +# filemode='a') +# logger = logging.getLogger('davis_api') +# logger.setLevel(logging.DEBUG) +# +# console_handler = logging.StreamHandler() +# console_handler.setLevel(logging.DEBUG) +# console_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s')) +# logger.addHandler(console_handler) +# +# +# def write_data(device, station, send=True): +# try: +# # device.parse() +# data = device.fields +# logger.info(data) +# if len(data) < 1: +# return +# else: +# logger.info(data) +# fields = ['BarTrend', 'CRC', 'DateStamp', 'DewPoint', 'HeatIndex', 'ETDay', 'HeatIndex', +# 'HumIn', 'HumOut', 'Pressure', 'RainDay', 'RainMonth', 'RainRate', 'RainStorm', +# 'RainYear', 'SunRise', 'SunSet', 'TempIn', 'TempOut', 'WindDir', 'WindSpeed', +# 'WindSpeed10Min'] +# +# if send: +# placeholders = ', '.join(['%s'] * len(fields)) +# field_names = ', '.join(fields) +# sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})" +# values = [data[field] for field in fields] +# cursor.execute(sql, values) +# conn.commit() +# else: +# logger.info(data) +# +# del data +# del fields +# gc.collect() +# except Exception as e: +# logger.error(str(e)) +# raise e +# +# +# def get_previous_values(cursor): +# cursor.execute("SELECT SunRise, SunSet, WindDir, DateStamp FROM weather_data ORDER BY DateStamp DESC LIMIT 1") +# result = cursor.fetchone() +# +# if result is None: +# return None, None, None, None +# +# sun_rise, sun_set, wind_dir, datestamp = result +# return sun_rise, sun_set, wind_dir, datestamp +# +# +# def save_prediction_to_db(predictions): +# try: +# +# sun_rise, sun_set, wind_dir, datestamp = get_previous_values(cursor) +# +# fields = ['DateStamp', 'SunRise', 'SunSet', 'WindDir'] + list(predictions.keys()) +# placeholders = ', '.join(['%s'] * len(fields)) +# field_names = ', '.join(fields) +# +# values = [datestamp + timedelta(minutes = 1), sun_rise, sun_set, wind_dir] + list(predictions.values()) +# pprint(dict(zip(fields, values))) +# sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})" +# # cursor.execute(sql, values) +# # conn.commit() +# logger.info("Save prediction to db success!") +# except Exception as e: +# logger.error(str(e)) +# raise e +# +# +# try: +# conn = mariadb.connect( +# user="wind", +# password="wind", +# host="193.124.203.110", +# port=3306, +# database="wind_towers" +# ) +# cursor = conn.cursor() +# except mariadb.Error as e: +# logger.error('DB_ERR: ' + str(e)) +# raise e +# while True: +# try: +# ports = serial.tools.list_ports.comports() +# available_ports = {} +# +# for port in ports: +# if port.serial_number == '0001': +# available_ports[port.name] = port.vid +# +# devices = [VantagePro(port) for port in available_ports.keys()] +# while True: +# for i in range(1): +# if len(devices) != 0: +# logger.info(devices) +# # write_data(devices[i], 'st' + str(available_ports[list(available_ports.keys())[i]]), True) +# else: +# raise Exception('Can`t connect to device') +# time.sleep(60) +# except Exception as e: +# logger.error('Device_error' + str(e)) +# predictions = run_prediction_module() +# #logger.info(predictions) +# if predictions is not None: +# save_prediction_to_db(predictions) +# time.sleep(60) -def write_data(device, station, send=True): - try: - # device.parse() - data = device.fields - logger.info(data) - if len(data) < 1: - return - else: - logger.info(data) - fields = ['BarTrend', 'CRC', 'DateStamp', 'DewPoint', 'HeatIndex', 'ETDay', 'HeatIndex', - 'HumIn', 'HumOut', 'Pressure', 'RainDay', 'RainMonth', 'RainRate', 'RainStorm', - 'RainYear', 'SunRise', 'SunSet', 'TempIn', 'TempOut', 'WindDir', 'WindSpeed', - 'WindSpeed10Min'] +#todo переписать под influx, для линухи приколы сделать +import metpy.calc +from datetime import datetime +import torch +from aurora import AuroraSmall, Batch, Metadata +from metpy.units import units - if send: - placeholders = ', '.join(['%s'] * len(fields)) - field_names = ', '.join(fields) - sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})" - values = [data[field] for field in fields] - cursor.execute(sql, values) - conn.commit() - else: - logger.info(data) +def get_wind_speed_and_direction(lat:float,lon:float): + model = AuroraSmall() + model.load_checkpoint("microsoft/aurora", "aurora-0.25-small-pretrained.ckpt") - del data - del fields - gc.collect() - except Exception as e: - logger.error(str(e)) - raise e - - -def get_previous_values(cursor): - cursor.execute("SELECT SunRise, SunSet, WindDir, DateStamp FROM weather_data ORDER BY DateStamp DESC LIMIT 1") - result = cursor.fetchone() - - if result is None: - return None, None, None, None - - sun_rise, sun_set, wind_dir, datestamp = result - return sun_rise, sun_set, wind_dir, datestamp - - -def save_prediction_to_db(predictions): - try: - - sun_rise, sun_set, wind_dir, datestamp = get_previous_values(cursor) - - fields = ['DateStamp', 'SunRise', 'SunSet', 'WindDir'] + list(predictions.keys()) - placeholders = ', '.join(['%s'] * len(fields)) - field_names = ', '.join(fields) - - values = [datestamp + timedelta(minutes = 1), sun_rise, sun_set, wind_dir] + list(predictions.values()) - pprint(dict(zip(fields, values))) - sql = f"INSERT INTO weather_data ({field_names}) VALUES ({placeholders})" - # cursor.execute(sql, values) - # conn.commit() - logger.info("Save prediction to db success!") - except Exception as e: - logger.error(str(e)) - raise e - - -try: - conn = mariadb.connect( - user="wind", - password="wind", - host="193.124.203.110", - port=3306, - database="wind_towers" + batch = Batch( + surf_vars={k: torch.randn(1, 2, 17, 32) for k in ("2t", "10u", "10v", "msl")}, + static_vars={k: torch.randn(17, 32) for k in ("lsm", "z", "slt")}, + atmos_vars={k: torch.randn(1, 2, 4, 17, 32) for k in ("z", "u", "v", "t", "q")}, + metadata=Metadata( + lat=torch.linspace(90, -90, 17), + lon=torch.linspace(0, 360, 32 + 1)[:-1], + time=(datetime(2024, 11, 26, 23, 7),), + atmos_levels=(100,), + ), ) - cursor = conn.cursor() -except mariadb.Error as e: - logger.error('DB_ERR: ' + str(e)) - raise e -while True: - try: - ports = serial.tools.list_ports.comports() - available_ports = {} + prediction = model.forward(batch) - for port in ports: - if port.serial_number == '0001': - available_ports[port.name] = port.vid + target_lat = lat + target_lon = lon - devices = [VantagePro(port) for port in available_ports.keys()] - while True: - for i in range(1): - if len(devices) != 0: - logger.info(devices) - # write_data(devices[i], 'st' + str(available_ports[list(available_ports.keys())[i]]), True) - else: - raise Exception('Can`t connect to device') - time.sleep(60) - except Exception as e: - logger.error('Device_error' + str(e)) - predictions = run_prediction_module() - #logger.info(predictions) - if predictions is not None: - save_prediction_to_db(predictions) - time.sleep(60) + lat_idx = torch.abs(batch.metadata.lat - target_lat).argmin() + lon_idx = torch.abs(batch.metadata.lon - target_lon).argmin() + + u_values = prediction.atmos_vars["u"][:, :, :, lat_idx, lon_idx] + v_values = prediction.atmos_vars["v"][:, :, :, lat_idx, lon_idx] + + print("u values at target location:", u_values) + print("v values at target location:", v_values) + + u_scalar = u_values.item() + v_scalar = v_values.item() + + print("u value:", u_scalar) + print("v value:", v_scalar) + u_with_units = u_scalar * units("m/s") + v_with_units = v_scalar * units("m/s") + + # Рассчитайте направление и скорость ветра + wind_dir = metpy.calc.wind_direction(u_with_units, v_with_units) + wind_speed = metpy.calc.wind_speed(u_with_units, v_with_units) + + wind_dir_text = wind_direction_to_text(wind_dir.magnitude) + print(type(wind_dir)) + # Вывод результата + print(f"Направление ветра: {wind_dir_text} ({wind_dir:.2f}°)") + print(f"Скорость ветра: {wind_speed:.2f} м/с") + return wind_dir.magnitude.item(),wind_speed.magnitude.item() -#todo переписать под influx, для линухи приколы сделать \ No newline at end of file +def wind_direction_to_text(wind_dir_deg): + directions = [ + "север", "северо-восток", "восток", "юго-восток", + "юг", "юго-запад", "запад", "северо-запад" + ] + idx = int((wind_dir_deg + 22.5) // 45) % 8 + return directions[idx] + +print(get_wind_speed_and_direction(50,20)) \ No newline at end of file