feat: refactor command handlers

This commit is contained in:
parent 7c513c6395
commit 7c3b7e578c
7 changed files with 488 additions and 246 deletions

View File

@ -0,0 +1,4 @@
from src.bot.handlers.abstract_command_handler import AbstractCommandHandler
from src.bot.handlers.ai_command_handler import AICommandHandler
from src.bot.handlers.voice_command_handler import VoiceCommandHandler
from src.bot.handlers.video_command_handler import VideoCommandHandler

View File

@ -0,0 +1,46 @@
from abc import ABC, abstractmethod
from pyrogram.filters import Filter
from pyrogram.client import Client
from pyrogram.types import Message
class AbstractCommandHandler(ABC):
"""
Abstract base class for creating command handlers in a Pyrogram bot.
This class provides a structure for defining custom command handlers.
Each subclass must implement the following abstract methods:
- `get_filters`: Defines the filters used to match messages that
the command handler should process.
- `handle`: Contains the logic to execute when a message matches
the defined filters.
Attributes:
COMMAND (`str`): The name of the command that this handler handles.
"""
COMMAND: str = ""
@abstractmethod
def get_filters(self) -> Filter:
"""
Returns the filters for this command handler.
Returns:
`pyrogram.filters.Filter`: A Pyrogram filter or a custom filter
determines which messages this handler processes.
"""
pass
@abstractmethod
async def handle(self, client: Client, message: Message) -> None:
"""
Handles the command logic.
Args:
client (`pyrogram.client.Client`): The Pyrogram client instance.
message (`pyrogram.types.Message`): The incoming message object to process.
"""
pass

View File

@ -0,0 +1,87 @@
from logging import Logger
from typing import Optional
from pyrogram import filters
from pyrogram.filters import Filter
from pyrogram.client import Client
from pyrogram.types import Message
from pyrogram.enums import ChatAction
from src.bot.handlers import AbstractCommandHandler
from src.integrations.gigachat_api_client import GigaChatClient
class AICommandHandler(AbstractCommandHandler):
"""
Command handler for the /ai command in a Pyrogram bot.
This handler processes text input, sends it to the GigaChat AI model,
and returns the generated response.
Attributes:
logger (`Logger`): Logger instance for logging events.
gigachat_client (`GigaChatClient`): Client for interacting with the GigaChat API.
COMMAND (`str`): The name of the command that this handler handles.
"""
# The name of the command that this handler handles
COMMAND: str = "ai"
def __init__(self, logger: Logger, gigachat_client: GigaChatClient) -> None:
"""
Initializes the AICommandHandler.
Args:
logger (`Logger`): Logger instance for logging events.
gigachat_client (`GigaChatClient`): Client for interacting with the GigaChat API.
"""
self.logger: Logger = logger
self.gigachat_client: GigaChatClient = gigachat_client
def get_filters(self) -> Filter:
"""
Returns the filter for the /ai command.
Returns:
`pyrogram.filters.Filter`: A Pyrogram filter matching the /ai command.
"""
return filters.command(self.COMMAND)
async def handle(self, client: Client, message: Message) -> None:
"""
Handles the /ai command.
Sends the user's input to the GigaChat AI model and returns the generated response.
The command can be used with an inline argument or as a reply to a text message.
Args:
client (`pyrogram.client.Client`): The Pyrogram client instance.
message (`pyrogram.types.Message`): The incoming message object to process.
"""
self.logger.info(f"Received /{self.COMMAND} command from chat_id={message.chat.id}.")
# Extract the command argument or use the replied message's text
command_argument: Optional[str] = " ".join(message.text.split()[1:])
if not command_argument and message.reply_to_message and message.reply_to_message.text:
command_argument = message.reply_to_message.text
if not command_argument:
self.logger.warning(f"No argument provided for /{self.COMMAND} command in chat_id={message.chat.id}.")
await message.reply(f"Please provide a message after /{self.COMMAND} or reply to a message.", quote=True)
return
# Notify the user that the request is being processed
processing_message: Message = await message.reply(
f"{self.gigachat_client.model_name} is processing your request...", quote=True
)
try:
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
response_text: str = self.gigachat_client.get_response(str(message.chat.id), command_argument)
self.logger.debug(f"Generated response for chat_id={message.chat.id}")
await processing_message.edit_text(response_text)
except Exception as error:
self.logger.error(f"Error processing /{self.COMMAND} command for chat_id={message.chat.id}: {error}", exc_info=True)
await processing_message.edit_text("An error occurred while processing your request.")
finally:
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)

