Compare commits
No commits in common. "feature/add-video-messages-recognition" and "main" have entirely different histories.
feature/ad
...
main
7
.gitignore
vendored
7
.gitignore
vendored
@ -129,9 +129,6 @@ ENV/
|
|||||||
env.bak/
|
env.bak/
|
||||||
venv.bak/
|
venv.bak/
|
||||||
|
|
||||||
# Configuration
|
|
||||||
config.py
|
|
||||||
|
|
||||||
# Spyder project settings
|
# Spyder project settings
|
||||||
.spyderproject
|
.spyderproject
|
||||||
.spyproject
|
.spyproject
|
||||||
@ -164,7 +161,6 @@ cython_debug/
|
|||||||
#.idea/
|
#.idea/
|
||||||
|
|
||||||
# ---> VisualStudioCode
|
# ---> VisualStudioCode
|
||||||
.vscode
|
|
||||||
.vscode/*
|
.vscode/*
|
||||||
!.vscode/settings.json
|
!.vscode/settings.json
|
||||||
!.vscode/tasks.json
|
!.vscode/tasks.json
|
||||||
@ -178,6 +174,3 @@ cython_debug/
|
|||||||
# Built Visual Studio Code Extensions
|
# Built Visual Studio Code Extensions
|
||||||
*.vsix
|
*.vsix
|
||||||
|
|
||||||
# Session info
|
|
||||||
*.session
|
|
||||||
*.session-journal
|
|
33
main.py
33
main.py
@ -1,33 +0,0 @@
|
|||||||
from src.integrations.gigachat_api_client import GigaChatClient
|
|
||||||
from src.bot.telegram_userbot import TelegramUserBot
|
|
||||||
from src.utils import logging
|
|
||||||
from src.core.configuration import config
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
|
||||||
"""
|
|
||||||
Entry point for starting the Telegram user bot.
|
|
||||||
"""
|
|
||||||
# Configure logging
|
|
||||||
logging.setup_logging()
|
|
||||||
|
|
||||||
# Load API credentials and configuration
|
|
||||||
api_id: str = config.API_ID
|
|
||||||
api_hash: str = config.API_HASH
|
|
||||||
api_token: str = config.API_GIGACHAT_TOKEN
|
|
||||||
|
|
||||||
# Initialize GigaChatClient
|
|
||||||
gigachat_client: GigaChatClient = GigaChatClient(api_token=api_token)
|
|
||||||
|
|
||||||
# Initialize and run the Telegram user bot
|
|
||||||
bot: TelegramUserBot = TelegramUserBot(
|
|
||||||
session_name="userbot",
|
|
||||||
api_id=api_id,
|
|
||||||
api_hash=api_hash,
|
|
||||||
gigachat_client=gigachat_client
|
|
||||||
)
|
|
||||||
bot.run()
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
2094
poetry.lock
generated
2094
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@ -6,24 +6,7 @@ authors = ["Factorino73 <masenkin73@xmail.ru>"]
|
|||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
|
|
||||||
[tool.poetry.dependencies]
|
[tool.poetry.dependencies]
|
||||||
python = "^3.12"
|
python = "^3.13"
|
||||||
pyrogram = "^2.0.106"
|
|
||||||
tgcrypto = "^1.2.5"
|
|
||||||
setuptools = "^75.6.0"
|
|
||||||
wheel = "^0.45.1"
|
|
||||||
langchain-gigachat = "^0.3.2"
|
|
||||||
punq = "^0.7.0"
|
|
||||||
pytest = "^8.3.4"
|
|
||||||
speechrecognition = "^3.13.0"
|
|
||||||
typing-extensions = "^4.12.2"
|
|
||||||
pydub = "^0.25.1"
|
|
||||||
numpy = "2.0.2"
|
|
||||||
soundfile = "^0.13.0"
|
|
||||||
torch = "^2.5.1"
|
|
||||||
llvmlite = "0.43.0"
|
|
||||||
numba = "0.60.0"
|
|
||||||
openai-whisper = "^20240930"
|
|
||||||
moviepy = "^2.1.1"
|
|
||||||
|
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
|
@ -1,289 +0,0 @@
|
|||||||
import os
|
|
||||||
import logging
|
|
||||||
from logging import Logger
|
|
||||||
from tempfile import NamedTemporaryFile
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from pyrogram import filters
|
|
||||||
from pyrogram.client import Client
|
|
||||||
from pyrogram.types import Message
|
|
||||||
from pyrogram.enums import ChatAction
|
|
||||||
from pyrogram.types.messages_and_media.video import Video
|
|
||||||
from pyrogram.types.messages_and_media.video_note import VideoNote
|
|
||||||
|
|
||||||
from src.integrations.gigachat_api_client import GigaChatClient
|
|
||||||
from src.utils import speech_recognition, video_processing
|
|
||||||
|
|
||||||
|
|
||||||
class TelegramUserBot:
|
|
||||||
"""
|
|
||||||
A Telegram user bot.
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
app (Client): The Pyrogram client instance for the bot.
|
|
||||||
gigachat_client (GigaChatClient): The client instance for GigaChat integration.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, session_name: str, api_id: str, api_hash: str, gigachat_client: GigaChatClient) -> None:
|
|
||||||
"""
|
|
||||||
Initializes the Telegram user bot.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
session_name (str): The session name for the bot.
|
|
||||||
api_id (str): The API ID for the Telegram application.
|
|
||||||
api_hash (str): The API hash for the Telegram application.
|
|
||||||
gigachat_client (GigaChatClient): An instance of GigaChatClient for handling AI responses.
|
|
||||||
"""
|
|
||||||
# Configure logging
|
|
||||||
self.logger: Logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
self.app: Client = Client(session_name, api_id=api_id, api_hash=api_hash)
|
|
||||||
self.gigachat_client: GigaChatClient = gigachat_client
|
|
||||||
self.register_handlers()
|
|
||||||
|
|
||||||
def register_handlers(self) -> None:
|
|
||||||
"""
|
|
||||||
Registers the message handlers for the bot.
|
|
||||||
"""
|
|
||||||
self.logger.debug("Registering handlers.")
|
|
||||||
self.app.on_message(filters.command("ai"))(self.handle_ai_command)
|
|
||||||
self.app.on_message(filters.command("voice"))(self.handle_voice_command)
|
|
||||||
self.app.on_message(filters.command("video"))(self.handle_video_command)
|
|
||||||
|
|
||||||
async def handle_ai_command(self, client: Client, message: Message) -> None:
|
|
||||||
"""
|
|
||||||
Handles messages that invoke the /ai command.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
client (Client): The Pyrogram client instance.
|
|
||||||
message (Message): The incoming Telegram message.
|
|
||||||
"""
|
|
||||||
self.logger.info(f"Received /ai command from chat_id={message.chat.id}")
|
|
||||||
|
|
||||||
# Extract the command argument
|
|
||||||
command_arg: Optional[str] = " ".join(message.text.split()[1:])
|
|
||||||
|
|
||||||
if not command_arg and message.reply_to_message and message.reply_to_message.text:
|
|
||||||
# Use the text of the replied message if no argument is provided
|
|
||||||
command_arg = message.reply_to_message.text
|
|
||||||
|
|
||||||
if not command_arg:
|
|
||||||
self.logger.warning(f"No argument or replied message provided for /ai command by chat_id={message.chat.id}")
|
|
||||||
await message.reply("Please provide a message after /ai or reply to a message.", quote=True)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Send an initial message indicating processing
|
|
||||||
self.logger.debug(f"Processing request for chat_id={message.chat.id}")
|
|
||||||
processing_message: Message = await message.reply(f"{self.gigachat_client.model_name} is processing your request...", quote=True)
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Start typing animation
|
|
||||||
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
|
|
||||||
|
|
||||||
# Get a response from GigaChat
|
|
||||||
response: str = self.gigachat_client.get_response(str(message.chat.id), command_arg)
|
|
||||||
self.logger.debug(f"Received response for chat_id={message.chat.id}")
|
|
||||||
|
|
||||||
# Edit the processing message with the generated response
|
|
||||||
await processing_message.edit_text(response)
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.error(f"Error processing /ai command for chat_id={message.chat.id}: {e}", exc_info=True)
|
|
||||||
await processing_message.edit_text("An error occurred while processing your request.")
|
|
||||||
finally:
|
|
||||||
# Stop indicating typing action
|
|
||||||
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
|
|
||||||
|
|
||||||
def get_language(self, input_text: str) -> str:
|
|
||||||
"""
|
|
||||||
Determines the language for voice-to-text conversion based on the input parameter.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
input_text (str): The input parameter indicating the language.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The language code ('en' or 'ru').
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ValueError: If an invalid language parameter is provided.
|
|
||||||
"""
|
|
||||||
language_params: dict[str, list[str]] = {
|
|
||||||
'en': ['en', 'eng', 'english'],
|
|
||||||
'ru': ['ru', 'rus', 'russian']
|
|
||||||
}
|
|
||||||
|
|
||||||
input_lower: str = input_text.lower()
|
|
||||||
for lang_code, aliases in language_params.items():
|
|
||||||
if input_lower in aliases:
|
|
||||||
return lang_code
|
|
||||||
raise ValueError(
|
|
||||||
"Invalid language parameter. Please use one of the following:\n" +
|
|
||||||
"\n".join(f"{lang_code}: {', '.join(aliases)}" for lang_code, aliases in language_params.items())
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def handle_voice_command(self, client: Client, message: Message) -> None:
|
|
||||||
"""
|
|
||||||
Handle the /voice command to convert a voice message to text with optional language selection.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
client (Client): The Pyrogram Client instance.
|
|
||||||
message (Message): The incoming message containing the /voice command.
|
|
||||||
"""
|
|
||||||
self.logger.info(f"Received /voice command from chat_id={message.chat.id}.")
|
|
||||||
|
|
||||||
# Parse the language parameter (default to Russian)
|
|
||||||
command_parts: list[str] = message.text.split()
|
|
||||||
try:
|
|
||||||
language: str = self.get_language(command_parts[1]) if len(command_parts) > 1 else 'ru'
|
|
||||||
except ValueError as e:
|
|
||||||
await message.reply(str(e), quote=True)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Check if the reply is to a voice message
|
|
||||||
if not (message.reply_to_message and message.reply_to_message.voice):
|
|
||||||
self.logger.warning("The /voice command was not used in reply to a voice message.")
|
|
||||||
await message.reply("Please reply to a voice message with the /voice command.", quote=True)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Send an initial message indicating processing
|
|
||||||
processing_message: Message = await message.reply_to_message.reply("Converting voice message to text...", quote=True)
|
|
||||||
|
|
||||||
with NamedTemporaryFile(delete=False) as temp_file:
|
|
||||||
file_path = await client.download_media(message.reply_to_message.voice.file_id, file_name=temp_file.name)
|
|
||||||
self.logger.info(f"Voice message downloaded to {file_path}.")
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Check voice message duration
|
|
||||||
duration: float = speech_recognition.get_audio_duration(file_path) # type: ignore
|
|
||||||
if duration > 300:
|
|
||||||
self.logger.warning(f"Voice message too long: {duration} seconds.")
|
|
||||||
await processing_message.edit_text("The voice message is too long (over 5 minutes). Please send a shorter one.")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Start typing animation
|
|
||||||
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
|
|
||||||
|
|
||||||
# Attempt to convert voice to text with the selected language
|
|
||||||
text: str = speech_recognition.convert_voice_to_text(file_path, language=language) # type: ignore
|
|
||||||
self.logger.info("Voice message successfully converted to text.")
|
|
||||||
|
|
||||||
# Format the text for sending
|
|
||||||
formatted_text: str = (
|
|
||||||
f"<pre language=\"Conversion Result ({language})\">"
|
|
||||||
f"{text}"
|
|
||||||
"</pre>"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Edit the initial processing message with the converted text
|
|
||||||
await processing_message.edit_text(formatted_text)
|
|
||||||
except FileNotFoundError:
|
|
||||||
self.logger.error("File not found during processing.", exc_info=True)
|
|
||||||
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
|
||||||
except RuntimeError:
|
|
||||||
self.logger.error("A runtime error occurred.", exc_info=True)
|
|
||||||
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
|
||||||
except Exception:
|
|
||||||
self.logger.error("An unexpected error occurred.", exc_info=True)
|
|
||||||
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
|
||||||
finally:
|
|
||||||
# Stop indicating typing action
|
|
||||||
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
|
|
||||||
|
|
||||||
# Clean up temporary files
|
|
||||||
if os.path.exists(file_path): # type: ignore
|
|
||||||
os.remove(file_path) # type: ignore
|
|
||||||
|
|
||||||
async def handle_video_command(self, client: Client, message: Message) -> None:
|
|
||||||
"""
|
|
||||||
Handle the /video command to convert a video or video note message to text with optional language selection.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
client (Client): The Pyrogram Client instance.
|
|
||||||
message (Message): The incoming message containing the /video command.
|
|
||||||
"""
|
|
||||||
self.logger.info(f"Received /video command from chat_id={message.chat.id}.")
|
|
||||||
|
|
||||||
# Parse the language parameter (default to Russian)
|
|
||||||
command_parts: list[str] = message.text.split()
|
|
||||||
try:
|
|
||||||
language: str = self.get_language(command_parts[1]) if len(command_parts) > 1 else 'ru'
|
|
||||||
except ValueError as e:
|
|
||||||
await message.reply(str(e), quote=True)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Check if the reply is to a video or video note message
|
|
||||||
if not (message.reply_to_message and (message.reply_to_message.video or message.reply_to_message.video_note)):
|
|
||||||
self.logger.warning("The /video command was not used in reply to a video or video note message.")
|
|
||||||
await message.reply("Please reply to a video or video note message with the /video command.", quote=True)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Identify the file type (video or video note)
|
|
||||||
media_type: str = "video" if message.reply_to_message.video else "video_note"
|
|
||||||
media: Video | VideoNote = message.reply_to_message.video if media_type == "video" else message.reply_to_message.video_note
|
|
||||||
|
|
||||||
# Send an initial message indicating processing
|
|
||||||
processing_message: Message = await message.reply_to_message.reply("Processing video message to extract text...", quote=True)
|
|
||||||
|
|
||||||
with NamedTemporaryFile(delete=False) as temp_video_file:
|
|
||||||
video_path = await client.download_media(
|
|
||||||
media.file_id,
|
|
||||||
file_name=temp_video_file.name
|
|
||||||
)
|
|
||||||
self.logger.info(f"{media_type} message downloaded to {video_path}.")
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Check video duration
|
|
||||||
duration: float = video_processing.get_video_duration(video_path) # type: ignore
|
|
||||||
if duration > 300:
|
|
||||||
self.logger.warning(f"{media_type} too long: {duration} seconds.")
|
|
||||||
await processing_message.edit_text("The video or video note is too long (over 5 minutes). Please send a shorter one.")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Extract audio from video
|
|
||||||
output_dir = os.path.dirname(video_path) # type: ignore
|
|
||||||
audio_path: str = video_processing.extract_audio_from_video(video_path, output_dir) # type: ignore
|
|
||||||
|
|
||||||
# Convert extracted audio to text
|
|
||||||
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
|
|
||||||
text: str = speech_recognition.convert_voice_to_text(audio_path, language=language) # type: ignore
|
|
||||||
self.logger.info(f"{media_type} message successfully converted to text.")
|
|
||||||
|
|
||||||
# Format the text for sending
|
|
||||||
formatted_text: str = (
|
|
||||||
f"<pre language=\"Conversion Result ({language})\">"
|
|
||||||
f"{text}"
|
|
||||||
"</pre>"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Edit the initial processing message with the converted text
|
|
||||||
await processing_message.edit_text(formatted_text)
|
|
||||||
|
|
||||||
except FileNotFoundError:
|
|
||||||
self.logger.error("File not found during processing.", exc_info=True)
|
|
||||||
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
|
|
||||||
except RuntimeError:
|
|
||||||
self.logger.error("A runtime error occurred.", exc_info=True)
|
|
||||||
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
|
|
||||||
except Exception:
|
|
||||||
self.logger.error("An unexpected error occurred.", exc_info=True)
|
|
||||||
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
|
|
||||||
finally:
|
|
||||||
# Stop indicating typing action
|
|
||||||
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
|
|
||||||
|
|
||||||
# Clean up temporary files
|
|
||||||
if os.path.exists(video_path): # type: ignore
|
|
||||||
os.remove(video_path) # type: ignore
|
|
||||||
if os.path.exists(audio_path):
|
|
||||||
os.remove(audio_path)
|
|
||||||
|
|
||||||
def run(self) -> None:
|
|
||||||
"""
|
|
||||||
Starts the bot.
|
|
||||||
"""
|
|
||||||
self.logger.info("Bot is starting.")
|
|
||||||
print("Bot is running.")
|
|
||||||
try:
|
|
||||||
self.app.run()
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.critical(f"Failed to start the bot: {e}", exc_info=True)
|
|
@ -1,27 +0,0 @@
|
|||||||
"""
|
|
||||||
config.py
|
|
||||||
|
|
||||||
This file contains the configuration settings required for the Telegram bot and GigaChat integration.
|
|
||||||
|
|
||||||
Configuration settings:
|
|
||||||
API_ID (str): The unique identifier for your Telegram application. It is required to initialize the Pyrogram client.
|
|
||||||
API_HASH (str): The hash associated with your Telegram application. It is required to initialize the Pyrogram client.
|
|
||||||
BOT_TOKEN (str): The token for the Telegram bot. It is used for authenticating the bot with the Telegram API.
|
|
||||||
API_GIGACHAT_TOKEN (str): The token for authenticating with the GigaChat API. It is used for communication with the GigaChat service.
|
|
||||||
|
|
||||||
Note:
|
|
||||||
- Make sure to keep these tokens secure and do not share them publicly.
|
|
||||||
- These values should be replaced with actual credentials for the bot and the GigaChat API.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# The API ID for the Telegram application
|
|
||||||
API_ID: str = ''
|
|
||||||
|
|
||||||
# The API hash for the Telegram application
|
|
||||||
API_HASH: str = ''
|
|
||||||
|
|
||||||
# The bot token for authenticating the bot with Telegram
|
|
||||||
BOT_TOKEN: str = ''
|
|
||||||
|
|
||||||
# The token for authenticating with the GigaChat API
|
|
||||||
API_GIGACHAT_TOKEN: str = ''
|
|
@ -1,102 +0,0 @@
|
|||||||
import logging
|
|
||||||
from logging import Logger
|
|
||||||
from typing import Dict
|
|
||||||
|
|
||||||
from langchain_core.runnables.history import RunnableWithMessageHistory
|
|
||||||
from langchain_core.chat_history import InMemoryChatMessageHistory
|
|
||||||
from langchain_gigachat.chat_models import GigaChat
|
|
||||||
|
|
||||||
|
|
||||||
class GigaChatClient:
|
|
||||||
"""
|
|
||||||
A client class for interacting with the GigaChat API using LangChain components.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, api_token: str, model_name: str = "GigaChat") -> None:
|
|
||||||
"""
|
|
||||||
Initializes the GigaChatManager with API credentials and a default model.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
api_token (str): The API token for authenticating with the GigaChat API.
|
|
||||||
model_name (str): The GigaChat model to use. Defaults to "GigaChat".
|
|
||||||
"""
|
|
||||||
# Configure logging
|
|
||||||
self.logger: Logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
self.api_token: str = api_token
|
|
||||||
self.model_name: str = model_name
|
|
||||||
self.logger.info(f"Initialize GigaChat client Using model: {self.model_name}")
|
|
||||||
|
|
||||||
self.llm: GigaChat = self._create_llm(model_name)
|
|
||||||
self.store: Dict[str, InMemoryChatMessageHistory] = {}
|
|
||||||
self.conversation = RunnableWithMessageHistory(self.llm, self.get_session_history)
|
|
||||||
|
|
||||||
def _create_llm(self, model_name: str) -> GigaChat:
|
|
||||||
"""
|
|
||||||
Creates and configures a GigaChat LLM instance.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
model_name (str): The GigaChat model to use.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
GigaChat: Configured GigaChat instance.
|
|
||||||
"""
|
|
||||||
self.logger.debug(f"Creating GigaChat LLM with model: {model_name}")
|
|
||||||
return GigaChat(
|
|
||||||
credentials=self.api_token,
|
|
||||||
scope="GIGACHAT_API_PERS",
|
|
||||||
model=model_name,
|
|
||||||
verify_ssl_certs=False,
|
|
||||||
streaming=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_session_history(self, session_id: str) -> InMemoryChatMessageHistory:
|
|
||||||
"""
|
|
||||||
Retrieves the chat history for a given session, creating it if it does not exist.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
session_id (str): The unique identifier for the session.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
InMemoryChatMessageHistory: The chat history for the session.
|
|
||||||
"""
|
|
||||||
if session_id not in self.store:
|
|
||||||
self.logger.debug(f"Creating new session history for session_id: {session_id}")
|
|
||||||
self.store[session_id] = InMemoryChatMessageHistory()
|
|
||||||
else:
|
|
||||||
self.logger.debug(f"Retrieving existing session history for session_id: {session_id}")
|
|
||||||
return self.store[session_id]
|
|
||||||
|
|
||||||
def set_model(self, model_name: str) -> None:
|
|
||||||
"""
|
|
||||||
Updates the LLM to use a different GigaChat model.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
model_name (str): The new GigaChat model to use.
|
|
||||||
"""
|
|
||||||
self.logger.info(f"Switching model to: {model_name}")
|
|
||||||
self.llm = self._create_llm(model_name)
|
|
||||||
self.conversation = RunnableWithMessageHistory(self.llm, self.get_session_history)
|
|
||||||
|
|
||||||
def get_response(self, session_id: str, text: str) -> str:
|
|
||||||
"""
|
|
||||||
Get a response to the provided input text for a given session.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
session_id (str): The unique identifier for the session.
|
|
||||||
text (str): The input text for which a response is needed.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The response text.
|
|
||||||
"""
|
|
||||||
self.logger.info(f"Generating response for session_id: {session_id}")
|
|
||||||
try:
|
|
||||||
response = self.conversation.invoke(
|
|
||||||
input=text,
|
|
||||||
config={"configurable": {"session_id": session_id}},
|
|
||||||
)
|
|
||||||
self.logger.debug(f"Response for session_id {session_id}")
|
|
||||||
return response.content
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.error(f"Error while getting response for session_id: {session_id}. Error: {e}", exc_info=True)
|
|
||||||
raise
|
|
@ -1,62 +0,0 @@
|
|||||||
import logging
|
|
||||||
import logging.config
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
|
|
||||||
def setup_logging(output_to_console=False) -> None:
|
|
||||||
"""
|
|
||||||
Configures the logging system.
|
|
||||||
|
|
||||||
This function sets up logging with optional output to the console and ensures
|
|
||||||
log files are rotated daily. It creates a detailed logging format and retains
|
|
||||||
log files for a week.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
output_to_console (bool): If True, log messages will also be printed to the console.
|
|
||||||
Defaults to False.
|
|
||||||
"""
|
|
||||||
# Define the default handlers to use. Always logs to a file.
|
|
||||||
handlers: list[str] = ['file']
|
|
||||||
if output_to_console:
|
|
||||||
# Add console logging if requested
|
|
||||||
handlers.append('console')
|
|
||||||
|
|
||||||
# Generate the log file name with the current date
|
|
||||||
log_filename: str = f'logs/log-{datetime.now().strftime("%Y-%m-%d")}.log'
|
|
||||||
|
|
||||||
# Configure the logging settings using a dictionary
|
|
||||||
logging.config.dictConfig({
|
|
||||||
'version': 1, # Logging configuration version
|
|
||||||
'disable_existing_loggers': True, # Deny other loggers to remain active
|
|
||||||
'formatters': {
|
|
||||||
'detailed': { # Define a detailed logging format
|
|
||||||
'format': (
|
|
||||||
'%(asctime)s | %(levelname)-8s | '
|
|
||||||
'%(filename)s.%(funcName)s, line %(lineno)d: '
|
|
||||||
'%(message)s'
|
|
||||||
),
|
|
||||||
'datefmt': '%Y-%m-%d %H:%M:%S' # Timestamp format
|
|
||||||
},
|
|
||||||
},
|
|
||||||
'handlers': {
|
|
||||||
# Console handler outputs log messages to the console
|
|
||||||
'console': {
|
|
||||||
'class': 'logging.StreamHandler', # Standard output stream
|
|
||||||
'formatter': 'detailed', # Use the detailed formatter
|
|
||||||
},
|
|
||||||
# File handler writes log messages to a file, rotating daily
|
|
||||||
'file': {
|
|
||||||
'class': 'logging.handlers.TimedRotatingFileHandler',
|
|
||||||
'filename': log_filename, # Log file path
|
|
||||||
'when': 'midnight', # Rotate log files at midnight
|
|
||||||
'interval': 1, # Rotate daily
|
|
||||||
'backupCount': 7, # Keep up to 7 old log files
|
|
||||||
'formatter': 'detailed', # Use the detailed formatter
|
|
||||||
},
|
|
||||||
},
|
|
||||||
# Define the root logger configuration
|
|
||||||
'root': {
|
|
||||||
'handlers': handlers, # Handlers to use (console, file, or both)
|
|
||||||
'level': 'DEBUG', # Log level (DEBUG logs all levels)
|
|
||||||
},
|
|
||||||
})
|
|
@ -1,120 +0,0 @@
|
|||||||
import os
|
|
||||||
import logging
|
|
||||||
from logging import Logger
|
|
||||||
|
|
||||||
from pydub import AudioSegment
|
|
||||||
import speech_recognition as sr
|
|
||||||
from speech_recognition.audio import AudioData
|
|
||||||
|
|
||||||
|
|
||||||
# Configure logging
|
|
||||||
logger: Logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
def convert_to_wav(file_path: str) -> str:
|
|
||||||
"""
|
|
||||||
Converts an audio file to WAV format if it is not already in WAV format.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
file_path (str): The path to the audio file to be converted.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The path to the converted or original WAV file.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
FileNotFoundError: If the file does not exist.
|
|
||||||
RuntimeError: If the conversion fails for any reason.
|
|
||||||
"""
|
|
||||||
# Check if the file exists
|
|
||||||
if not os.path.exists(file_path):
|
|
||||||
logger.error(f"File {file_path} does not exist.")
|
|
||||||
raise FileNotFoundError(f"File {file_path} does not exist.")
|
|
||||||
|
|
||||||
if file_path.lower().endswith('.wav'):
|
|
||||||
logger.info(f"File {file_path} is already in WAV format.")
|
|
||||||
return file_path
|
|
||||||
|
|
||||||
try:
|
|
||||||
logger.info(f"Converting {file_path} to WAV format.")
|
|
||||||
audio = AudioSegment.from_file(file_path)
|
|
||||||
wav_path: str = f"{os.path.splitext(file_path)[0]}.wav"
|
|
||||||
audio.export(wav_path, format="wav")
|
|
||||||
logger.info(f"File converted to {wav_path}.")
|
|
||||||
return wav_path
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to convert file to WAV: {e}")
|
|
||||||
raise RuntimeError(f"Failed to convert file to WAV: {e}")
|
|
||||||
|
|
||||||
def get_audio_duration(file_path: str) -> float:
|
|
||||||
"""
|
|
||||||
Retrieves the duration of an audio file in seconds.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
file_path (str): The path to the audio file.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
float: The duration of the audio file in seconds.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
FileNotFoundError: If the file does not exist.
|
|
||||||
RuntimeError: If unable to get the file duration.
|
|
||||||
"""
|
|
||||||
# Check if the file exists
|
|
||||||
if not os.path.exists(file_path):
|
|
||||||
logger.error(f"File {file_path} does not exist.")
|
|
||||||
raise FileNotFoundError(f"File {file_path} does not exist.")
|
|
||||||
|
|
||||||
try:
|
|
||||||
logger.info(f"Getting duration of {file_path}.")
|
|
||||||
audio = AudioSegment.from_file(file_path)
|
|
||||||
duration: float = len(audio) / 1000 # Duration in seconds
|
|
||||||
logger.info(f"Duration of {file_path}: {duration} seconds.")
|
|
||||||
return duration
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to get file duration: {e}")
|
|
||||||
raise RuntimeError(f"Failed to get file duration: {e}")
|
|
||||||
|
|
||||||
def convert_voice_to_text(file_path: str, language='ru') -> str:
|
|
||||||
"""
|
|
||||||
Converts speech from an audio file to text using OpenAI speech recognition service.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
file_path (str): The path to the audio file to be processed.
|
|
||||||
language (str): The language code for speech recognition (default is 'ru').
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The transcribed text if recognition is successful.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
FileNotFoundError: If the file does not exist.
|
|
||||||
RuntimeError: For any errors encountered during processing.
|
|
||||||
"""
|
|
||||||
# Check if the file exists
|
|
||||||
if not os.path.exists(file_path):
|
|
||||||
logger.error(f"File {file_path} does not exist.")
|
|
||||||
raise FileNotFoundError("File does not exist.")
|
|
||||||
|
|
||||||
# Convert the file to WAV format if necessary
|
|
||||||
try:
|
|
||||||
wav_path: str = convert_to_wav(file_path)
|
|
||||||
except RuntimeError as e:
|
|
||||||
logger.error(f"Error converting to WAV: {e}")
|
|
||||||
raise RuntimeError(f"Error converting to WAV: {e}")
|
|
||||||
|
|
||||||
recognizer = sr.Recognizer()
|
|
||||||
|
|
||||||
try:
|
|
||||||
logger.info(f"Processing file {wav_path} ({get_audio_duration(wav_path)} sec) for speech recognition.")
|
|
||||||
with sr.AudioFile(wav_path) as source:
|
|
||||||
audio_data: AudioData = recognizer.record(source)
|
|
||||||
text = recognizer.recognize_whisper(audio_data, language=language, model='medium')
|
|
||||||
logger.info("Speech recognition successful.")
|
|
||||||
return text # type: ignore
|
|
||||||
except sr.UnknownValueError:
|
|
||||||
logger.warning(f"Speech in {wav_path} could not be recognized.")
|
|
||||||
raise RuntimeError("Speech could not be recognized.")
|
|
||||||
except sr.RequestError as e:
|
|
||||||
logger.error(f"Request error from the recognition service: {e}")
|
|
||||||
raise RuntimeError(f"Request error from the recognition service: {e}")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"An unexpected error occurred: {e}")
|
|
||||||
raise RuntimeError(f"An unexpected error occurred: {e}")
|
|
@ -1,71 +0,0 @@
|
|||||||
import os
|
|
||||||
import logging
|
|
||||||
from logging import Logger
|
|
||||||
|
|
||||||
from moviepy import VideoFileClip
|
|
||||||
|
|
||||||
|
|
||||||
# Configure logging
|
|
||||||
logger: Logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
def get_video_duration(video_path: str) -> float:
|
|
||||||
"""
|
|
||||||
Get the duration of a video file in seconds.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
video_path (str): The path to the video file.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
float: The duration of the video in seconds.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
FileNotFoundError: If the video file does not exist.
|
|
||||||
RuntimeError: If an error occurs during processing.
|
|
||||||
"""
|
|
||||||
if not os.path.exists(video_path):
|
|
||||||
logger.error(f"Video file {video_path} does not exist.")
|
|
||||||
raise FileNotFoundError(f"Video file {video_path} does not exist.")
|
|
||||||
|
|
||||||
try:
|
|
||||||
video_clip = VideoFileClip(video_path)
|
|
||||||
duration = video_clip.duration
|
|
||||||
logger.info(f"Duration of video {video_path}: {duration} seconds.")
|
|
||||||
return duration
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to get video duration: {e}", exc_info=True)
|
|
||||||
raise RuntimeError(f"Failed to get video duration: {e}")
|
|
||||||
finally:
|
|
||||||
video_clip.close()
|
|
||||||
|
|
||||||
|
|
||||||
def extract_audio_from_video(video_path: str, output_dir: str) -> str:
|
|
||||||
"""
|
|
||||||
Extracts the audio track from a video file and saves it as a WAV file.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
video_path (str): The path to the video file.
|
|
||||||
output_dir (str): The directory where the audio file will be saved.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The path to the extracted audio file.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
FileNotFoundError: If the video file does not exist.
|
|
||||||
RuntimeError: If an error occurs during audio extraction.
|
|
||||||
"""
|
|
||||||
if not os.path.exists(video_path):
|
|
||||||
logger.error(f"Video file {video_path} does not exist.")
|
|
||||||
raise FileNotFoundError(f"Video file {video_path} does not exist.")
|
|
||||||
|
|
||||||
try:
|
|
||||||
logger.info(f"Extracting audio from video: {video_path}")
|
|
||||||
video_clip = VideoFileClip(video_path)
|
|
||||||
audio_path = os.path.join(output_dir, f"{os.path.splitext(os.path.basename(video_path))[0]}.wav")
|
|
||||||
video_clip.audio.write_audiofile(audio_path) # type: ignore
|
|
||||||
logger.info(f"Audio extracted and saved to: {audio_path}")
|
|
||||||
return audio_path
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to extract audio from video: {e}", exc_info=True)
|
|
||||||
raise RuntimeError(f"Failed to extract audio from video: {e}")
|
|
||||||
finally:
|
|
||||||
video_clip.close()
|
|
@ -1,55 +0,0 @@
|
|||||||
import pytest
|
|
||||||
from unittest.mock import MagicMock, patch
|
|
||||||
from langchain_core.chat_history import InMemoryChatMessageHistory
|
|
||||||
from langchain_gigachat import GigaChat
|
|
||||||
|
|
||||||
from src.integrations.gigachat_api_client import GigaChatClient
|
|
||||||
from src.core.configuration import config
|
|
||||||
|
|
||||||
|
|
||||||
API_GIGACHAT_TOKEN: str = config.API_GIGACHAT_TOKEN
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def gigachat_client() -> GigaChatClient:
|
|
||||||
"""Fixture to create a GigaChatClient instance with a mock API token."""
|
|
||||||
return GigaChatClient(api_token=API_GIGACHAT_TOKEN)
|
|
||||||
|
|
||||||
def test_initialization(gigachat_client) -> None:
|
|
||||||
"""Test if the GigaChatClient initializes correctly."""
|
|
||||||
assert gigachat_client.api_token == API_GIGACHAT_TOKEN
|
|
||||||
assert gigachat_client.model_name == "GigaChat"
|
|
||||||
assert isinstance(gigachat_client.store, dict)
|
|
||||||
assert gigachat_client.llm is not None
|
|
||||||
|
|
||||||
def test_create_llm() -> None:
|
|
||||||
"""Test the _create_llm method for proper LLM creation."""
|
|
||||||
client = GigaChatClient(api_token=API_GIGACHAT_TOKEN)
|
|
||||||
mock_llm: GigaChat = client._create_llm("GigaChat-Pro")
|
|
||||||
assert mock_llm.credentials == API_GIGACHAT_TOKEN
|
|
||||||
assert mock_llm.model == "GigaChat-Pro"
|
|
||||||
|
|
||||||
def test_get_session_history(gigachat_client) -> None:
|
|
||||||
"""Test the get_session_history method for creating/retrieving session history."""
|
|
||||||
session_id = "test_session"
|
|
||||||
history = gigachat_client.get_session_history(session_id)
|
|
||||||
assert isinstance(history, InMemoryChatMessageHistory)
|
|
||||||
assert session_id in gigachat_client.store
|
|
||||||
assert gigachat_client.store[session_id] is history
|
|
||||||
|
|
||||||
def test_set_model(gigachat_client) -> None:
|
|
||||||
"""Test the set_model method for updating the LLM model."""
|
|
||||||
new_model = "GigaChat-Pro"
|
|
||||||
gigachat_client.set_model(new_model)
|
|
||||||
assert gigachat_client.llm.model == new_model
|
|
||||||
|
|
||||||
def test_get_response() -> None:
|
|
||||||
"""Test the get_response method by verifying the response code."""
|
|
||||||
with patch("langchain_core.runnables.history.RunnableWithMessageHistory") as MockRunnable:
|
|
||||||
mock_runnable = MagicMock()
|
|
||||||
mock_runnable.invoke.return_value.code = 200
|
|
||||||
MockRunnable.return_value = mock_runnable
|
|
||||||
|
|
||||||
client = GigaChatClient(api_token=API_GIGACHAT_TOKEN)
|
|
||||||
response_code = mock_runnable.invoke.return_value.code
|
|
||||||
|
|
||||||
assert response_code == 200
|
|
Loading…
Reference in New Issue
Block a user