167 lines
6.8 KiB
Python
167 lines
6.8 KiB
Python
import os
|
|
from logging import Logger
|
|
from tempfile import NamedTemporaryFile
|
|
|
|
from pyrogram import filters
|
|
from pyrogram.filters import Filter
|
|
from pyrogram.client import Client
|
|
from pyrogram.types import Message
|
|
from pyrogram.enums import ChatAction
|
|
|
|
from src.bot.handlers import AbstractCommandHandler
|
|
from src.utils import audio_processing
|
|
|
|
|
|
class VoiceCommandHandler(AbstractCommandHandler):
|
|
"""
|
|
Command handler for the /voice command in a Pyrogram bot.
|
|
|
|
This handler processes voice messages, converts them to text
|
|
in a specified language, and sends the result back to the user.
|
|
|
|
Attributes:
|
|
COMMAND (`str`): The name of the command that this handler handles.
|
|
LANGUAGE_ALIASES (`dict`): Mapping of language codes to their aliases.
|
|
DEFAULT_LANGUAGE (`str`): Default language code for conversion if no language is specified.
|
|
MAX_DURATION (`int`): Maximum allowed duration of the voice message in seconds.
|
|
logger (`Logger`): Logger instance for logging events.
|
|
"""
|
|
|
|
# Mapping of language codes to their aliases
|
|
LANGUAGE_ALIASES: dict[str, list[str]] = {
|
|
"en": ["en", "eng", "english"],
|
|
"ru": ["ru", "rus", "russian"],
|
|
}
|
|
|
|
# Default language code for conversion if no language is specified
|
|
DEFAULT_LANGUAGE: str = "ru"
|
|
|
|
# Maximum allowed duration of the voice message in seconds
|
|
MAX_DURATION: int = 300
|
|
|
|
def __init__(self, logger: Logger) -> None:
|
|
"""
|
|
Initializes the VoiceCommandHandler.
|
|
|
|
Args:
|
|
logger (`Logger`): Logger instance for logging events.
|
|
"""
|
|
self.logger: Logger = logger
|
|
|
|
@property
|
|
def COMMAND(self) -> str:
|
|
"""
|
|
The name of the command that this handler handles.
|
|
|
|
Returns:
|
|
str: The command name.
|
|
"""
|
|
return "voice"
|
|
|
|
def get_filters(self) -> Filter:
|
|
"""
|
|
Returns the filter for the /voice command.
|
|
|
|
Returns:
|
|
`pyrogram.filters.Filter`: A Pyrogram filter matching the /voice command.
|
|
"""
|
|
return filters.command(self.COMMAND)
|
|
|
|
async def handle(self, client: Client, message: Message) -> None:
|
|
"""
|
|
Handles the /voice command.
|
|
|
|
Converts a voice message to text in the specified language
|
|
and sends the result back to the user. The command must be used in reply
|
|
to a voice message.
|
|
|
|
Args:
|
|
client (`pyrogram.client.Client`): The Pyrogram client instance.
|
|
message (`pyrogram.types.Message`): The incoming message object to process.
|
|
"""
|
|
self.logger.info(f"Received /{self.COMMAND} command from chat_id={message.chat.id}.")
|
|
|
|
# Parse the language parameter or use the default (Russian)
|
|
command_arguments: list[str] = message.text.split()
|
|
try:
|
|
language_code: str = (
|
|
self.__resolve_language(command_arguments[1]) if len(command_arguments) > 1 else self.DEFAULT_LANGUAGE
|
|
)
|
|
except ValueError as error:
|
|
await message.reply(str(error), quote=True)
|
|
return
|
|
|
|
if not (message.reply_to_message and message.reply_to_message.voice):
|
|
self.logger.warning(f"The /{self.COMMAND} command was not used in reply to a voice message.")
|
|
await message.reply("Please reply to a voice message with the /voice command.", quote=True)
|
|
return
|
|
|
|
# Notify the user that the request is being processed
|
|
processing_message: Message = await message.reply_to_message.reply(
|
|
"Converting voice message to text...", quote=True
|
|
)
|
|
|
|
with NamedTemporaryFile(delete=False) as temp_audio_file:
|
|
audio_file_path = await client.download_media(
|
|
message.reply_to_message.voice.file_id,
|
|
file_name=temp_audio_file.name
|
|
)
|
|
self.logger.info(f"Voice message downloaded to {audio_file_path}.")
|
|
|
|
try:
|
|
# Validate voice message duration
|
|
voice_duration: float = audio_processing.get_audio_duration(audio_file_path) # type: ignore
|
|
if voice_duration > self.MAX_DURATION:
|
|
self.logger.warning(f"Voice message too long: {voice_duration} seconds.")
|
|
await processing_message.edit_text(
|
|
f"The voice message is too long (over {self.MAX_DURATION // 60} minutes). Please send a shorter one."
|
|
)
|
|
return
|
|
|
|
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
|
|
extracted_text: str = audio_processing.convert_voice_to_text(audio_file_path, language=language_code) # type: ignore
|
|
self.logger.info("Voice message successfully converted to text.")
|
|
|
|
response_text: str = (
|
|
f"<pre language=\"Conversion Result ({language_code})\">"
|
|
f"{extracted_text}"
|
|
"</pre>"
|
|
)
|
|
await processing_message.edit_text(response_text)
|
|
|
|
except FileNotFoundError:
|
|
self.logger.error("File not found during processing.", exc_info=True)
|
|
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
|
except RuntimeError:
|
|
self.logger.error("A runtime error occurred.", exc_info=True)
|
|
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
|
except Exception:
|
|
self.logger.error("An unexpected error occurred.", exc_info=True)
|
|
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
|
finally:
|
|
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
|
|
if os.path.exists(audio_file_path): # type: ignore
|
|
os.remove(audio_file_path) # type: ignore
|
|
|
|
def __resolve_language(self, language_input: str) -> str:
|
|
"""
|
|
Resolves the language code based on the input text.
|
|
|
|
Args:
|
|
language_input (str): User-provided language parameter.
|
|
|
|
Returns:
|
|
str: The resolved language code.
|
|
|
|
Raises:
|
|
ValueError: If the input does not match any supported language.
|
|
"""
|
|
normalized_input: str = language_input.lower()
|
|
for language_code, aliases in self.LANGUAGE_ALIASES.items():
|
|
if normalized_input in aliases:
|
|
return language_code
|
|
raise ValueError(
|
|
"Invalid language parameter. Please use one of the following:\n" +
|
|
"\n".join(f"{language_code}: {', '.join(aliases)}" for language_code, aliases in self.LANGUAGE_ALIASES.items())
|
|
)
|