View File

@ -0,0 +1,172 @@
import os
from logging import Logger
from tempfile import NamedTemporaryFile
from pyrogram import filters
from pyrogram.filters import Filter
from pyrogram.client import Client
from pyrogram.types import Message
from pyrogram.enums import ChatAction
from pyrogram.types.messages_and_media.video import Video
from pyrogram.types.messages_and_media.video_note import VideoNote
from src.bot.handlers import AbstractCommandHandler
from src.utils import video_processing, audio_processing
class VideoCommandHandler(AbstractCommandHandler):
"""
Command handler for the /video command in a Pyrogram bot.
This handler processes video or video note messages, extracts audio,
converts the audio to text in a specified language, and sends the result back to the user.
Attributes:
COMMAND (`str`): The name of the command that this handler handles.
LANGUAGE_ALIASES (`dict`): Mapping of language codes to their aliases.
DEFAULT_LANGUAGE (`str`): Default language code for conversion if no language is specified.
MAX_DURATION (`int`): Maximum allowed duration of the video in seconds.
logger (`Logger`): Logger instance for logging events.
"""
# The name of the command that this handler handles
COMMAND: str = "video"
# Mapping of language codes to their aliases
LANGUAGE_ALIASES: dict[str, list[str]] = {
"en": ["en", "eng", "english"],
"ru": ["ru", "rus", "russian"],
}
# Default language code for conversion if no language is specified
DEFAULT_LANGUAGE: str = "ru"
# Maximum allowed duration of the video message in seconds
MAX_DURATION: int = 300
def __init__(self, logger: Logger) -> None:
"""
Initializes the VideoCommandHandler.
Args:
logger (`Logger`): Logger instance for logging events.
"""
self.logger: Logger = logger
def get_filters(self) -> Filter:
"""
Returns the filter for the /video command.
Returns:
`pyrogram.filters.Filter`: A Pyrogram filter matching the /video command.
"""
return filters.command(self.COMMAND)
async def handle(self, client: Client, message: Message) -> None:
"""
Handles the /video command.
Extracts audio from a video or video note message, converts the audio to text
in the specified language and sends the result back to the user. The command
must be used in reply to a video or video note message.
Args:
client (`pyrogram.client.Client`): The Pyrogram client instance.
message (`pyrogram.types.Message`): The incoming message object to process.
"""
self.logger.info(f"Received /{self.COMMAND} command from chat_id={message.chat.id}.")
# Parse the language parameter or use the default (Russian)
command_arguments: list[str] = message.text.split()
try:
language_code: str = (
self.__resolve_language(command_arguments[1]) if len(command_arguments) > 1 else self.DEFAULT_LANGUAGE
)
except ValueError as error:
await message.reply(str(error), quote=True)
return
# Check if the reply is to a video or video note message
if not (message.reply_to_message and (message.reply_to_message.video or message.reply_to_message.video_note)):
self.logger.warning(f"The /{self.COMMAND} command was not used in reply to a video or video note message.")
await message.reply("Please reply to a video or video note message with the /video command.", quote=True)
return
# Identify the file type (video or video note)
media_type: str = "video" if message.reply_to_message.video else "video_note"
media: Video | VideoNote = message.reply_to_message.video if media_type == "video" else message.reply_to_message.video_note
# Notify the user that the request is being processed
processing_message: Message = await message.reply_to_message.reply(
"Processing video message to extract text...", quote=True
)
with NamedTemporaryFile(delete=False) as temp_video_file:
video_file_path = await client.download_media(
media.file_id,
file_name=temp_video_file.name
)
self.logger.info(f"{media_type} message downloaded to {video_file_path}.")
try:
# Validate video duration
video_duration: float = video_processing.get_video_duration(video_file_path) # type: ignore
if video_duration > self.MAX_DURATION:
self.logger.warning(f"{media_type} too long: {video_duration} seconds.")
await processing_message.edit_text(
f"The video or video note is too long (over {self.MAX_DURATION // 60} minutes). Please send a shorter one."
)
return
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
# Extract audio and convert it to text
output_dir = os.path.dirname(video_file_path) # type: ignore
audio_file_path: str = video_processing.extract_audio_from_video(video_file_path, output_dir) # type: ignore
extracted_text: str = audio_processing.convert_voice_to_text(audio_file_path, language=language_code) # type: ignore
self.logger.info(f"{media_type} message successfully converted to text.")
response_text: str = (
f"<pre language=\"Conversion Result ({language_code})\">"
f"{extracted_text}"
"</pre>"
)
await processing_message.edit_text(response_text)
except FileNotFoundError:
self.logger.error("File not found during processing.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
except RuntimeError:
self.logger.error("A runtime error occurred.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
except Exception:
self.logger.error("An unexpected error occurred.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
finally:
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
if os.path.exists(video_file_path): # type: ignore
os.remove(video_file_path) # type: ignore
if os.path.exists(audio_file_path): # type: ignore
os.remove(audio_file_path) # type: ignore
def __resolve_language(self, language_input: str) -> str:
"""
Resolves the language code based on the input text.
Args:
language_input (str): User-provided language parameter.
Returns:
str: The resolved language code.
Raises:
ValueError: If the input does not match any supported language.
"""
normalized_input: str = language_input.lower()
for language_code, aliases in self.LANGUAGE_ALIASES.items():
if normalized_input in aliases:
return language_code
raise ValueError(
"Invalid language parameter. Please use one of the following:\n" +
"\n".join(f"{language_code}: {', '.join(aliases)}" for language_code, aliases in self.LANGUAGE_ALIASES.items())
)

View File

@ -0,0 +1,159 @@
import os
from logging import Logger
from tempfile import NamedTemporaryFile
from pyrogram import filters
from pyrogram.filters import Filter
from pyrogram.client import Client
from pyrogram.types import Message
from pyrogram.enums import ChatAction
from src.bot.handlers import AbstractCommandHandler
from src.utils import audio_processing
class VoiceCommandHandler(AbstractCommandHandler):
"""
Command handler for the /voice command in a Pyrogram bot.
This handler processes voice messages, converts them to text
in a specified language, and sends the result back to the user.
Attributes:
COMMAND (`str`): The name of the command that this handler handles.
LANGUAGE_ALIASES (`dict`): Mapping of language codes to their aliases.
DEFAULT_LANGUAGE (`str`): Default language code for conversion if no language is specified.
MAX_DURATION (`int`): Maximum allowed duration of the voice message in seconds.
logger (`Logger`): Logger instance for logging events.
"""
# The name of the command that this handler handles
COMMAND: str = "voice"
# Mapping of language codes to their aliases
LANGUAGE_ALIASES: dict[str, list[str]] = {
"en": ["en", "eng", "english"],
"ru": ["ru", "rus", "russian"],
}
# Default language code for conversion if no language is specified
DEFAULT_LANGUAGE: str = "ru"
# Maximum allowed duration of the voice message in seconds
MAX_DURATION: int = 300
def __init__(self, logger: Logger) -> None:
"""
Initializes the VoiceCommandHandler.
Args:
logger (`Logger`): Logger instance for logging events.
"""
self.logger: Logger = logger
def get_filters(self) -> Filter:
"""
Returns the filter for the /voice command.
Returns:
`pyrogram.filters.Filter`: A Pyrogram filter matching the /voice command.
"""
return filters.command(self.COMMAND)
async def handle(self, client: Client, message: Message) -> None:
"""
Handles the /voice command.
Converts a voice message to text in the specified language
and sends the result back to the user. The command must be used in reply
to a voice message.
Args:
client (`pyrogram.client.Client`): The Pyrogram client instance.
message (`pyrogram.types.Message`): The incoming message object to process.
"""
self.logger.info(f"Received /{self.COMMAND} command from chat_id={message.chat.id}.")
# Parse the language parameter or use the default (Russian)
command_arguments: list[str] = message.text.split()
try:
language_code: str = (
self.__resolve_language(command_arguments[1]) if len(command_arguments) > 1 else self.DEFAULT_LANGUAGE
)
except ValueError as error:
await message.reply(str(error), quote=True)
return
if not (message.reply_to_message and message.reply_to_message.voice):
self.logger.warning(f"The /{self.COMMAND} command was not used in reply to a voice message.")
await message.reply("Please reply to a voice message with the /voice command.", quote=True)
return
# Notify the user that the request is being processed
processing_message: Message = await message.reply_to_message.reply(
"Converting voice message to text...", quote=True
)
with NamedTemporaryFile(delete=False) as temp_audio_file:
audio_file_path = await client.download_media(
message.reply_to_message.voice.file_id,
file_name=temp_audio_file.name
)
self.logger.info(f"Voice message downloaded to {audio_file_path}.")
try:
# Validate voice message duration
voice_duration: float = audio_processing.get_audio_duration(audio_file_path) # type: ignore
if voice_duration > self.MAX_DURATION:
self.logger.warning(f"Voice message too long: {voice_duration} seconds.")
await processing_message.edit_text(
f"The voice message is too long (over {self.MAX_DURATION // 60} minutes). Please send a shorter one."
)
return
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
extracted_text: str = audio_processing.convert_voice_to_text(audio_file_path, language=language_code) # type: ignore
self.logger.info("Voice message successfully converted to text.")
response_text: str = (
f"<pre language=\"Conversion Result ({language_code})\">"
f"{extracted_text}"
"</pre>"
)
await processing_message.edit_text(response_text)
except FileNotFoundError:
self.logger.error("File not found during processing.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
except RuntimeError:
self.logger.error("A runtime error occurred.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
except Exception:
self.logger.error("An unexpected error occurred.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
finally:
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
if os.path.exists(audio_file_path): # type: ignore
os.remove(audio_file_path) # type: ignore
def __resolve_language(self, language_input: str) -> str:
"""
Resolves the language code based on the input text.
Args:
language_input (str): User-provided language parameter.
Returns:
str: The resolved language code.
Raises:
ValueError: If the input does not match any supported language.
"""
normalized_input: str = language_input.lower()
for language_code, aliases in self.LANGUAGE_ALIASES.items():
if normalized_input in aliases:
return language_code
raise ValueError(
"Invalid language parameter. Please use one of the following:\n" +
"\n".join(f"{language_code}: {', '.join(aliases)}" for language_code, aliases in self.LANGUAGE_ALIASES.items())
)

View File

@ -1,18 +1,14 @@
import os
import logging
from logging import Logger
from tempfile import NamedTemporaryFile
from typing import Optional
from pyrogram import filters
from pyrogram.client import Client
from pyrogram.types import Message
from pyrogram.enums import ChatAction
from pyrogram.types.messages_and_media.video import Video
from pyrogram.types.messages_and_media.video_note import VideoNote
from src.integrations.gigachat_api_client import GigaChatClient
from src.utils import speech_recognition, video_processing
from src.bot.handlers import AbstractCommandHandler
from src.bot.handlers import AICommandHandler
from src.bot.handlers import VoiceCommandHandler
from src.bot.handlers import VideoCommandHandler
class TelegramUserBot:
@ -20,8 +16,8 @@ class TelegramUserBot:
A Telegram user bot.
Attributes:
app (Client): The Pyrogram client instance for the bot.
gigachat_client (GigaChatClient): The client instance for GigaChat integration.
app (`Client`): The Pyrogram client instance for the bot.
gigachat_client (`GigaChatClient`): The client instance for GigaChat integration.
"""
def __init__(self, session_name: str, api_id: str, api_hash: str, gigachat_client: GigaChatClient) -> None:
@ -29,16 +25,22 @@ class TelegramUserBot:
Initializes the Telegram user bot.
Args:
session_name (str): The session name for the bot.
api_id (str): The API ID for the Telegram application.
api_hash (str): The API hash for the Telegram application.
gigachat_client (GigaChatClient): An instance of GigaChatClient for handling AI responses.
session_name (`str`): The session name for the bot.
api_id (`str`): The API ID for the Telegram application.
api_hash (`str`): The API hash for the Telegram application.
gigachat_client (`GigaChatClient`): An instance of GigaChatClient for handling AI responses.
"""
# Configure logging
self.logger: Logger = logging.getLogger(__name__)
self.app: Client = Client(session_name, api_id=api_id, api_hash=api_hash)
self.gigachat_client: GigaChatClient = gigachat_client
# Configure handlers
self.handlers: dict[str, AbstractCommandHandler] = {
"ai": AICommandHandler(self.logger, gigachat_client),
"voice": VoiceCommandHandler(self.logger),
"video": VideoCommandHandler(self.logger),
}
self.register_handlers()
def register_handlers(self) -> None:
@ -46,236 +48,8 @@ class TelegramUserBot:
Registers the message handlers for the bot.
"""
self.logger.debug("Registering handlers.")
self.app.on_message(filters.command("ai"))(self.handle_ai_command)
self.app.on_message(filters.command("voice"))(self.handle_voice_command)
self.app.on_message(filters.command("video"))(self.handle_video_command)
async def handle_ai_command(self, client: Client, message: Message) -> None:
"""
Handles messages that invoke the /ai command.
Args:
client (Client): The Pyrogram client instance.
message (Message): The incoming Telegram message.
"""
self.logger.info(f"Received /ai command from chat_id={message.chat.id}")
# Extract the command argument
command_arg: Optional[str] = " ".join(message.text.split()[1:])
if not command_arg and message.reply_to_message and message.reply_to_message.text:
# Use the text of the replied message if no argument is provided
command_arg = message.reply_to_message.text
if not command_arg:
self.logger.warning(f"No argument or replied message provided for /ai command by chat_id={message.chat.id}")
await message.reply("Please provide a message after /ai or reply to a message.", quote=True)
return
# Send an initial message indicating processing
self.logger.debug(f"Processing request for chat_id={message.chat.id}")
processing_message: Message = await message.reply(f"{self.gigachat_client.model_name} is processing your request...", quote=True)
try:
# Start typing animation
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
# Get a response from GigaChat
response: str = self.gigachat_client.get_response(str(message.chat.id), command_arg)
self.logger.debug(f"Received response for chat_id={message.chat.id}")
# Edit the processing message with the generated response
await processing_message.edit_text(response)
except Exception as e:
self.logger.error(f"Error processing /ai command for chat_id={message.chat.id}: {e}", exc_info=True)
await processing_message.edit_text("An error occurred while processing your request.")
finally:
# Stop indicating typing action
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
def get_language(self, input_text: str) -> str:
"""
Determines the language for voice-to-text conversion based on the input parameter.
Args:
input_text (str): The input parameter indicating the language.
Returns:
str: The language code ('en' or 'ru').
Raises:
ValueError: If an invalid language parameter is provided.
"""
language_params: dict[str, list[str]] = {
'en': ['en', 'eng', 'english'],
'ru': ['ru', 'rus', 'russian']
}
input_lower: str = input_text.lower()
for lang_code, aliases in language_params.items():
if input_lower in aliases:
return lang_code
raise ValueError(
"Invalid language parameter. Please use one of the following:\n" +
"\n".join(f"{lang_code}: {', '.join(aliases)}" for lang_code, aliases in language_params.items())
)
async def handle_voice_command(self, client: Client, message: Message) -> None:
"""
Handle the /voice command to convert a voice message to text with optional language selection.
Args:
client (Client): The Pyrogram Client instance.
message (Message): The incoming message containing the /voice command.
"""
self.logger.info(f"Received /voice command from chat_id={message.chat.id}.")
# Parse the language parameter (default to Russian)
command_parts: list[str] = message.text.split()
try:
language: str = self.get_language(command_parts[1]) if len(command_parts) > 1 else 'ru'
except ValueError as e:
await message.reply(str(e), quote=True)
return
# Check if the reply is to a voice message
if not (message.reply_to_message and message.reply_to_message.voice):
self.logger.warning("The /voice command was not used in reply to a voice message.")
await message.reply("Please reply to a voice message with the /voice command.", quote=True)
return
# Send an initial message indicating processing
processing_message: Message = await message.reply_to_message.reply("Converting voice message to text...", quote=True)
with NamedTemporaryFile(delete=False) as temp_file:
file_path = await client.download_media(message.reply_to_message.voice.file_id, file_name=temp_file.name)
self.logger.info(f"Voice message downloaded to {file_path}.")
try:
# Check voice message duration
duration: float = speech_recognition.get_audio_duration(file_path) # type: ignore
if duration > 300:
self.logger.warning(f"Voice message too long: {duration} seconds.")
await processing_message.edit_text("The voice message is too long (over 5 minutes). Please send a shorter one.")
return
# Start typing animation
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
# Attempt to convert voice to text with the selected language
text: str = speech_recognition.convert_voice_to_text(file_path, language=language) # type: ignore
self.logger.info("Voice message successfully converted to text.")
# Format the text for sending
formatted_text: str = (
f"<pre language=\"Conversion Result ({language})\">"
f"{text}"
"</pre>"
)
# Edit the initial processing message with the converted text
await processing_message.edit_text(formatted_text)
except FileNotFoundError:
self.logger.error("File not found during processing.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
except RuntimeError:
self.logger.error("A runtime error occurred.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
except Exception:
self.logger.error("An unexpected error occurred.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
finally:
# Stop indicating typing action
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
# Clean up temporary files
if os.path.exists(file_path): # type: ignore
os.remove(file_path) # type: ignore
async def handle_video_command(self, client: Client, message: Message) -> None:
"""
Handle the /video command to convert a video or video note message to text with optional language selection.
Args:
client (Client): The Pyrogram Client instance.
message (Message): The incoming message containing the /video command.
"""
self.logger.info(f"Received /video command from chat_id={message.chat.id}.")
# Parse the language parameter (default to Russian)
command_parts: list[str] = message.text.split()
try:
language: str = self.get_language(command_parts[1]) if len(command_parts) > 1 else 'ru'
except ValueError as e:
await message.reply(str(e), quote=True)
return
# Check if the reply is to a video or video note message
if not (message.reply_to_message and (message.reply_to_message.video or message.reply_to_message.video_note)):
self.logger.warning("The /video command was not used in reply to a video or video note message.")
await message.reply("Please reply to a video or video note message with the /video command.", quote=True)
return
# Identify the file type (video or video note)
media_type: str = "video" if message.reply_to_message.video else "video_note"
media: Video | VideoNote = message.reply_to_message.video if media_type == "video" else message.reply_to_message.video_note
# Send an initial message indicating processing
processing_message: Message = await message.reply_to_message.reply("Processing video message to extract text...", quote=True)
with NamedTemporaryFile(delete=False) as temp_video_file:
video_path = await client.download_media(
media.file_id,
file_name=temp_video_file.name
)
self.logger.info(f"{media_type} message downloaded to {video_path}.")
try:
# Check video duration
duration: float = video_processing.get_video_duration(video_path) # type: ignore
if duration > 300:
self.logger.warning(f"{media_type} too long: {duration} seconds.")
await processing_message.edit_text("The video or video note is too long (over 5 minutes). Please send a shorter one.")
return
# Extract audio from video
output_dir = os.path.dirname(video_path) # type: ignore
audio_path: str = video_processing.extract_audio_from_video(video_path, output_dir) # type: ignore
# Convert extracted audio to text
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
text: str = speech_recognition.convert_voice_to_text(audio_path, language=language) # type: ignore
self.logger.info(f"{media_type} message successfully converted to text.")
# Format the text for sending
formatted_text: str = (
f"<pre language=\"Conversion Result ({language})\">"
f"{text}"
"</pre>"
)
# Edit the initial processing message with the converted text
await processing_message.edit_text(formatted_text)
except FileNotFoundError:
self.logger.error("File not found during processing.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
except RuntimeError:
self.logger.error("A runtime error occurred.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
except Exception:
self.logger.error("An unexpected error occurred.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
finally:
# Stop indicating typing action
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
# Clean up temporary files
if os.path.exists(video_path): # type: ignore
os.remove(video_path) # type: ignore
if os.path.exists(audio_path):
os.remove(audio_path)
for command, handler in self.handlers.items():
self.app.on_message(filters=handler.get_filters())(handler.handle)
def run(self) -> None:
"""