Compare commits

...

14 Commits

18 changed files with 3543 additions and 1 deletions

7
.gitignore vendored
View File

@ -129,6 +129,9 @@ ENV/
env.bak/ env.bak/
venv.bak/ venv.bak/
# Configuration
config.py
# Spyder project settings # Spyder project settings
.spyderproject .spyderproject
.spyproject .spyproject
@ -161,6 +164,7 @@ cython_debug/
#.idea/ #.idea/
# ---> VisualStudioCode # ---> VisualStudioCode
.vscode
.vscode/* .vscode/*
!.vscode/settings.json !.vscode/settings.json
!.vscode/tasks.json !.vscode/tasks.json
@ -174,3 +178,6 @@ cython_debug/
# Built Visual Studio Code Extensions # Built Visual Studio Code Extensions
*.vsix *.vsix
# Session info
*.session
*.session-journal

40
main.py
View File

@ -0,0 +1,40 @@
import logging
from logging import Logger
from src.utils import logging_configuration
from src.integrations.gigachat_api_client import GigaChatClient
from src.integrations.google_translate_client import GoogleTranslateClient
from src.bot.telegram_userbot import TelegramUserBot
from src.core.configuration import config
def main() -> None:
"""
Entry point for starting the Telegram user bot.
"""
# Configure logging
logging_configuration.setup_logging()
logger: Logger = logging.getLogger(__name__)
# Load API credentials and configuration
api_id: str = config.API_ID
api_hash: str = config.API_HASH
api_token: str = config.API_GIGACHAT_TOKEN
# Initialize services
gigachat_client: GigaChatClient = GigaChatClient(api_token=api_token)
translate_client: GoogleTranslateClient = GoogleTranslateClient(logger)
# Initialize and run the Telegram user bot
bot: TelegramUserBot = TelegramUserBot(
session_name="userbot",
api_id=api_id,
api_hash=api_hash,
gigachat_client=gigachat_client,
translate_client=translate_client
)
bot.run()
if __name__ == "__main__":
main()

2149
poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -6,7 +6,25 @@ authors = ["Factorino73 <masenkin73@xmail.ru>"]
readme = "README.md" readme = "README.md"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.13" python = "^3.12"
pyrogram = "^2.0.106"
tgcrypto = "^1.2.5"
setuptools = "^75.6.0"
wheel = "^0.45.1"
langchain-gigachat = "^0.3.2"
punq = "^0.7.0"
pytest = "^8.3.4"
speechrecognition = "^3.13.0"
typing-extensions = "^4.12.2"
pydub = "^0.25.1"
numpy = "2.0.2"
soundfile = "^0.13.0"
torch = "^2.5.1"
llvmlite = "0.43.0"
numba = "0.60.0"
openai-whisper = "^20240930"
moviepy = "^2.1.1"
googletrans = "^4.0.2"
[build-system] [build-system]

View File

@ -0,0 +1,5 @@
from src.bot.handlers.abstract_command_handler import AbstractCommandHandler
from src.bot.handlers.ai_command_handler import AICommandHandler
from src.bot.handlers.voice_command_handler import VoiceCommandHandler
from src.bot.handlers.video_command_handler import VideoCommandHandler
from src.bot.handlers.translate_command_handler import TranslateCommandHandler

View File

@ -0,0 +1,55 @@
from abc import ABC, abstractmethod
from pyrogram.filters import Filter
from pyrogram.client import Client
from pyrogram.types import Message
class AbstractCommandHandler(ABC):
"""
Abstract base class for creating command handlers in a Pyrogram bot.
This class provides a structure for defining custom command handlers.
Each subclass must implement the following abstract methods:
- `get_filters`: Defines the filters used to match messages that
the command handler should process.
- `handle`: Contains the logic to execute when a message matches
the defined filters.
Attributes:
COMMAND (`str`): The name of the command that this handler handles.
"""
@property
@abstractmethod
def COMMAND(self) -> str:
"""
The name of the command that this handler handles.
Returns:
str: The command name.
"""
pass
@abstractmethod
def get_filters(self) -> Filter:
"""
Returns the filters for this command handler.
Returns:
`pyrogram.filters.Filter`: A Pyrogram filter or a custom filter
determines which messages this handler processes.
"""
pass
@abstractmethod
async def handle(self, client: Client, message: Message) -> None:
"""
Handles the command logic.
Args:
client (`pyrogram.client.Client`): The Pyrogram client instance.
message (`pyrogram.types.Message`): The incoming message object to process.
"""
pass

View File

@ -0,0 +1,94 @@
from logging import Logger
from typing import Optional
from pyrogram import filters
from pyrogram.filters import Filter
from pyrogram.client import Client
from pyrogram.types import Message
from pyrogram.enums import ChatAction
from src.bot.handlers import AbstractCommandHandler
from src.integrations.gigachat_api_client import GigaChatClient
class AICommandHandler(AbstractCommandHandler):
"""
Command handler for the /ai command in a Pyrogram bot.
This handler processes text input, sends it to the GigaChat AI model,
and returns the generated response.
Attributes:
COMMAND (`str`): The name of the command that this handler handles.
logger (`Logger`): Logger instance for logging events.
gigachat_client (`GigaChatClient`): Client for interacting with the GigaChat API.
"""
def __init__(self, logger: Logger, gigachat_client: GigaChatClient) -> None:
"""
Initializes the AICommandHandler.
Args:
logger (`Logger`): Logger instance for logging events.
gigachat_client (`GigaChatClient`): Client for interacting with the GigaChat API.
"""
self.logger: Logger = logger
self.gigachat_client: GigaChatClient = gigachat_client
@property
def COMMAND(self) -> str:
"""
The name of the command that this handler handles.
Returns:
str: The command name.
"""
return "ai"
def get_filters(self) -> Filter:
"""
Returns the filter for the /ai command.
Returns:
`pyrogram.filters.Filter`: A Pyrogram filter matching the /ai command.
"""
return filters.command(self.COMMAND)
async def handle(self, client: Client, message: Message) -> None:
"""
Handles the /ai command.
Sends the user's input to the GigaChat AI model and returns the generated response.
The command can be used with an inline argument or as a reply to a text message.
Args:
client (`pyrogram.client.Client`): The Pyrogram client instance.
message (`pyrogram.types.Message`): The incoming message object to process.
"""
self.logger.info(f"Received /{self.COMMAND} command from chat_id={message.chat.id}.")
# Extract the command argument or use the replied message's text
command_argument: Optional[str] = " ".join(message.text.split()[1:])
if not command_argument and message.reply_to_message and message.reply_to_message.text:
command_argument = message.reply_to_message.text
if not command_argument:
self.logger.warning(f"No argument provided for /{self.COMMAND} command in chat_id={message.chat.id}.")
await message.reply(f"Please provide a message after /{self.COMMAND} or reply to a message.", quote=True)
return
# Notify the user that the request is being processed
processing_message: Message = await message.reply(
f"{self.gigachat_client.model_name} is processing your request...", quote=True
)
try:
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
response_text: str = self.gigachat_client.get_response(str(message.chat.id), command_argument)
self.logger.debug(f"Generated response for chat_id={message.chat.id}")
await processing_message.edit_text(response_text)
except Exception as error:
self.logger.error(f"Error processing /{self.COMMAND} command for chat_id={message.chat.id}: {error}", exc_info=True)
await processing_message.edit_text("An error occurred while processing your request.")
finally:
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)

View File

@ -0,0 +1,217 @@
import re
from re import Match
from logging import Logger
from pyrogram import filters
from pyrogram.filters import Filter
from pyrogram.client import Client
from pyrogram.types import Message
from pyrogram.enums import ChatAction
from googletrans.models import Translated
from src.bot.handlers import AbstractCommandHandler
from src.integrations.google_translate_client import GoogleTranslateClient
class TranslateCommandHandler(AbstractCommandHandler):
"""
Command handler for the /translate command in a Pyrogram bot.
This handler translates text from one language to another using Google Translate.
Attributes:
COMMAND (`str`): The name of the command that this handler handles.
LANGUAGE_ALIASES (`dict`): Mapping of language codes to their aliases.
DEFAULT_SOURCE_LANGUAGE (`str`): Default source language for translation ("auto" for auto-detection).
DEFAULT_DESTINATION_LANGUAGE (`str`): Default destination language code for translation.
logger (`Logger`): Logger instance for logging.
translate_client (`GoogleTranslateClient`): Client for interacting with Google Translate.
"""
# Mapping of language codes to their aliases
LANGUAGE_ALIASES: dict[str, list[str]] = {
"ru": ["ru", "rus", "russian"],
"en": ["en", "eng", "english"],
"es": ["es", "spa", "spanish"],
"de": ["de", "ger", "german"],
"fr": ["fr", "fra", "french"],
"pt": ["pt", "por", "portuguese"],
"it": ["it", "ita", "italian"],
"zh": ["zh", "chi", "chinese"],
"ja": ["ja", "jpn", "japanese"],
"ko": ["ko", "kor", "korean"],
"ar": ["ar", "ara", "arabic"],
"tr": ["tr", "tur", "turkish"],
"hi": ["hi", "hin", "hindi"],
"vi": ["vi", "vie", "vietnamese"],
"sv": ["sv", "swe", "swedish"],
"no": ["no", "nor", "norwegian"],
"da": ["da", "dan", "danish"],
"fi": ["fi", "fin", "finnish"],
"cs": ["cs", "cze", "czech"],
"sk": ["sk", "slo", "slovak"],
"ro": ["ro", "rum", "romanian"],
"bg": ["bg", "bul", "bulgarian"],
"uk": ["uk", "ukr", "ukrainian"],
"be": ["be", "bel", "belarusian"],
"et": ["et", "est", "estonian"],
"lv": ["lv", "lav", "latvian"],
"lt": ["lt", "lit", "lithuanian"],
"tt": ["tt", "tat", "tatar"],
"cv": ["cv", "chv", "chuvash"],
}
# Default source language for translation ("auto" for auto-detection)
DEFAULT_SOURCE_LANGUAGE: str = "auto"
# Default destination language code for translation
DEFAULT_DESTINATION_LANGUAGE: str = "ru"
def __init__(self, logger: Logger, translate_client: GoogleTranslateClient) -> None:
"""
Initializes the TranslateCommandHandler.
Args:
logger (`logging.Logger`): Logger instance for logging events.
translate_client (`src.integrations.GoogleTranslateClient`): Client for interacting with Google Translate.
"""
self.logger: Logger = logger
self.translate_client: GoogleTranslateClient = translate_client
self.logger.info("GoogleTranslateClient initialized successfully.")
@property
def COMMAND(self) -> str:
"""
The name of the command that this handler handles.
"""
return "translate"
def get_filters(self) -> Filter:
"""
Returns the filter for the /translate command.
Returns:
`pyrogram.filters.Filter`: A Pyrogram filter matching the /translate command.
"""
return filters.command(self.COMMAND)
async def handle(self, client: Client, message: Message) -> None:
"""
Handles the /translate command.
Translates a given text or text from a replied-to message from one language to another.
Args:
client (`pyrogram.client.Client`): The Pyrogram client instance.
message (`pyrogram.types.Message`): The incoming message object to process.
"""
self.logger.info(
f"Received /{self.COMMAND} command from chat_id={message.chat.id}."
)
# Default values
source_language: str = self.DEFAULT_SOURCE_LANGUAGE
destination_language: str = self.DEFAULT_DESTINATION_LANGUAGE
text: str | None = None
# Parse optional arguments using regex
match_src: Match[str] | None = re.search(r"(?:src=|source=)(\w+)", message.text)
match_dest: Match[str] | None = re.search(r"(?:dest=|destination=)(\w+)", message.text)
if match_src:
source_language = match_src.group(1)
if match_dest:
destination_language = match_dest.group(1)
# Extract text (everything after the last optional parameter)
text_parts: str = re.sub(
rf"(?:/{self.COMMAND}|src=\w+|source=\w+|dest=\w+|destination=\w+)", "", message.text
).strip()
if text_parts:
text = text_parts
self.logger.debug(
f"Parsed parameters - source_language: {source_language}, destination_language: {destination_language}, text length: {len(text) if text else 0}"
)
# Resolve language aliases
try:
if source_language != self.DEFAULT_SOURCE_LANGUAGE:
source_language = self.__resolve_language(source_language)
destination_language = self.__resolve_language(destination_language)
except ValueError:
await message.reply("Invalid language parameter provided.", quote=True)
self.logger.error("Invalid language parameter provided.", exc_info=True)
return
# Use replied message text if no text is provided
if not text and message.reply_to_message and message.reply_to_message.text:
text = message.reply_to_message.text
if not text:
await message.reply(
f"Please provide a message after /{self.COMMAND} or reply to a message.",
quote=True
)
self.logger.warning(
f"No argument provided for /{self.COMMAND} command in chat_id={message.chat.id}."
)
return
# Notify the user that the translation is in progress
processing_message: Message = await message.reply(
"Translating text...", quote=True
)
try:
# Perform translation
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
translation_result: Translated = await self.translate_client.translate_text(
text=text,
src_lang=source_language,
dest_lang=destination_language
)
self.logger.debug(f"Translating text for chat_id={message.chat.id}")
# Formatted response text
caption: str = f"Translated from {translation_result.src} to {translation_result.dest}"
response_text: str = (
f"<pre language=\"{caption}\">"
f"{translation_result.text}"
"</pre>"
)
await processing_message.edit_text(response_text)
except Exception as error:
self.logger.error(
f"Error processing /{self.COMMAND} command for chat_id={message.chat.id}: {error}",
exc_info=True
)
await processing_message.edit_text(
"An error occurred during the translation process. Please try again later."
)
finally:
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
def __resolve_language(self, language_input: str) -> str:
"""
Resolves the language code based on the input text.
Args:
language_input (`str`): User-provided language parameter.
Returns:
`str`: The resolved language code.
Raises:
`ValueError`: If the input does not match any supported language.
"""
normalized_input: str = language_input.lower()
for language_code, aliases in self.LANGUAGE_ALIASES.items():
if normalized_input in aliases:
return language_code
self.logger.warning(f"Invalid language parameter provided: {language_input}")
raise ValueError(f"Invalid language parameter provided: {language_input}")

View File

@ -0,0 +1,179 @@
import os
from logging import Logger
import tempfile
from pyrogram import filters
from pyrogram.filters import Filter
from pyrogram.client import Client
from pyrogram.types import Message
from pyrogram.enums import ChatAction
from pyrogram.types.messages_and_media.video import Video
from pyrogram.types.messages_and_media.video_note import VideoNote
from src.bot.handlers import AbstractCommandHandler
from src.utils import video_processing, audio_processing
class VideoCommandHandler(AbstractCommandHandler):
"""
Command handler for the /video command in a Pyrogram bot.
This handler processes video or video note messages, extracts audio,
converts the audio to text in a specified language, and sends the result back to the user.
Attributes:
COMMAND (`str`): The name of the command that this handler handles.
LANGUAGE_ALIASES (`dict`): Mapping of language codes to their aliases.
DEFAULT_LANGUAGE (`str`): Default language code for conversion if no language is specified.
MAX_DURATION (`int`): Maximum allowed duration of the video in seconds.
logger (`Logger`): Logger instance for logging events.
"""
# Mapping of language codes to their aliases
LANGUAGE_ALIASES: dict[str, list[str]] = {
"en": ["en", "eng", "english"],
"ru": ["ru", "rus", "russian"],
}
# Default language code for conversion if no language is specified
DEFAULT_LANGUAGE: str = "ru"
# Maximum allowed duration of the video message in seconds
MAX_DURATION: int = 300
def __init__(self, logger: Logger) -> None:
"""
Initializes the VideoCommandHandler.
Args:
logger (`Logger`): Logger instance for logging events.
"""
self.logger: Logger = logger
def get_filters(self) -> Filter:
"""
Returns the filter for the /video command.
Returns:
`pyrogram.filters.Filter`: A Pyrogram filter matching the /video command.
"""
return filters.command(self.COMMAND)
@property
def COMMAND(self) -> str:
"""
The name of the command that this handler handles.
Returns:
str: The command name.
"""
return "video"
async def handle(self, client: Client, message: Message) -> None:
"""
Handles the /video command.
Extracts audio from a video or video note message, converts the audio to text
in the specified language and sends the result back to the user. The command
must be used in reply to a video or video note message.
Args:
client (`pyrogram.client.Client`): The Pyrogram client instance.
message (`pyrogram.types.Message`): The incoming message object to process.
"""
self.logger.info(f"Received /{self.COMMAND} command from chat_id={message.chat.id}.")
# Parse the language parameter or use the default (Russian)
command_arguments: list[str] = message.text.split()
try:
language_code: str = (
self.__resolve_language(command_arguments[1]) if len(command_arguments) > 1 else self.DEFAULT_LANGUAGE
)
except ValueError as error:
await message.reply(str(error), quote=True)
return
# Check if the reply is to a video or video note message
if not (message.reply_to_message and (message.reply_to_message.video or message.reply_to_message.video_note)):
self.logger.warning(f"The /{self.COMMAND} command was not used in reply to a video or video note message.")
await message.reply("Please reply to a video or video note message with the /video command.", quote=True)
return
# Identify the file type (video or video note)
media_type: str = "video" if message.reply_to_message.video else "video_note"
media: Video | VideoNote = message.reply_to_message.video if media_type == "video" else message.reply_to_message.video_note
# Notify the user that the request is being processed
processing_message: Message = await message.reply_to_message.reply(
"Processing video message to extract text...", quote=True
)
with tempfile.NamedTemporaryFile(delete=False) as temp_video_file:
video_file_path = await client.download_media(
media.file_id,
file_name=temp_video_file.name
)
self.logger.info(f"{media_type} message downloaded to {video_file_path}.")
try:
# Validate video duration
video_duration: float = await video_processing.get_video_duration(video_file_path) # type: ignore
if video_duration > self.MAX_DURATION:
self.logger.warning(f"{media_type} too long: {video_duration} seconds.")
await processing_message.edit_text(
f"The video or video note is too long (over {self.MAX_DURATION // 60} minutes). Please send a shorter one."
)
return
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
# Extract audio and convert it to text
output_dir = os.path.dirname(video_file_path) # type: ignore
audio_file_path: str = await video_processing.extract_audio_from_video(video_file_path, output_dir) # type: ignore
extracted_text: str = await audio_processing.convert_voice_to_text(audio_file_path, language=language_code) # type: ignore
self.logger.info(f"{media_type} message successfully converted to text.")
response_text: str = (
f"<pre language=\"Conversion Result ({language_code})\">"
f"{extracted_text}"
"</pre>"
)
await processing_message.edit_text(response_text)
except FileNotFoundError:
self.logger.error("File not found during processing.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
except RuntimeError:
self.logger.error("A runtime error occurred.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
except Exception:
self.logger.error("An unexpected error occurred.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
finally:
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
if os.path.exists(video_file_path): # type: ignore
os.remove(video_file_path) # type: ignore
if os.path.exists(audio_file_path): # type: ignore
os.remove(audio_file_path) # type: ignore
def __resolve_language(self, language_input: str) -> str:
"""
Resolves the language code based on the input text.
Args:
language_input (str): User-provided language parameter.
Returns:
str: The resolved language code.
Raises:
ValueError: If the input does not match any supported language.
"""
normalized_input: str = language_input.lower()
for language_code, aliases in self.LANGUAGE_ALIASES.items():
if normalized_input in aliases:
return language_code
raise ValueError(
"Invalid language parameter. Please use one of the following:\n" +
"\n".join(f"{language_code}: {', '.join(aliases)}" for language_code, aliases in self.LANGUAGE_ALIASES.items())
)

View File

@ -0,0 +1,166 @@
import os
from logging import Logger
import tempfile
from pyrogram import filters
from pyrogram.filters import Filter
from pyrogram.client import Client
from pyrogram.types import Message
from pyrogram.enums import ChatAction
from src.bot.handlers import AbstractCommandHandler
from src.utils import audio_processing
class VoiceCommandHandler(AbstractCommandHandler):
"""
Command handler for the /voice command in a Pyrogram bot.
This handler processes voice messages, converts them to text
in a specified language, and sends the result back to the user.
Attributes:
COMMAND (`str`): The name of the command that this handler handles.
LANGUAGE_ALIASES (`dict`): Mapping of language codes to their aliases.
DEFAULT_LANGUAGE (`str`): Default language code for conversion if no language is specified.
MAX_DURATION (`int`): Maximum allowed duration of the voice message in seconds.
logger (`Logger`): Logger instance for logging events.
"""
# Mapping of language codes to their aliases
LANGUAGE_ALIASES: dict[str, list[str]] = {
"en": ["en", "eng", "english"],
"ru": ["ru", "rus", "russian"],
}
# Default language code for conversion if no language is specified
DEFAULT_LANGUAGE: str = "ru"
# Maximum allowed duration of the voice message in seconds
MAX_DURATION: int = 300
def __init__(self, logger: Logger) -> None:
"""
Initializes the VoiceCommandHandler.
Args:
logger (`Logger`): Logger instance for logging events.
"""
self.logger: Logger = logger
@property
def COMMAND(self) -> str:
"""
The name of the command that this handler handles.
Returns:
str: The command name.
"""
return "voice"
def get_filters(self) -> Filter:
"""
Returns the filter for the /voice command.
Returns:
`pyrogram.filters.Filter`: A Pyrogram filter matching the /voice command.
"""
return filters.command(self.COMMAND)
async def handle(self, client: Client, message: Message) -> None:
"""
Handles the /voice command.
Converts a voice message to text in the specified language
and sends the result back to the user. The command must be used in reply
to a voice message.
Args:
client (`pyrogram.client.Client`): The Pyrogram client instance.
message (`pyrogram.types.Message`): The incoming message object to process.
"""
self.logger.info(f"Received /{self.COMMAND} command from chat_id={message.chat.id}.")
# Parse the language parameter or use the default (Russian)
command_arguments: list[str] = message.text.split()
try:
language_code: str = (
self.__resolve_language(command_arguments[1]) if len(command_arguments) > 1 else self.DEFAULT_LANGUAGE
)
except ValueError as error:
await message.reply(str(error), quote=True)
return
if not (message.reply_to_message and message.reply_to_message.voice):
self.logger.warning(f"The /{self.COMMAND} command was not used in reply to a voice message.")
await message.reply("Please reply to a voice message with the /voice command.", quote=True)
return
# Notify the user that the request is being processed
processing_message: Message = await message.reply_to_message.reply(
"Converting voice message to text...", quote=True
)
with tempfile.NamedTemporaryFile(delete=False) as temp_audio_file:
audio_file_path = await client.download_media(
message.reply_to_message.voice.file_id,
file_name=temp_audio_file.name
)
self.logger.info(f"Voice message downloaded to {audio_file_path}.")
try:
# Validate voice message duration
voice_duration: float = await audio_processing.get_audio_duration(audio_file_path)
if voice_duration > self.MAX_DURATION:
self.logger.warning(f"Voice message too long: {voice_duration} seconds.")
await processing_message.edit_text(
f"The voice message is too long (over {self.MAX_DURATION // 60} minutes). Please send a shorter one."
)
return
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
extracted_text: str = await audio_processing.convert_voice_to_text(audio_file_path, language=language_code) # type: ignore
self.logger.info("Voice message successfully converted to text.")
response_text: str = (
f"<pre language=\"Conversion Result ({language_code})\">"
f"{extracted_text}"
"</pre>"
)
await processing_message.edit_text(response_text)
except FileNotFoundError:
self.logger.error("File not found during processing.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
except RuntimeError:
self.logger.error("A runtime error occurred.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
except Exception:
self.logger.error("An unexpected error occurred.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
finally:
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
if os.path.exists(audio_file_path): # type: ignore
os.remove(audio_file_path) # type: ignore
def __resolve_language(self, language_input: str) -> str:
"""
Resolves the language code based on the input text.
Args:
language_input (str): User-provided language parameter.
Returns:
str: The resolved language code.
Raises:
ValueError: If the input does not match any supported language.
"""
normalized_input: str = language_input.lower()
for language_code, aliases in self.LANGUAGE_ALIASES.items():
if normalized_input in aliases:
return language_code
raise ValueError(
"Invalid language parameter. Please use one of the following:\n" +
"\n".join(f"{language_code}: {', '.join(aliases)}" for language_code, aliases in self.LANGUAGE_ALIASES.items())
)

View File

@ -0,0 +1,65 @@
import logging
from logging import Logger
from pyrogram.client import Client
from src.integrations.gigachat_api_client import GigaChatClient
from src.integrations.google_translate_client import GoogleTranslateClient
from src.bot.handlers import AbstractCommandHandler
from src.bot.handlers import AICommandHandler
from src.bot.handlers import VoiceCommandHandler
from src.bot.handlers import VideoCommandHandler
from src.bot.handlers import TranslateCommandHandler
class TelegramUserBot:
"""
A Telegram user bot.
Attributes:
app (`Client`): The Pyrogram client instance for the bot.
gigachat_client (`GigaChatClient`): The client instance for GigaChat integration.
"""
def __init__(self, session_name: str, api_id: str, api_hash: str, gigachat_client: GigaChatClient, translate_client: GoogleTranslateClient) -> None:
"""
Initializes the Telegram user bot.
Args:
session_name (`str`): The session name for the bot.
api_id (`str`): The API ID for the Telegram application.
api_hash (`str`): The API hash for the Telegram application.
gigachat_client (`GigaChatClient`): An instance of GigaChatClient for handling AI responses.
"""
# Configure logging
self.logger: Logger = logging.getLogger(__name__)
self.app: Client = Client(session_name, api_id=api_id, api_hash=api_hash)
# Configure handlers
self.handlers: dict[str, AbstractCommandHandler] = {
"ai": AICommandHandler(self.logger, gigachat_client),
"voice": VoiceCommandHandler(self.logger),
"video": VideoCommandHandler(self.logger),
"translate": TranslateCommandHandler(self.logger, translate_client),
}
self.register_handlers()
def register_handlers(self) -> None:
"""
Registers the message handlers for the bot.
"""
self.logger.debug("Registering handlers.")
for command, handler in self.handlers.items():
self.app.on_message(filters=handler.get_filters())(handler.handle)
def run(self) -> None:
"""
Starts the bot.
"""
self.logger.info("Bot is starting.")
print("Bot is running.")
try:
self.app.run()
except Exception as e:
self.logger.critical(f"Failed to start the bot: {e}", exc_info=True)

View File

@ -0,0 +1,27 @@
"""
config.py
This file contains the configuration settings required for the Telegram bot and GigaChat integration.
Configuration settings:
API_ID (str): The unique identifier for your Telegram application. It is required to initialize the Pyrogram client.
API_HASH (str): The hash associated with your Telegram application. It is required to initialize the Pyrogram client.
BOT_TOKEN (str): The token for the Telegram bot. It is used for authenticating the bot with the Telegram API.
API_GIGACHAT_TOKEN (str): The token for authenticating with the GigaChat API. It is used for communication with the GigaChat service.
Note:
- Make sure to keep these tokens secure and do not share them publicly.
- These values should be replaced with actual credentials for the bot and the GigaChat API.
"""
# The API ID for the Telegram application
API_ID: str = ''
# The API hash for the Telegram application
API_HASH: str = ''
# The bot token for authenticating the bot with Telegram
BOT_TOKEN: str = ''
# The token for authenticating with the GigaChat API
API_GIGACHAT_TOKEN: str = ''

View File

@ -0,0 +1,102 @@
import logging
from logging import Logger
from typing import Dict
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_gigachat.chat_models import GigaChat
class GigaChatClient:
"""
A client class for interacting with the GigaChat API using LangChain components.
"""
def __init__(self, api_token: str, model_name: str = "GigaChat") -> None:
"""
Initializes the GigaChatManager with API credentials and a default model.
Args:
api_token (str): The API token for authenticating with the GigaChat API.
model_name (str): The GigaChat model to use. Defaults to "GigaChat".
"""
# Configure logging
self.logger: Logger = logging.getLogger(__name__)
self.api_token: str = api_token
self.model_name: str = model_name
self.logger.info(f"Initialize GigaChat client Using model: {self.model_name}")
self.llm: GigaChat = self._create_llm(model_name)
self.store: Dict[str, InMemoryChatMessageHistory] = {}
self.conversation = RunnableWithMessageHistory(self.llm, self.get_session_history)
def _create_llm(self, model_name: str) -> GigaChat:
"""
Creates and configures a GigaChat LLM instance.
Args:
model_name (str): The GigaChat model to use.
Returns:
GigaChat: Configured GigaChat instance.
"""
self.logger.debug(f"Creating GigaChat LLM with model: {model_name}")
return GigaChat(
credentials=self.api_token,
scope="GIGACHAT_API_PERS",
model=model_name,
verify_ssl_certs=False,
streaming=False,
)
def get_session_history(self, session_id: str) -> InMemoryChatMessageHistory:
"""
Retrieves the chat history for a given session, creating it if it does not exist.
Args:
session_id (str): The unique identifier for the session.
Returns:
InMemoryChatMessageHistory: The chat history for the session.
"""
if session_id not in self.store:
self.logger.debug(f"Creating new session history for session_id: {session_id}")
self.store[session_id] = InMemoryChatMessageHistory()
else:
self.logger.debug(f"Retrieving existing session history for session_id: {session_id}")
return self.store[session_id]
def set_model(self, model_name: str) -> None:
"""
Updates the LLM to use a different GigaChat model.
Args:
model_name (str): The new GigaChat model to use.
"""
self.logger.info(f"Switching model to: {model_name}")
self.llm = self._create_llm(model_name)
self.conversation = RunnableWithMessageHistory(self.llm, self.get_session_history)
def get_response(self, session_id: str, text: str) -> str:
"""
Get a response to the provided input text for a given session.
Args:
session_id (str): The unique identifier for the session.
text (str): The input text for which a response is needed.
Returns:
str: The response text.
"""
self.logger.info(f"Generating response for session_id: {session_id}")
try:
response = self.conversation.invoke(
input=text,
config={"configurable": {"session_id": session_id}},
)
self.logger.debug(f"Response for session_id {session_id}")
return response.content
except Exception as e:
self.logger.error(f"Error while getting response for session_id: {session_id}. Error: {e}", exc_info=True)
raise

View File

@ -0,0 +1,106 @@
from logging import Logger
from googletrans import Translator, LANGUAGES
from googletrans.models import Translated, Detected
class GoogleTranslateClient:
"""
A client for interacting with Google Translate.
"""
def __init__(self, logger: Logger) -> None:
"""
Initializes the client for interacting with Google Translate.
Args:
logger (`logging.Logger`): Logger instance for logging.
"""
self.logger: Logger = logger
self.translator: Translator = Translator()
self.logger.info("Google Translate client initialized successfully.")
async def get_available_languages(self) -> dict[str, str]:
"""
Retrieves a list of available languages supported by Google Translate.
Returns:
`dict[str, str]`: A dictionary where keys are language codes and values are language names.
"""
self.logger.info("Retrieving available languages.")
return LANGUAGES
async def detect_language(self, text: str) -> Detected:
"""
Detects the language of a given text.
Args:
text (`str`): The text for language detection.
Returns:
`googletrans.models.Detected`: The detection object containing the detected language and confidence.
"""
try:
self.logger.info("Detecting language for text.")
detection: Detected = await self.translator.detect(text)
self.logger.debug("Detection language completed successfully.")
return detection
except Exception as e:
self.logger.error(f"Error during language detection: {e}", exc_info=True)
raise RuntimeError(f"Error during language detection: {e}")
async def translate_text(
self,
text: str,
dest_lang: str = "ru",
src_lang: str = "auto"
) -> Translated:
"""
Translates a given text to the target language.
Args:
text (`str`): The text to be translated.
dest_lang (`str`): The target language code (e.g., 'ru' for Russian). Defaults to 'ru'.
src_lang (`str`): The source language code. Defaults to 'auto' for automatic detection.
Returns:
`googletrans.models.Translated`: The translation object containing the translated text and metadata.
"""
try:
self.logger.info(f"Translating text from {src_lang} to {dest_lang}.")
translation: Translated = await self.translator.translate(
text, dest_lang, src_lang
)
self.logger.info("Translation completed successfully.")
return translation
except Exception as e:
self.logger.error(f"Error during translation: {e}", exc_info=True)
raise RuntimeError(f"Error during translation: {e}")
async def translate_batch(
self,
texts: list[str],
dest_lang: str = "ru",
src_lang: str = "auto"
) -> list[Translated]:
"""
Translates a list of texts to the target language.
Args:
texts (`list[str]`): A list of texts to be translated.
dest_lang (`str`): The target language code (e.g., 'ru' for Russian). Defaults to 'ru'.
src_lang (`str`): The source language code. Defaults to 'auto' for automatic detection.
Returns:
`list[googletrans.models.Translated]`: A list of translation objects containing the translated texts and metadata.
"""
try:
self.logger.info(f"Translating batch of {len(texts)} texts from {src_lang} to {dest_lang}.")
translations: list[Translated] = await self.translator.translate(
texts, dest_lang, src_lang
)
self.logger.info("Batch translation completed successfully.")
return translations
except Exception as e:
self.logger.error(f"Error during batch translation: {e}", exc_info=True)
raise RuntimeError(f"Error during batch translation: {e}")

View File

@ -0,0 +1,124 @@
import os
import logging
from logging import Logger
import asyncio
from pydub import AudioSegment
import speech_recognition as sr
from speech_recognition.audio import AudioData
# Configure logging
logger: Logger = logging.getLogger(__name__)
async def convert_to_wav(file_path: str) -> str:
"""
Converts an audio file to WAV format if it is not already in WAV format.
Args:
file_path (`str`): The path to the audio file to be converted.
Returns:
`str`: The path to the converted or original WAV file.
Raises:
`FileNotFoundError`: If the file does not exist.
`RuntimeError`: If the conversion fails for any reason.
"""
# Check if the file exists
if not os.path.exists(file_path):
logger.error(f"File {file_path} does not exist.")
raise FileNotFoundError(f"File {file_path} does not exist.")
if file_path.lower().endswith('.wav'):
logger.info(f"File {file_path} is already in WAV format.")
return file_path
try:
logger.info(f"Converting {file_path} to WAV format.")
audio = await asyncio.to_thread(AudioSegment.from_file, file_path)
wav_path: str = f"{os.path.splitext(file_path)[0]}.wav"
await asyncio.to_thread(audio.export, wav_path, format="wav")
logger.info(f"File converted to {wav_path}.")
return wav_path
except Exception as e:
logger.error(f"Failed to convert file to WAV: {e}")
raise RuntimeError(f"Failed to convert file to WAV: {e}")
async def get_audio_duration(file_path: str) -> float:
"""
Retrieves the duration of an audio file in seconds.
Args:
file_path (`str`): The path to the audio file.
Returns:
`float`: The duration of the audio file in seconds.
Raises:
`FileNotFoundError`: If the file does not exist.
`RuntimeError`: If unable to get the file duration.
"""
# Check if the file exists
if not os.path.exists(file_path):
logger.error(f"File {file_path} does not exist.")
raise FileNotFoundError(f"File {file_path} does not exist.")
try:
logger.info(f"Getting duration of {file_path}.")
audio = await asyncio.to_thread(AudioSegment.from_file, file_path)
duration: float = len(audio) / 1000 # Duration in seconds
logger.info(f"Duration of {file_path}: {duration} seconds.")
return duration
except Exception as e:
logger.error(f"Failed to get file duration: {e}")
raise RuntimeError(f"Failed to get file duration: {e}")
async def convert_voice_to_text(file_path: str, language='ru') -> str:
"""
Converts speech from an audio file to text using OpenAI speech recognition service.
Args:
file_path (`str`): The path to the audio file to be processed.
language (`str`): The language code for speech recognition (default is 'ru').
Returns:
`str`: The transcribed text if recognition is successful.
Raises:
`FileNotFoundError`: If the file does not exist.
`RuntimeError`: For any errors encountered during processing.
"""
# Check if the file exists
if not os.path.exists(file_path):
logger.error(f"File {file_path} does not exist.")
raise FileNotFoundError("File does not exist.")
# Convert the file to WAV format if necessary
try:
wav_path: str = await convert_to_wav(file_path)
except RuntimeError as e:
logger.error(f"Error converting to WAV: {e}")
raise RuntimeError(f"Error converting to WAV: {e}")
recognizer = sr.Recognizer()
try:
duration: float = await get_audio_duration(wav_path)
logger.info(f"Processing file {wav_path} ({duration} sec) for speech recognition.")
with sr.AudioFile(wav_path) as source:
audio_data: AudioData = await asyncio.to_thread(recognizer.record, source)
text = await asyncio.to_thread(
recognizer.recognize_whisper, audio_data, language=language, model='medium'
)
logger.info("Speech recognition successful.")
return text # type: ignore
except sr.UnknownValueError:
logger.warning(f"Speech in {wav_path} could not be recognized.")
raise RuntimeError("Speech could not be recognized.")
except sr.RequestError as e:
logger.error(f"Request error from the recognition service: {e}")
raise RuntimeError(f"Request error from the recognition service: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
raise RuntimeError(f"An unexpected error occurred: {e}")

View File

@ -0,0 +1,62 @@
import logging
import logging.config
from datetime import datetime
def setup_logging(output_to_console=False) -> None:
"""
Configures the logging system.
This function sets up logging with optional output to the console and ensures
log files are rotated daily. It creates a detailed logging format and retains
log files for a week.
Args:
output_to_console (`bool`): If True, log messages will also be printed to the console.
Defaults to False.
"""
# Define the default handlers to use. Always logs to a file.
handlers: list[str] = ['file']
if output_to_console:
# Add console logging if requested
handlers.append('console')
# Generate the log file name with the current date
log_filename: str = f'logs/log-{datetime.now().strftime("%Y-%m-%d")}.log'
# Configure the logging settings using a dictionary
logging.config.dictConfig({
'version': 1, # Logging configuration version
'disable_existing_loggers': True, # Deny other loggers to remain active
'formatters': {
'detailed': { # Define a detailed logging format
'format': (
'%(asctime)s | %(levelname)-8s | '
'%(filename)s.%(funcName)s, line %(lineno)d: '
'%(message)s'
),
'datefmt': '%Y-%m-%d %H:%M:%S' # Timestamp format
},
},
'handlers': {
# Console handler outputs log messages to the console
'console': {
'class': 'logging.StreamHandler', # Standard output stream
'formatter': 'detailed', # Use the detailed formatter
},
# File handler writes log messages to a file, rotating daily
'file': {
'class': 'logging.handlers.TimedRotatingFileHandler',
'filename': log_filename, # Log file path
'when': 'midnight', # Rotate log files at midnight
'interval': 1, # Rotate daily
'backupCount': 7, # Keep up to 7 old log files
'formatter': 'detailed', # Use the detailed formatter
},
},
# Define the root logger configuration
'root': {
'handlers': handlers, # Handlers to use (console, file, or both)
'level': 'DEBUG', # Log level (DEBUG logs all levels)
},
})

View File

@ -0,0 +1,71 @@
import os
import logging
from logging import Logger
import asyncio
from moviepy import VideoFileClip
# Configure logging
logger: Logger = logging.getLogger(__name__)
async def get_video_duration(video_path: str) -> float:
"""
Get the duration of a video file in seconds.
Args:
video_path (`str`): The path to the video file.
Returns:
`float`: The duration of the video in seconds.
Raises:
`FileNotFoundError`: If the video file does not exist.
`RuntimeError`: If an error occurs during processing.
"""
if not os.path.exists(video_path):
logger.error(f"Video file {video_path} does not exist.")
raise FileNotFoundError(f"Video file {video_path} does not exist.")
try:
video_clip: VideoFileClip = await asyncio.to_thread(VideoFileClip, video_path)
duration = video_clip.duration
logger.info(f"Duration of video {video_path}: {duration} seconds.")
return duration
except Exception as e:
logger.error(f"Failed to get video duration: {e}", exc_info=True)
raise RuntimeError(f"Failed to get video duration: {e}")
finally:
await asyncio.to_thread(video_clip.close)
async def extract_audio_from_video(video_path: str, output_dir: str) -> str:
"""
Extracts the audio track from a video file and saves it as a WAV file.
Args:
video_path (`str`): The path to the video file.
output_dir (`str`): The directory where the audio file will be saved.
Returns:
`str`: The path to the extracted audio file.
Raises:
`FileNotFoundError`: If the video file does not exist.
`RuntimeError`: If an error occurs during audio extraction.
"""
if not os.path.exists(video_path):
logger.error(f"Video file {video_path} does not exist.")
raise FileNotFoundError(f"Video file {video_path} does not exist.")
try:
logger.info(f"Extracting audio from video: {video_path}")
video_clip: VideoFileClip = await asyncio.to_thread(VideoFileClip, video_path)
audio_path: str = os.path.join(output_dir, f"{os.path.splitext(os.path.basename(video_path))[0]}.wav")
await asyncio.to_thread(video_clip.audio.write_audiofile, audio_path) # type: ignore
logger.info(f"Audio extracted and saved to: {audio_path}")
return audio_path
except Exception as e:
logger.error(f"Failed to extract audio from video: {e}", exc_info=True)
raise RuntimeError(f"Failed to extract audio from video: {e}")
finally:
await asyncio.to_thread(video_clip.close)

View File

@ -0,0 +1,55 @@
import pytest
from unittest.mock import MagicMock, patch
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_gigachat import GigaChat
from src.integrations.gigachat_api_client import GigaChatClient
from src.core.configuration import config
API_GIGACHAT_TOKEN: str = config.API_GIGACHAT_TOKEN
@pytest.fixture
def gigachat_client() -> GigaChatClient:
"""Fixture to create a GigaChatClient instance with a mock API token."""
return GigaChatClient(api_token=API_GIGACHAT_TOKEN)
def test_initialization(gigachat_client) -> None:
"""Test if the GigaChatClient initializes correctly."""
assert gigachat_client.api_token == API_GIGACHAT_TOKEN
assert gigachat_client.model_name == "GigaChat"
assert isinstance(gigachat_client.store, dict)
assert gigachat_client.llm is not None
def test_create_llm() -> None:
"""Test the _create_llm method for proper LLM creation."""
client = GigaChatClient(api_token=API_GIGACHAT_TOKEN)
mock_llm: GigaChat = client._create_llm("GigaChat-Pro")
assert mock_llm.credentials == API_GIGACHAT_TOKEN
assert mock_llm.model == "GigaChat-Pro"
def test_get_session_history(gigachat_client) -> None:
"""Test the get_session_history method for creating/retrieving session history."""
session_id = "test_session"
history = gigachat_client.get_session_history(session_id)
assert isinstance(history, InMemoryChatMessageHistory)
assert session_id in gigachat_client.store
assert gigachat_client.store[session_id] is history
def test_set_model(gigachat_client) -> None:
"""Test the set_model method for updating the LLM model."""
new_model = "GigaChat-Pro"
gigachat_client.set_model(new_model)
assert gigachat_client.llm.model == new_model
def test_get_response() -> None:
"""Test the get_response method by verifying the response code."""
with patch("langchain_core.runnables.history.RunnableWithMessageHistory") as MockRunnable:
mock_runnable = MagicMock()
mock_runnable.invoke.return_value.code = 200
MockRunnable.return_value = mock_runnable
client = GigaChatClient(api_token=API_GIGACHAT_TOKEN)
response_code = mock_runnable.invoke.return_value.code
assert response_code == 200