109 lines
4.5 KiB
Python
109 lines
4.5 KiB
Python
|
import time
|
|||
|
import pandas as pd
|
|||
|
import clickhouse_connect
|
|||
|
|
|||
|
class ClickHouseClient:
|
|||
|
def __init__(self, host='localhost', port=9000, database="default", username="", password=""):
|
|||
|
self.host = host
|
|||
|
self.port = port
|
|||
|
self.database = database
|
|||
|
self.username = username
|
|||
|
self.password = password
|
|||
|
self.client = self.connect_clickhouse()
|
|||
|
self.initialize_table()
|
|||
|
|
|||
|
def connect_clickhouse(self):
|
|||
|
"""
|
|||
|
Создает подключение к базе данных ClickHouse.
|
|||
|
"""
|
|||
|
return clickhouse_connect.get_client(host=self.host, port=self.port, database=self.database, username=self.username, password=self.password)
|
|||
|
|
|||
|
def initialize_table(self):
|
|||
|
"""
|
|||
|
Инициализирует таблицу в ClickHouse, если она не существует.
|
|||
|
"""
|
|||
|
create_table_query = """
|
|||
|
CREATE TABLE IF NOT EXISTS experiment_data (
|
|||
|
volume Float64,
|
|||
|
nitrogen_oxide_emission Float64,
|
|||
|
temperature Float64,
|
|||
|
co_fraction Float64,
|
|||
|
co2_fraction Float64,
|
|||
|
x Float64,
|
|||
|
y Float64,
|
|||
|
z Float64,
|
|||
|
file_id String
|
|||
|
) ENGINE = MergeTree()
|
|||
|
ORDER BY file_id
|
|||
|
"""
|
|||
|
self.client.command(create_table_query)
|
|||
|
|
|||
|
def get_data(self):
|
|||
|
"""
|
|||
|
Извлекает максимальные и взвешенные средние значения из ClickHouse, включая разницу max(x) - min(x) при температуре > 1150.
|
|||
|
:return: DataFrame с данными из ClickHouse.
|
|||
|
"""
|
|||
|
query = """
|
|||
|
SELECT
|
|||
|
file_id,
|
|||
|
MAX(temperature) AS max_temperature,
|
|||
|
MAX(nitrogen_oxide_emission) AS max_nox,
|
|||
|
MAX(co_fraction) AS max_co,
|
|||
|
MAX(co2_fraction) AS max_co2,
|
|||
|
SUM(volume * temperature) / SUM(volume) AS weighted_avg_temperature,
|
|||
|
SUM(volume * nitrogen_oxide_emission) / SUM(volume) AS weighted_avg_nox,
|
|||
|
SUM(volume * co_fraction) / SUM(volume) AS weighted_avg_co,
|
|||
|
SUM(volume * co2_fraction) / SUM(volume) AS weighted_avg_co2,
|
|||
|
MAX(if(temperature > 1150, x, NULL)) - MIN(if(temperature > 1150, x, NULL)) AS x_range_high_temp,
|
|||
|
MAX(if(temperature > 1150, y, NULL)) - MIN(if(temperature > 1150, y, NULL)) AS y_range_high_temp,
|
|||
|
MAX(if(temperature > 1150, z, NULL)) - MIN(if(temperature > 1150, z, NULL)) AS z_range_high_temp,
|
|||
|
SUM(if(temperature > 1150, volume, NULL)) AS flame_volume
|
|||
|
FROM
|
|||
|
experiment_data
|
|||
|
GROUP BY
|
|||
|
file_id
|
|||
|
"""
|
|||
|
|
|||
|
results = self.client.query(query)
|
|||
|
columns = ["file_id", "max_temperature", "max_nox", "max_co", "max_co2",
|
|||
|
"weighted_avg_temperature", "weighted_avg_nox", "weighted_avg_co", "weighted_avg_co2",
|
|||
|
"x_range_high_temp", "y_range_high_temp", "z_range_high_temp", "flame_volume"]
|
|||
|
|
|||
|
return pd.DataFrame(results.result_rows, columns=columns)
|
|||
|
|
|||
|
def save_csv_to_clickhouse(self, csv_path, file_id):
|
|||
|
"""
|
|||
|
Загружает данные из CSV файла в ClickHouse.
|
|||
|
:param csv_path: Путь к CSV файлу.
|
|||
|
:param file_id: Идентификатор файла.
|
|||
|
"""
|
|||
|
# Чтение данных из CSV
|
|||
|
df = pd.read_csv(csv_path, delimiter=';', decimal='.')
|
|||
|
|
|||
|
# Переименовать колонки
|
|||
|
rename_dict = {
|
|||
|
"Volume (m^3)": "volume",
|
|||
|
"Mass Fraction of Nitrogen Oxide Emission": "nitrogen_oxide_emission",
|
|||
|
"Temperature (K)": "temperature",
|
|||
|
"Mass Fraction of CO": "co_fraction",
|
|||
|
"Mass Fraction of CO2": "co2_fraction",
|
|||
|
"X (m)": "x",
|
|||
|
"Y (m)": "y",
|
|||
|
"Z (m)": "z"
|
|||
|
}
|
|||
|
df.rename(columns=rename_dict, inplace=True)
|
|||
|
|
|||
|
df['x'] = abs(df['x'])
|
|||
|
df['x'] = df['x'] - df['x'].min()
|
|||
|
df['y'] = df['y'] - df['y'].min()
|
|||
|
df['z'] = df['z'] - df['z'].min()
|
|||
|
|
|||
|
# Добавление столбца идентификатора файла
|
|||
|
df["file_id"] = file_id
|
|||
|
|
|||
|
# Удаление существующих записей для данного файла
|
|||
|
delete_query = "ALTER TABLE experiment_data DELETE WHERE file_id = %(file_id)s"
|
|||
|
self.client.command(delete_query, parameters={'file_id': file_id})
|
|||
|
|
|||
|
# Вставка данных в ClickHouse
|
|||
|
self.client.insert_df('experiment_data', df)
|