PIbd-42_SSPR/clickhouse_tools.py

109 lines
4.5 KiB
Python
Raw Normal View History

import time
import pandas as pd
import clickhouse_connect
class ClickHouseClient:
def __init__(self, host='localhost', port=9000, database="default", username="", password=""):
self.host = host
self.port = port
self.database = database
self.username = username
self.password = password
self.client = self.connect_clickhouse()
self.initialize_table()
def connect_clickhouse(self):
"""
Создает подключение к базе данных ClickHouse.
"""
return clickhouse_connect.get_client(host=self.host, port=self.port, database=self.database, username=self.username, password=self.password)
def initialize_table(self):
"""
Инициализирует таблицу в ClickHouse, если она не существует.
"""
create_table_query = """
CREATE TABLE IF NOT EXISTS experiment_data (
volume Float64,
nitrogen_oxide_emission Float64,
temperature Float64,
co_fraction Float64,
co2_fraction Float64,
x Float64,
y Float64,
z Float64,
file_id String
) ENGINE = MergeTree()
ORDER BY file_id
"""
self.client.command(create_table_query)
def get_data(self):
"""
Извлекает максимальные и взвешенные средние значения из ClickHouse, включая разницу max(x) - min(x) при температуре > 1150.
:return: DataFrame с данными из ClickHouse.
"""
query = """
SELECT
file_id,
MAX(temperature) AS max_temperature,
MAX(nitrogen_oxide_emission) AS max_nox,
MAX(co_fraction) AS max_co,
MAX(co2_fraction) AS max_co2,
SUM(volume * temperature) / SUM(volume) AS weighted_avg_temperature,
SUM(volume * nitrogen_oxide_emission) / SUM(volume) AS weighted_avg_nox,
SUM(volume * co_fraction) / SUM(volume) AS weighted_avg_co,
SUM(volume * co2_fraction) / SUM(volume) AS weighted_avg_co2,
MAX(if(temperature > 1150, x, NULL)) - MIN(if(temperature > 1150, x, NULL)) AS x_range_high_temp,
MAX(if(temperature > 1150, y, NULL)) - MIN(if(temperature > 1150, y, NULL)) AS y_range_high_temp,
MAX(if(temperature > 1150, z, NULL)) - MIN(if(temperature > 1150, z, NULL)) AS z_range_high_temp,
SUM(if(temperature > 1150, volume, NULL)) AS flame_volume
FROM
experiment_data
GROUP BY
file_id
"""
results = self.client.query(query)
columns = ["file_id", "max_temperature", "max_nox", "max_co", "max_co2",
"weighted_avg_temperature", "weighted_avg_nox", "weighted_avg_co", "weighted_avg_co2",
"x_range_high_temp", "y_range_high_temp", "z_range_high_temp", "flame_volume"]
return pd.DataFrame(results.result_rows, columns=columns)
def save_csv_to_clickhouse(self, csv_path, file_id):
"""
Загружает данные из CSV файла в ClickHouse.
:param csv_path: Путь к CSV файлу.
:param file_id: Идентификатор файла.
"""
# Чтение данных из CSV
df = pd.read_csv(csv_path, delimiter=';', decimal='.')
# Переименовать колонки
rename_dict = {
"Volume (m^3)": "volume",
"Mass Fraction of Nitrogen Oxide Emission": "nitrogen_oxide_emission",
"Temperature (K)": "temperature",
"Mass Fraction of CO": "co_fraction",
"Mass Fraction of CO2": "co2_fraction",
"X (m)": "x",
"Y (m)": "y",
"Z (m)": "z"
}
df.rename(columns=rename_dict, inplace=True)
df['x'] = abs(df['x'])
df['x'] = df['x'] - df['x'].min()
df['y'] = df['y'] - df['y'].min()
df['z'] = df['z'] - df['z'].min()
# Добавление столбца идентификатора файла
df["file_id"] = file_id
# Удаление существующих записей для данного файла
delete_query = "ALTER TABLE experiment_data DELETE WHERE file_id = %(file_id)s"
self.client.command(delete_query, parameters={'file_id': file_id})
# Вставка данных в ClickHouse
self.client.insert_df('experiment_data', df)