PIbd-42_SSPR/clickhouse_tools.py

109 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import pandas as pd
import clickhouse_connect
class ClickHouseClient:
def __init__(self, host='localhost', port=9000, database="default", username="", password=""):
self.host = host
self.port = port
self.database = database
self.username = username
self.password = password
self.client = self.connect_clickhouse()
self.initialize_table()
def connect_clickhouse(self):
"""
Создает подключение к базе данных ClickHouse.
"""
return clickhouse_connect.get_client(host=self.host, port=self.port, database=self.database, username=self.username, password=self.password)
def initialize_table(self):
"""
Инициализирует таблицу в ClickHouse, если она не существует.
"""
create_table_query = """
CREATE TABLE IF NOT EXISTS experiment_data (
volume Float64,
nitrogen_oxide_emission Float64,
temperature Float64,
co_fraction Float64,
co2_fraction Float64,
x Float64,
y Float64,
z Float64,
file_id String
) ENGINE = MergeTree()
ORDER BY file_id
"""
self.client.command(create_table_query)
def get_data(self):
"""
Извлекает максимальные и взвешенные средние значения из ClickHouse, включая разницу max(x) - min(x) при температуре > 1150.
:return: DataFrame с данными из ClickHouse.
"""
query = """
SELECT
file_id,
MAX(temperature) AS max_temperature,
MAX(nitrogen_oxide_emission) AS max_nox,
MAX(co_fraction) AS max_co,
MAX(co2_fraction) AS max_co2,
SUM(volume * temperature) / SUM(volume) AS weighted_avg_temperature,
SUM(volume * nitrogen_oxide_emission) / SUM(volume) AS weighted_avg_nox,
SUM(volume * co_fraction) / SUM(volume) AS weighted_avg_co,
SUM(volume * co2_fraction) / SUM(volume) AS weighted_avg_co2,
MAX(if(temperature > 1150, x, NULL)) - MIN(if(temperature > 1150, x, NULL)) AS x_range_high_temp,
MAX(if(temperature > 1150, y, NULL)) - MIN(if(temperature > 1150, y, NULL)) AS y_range_high_temp,
MAX(if(temperature > 1150, z, NULL)) - MIN(if(temperature > 1150, z, NULL)) AS z_range_high_temp,
SUM(if(temperature > 1150, volume, NULL)) AS flame_volume
FROM
experiment_data
GROUP BY
file_id
"""
results = self.client.query(query)
columns = ["file_id", "max_temperature", "max_nox", "max_co", "max_co2",
"weighted_avg_temperature", "weighted_avg_nox", "weighted_avg_co", "weighted_avg_co2",
"x_range_high_temp", "y_range_high_temp", "z_range_high_temp", "flame_volume"]
return pd.DataFrame(results.result_rows, columns=columns)
def save_csv_to_clickhouse(self, csv_path, file_id):
"""
Загружает данные из CSV файла в ClickHouse.
:param csv_path: Путь к CSV файлу.
:param file_id: Идентификатор файла.
"""
# Чтение данных из CSV
df = pd.read_csv(csv_path, delimiter=';', decimal='.')
# Переименовать колонки
rename_dict = {
"Volume (m^3)": "volume",
"Mass Fraction of Nitrogen Oxide Emission": "nitrogen_oxide_emission",
"Temperature (K)": "temperature",
"Mass Fraction of CO": "co_fraction",
"Mass Fraction of CO2": "co2_fraction",
"X (m)": "x",
"Y (m)": "y",
"Z (m)": "z"
}
df.rename(columns=rename_dict, inplace=True)
df['x'] = abs(df['x'])
df['x'] = df['x'] - df['x'].min()
df['y'] = df['y'] - df['y'].min()
df['z'] = df['z'] - df['z'].min()
# Добавление столбца идентификатора файла
df["file_id"] = file_id
# Удаление существующих записей для данного файла
delete_query = "ALTER TABLE experiment_data DELETE WHERE file_id = %(file_id)s"
self.client.command(delete_query, parameters={'file_id': file_id})
# Вставка данных в ClickHouse
self.client.insert_df('experiment_data', df)