From 7c3b7e578c1538e4e8193c43c4217f6a5b598b2d Mon Sep 17 00:00:00 2001 From: Factorino73 Date: Sun, 5 Jan 2025 21:06:41 +0400 Subject: [PATCH] feat: refactor command handlers --- src/bot/handlers/__init__.py | 4 + src/bot/handlers/abstract_command_handler.py | 46 +++ src/bot/handlers/ai_command_handler.py | 87 ++++++ src/bot/handlers/video_command_handler.py | 172 +++++++++++ src/bot/handlers/voice_command_handler.py | 159 +++++++++++ src/bot/telegram_userbot.py | 266 ++---------------- ...ech_recognition.py => audio_processing.py} | 0 7 files changed, 488 insertions(+), 246 deletions(-) create mode 100644 src/bot/handlers/__init__.py create mode 100644 src/bot/handlers/abstract_command_handler.py create mode 100644 src/bot/handlers/ai_command_handler.py create mode 100644 src/bot/handlers/video_command_handler.py create mode 100644 src/bot/handlers/voice_command_handler.py rename src/utils/{speech_recognition.py => audio_processing.py} (100%) diff --git a/src/bot/handlers/__init__.py b/src/bot/handlers/__init__.py new file mode 100644 index 0000000..28b5694 --- /dev/null +++ b/src/bot/handlers/__init__.py @@ -0,0 +1,4 @@ +from src.bot.handlers.abstract_command_handler import AbstractCommandHandler +from src.bot.handlers.ai_command_handler import AICommandHandler +from src.bot.handlers.voice_command_handler import VoiceCommandHandler +from src.bot.handlers.video_command_handler import VideoCommandHandler diff --git a/src/bot/handlers/abstract_command_handler.py b/src/bot/handlers/abstract_command_handler.py new file mode 100644 index 0000000..e9f6742 --- /dev/null +++ b/src/bot/handlers/abstract_command_handler.py @@ -0,0 +1,46 @@ +from abc import ABC, abstractmethod + +from pyrogram.filters import Filter +from pyrogram.client import Client +from pyrogram.types import Message + + +class AbstractCommandHandler(ABC): + """ + Abstract base class for creating command handlers in a Pyrogram bot. + + This class provides a structure for defining custom command handlers. + Each subclass must implement the following abstract methods: + + - `get_filters`: Defines the filters used to match messages that + the command handler should process. + - `handle`: Contains the logic to execute when a message matches + the defined filters. + + Attributes: + COMMAND (`str`): The name of the command that this handler handles. + """ + + COMMAND: str = "" + + @abstractmethod + def get_filters(self) -> Filter: + """ + Returns the filters for this command handler. + + Returns: + `pyrogram.filters.Filter`: A Pyrogram filter or a custom filter + determines which messages this handler processes. + """ + pass + + @abstractmethod + async def handle(self, client: Client, message: Message) -> None: + """ + Handles the command logic. + + Args: + client (`pyrogram.client.Client`): The Pyrogram client instance. + message (`pyrogram.types.Message`): The incoming message object to process. + """ + pass diff --git a/src/bot/handlers/ai_command_handler.py b/src/bot/handlers/ai_command_handler.py new file mode 100644 index 0000000..de4ff71 --- /dev/null +++ b/src/bot/handlers/ai_command_handler.py @@ -0,0 +1,87 @@ +from logging import Logger +from typing import Optional + +from pyrogram import filters +from pyrogram.filters import Filter +from pyrogram.client import Client +from pyrogram.types import Message +from pyrogram.enums import ChatAction + +from src.bot.handlers import AbstractCommandHandler +from src.integrations.gigachat_api_client import GigaChatClient + + +class AICommandHandler(AbstractCommandHandler): + """ + Command handler for the /ai command in a Pyrogram bot. + + This handler processes text input, sends it to the GigaChat AI model, + and returns the generated response. + + Attributes: + logger (`Logger`): Logger instance for logging events. + gigachat_client (`GigaChatClient`): Client for interacting with the GigaChat API. + COMMAND (`str`): The name of the command that this handler handles. + """ + + # The name of the command that this handler handles + COMMAND: str = "ai" + + def __init__(self, logger: Logger, gigachat_client: GigaChatClient) -> None: + """ + Initializes the AICommandHandler. + + Args: + logger (`Logger`): Logger instance for logging events. + gigachat_client (`GigaChatClient`): Client for interacting with the GigaChat API. + """ + self.logger: Logger = logger + self.gigachat_client: GigaChatClient = gigachat_client + + def get_filters(self) -> Filter: + """ + Returns the filter for the /ai command. + + Returns: + `pyrogram.filters.Filter`: A Pyrogram filter matching the /ai command. + """ + return filters.command(self.COMMAND) + + async def handle(self, client: Client, message: Message) -> None: + """ + Handles the /ai command. + + Sends the user's input to the GigaChat AI model and returns the generated response. + The command can be used with an inline argument or as a reply to a text message. + + Args: + client (`pyrogram.client.Client`): The Pyrogram client instance. + message (`pyrogram.types.Message`): The incoming message object to process. + """ + self.logger.info(f"Received /{self.COMMAND} command from chat_id={message.chat.id}.") + + # Extract the command argument or use the replied message's text + command_argument: Optional[str] = " ".join(message.text.split()[1:]) + if not command_argument and message.reply_to_message and message.reply_to_message.text: + command_argument = message.reply_to_message.text + + if not command_argument: + self.logger.warning(f"No argument provided for /{self.COMMAND} command in chat_id={message.chat.id}.") + await message.reply(f"Please provide a message after /{self.COMMAND} or reply to a message.", quote=True) + return + + # Notify the user that the request is being processed + processing_message: Message = await message.reply( + f"{self.gigachat_client.model_name} is processing your request...", quote=True + ) + + try: + await client.send_chat_action(message.chat.id, ChatAction.TYPING) + response_text: str = self.gigachat_client.get_response(str(message.chat.id), command_argument) + self.logger.debug(f"Generated response for chat_id={message.chat.id}") + await processing_message.edit_text(response_text) + except Exception as error: + self.logger.error(f"Error processing /{self.COMMAND} command for chat_id={message.chat.id}: {error}", exc_info=True) + await processing_message.edit_text("An error occurred while processing your request.") + finally: + await client.send_chat_action(message.chat.id, ChatAction.CANCEL) diff --git a/src/bot/handlers/video_command_handler.py b/src/bot/handlers/video_command_handler.py new file mode 100644 index 0000000..2c9349f --- /dev/null +++ b/src/bot/handlers/video_command_handler.py @@ -0,0 +1,172 @@ +import os +from logging import Logger +from tempfile import NamedTemporaryFile + +from pyrogram import filters +from pyrogram.filters import Filter +from pyrogram.client import Client +from pyrogram.types import Message +from pyrogram.enums import ChatAction +from pyrogram.types.messages_and_media.video import Video +from pyrogram.types.messages_and_media.video_note import VideoNote + +from src.bot.handlers import AbstractCommandHandler +from src.utils import video_processing, audio_processing + + +class VideoCommandHandler(AbstractCommandHandler): + """ + Command handler for the /video command in a Pyrogram bot. + + This handler processes video or video note messages, extracts audio, + converts the audio to text in a specified language, and sends the result back to the user. + + Attributes: + COMMAND (`str`): The name of the command that this handler handles. + LANGUAGE_ALIASES (`dict`): Mapping of language codes to their aliases. + DEFAULT_LANGUAGE (`str`): Default language code for conversion if no language is specified. + MAX_DURATION (`int`): Maximum allowed duration of the video in seconds. + logger (`Logger`): Logger instance for logging events. + """ + + # The name of the command that this handler handles + COMMAND: str = "video" + + # Mapping of language codes to their aliases + LANGUAGE_ALIASES: dict[str, list[str]] = { + "en": ["en", "eng", "english"], + "ru": ["ru", "rus", "russian"], + } + + # Default language code for conversion if no language is specified + DEFAULT_LANGUAGE: str = "ru" + + # Maximum allowed duration of the video message in seconds + MAX_DURATION: int = 300 + + def __init__(self, logger: Logger) -> None: + """ + Initializes the VideoCommandHandler. + + Args: + logger (`Logger`): Logger instance for logging events. + """ + self.logger: Logger = logger + + def get_filters(self) -> Filter: + """ + Returns the filter for the /video command. + + Returns: + `pyrogram.filters.Filter`: A Pyrogram filter matching the /video command. + """ + return filters.command(self.COMMAND) + + async def handle(self, client: Client, message: Message) -> None: + """ + Handles the /video command. + + Extracts audio from a video or video note message, converts the audio to text + in the specified language and sends the result back to the user. The command + must be used in reply to a video or video note message. + + Args: + client (`pyrogram.client.Client`): The Pyrogram client instance. + message (`pyrogram.types.Message`): The incoming message object to process. + """ + self.logger.info(f"Received /{self.COMMAND} command from chat_id={message.chat.id}.") + + # Parse the language parameter or use the default (Russian) + command_arguments: list[str] = message.text.split() + try: + language_code: str = ( + self.__resolve_language(command_arguments[1]) if len(command_arguments) > 1 else self.DEFAULT_LANGUAGE + ) + except ValueError as error: + await message.reply(str(error), quote=True) + return + + # Check if the reply is to a video or video note message + if not (message.reply_to_message and (message.reply_to_message.video or message.reply_to_message.video_note)): + self.logger.warning(f"The /{self.COMMAND} command was not used in reply to a video or video note message.") + await message.reply("Please reply to a video or video note message with the /video command.", quote=True) + return + + # Identify the file type (video or video note) + media_type: str = "video" if message.reply_to_message.video else "video_note" + media: Video | VideoNote = message.reply_to_message.video if media_type == "video" else message.reply_to_message.video_note + + # Notify the user that the request is being processed + processing_message: Message = await message.reply_to_message.reply( + "Processing video message to extract text...", quote=True + ) + + with NamedTemporaryFile(delete=False) as temp_video_file: + video_file_path = await client.download_media( + media.file_id, + file_name=temp_video_file.name + ) + self.logger.info(f"{media_type} message downloaded to {video_file_path}.") + + try: + # Validate video duration + video_duration: float = video_processing.get_video_duration(video_file_path) # type: ignore + if video_duration > self.MAX_DURATION: + self.logger.warning(f"{media_type} too long: {video_duration} seconds.") + await processing_message.edit_text( + f"The video or video note is too long (over {self.MAX_DURATION // 60} minutes). Please send a shorter one." + ) + return + + await client.send_chat_action(message.chat.id, ChatAction.TYPING) + + # Extract audio and convert it to text + output_dir = os.path.dirname(video_file_path) # type: ignore + audio_file_path: str = video_processing.extract_audio_from_video(video_file_path, output_dir) # type: ignore + extracted_text: str = audio_processing.convert_voice_to_text(audio_file_path, language=language_code) # type: ignore + self.logger.info(f"{media_type} message successfully converted to text.") + + response_text: str = ( + f"
"
+                    f"{extracted_text}"
+                    "
" + ) + await processing_message.edit_text(response_text) + + except FileNotFoundError: + self.logger.error("File not found during processing.", exc_info=True) + await processing_message.edit_text("An error occurred while processing the video message. Please try again later.") + except RuntimeError: + self.logger.error("A runtime error occurred.", exc_info=True) + await processing_message.edit_text("An error occurred while processing the video message. Please try again later.") + except Exception: + self.logger.error("An unexpected error occurred.", exc_info=True) + await processing_message.edit_text("An error occurred while processing the video message. Please try again later.") + finally: + await client.send_chat_action(message.chat.id, ChatAction.CANCEL) + if os.path.exists(video_file_path): # type: ignore + os.remove(video_file_path) # type: ignore + if os.path.exists(audio_file_path): # type: ignore + os.remove(audio_file_path) # type: ignore + + def __resolve_language(self, language_input: str) -> str: + """ + Resolves the language code based on the input text. + + Args: + language_input (str): User-provided language parameter. + + Returns: + str: The resolved language code. + + Raises: + ValueError: If the input does not match any supported language. + """ + normalized_input: str = language_input.lower() + for language_code, aliases in self.LANGUAGE_ALIASES.items(): + if normalized_input in aliases: + return language_code + raise ValueError( + "Invalid language parameter. Please use one of the following:\n" + + "\n".join(f"{language_code}: {', '.join(aliases)}" for language_code, aliases in self.LANGUAGE_ALIASES.items()) + ) diff --git a/src/bot/handlers/voice_command_handler.py b/src/bot/handlers/voice_command_handler.py new file mode 100644 index 0000000..80cb594 --- /dev/null +++ b/src/bot/handlers/voice_command_handler.py @@ -0,0 +1,159 @@ +import os +from logging import Logger +from tempfile import NamedTemporaryFile + +from pyrogram import filters +from pyrogram.filters import Filter +from pyrogram.client import Client +from pyrogram.types import Message +from pyrogram.enums import ChatAction + +from src.bot.handlers import AbstractCommandHandler +from src.utils import audio_processing + + +class VoiceCommandHandler(AbstractCommandHandler): + """ + Command handler for the /voice command in a Pyrogram bot. + + This handler processes voice messages, converts them to text + in a specified language, and sends the result back to the user. + + Attributes: + COMMAND (`str`): The name of the command that this handler handles. + LANGUAGE_ALIASES (`dict`): Mapping of language codes to their aliases. + DEFAULT_LANGUAGE (`str`): Default language code for conversion if no language is specified. + MAX_DURATION (`int`): Maximum allowed duration of the voice message in seconds. + logger (`Logger`): Logger instance for logging events. + """ + + # The name of the command that this handler handles + COMMAND: str = "voice" + + # Mapping of language codes to their aliases + LANGUAGE_ALIASES: dict[str, list[str]] = { + "en": ["en", "eng", "english"], + "ru": ["ru", "rus", "russian"], + } + + # Default language code for conversion if no language is specified + DEFAULT_LANGUAGE: str = "ru" + + # Maximum allowed duration of the voice message in seconds + MAX_DURATION: int = 300 + + def __init__(self, logger: Logger) -> None: + """ + Initializes the VoiceCommandHandler. + + Args: + logger (`Logger`): Logger instance for logging events. + """ + self.logger: Logger = logger + + def get_filters(self) -> Filter: + """ + Returns the filter for the /voice command. + + Returns: + `pyrogram.filters.Filter`: A Pyrogram filter matching the /voice command. + """ + return filters.command(self.COMMAND) + + async def handle(self, client: Client, message: Message) -> None: + """ + Handles the /voice command. + + Converts a voice message to text in the specified language + and sends the result back to the user. The command must be used in reply + to a voice message. + + Args: + client (`pyrogram.client.Client`): The Pyrogram client instance. + message (`pyrogram.types.Message`): The incoming message object to process. + """ + self.logger.info(f"Received /{self.COMMAND} command from chat_id={message.chat.id}.") + + # Parse the language parameter or use the default (Russian) + command_arguments: list[str] = message.text.split() + try: + language_code: str = ( + self.__resolve_language(command_arguments[1]) if len(command_arguments) > 1 else self.DEFAULT_LANGUAGE + ) + except ValueError as error: + await message.reply(str(error), quote=True) + return + + if not (message.reply_to_message and message.reply_to_message.voice): + self.logger.warning(f"The /{self.COMMAND} command was not used in reply to a voice message.") + await message.reply("Please reply to a voice message with the /voice command.", quote=True) + return + + # Notify the user that the request is being processed + processing_message: Message = await message.reply_to_message.reply( + "Converting voice message to text...", quote=True + ) + + with NamedTemporaryFile(delete=False) as temp_audio_file: + audio_file_path = await client.download_media( + message.reply_to_message.voice.file_id, + file_name=temp_audio_file.name + ) + self.logger.info(f"Voice message downloaded to {audio_file_path}.") + + try: + # Validate voice message duration + voice_duration: float = audio_processing.get_audio_duration(audio_file_path) # type: ignore + if voice_duration > self.MAX_DURATION: + self.logger.warning(f"Voice message too long: {voice_duration} seconds.") + await processing_message.edit_text( + f"The voice message is too long (over {self.MAX_DURATION // 60} minutes). Please send a shorter one." + ) + return + + await client.send_chat_action(message.chat.id, ChatAction.TYPING) + extracted_text: str = audio_processing.convert_voice_to_text(audio_file_path, language=language_code) # type: ignore + self.logger.info("Voice message successfully converted to text.") + + response_text: str = ( + f"
"
+                    f"{extracted_text}"
+                    "
" + ) + await processing_message.edit_text(response_text) + + except FileNotFoundError: + self.logger.error("File not found during processing.", exc_info=True) + await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.") + except RuntimeError: + self.logger.error("A runtime error occurred.", exc_info=True) + await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.") + except Exception: + self.logger.error("An unexpected error occurred.", exc_info=True) + await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.") + finally: + await client.send_chat_action(message.chat.id, ChatAction.CANCEL) + if os.path.exists(audio_file_path): # type: ignore + os.remove(audio_file_path) # type: ignore + + def __resolve_language(self, language_input: str) -> str: + """ + Resolves the language code based on the input text. + + Args: + language_input (str): User-provided language parameter. + + Returns: + str: The resolved language code. + + Raises: + ValueError: If the input does not match any supported language. + """ + normalized_input: str = language_input.lower() + for language_code, aliases in self.LANGUAGE_ALIASES.items(): + if normalized_input in aliases: + return language_code + raise ValueError( + "Invalid language parameter. Please use one of the following:\n" + + "\n".join(f"{language_code}: {', '.join(aliases)}" for language_code, aliases in self.LANGUAGE_ALIASES.items()) + ) diff --git a/src/bot/telegram_userbot.py b/src/bot/telegram_userbot.py index de0c376..3c3de7e 100644 --- a/src/bot/telegram_userbot.py +++ b/src/bot/telegram_userbot.py @@ -1,18 +1,14 @@ -import os import logging from logging import Logger -from tempfile import NamedTemporaryFile -from typing import Optional -from pyrogram import filters from pyrogram.client import Client -from pyrogram.types import Message -from pyrogram.enums import ChatAction -from pyrogram.types.messages_and_media.video import Video -from pyrogram.types.messages_and_media.video_note import VideoNote from src.integrations.gigachat_api_client import GigaChatClient -from src.utils import speech_recognition, video_processing + +from src.bot.handlers import AbstractCommandHandler +from src.bot.handlers import AICommandHandler +from src.bot.handlers import VoiceCommandHandler +from src.bot.handlers import VideoCommandHandler class TelegramUserBot: @@ -20,8 +16,8 @@ class TelegramUserBot: A Telegram user bot. Attributes: - app (Client): The Pyrogram client instance for the bot. - gigachat_client (GigaChatClient): The client instance for GigaChat integration. + app (`Client`): The Pyrogram client instance for the bot. + gigachat_client (`GigaChatClient`): The client instance for GigaChat integration. """ def __init__(self, session_name: str, api_id: str, api_hash: str, gigachat_client: GigaChatClient) -> None: @@ -29,16 +25,22 @@ class TelegramUserBot: Initializes the Telegram user bot. Args: - session_name (str): The session name for the bot. - api_id (str): The API ID for the Telegram application. - api_hash (str): The API hash for the Telegram application. - gigachat_client (GigaChatClient): An instance of GigaChatClient for handling AI responses. + session_name (`str`): The session name for the bot. + api_id (`str`): The API ID for the Telegram application. + api_hash (`str`): The API hash for the Telegram application. + gigachat_client (`GigaChatClient`): An instance of GigaChatClient for handling AI responses. """ # Configure logging self.logger: Logger = logging.getLogger(__name__) self.app: Client = Client(session_name, api_id=api_id, api_hash=api_hash) - self.gigachat_client: GigaChatClient = gigachat_client + + # Configure handlers + self.handlers: dict[str, AbstractCommandHandler] = { + "ai": AICommandHandler(self.logger, gigachat_client), + "voice": VoiceCommandHandler(self.logger), + "video": VideoCommandHandler(self.logger), + } self.register_handlers() def register_handlers(self) -> None: @@ -46,236 +48,8 @@ class TelegramUserBot: Registers the message handlers for the bot. """ self.logger.debug("Registering handlers.") - self.app.on_message(filters.command("ai"))(self.handle_ai_command) - self.app.on_message(filters.command("voice"))(self.handle_voice_command) - self.app.on_message(filters.command("video"))(self.handle_video_command) - - async def handle_ai_command(self, client: Client, message: Message) -> None: - """ - Handles messages that invoke the /ai command. - - Args: - client (Client): The Pyrogram client instance. - message (Message): The incoming Telegram message. - """ - self.logger.info(f"Received /ai command from chat_id={message.chat.id}") - - # Extract the command argument - command_arg: Optional[str] = " ".join(message.text.split()[1:]) - - if not command_arg and message.reply_to_message and message.reply_to_message.text: - # Use the text of the replied message if no argument is provided - command_arg = message.reply_to_message.text - - if not command_arg: - self.logger.warning(f"No argument or replied message provided for /ai command by chat_id={message.chat.id}") - await message.reply("Please provide a message after /ai or reply to a message.", quote=True) - return - - # Send an initial message indicating processing - self.logger.debug(f"Processing request for chat_id={message.chat.id}") - processing_message: Message = await message.reply(f"{self.gigachat_client.model_name} is processing your request...", quote=True) - - try: - # Start typing animation - await client.send_chat_action(message.chat.id, ChatAction.TYPING) - - # Get a response from GigaChat - response: str = self.gigachat_client.get_response(str(message.chat.id), command_arg) - self.logger.debug(f"Received response for chat_id={message.chat.id}") - - # Edit the processing message with the generated response - await processing_message.edit_text(response) - except Exception as e: - self.logger.error(f"Error processing /ai command for chat_id={message.chat.id}: {e}", exc_info=True) - await processing_message.edit_text("An error occurred while processing your request.") - finally: - # Stop indicating typing action - await client.send_chat_action(message.chat.id, ChatAction.CANCEL) - - def get_language(self, input_text: str) -> str: - """ - Determines the language for voice-to-text conversion based on the input parameter. - - Args: - input_text (str): The input parameter indicating the language. - - Returns: - str: The language code ('en' or 'ru'). - - Raises: - ValueError: If an invalid language parameter is provided. - """ - language_params: dict[str, list[str]] = { - 'en': ['en', 'eng', 'english'], - 'ru': ['ru', 'rus', 'russian'] - } - - input_lower: str = input_text.lower() - for lang_code, aliases in language_params.items(): - if input_lower in aliases: - return lang_code - raise ValueError( - "Invalid language parameter. Please use one of the following:\n" + - "\n".join(f"{lang_code}: {', '.join(aliases)}" for lang_code, aliases in language_params.items()) - ) - - - async def handle_voice_command(self, client: Client, message: Message) -> None: - """ - Handle the /voice command to convert a voice message to text with optional language selection. - - Args: - client (Client): The Pyrogram Client instance. - message (Message): The incoming message containing the /voice command. - """ - self.logger.info(f"Received /voice command from chat_id={message.chat.id}.") - - # Parse the language parameter (default to Russian) - command_parts: list[str] = message.text.split() - try: - language: str = self.get_language(command_parts[1]) if len(command_parts) > 1 else 'ru' - except ValueError as e: - await message.reply(str(e), quote=True) - return - - # Check if the reply is to a voice message - if not (message.reply_to_message and message.reply_to_message.voice): - self.logger.warning("The /voice command was not used in reply to a voice message.") - await message.reply("Please reply to a voice message with the /voice command.", quote=True) - return - - # Send an initial message indicating processing - processing_message: Message = await message.reply_to_message.reply("Converting voice message to text...", quote=True) - - with NamedTemporaryFile(delete=False) as temp_file: - file_path = await client.download_media(message.reply_to_message.voice.file_id, file_name=temp_file.name) - self.logger.info(f"Voice message downloaded to {file_path}.") - - try: - # Check voice message duration - duration: float = speech_recognition.get_audio_duration(file_path) # type: ignore - if duration > 300: - self.logger.warning(f"Voice message too long: {duration} seconds.") - await processing_message.edit_text("The voice message is too long (over 5 minutes). Please send a shorter one.") - return - - # Start typing animation - await client.send_chat_action(message.chat.id, ChatAction.TYPING) - - # Attempt to convert voice to text with the selected language - text: str = speech_recognition.convert_voice_to_text(file_path, language=language) # type: ignore - self.logger.info("Voice message successfully converted to text.") - - # Format the text for sending - formatted_text: str = ( - f"
"
-                    f"{text}"
-                    "
" - ) - - # Edit the initial processing message with the converted text - await processing_message.edit_text(formatted_text) - except FileNotFoundError: - self.logger.error("File not found during processing.", exc_info=True) - await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.") - except RuntimeError: - self.logger.error("A runtime error occurred.", exc_info=True) - await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.") - except Exception: - self.logger.error("An unexpected error occurred.", exc_info=True) - await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.") - finally: - # Stop indicating typing action - await client.send_chat_action(message.chat.id, ChatAction.CANCEL) - - # Clean up temporary files - if os.path.exists(file_path): # type: ignore - os.remove(file_path) # type: ignore - - async def handle_video_command(self, client: Client, message: Message) -> None: - """ - Handle the /video command to convert a video or video note message to text with optional language selection. - - Args: - client (Client): The Pyrogram Client instance. - message (Message): The incoming message containing the /video command. - """ - self.logger.info(f"Received /video command from chat_id={message.chat.id}.") - - # Parse the language parameter (default to Russian) - command_parts: list[str] = message.text.split() - try: - language: str = self.get_language(command_parts[1]) if len(command_parts) > 1 else 'ru' - except ValueError as e: - await message.reply(str(e), quote=True) - return - - # Check if the reply is to a video or video note message - if not (message.reply_to_message and (message.reply_to_message.video or message.reply_to_message.video_note)): - self.logger.warning("The /video command was not used in reply to a video or video note message.") - await message.reply("Please reply to a video or video note message with the /video command.", quote=True) - return - - # Identify the file type (video or video note) - media_type: str = "video" if message.reply_to_message.video else "video_note" - media: Video | VideoNote = message.reply_to_message.video if media_type == "video" else message.reply_to_message.video_note - - # Send an initial message indicating processing - processing_message: Message = await message.reply_to_message.reply("Processing video message to extract text...", quote=True) - - with NamedTemporaryFile(delete=False) as temp_video_file: - video_path = await client.download_media( - media.file_id, - file_name=temp_video_file.name - ) - self.logger.info(f"{media_type} message downloaded to {video_path}.") - - try: - # Check video duration - duration: float = video_processing.get_video_duration(video_path) # type: ignore - if duration > 300: - self.logger.warning(f"{media_type} too long: {duration} seconds.") - await processing_message.edit_text("The video or video note is too long (over 5 minutes). Please send a shorter one.") - return - - # Extract audio from video - output_dir = os.path.dirname(video_path) # type: ignore - audio_path: str = video_processing.extract_audio_from_video(video_path, output_dir) # type: ignore - - # Convert extracted audio to text - await client.send_chat_action(message.chat.id, ChatAction.TYPING) - text: str = speech_recognition.convert_voice_to_text(audio_path, language=language) # type: ignore - self.logger.info(f"{media_type} message successfully converted to text.") - - # Format the text for sending - formatted_text: str = ( - f"
"
-                    f"{text}"
-                    "
" - ) - - # Edit the initial processing message with the converted text - await processing_message.edit_text(formatted_text) - - except FileNotFoundError: - self.logger.error("File not found during processing.", exc_info=True) - await processing_message.edit_text("An error occurred while processing the video message. Please try again later.") - except RuntimeError: - self.logger.error("A runtime error occurred.", exc_info=True) - await processing_message.edit_text("An error occurred while processing the video message. Please try again later.") - except Exception: - self.logger.error("An unexpected error occurred.", exc_info=True) - await processing_message.edit_text("An error occurred while processing the video message. Please try again later.") - finally: - # Stop indicating typing action - await client.send_chat_action(message.chat.id, ChatAction.CANCEL) - - # Clean up temporary files - if os.path.exists(video_path): # type: ignore - os.remove(video_path) # type: ignore - if os.path.exists(audio_path): - os.remove(audio_path) + for command, handler in self.handlers.items(): + self.app.on_message(filters=handler.get_filters())(handler.handle) def run(self) -> None: """ diff --git a/src/utils/speech_recognition.py b/src/utils/audio_processing.py similarity index 100% rename from src/utils/speech_recognition.py rename to src/utils/audio_processing.py