Compare commits
No commits in common. "feature/async-video-and-audio-processing" and "main" have entirely different histories.
feature/as
...
main
7
.gitignore
vendored
7
.gitignore
vendored
@ -129,9 +129,6 @@ ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Configuration
|
||||
config.py
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
@ -164,7 +161,6 @@ cython_debug/
|
||||
#.idea/
|
||||
|
||||
# ---> VisualStudioCode
|
||||
.vscode
|
||||
.vscode/*
|
||||
!.vscode/settings.json
|
||||
!.vscode/tasks.json
|
||||
@ -178,6 +174,3 @@ cython_debug/
|
||||
# Built Visual Studio Code Extensions
|
||||
*.vsix
|
||||
|
||||
# Session info
|
||||
*.session
|
||||
*.session-journal
|
40
main.py
40
main.py
@ -1,40 +0,0 @@
|
||||
import logging
|
||||
from logging import Logger
|
||||
|
||||
from src.utils import logging_configuration
|
||||
from src.integrations.gigachat_api_client import GigaChatClient
|
||||
from src.integrations.google_translate_client import GoogleTranslateClient
|
||||
from src.bot.telegram_userbot import TelegramUserBot
|
||||
from src.core.configuration import config
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""
|
||||
Entry point for starting the Telegram user bot.
|
||||
"""
|
||||
# Configure logging
|
||||
logging_configuration.setup_logging()
|
||||
logger: Logger = logging.getLogger(__name__)
|
||||
|
||||
# Load API credentials and configuration
|
||||
api_id: str = config.API_ID
|
||||
api_hash: str = config.API_HASH
|
||||
api_token: str = config.API_GIGACHAT_TOKEN
|
||||
|
||||
# Initialize services
|
||||
gigachat_client: GigaChatClient = GigaChatClient(api_token=api_token)
|
||||
translate_client: GoogleTranslateClient = GoogleTranslateClient(logger)
|
||||
|
||||
# Initialize and run the Telegram user bot
|
||||
bot: TelegramUserBot = TelegramUserBot(
|
||||
session_name="userbot",
|
||||
api_id=api_id,
|
||||
api_hash=api_hash,
|
||||
gigachat_client=gigachat_client,
|
||||
translate_client=translate_client
|
||||
)
|
||||
bot.run()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
2149
poetry.lock
generated
2149
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@ -6,25 +6,7 @@ authors = ["Factorino73 <masenkin73@xmail.ru>"]
|
||||
readme = "README.md"
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.12"
|
||||
pyrogram = "^2.0.106"
|
||||
tgcrypto = "^1.2.5"
|
||||
setuptools = "^75.6.0"
|
||||
wheel = "^0.45.1"
|
||||
langchain-gigachat = "^0.3.2"
|
||||
punq = "^0.7.0"
|
||||
pytest = "^8.3.4"
|
||||
speechrecognition = "^3.13.0"
|
||||
typing-extensions = "^4.12.2"
|
||||
pydub = "^0.25.1"
|
||||
numpy = "2.0.2"
|
||||
soundfile = "^0.13.0"
|
||||
torch = "^2.5.1"
|
||||
llvmlite = "0.43.0"
|
||||
numba = "0.60.0"
|
||||
openai-whisper = "^20240930"
|
||||
moviepy = "^2.1.1"
|
||||
googletrans = "^4.0.2"
|
||||
python = "^3.13"
|
||||
|
||||
|
||||
[build-system]
|
||||
|
@ -1,5 +0,0 @@
|
||||
from src.bot.handlers.abstract_command_handler import AbstractCommandHandler
|
||||
from src.bot.handlers.ai_command_handler import AICommandHandler
|
||||
from src.bot.handlers.voice_command_handler import VoiceCommandHandler
|
||||
from src.bot.handlers.video_command_handler import VideoCommandHandler
|
||||
from src.bot.handlers.translate_command_handler import TranslateCommandHandler
|
@ -1,55 +0,0 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from pyrogram.filters import Filter
|
||||
from pyrogram.client import Client
|
||||
from pyrogram.types import Message
|
||||
|
||||
|
||||
class AbstractCommandHandler(ABC):
|
||||
"""
|
||||
Abstract base class for creating command handlers in a Pyrogram bot.
|
||||
|
||||
This class provides a structure for defining custom command handlers.
|
||||
Each subclass must implement the following abstract methods:
|
||||
|
||||
- `get_filters`: Defines the filters used to match messages that
|
||||
the command handler should process.
|
||||
- `handle`: Contains the logic to execute when a message matches
|
||||
the defined filters.
|
||||
|
||||
Attributes:
|
||||
COMMAND (`str`): The name of the command that this handler handles.
|
||||
"""
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def COMMAND(self) -> str:
|
||||
"""
|
||||
The name of the command that this handler handles.
|
||||
|
||||
Returns:
|
||||
str: The command name.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_filters(self) -> Filter:
|
||||
"""
|
||||
Returns the filters for this command handler.
|
||||
|
||||
Returns:
|
||||
`pyrogram.filters.Filter`: A Pyrogram filter or a custom filter
|
||||
determines which messages this handler processes.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def handle(self, client: Client, message: Message) -> None:
|
||||
"""
|
||||
Handles the command logic.
|
||||
|
||||
Args:
|
||||
client (`pyrogram.client.Client`): The Pyrogram client instance.
|
||||
message (`pyrogram.types.Message`): The incoming message object to process.
|
||||
"""
|
||||
pass
|
@ -1,94 +0,0 @@
|
||||
from logging import Logger
|
||||
from typing import Optional
|
||||
|
||||
from pyrogram import filters
|
||||
from pyrogram.filters import Filter
|
||||
from pyrogram.client import Client
|
||||
from pyrogram.types import Message
|
||||
from pyrogram.enums import ChatAction
|
||||
|
||||
from src.bot.handlers import AbstractCommandHandler
|
||||
from src.integrations.gigachat_api_client import GigaChatClient
|
||||
|
||||
|
||||
class AICommandHandler(AbstractCommandHandler):
|
||||
"""
|
||||
Command handler for the /ai command in a Pyrogram bot.
|
||||
|
||||
This handler processes text input, sends it to the GigaChat AI model,
|
||||
and returns the generated response.
|
||||
|
||||
Attributes:
|
||||
COMMAND (`str`): The name of the command that this handler handles.
|
||||
logger (`Logger`): Logger instance for logging events.
|
||||
gigachat_client (`GigaChatClient`): Client for interacting with the GigaChat API.
|
||||
"""
|
||||
|
||||
def __init__(self, logger: Logger, gigachat_client: GigaChatClient) -> None:
|
||||
"""
|
||||
Initializes the AICommandHandler.
|
||||
|
||||
Args:
|
||||
logger (`Logger`): Logger instance for logging events.
|
||||
gigachat_client (`GigaChatClient`): Client for interacting with the GigaChat API.
|
||||
"""
|
||||
self.logger: Logger = logger
|
||||
self.gigachat_client: GigaChatClient = gigachat_client
|
||||
|
||||
@property
|
||||
def COMMAND(self) -> str:
|
||||
"""
|
||||
The name of the command that this handler handles.
|
||||
|
||||
Returns:
|
||||
str: The command name.
|
||||
"""
|
||||
return "ai"
|
||||
|
||||
def get_filters(self) -> Filter:
|
||||
"""
|
||||
Returns the filter for the /ai command.
|
||||
|
||||
Returns:
|
||||
`pyrogram.filters.Filter`: A Pyrogram filter matching the /ai command.
|
||||
"""
|
||||
return filters.command(self.COMMAND)
|
||||
|
||||
async def handle(self, client: Client, message: Message) -> None:
|
||||
"""
|
||||
Handles the /ai command.
|
||||
|
||||
Sends the user's input to the GigaChat AI model and returns the generated response.
|
||||
The command can be used with an inline argument or as a reply to a text message.
|
||||
|
||||
Args:
|
||||
client (`pyrogram.client.Client`): The Pyrogram client instance.
|
||||
message (`pyrogram.types.Message`): The incoming message object to process.
|
||||
"""
|
||||
self.logger.info(f"Received /{self.COMMAND} command from chat_id={message.chat.id}.")
|
||||
|
||||
# Extract the command argument or use the replied message's text
|
||||
command_argument: Optional[str] = " ".join(message.text.split()[1:])
|
||||
if not command_argument and message.reply_to_message and message.reply_to_message.text:
|
||||
command_argument = message.reply_to_message.text
|
||||
|
||||
if not command_argument:
|
||||
self.logger.warning(f"No argument provided for /{self.COMMAND} command in chat_id={message.chat.id}.")
|
||||
await message.reply(f"Please provide a message after /{self.COMMAND} or reply to a message.", quote=True)
|
||||
return
|
||||
|
||||
# Notify the user that the request is being processed
|
||||
processing_message: Message = await message.reply(
|
||||
f"{self.gigachat_client.model_name} is processing your request...", quote=True
|
||||
)
|
||||
|
||||
try:
|
||||
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
|
||||
response_text: str = self.gigachat_client.get_response(str(message.chat.id), command_argument)
|
||||
self.logger.debug(f"Generated response for chat_id={message.chat.id}")
|
||||
await processing_message.edit_text(response_text)
|
||||
except Exception as error:
|
||||
self.logger.error(f"Error processing /{self.COMMAND} command for chat_id={message.chat.id}: {error}", exc_info=True)
|
||||
await processing_message.edit_text("An error occurred while processing your request.")
|
||||
finally:
|
||||
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
|
@ -1,217 +0,0 @@
|
||||
import re
|
||||
from re import Match
|
||||
from logging import Logger
|
||||
|
||||
from pyrogram import filters
|
||||
from pyrogram.filters import Filter
|
||||
from pyrogram.client import Client
|
||||
from pyrogram.types import Message
|
||||
from pyrogram.enums import ChatAction
|
||||
from googletrans.models import Translated
|
||||
|
||||
from src.bot.handlers import AbstractCommandHandler
|
||||
from src.integrations.google_translate_client import GoogleTranslateClient
|
||||
|
||||
|
||||
class TranslateCommandHandler(AbstractCommandHandler):
|
||||
"""
|
||||
Command handler for the /translate command in a Pyrogram bot.
|
||||
|
||||
This handler translates text from one language to another using Google Translate.
|
||||
|
||||
Attributes:
|
||||
COMMAND (`str`): The name of the command that this handler handles.
|
||||
LANGUAGE_ALIASES (`dict`): Mapping of language codes to their aliases.
|
||||
DEFAULT_SOURCE_LANGUAGE (`str`): Default source language for translation ("auto" for auto-detection).
|
||||
DEFAULT_DESTINATION_LANGUAGE (`str`): Default destination language code for translation.
|
||||
logger (`Logger`): Logger instance for logging.
|
||||
translate_client (`GoogleTranslateClient`): Client for interacting with Google Translate.
|
||||
"""
|
||||
|
||||
# Mapping of language codes to their aliases
|
||||
LANGUAGE_ALIASES: dict[str, list[str]] = {
|
||||
"ru": ["ru", "rus", "russian"],
|
||||
"en": ["en", "eng", "english"],
|
||||
"es": ["es", "spa", "spanish"],
|
||||
"de": ["de", "ger", "german"],
|
||||
"fr": ["fr", "fra", "french"],
|
||||
"pt": ["pt", "por", "portuguese"],
|
||||
"it": ["it", "ita", "italian"],
|
||||
"zh": ["zh", "chi", "chinese"],
|
||||
"ja": ["ja", "jpn", "japanese"],
|
||||
"ko": ["ko", "kor", "korean"],
|
||||
"ar": ["ar", "ara", "arabic"],
|
||||
"tr": ["tr", "tur", "turkish"],
|
||||
"hi": ["hi", "hin", "hindi"],
|
||||
"vi": ["vi", "vie", "vietnamese"],
|
||||
"sv": ["sv", "swe", "swedish"],
|
||||
"no": ["no", "nor", "norwegian"],
|
||||
"da": ["da", "dan", "danish"],
|
||||
"fi": ["fi", "fin", "finnish"],
|
||||
"cs": ["cs", "cze", "czech"],
|
||||
"sk": ["sk", "slo", "slovak"],
|
||||
"ro": ["ro", "rum", "romanian"],
|
||||
"bg": ["bg", "bul", "bulgarian"],
|
||||
"uk": ["uk", "ukr", "ukrainian"],
|
||||
"be": ["be", "bel", "belarusian"],
|
||||
"et": ["et", "est", "estonian"],
|
||||
"lv": ["lv", "lav", "latvian"],
|
||||
"lt": ["lt", "lit", "lithuanian"],
|
||||
"tt": ["tt", "tat", "tatar"],
|
||||
"cv": ["cv", "chv", "chuvash"],
|
||||
}
|
||||
|
||||
# Default source language for translation ("auto" for auto-detection)
|
||||
DEFAULT_SOURCE_LANGUAGE: str = "auto"
|
||||
|
||||
# Default destination language code for translation
|
||||
DEFAULT_DESTINATION_LANGUAGE: str = "ru"
|
||||
|
||||
def __init__(self, logger: Logger, translate_client: GoogleTranslateClient) -> None:
|
||||
"""
|
||||
Initializes the TranslateCommandHandler.
|
||||
|
||||
Args:
|
||||
logger (`logging.Logger`): Logger instance for logging events.
|
||||
translate_client (`src.integrations.GoogleTranslateClient`): Client for interacting with Google Translate.
|
||||
"""
|
||||
self.logger: Logger = logger
|
||||
self.translate_client: GoogleTranslateClient = translate_client
|
||||
self.logger.info("GoogleTranslateClient initialized successfully.")
|
||||
|
||||
@property
|
||||
def COMMAND(self) -> str:
|
||||
"""
|
||||
The name of the command that this handler handles.
|
||||
"""
|
||||
return "translate"
|
||||
|
||||
def get_filters(self) -> Filter:
|
||||
"""
|
||||
Returns the filter for the /translate command.
|
||||
|
||||
Returns:
|
||||
`pyrogram.filters.Filter`: A Pyrogram filter matching the /translate command.
|
||||
"""
|
||||
return filters.command(self.COMMAND)
|
||||
|
||||
async def handle(self, client: Client, message: Message) -> None:
|
||||
"""
|
||||
Handles the /translate command.
|
||||
|
||||
Translates a given text or text from a replied-to message from one language to another.
|
||||
|
||||
Args:
|
||||
client (`pyrogram.client.Client`): The Pyrogram client instance.
|
||||
message (`pyrogram.types.Message`): The incoming message object to process.
|
||||
"""
|
||||
self.logger.info(
|
||||
f"Received /{self.COMMAND} command from chat_id={message.chat.id}."
|
||||
)
|
||||
|
||||
# Default values
|
||||
source_language: str = self.DEFAULT_SOURCE_LANGUAGE
|
||||
destination_language: str = self.DEFAULT_DESTINATION_LANGUAGE
|
||||
text: str | None = None
|
||||
|
||||
# Parse optional arguments using regex
|
||||
match_src: Match[str] | None = re.search(r"(?:src=|source=)(\w+)", message.text)
|
||||
match_dest: Match[str] | None = re.search(r"(?:dest=|destination=)(\w+)", message.text)
|
||||
|
||||
if match_src:
|
||||
source_language = match_src.group(1)
|
||||
|
||||
if match_dest:
|
||||
destination_language = match_dest.group(1)
|
||||
|
||||
# Extract text (everything after the last optional parameter)
|
||||
text_parts: str = re.sub(
|
||||
rf"(?:/{self.COMMAND}|src=\w+|source=\w+|dest=\w+|destination=\w+)", "", message.text
|
||||
).strip()
|
||||
|
||||
if text_parts:
|
||||
text = text_parts
|
||||
|
||||
self.logger.debug(
|
||||
f"Parsed parameters - source_language: {source_language}, destination_language: {destination_language}, text length: {len(text) if text else 0}"
|
||||
)
|
||||
|
||||
# Resolve language aliases
|
||||
try:
|
||||
if source_language != self.DEFAULT_SOURCE_LANGUAGE:
|
||||
source_language = self.__resolve_language(source_language)
|
||||
destination_language = self.__resolve_language(destination_language)
|
||||
except ValueError:
|
||||
await message.reply("Invalid language parameter provided.", quote=True)
|
||||
self.logger.error("Invalid language parameter provided.", exc_info=True)
|
||||
return
|
||||
|
||||
# Use replied message text if no text is provided
|
||||
if not text and message.reply_to_message and message.reply_to_message.text:
|
||||
text = message.reply_to_message.text
|
||||
|
||||
if not text:
|
||||
await message.reply(
|
||||
f"Please provide a message after /{self.COMMAND} or reply to a message.",
|
||||
quote=True
|
||||
)
|
||||
self.logger.warning(
|
||||
f"No argument provided for /{self.COMMAND} command in chat_id={message.chat.id}."
|
||||
)
|
||||
return
|
||||
|
||||
# Notify the user that the translation is in progress
|
||||
processing_message: Message = await message.reply(
|
||||
"Translating text...", quote=True
|
||||
)
|
||||
|
||||
try:
|
||||
# Perform translation
|
||||
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
|
||||
translation_result: Translated = await self.translate_client.translate_text(
|
||||
text=text,
|
||||
src_lang=source_language,
|
||||
dest_lang=destination_language
|
||||
)
|
||||
self.logger.debug(f"Translating text for chat_id={message.chat.id}")
|
||||
|
||||
# Formatted response text
|
||||
caption: str = f"Translated from {translation_result.src} to {translation_result.dest}"
|
||||
response_text: str = (
|
||||
f"<pre language=\"{caption}\">"
|
||||
f"{translation_result.text}"
|
||||
"</pre>"
|
||||
)
|
||||
|
||||
await processing_message.edit_text(response_text)
|
||||
|
||||
except Exception as error:
|
||||
self.logger.error(
|
||||
f"Error processing /{self.COMMAND} command for chat_id={message.chat.id}: {error}",
|
||||
exc_info=True
|
||||
)
|
||||
await processing_message.edit_text(
|
||||
"An error occurred during the translation process. Please try again later."
|
||||
)
|
||||
finally:
|
||||
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
|
||||
|
||||
def __resolve_language(self, language_input: str) -> str:
|
||||
"""
|
||||
Resolves the language code based on the input text.
|
||||
|
||||
Args:
|
||||
language_input (`str`): User-provided language parameter.
|
||||
|
||||
Returns:
|
||||
`str`: The resolved language code.
|
||||
|
||||
Raises:
|
||||
`ValueError`: If the input does not match any supported language.
|
||||
"""
|
||||
normalized_input: str = language_input.lower()
|
||||
for language_code, aliases in self.LANGUAGE_ALIASES.items():
|
||||
if normalized_input in aliases:
|
||||
return language_code
|
||||
self.logger.warning(f"Invalid language parameter provided: {language_input}")
|
||||
raise ValueError(f"Invalid language parameter provided: {language_input}")
|
@ -1,179 +0,0 @@
|
||||
import os
|
||||
from logging import Logger
|
||||
import tempfile
|
||||
|
||||
from pyrogram import filters
|
||||
from pyrogram.filters import Filter
|
||||
from pyrogram.client import Client
|
||||
from pyrogram.types import Message
|
||||
from pyrogram.enums import ChatAction
|
||||
from pyrogram.types.messages_and_media.video import Video
|
||||
from pyrogram.types.messages_and_media.video_note import VideoNote
|
||||
|
||||
from src.bot.handlers import AbstractCommandHandler
|
||||
from src.utils import video_processing, audio_processing
|
||||
|
||||
|
||||
class VideoCommandHandler(AbstractCommandHandler):
|
||||
"""
|
||||
Command handler for the /video command in a Pyrogram bot.
|
||||
|
||||
This handler processes video or video note messages, extracts audio,
|
||||
converts the audio to text in a specified language, and sends the result back to the user.
|
||||
|
||||
Attributes:
|
||||
COMMAND (`str`): The name of the command that this handler handles.
|
||||
LANGUAGE_ALIASES (`dict`): Mapping of language codes to their aliases.
|
||||
DEFAULT_LANGUAGE (`str`): Default language code for conversion if no language is specified.
|
||||
MAX_DURATION (`int`): Maximum allowed duration of the video in seconds.
|
||||
logger (`Logger`): Logger instance for logging events.
|
||||
"""
|
||||
|
||||
# Mapping of language codes to their aliases
|
||||
LANGUAGE_ALIASES: dict[str, list[str]] = {
|
||||
"en": ["en", "eng", "english"],
|
||||
"ru": ["ru", "rus", "russian"],
|
||||
}
|
||||
|
||||
# Default language code for conversion if no language is specified
|
||||
DEFAULT_LANGUAGE: str = "ru"
|
||||
|
||||
# Maximum allowed duration of the video message in seconds
|
||||
MAX_DURATION: int = 300
|
||||
|
||||
def __init__(self, logger: Logger) -> None:
|
||||
"""
|
||||
Initializes the VideoCommandHandler.
|
||||
|
||||
Args:
|
||||
logger (`Logger`): Logger instance for logging events.
|
||||
"""
|
||||
self.logger: Logger = logger
|
||||
|
||||
def get_filters(self) -> Filter:
|
||||
"""
|
||||
Returns the filter for the /video command.
|
||||
|
||||
Returns:
|
||||
`pyrogram.filters.Filter`: A Pyrogram filter matching the /video command.
|
||||
"""
|
||||
return filters.command(self.COMMAND)
|
||||
|
||||
@property
|
||||
def COMMAND(self) -> str:
|
||||
"""
|
||||
The name of the command that this handler handles.
|
||||
|
||||
Returns:
|
||||
str: The command name.
|
||||
"""
|
||||
return "video"
|
||||
|
||||
async def handle(self, client: Client, message: Message) -> None:
|
||||
"""
|
||||
Handles the /video command.
|
||||
|
||||
Extracts audio from a video or video note message, converts the audio to text
|
||||
in the specified language and sends the result back to the user. The command
|
||||
must be used in reply to a video or video note message.
|
||||
|
||||
Args:
|
||||
client (`pyrogram.client.Client`): The Pyrogram client instance.
|
||||
message (`pyrogram.types.Message`): The incoming message object to process.
|
||||
"""
|
||||
self.logger.info(f"Received /{self.COMMAND} command from chat_id={message.chat.id}.")
|
||||
|
||||
# Parse the language parameter or use the default (Russian)
|
||||
command_arguments: list[str] = message.text.split()
|
||||
try:
|
||||
language_code: str = (
|
||||
self.__resolve_language(command_arguments[1]) if len(command_arguments) > 1 else self.DEFAULT_LANGUAGE
|
||||
)
|
||||
except ValueError as error:
|
||||
await message.reply(str(error), quote=True)
|
||||
return
|
||||
|
||||
# Check if the reply is to a video or video note message
|
||||
if not (message.reply_to_message and (message.reply_to_message.video or message.reply_to_message.video_note)):
|
||||
self.logger.warning(f"The /{self.COMMAND} command was not used in reply to a video or video note message.")
|
||||
await message.reply("Please reply to a video or video note message with the /video command.", quote=True)
|
||||
return
|
||||
|
||||
# Identify the file type (video or video note)
|
||||
media_type: str = "video" if message.reply_to_message.video else "video_note"
|
||||
media: Video | VideoNote = message.reply_to_message.video if media_type == "video" else message.reply_to_message.video_note
|
||||
|
||||
# Notify the user that the request is being processed
|
||||
processing_message: Message = await message.reply_to_message.reply(
|
||||
"Processing video message to extract text...", quote=True
|
||||
)
|
||||
|
||||
with tempfile.NamedTemporaryFile(delete=False) as temp_video_file:
|
||||
video_file_path = await client.download_media(
|
||||
media.file_id,
|
||||
file_name=temp_video_file.name
|
||||
)
|
||||
self.logger.info(f"{media_type} message downloaded to {video_file_path}.")
|
||||
|
||||
try:
|
||||
# Validate video duration
|
||||
video_duration: float = await video_processing.get_video_duration(video_file_path) # type: ignore
|
||||
if video_duration > self.MAX_DURATION:
|
||||
self.logger.warning(f"{media_type} too long: {video_duration} seconds.")
|
||||
await processing_message.edit_text(
|
||||
f"The video or video note is too long (over {self.MAX_DURATION // 60} minutes). Please send a shorter one."
|
||||
)
|
||||
return
|
||||
|
||||
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
|
||||
|
||||
# Extract audio and convert it to text
|
||||
output_dir = os.path.dirname(video_file_path) # type: ignore
|
||||
audio_file_path: str = await video_processing.extract_audio_from_video(video_file_path, output_dir) # type: ignore
|
||||
extracted_text: str = await audio_processing.convert_voice_to_text(audio_file_path, language=language_code) # type: ignore
|
||||
self.logger.info(f"{media_type} message successfully converted to text.")
|
||||
|
||||
response_text: str = (
|
||||
f"<pre language=\"Conversion Result ({language_code})\">"
|
||||
f"{extracted_text}"
|
||||
"</pre>"
|
||||
)
|
||||
await processing_message.edit_text(response_text)
|
||||
|
||||
except FileNotFoundError:
|
||||
self.logger.error("File not found during processing.", exc_info=True)
|
||||
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
|
||||
except RuntimeError:
|
||||
self.logger.error("A runtime error occurred.", exc_info=True)
|
||||
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
|
||||
except Exception:
|
||||
self.logger.error("An unexpected error occurred.", exc_info=True)
|
||||
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
|
||||
finally:
|
||||
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
|
||||
if os.path.exists(video_file_path): # type: ignore
|
||||
os.remove(video_file_path) # type: ignore
|
||||
if os.path.exists(audio_file_path): # type: ignore
|
||||
os.remove(audio_file_path) # type: ignore
|
||||
|
||||
def __resolve_language(self, language_input: str) -> str:
|
||||
"""
|
||||
Resolves the language code based on the input text.
|
||||
|
||||
Args:
|
||||
language_input (str): User-provided language parameter.
|
||||
|
||||
Returns:
|
||||
str: The resolved language code.
|
||||
|
||||
Raises:
|
||||
ValueError: If the input does not match any supported language.
|
||||
"""
|
||||
normalized_input: str = language_input.lower()
|
||||
for language_code, aliases in self.LANGUAGE_ALIASES.items():
|
||||
if normalized_input in aliases:
|
||||
return language_code
|
||||
raise ValueError(
|
||||
"Invalid language parameter. Please use one of the following:\n" +
|
||||
"\n".join(f"{language_code}: {', '.join(aliases)}" for language_code, aliases in self.LANGUAGE_ALIASES.items())
|
||||
)
|
@ -1,166 +0,0 @@
|
||||
import os
|
||||
from logging import Logger
|
||||
import tempfile
|
||||
|
||||
from pyrogram import filters
|
||||
from pyrogram.filters import Filter
|
||||
from pyrogram.client import Client
|
||||
from pyrogram.types import Message
|
||||
from pyrogram.enums import ChatAction
|
||||
|
||||
from src.bot.handlers import AbstractCommandHandler
|
||||
from src.utils import audio_processing
|
||||
|
||||
|
||||
class VoiceCommandHandler(AbstractCommandHandler):
|
||||
"""
|
||||
Command handler for the /voice command in a Pyrogram bot.
|
||||
|
||||
This handler processes voice messages, converts them to text
|
||||
in a specified language, and sends the result back to the user.
|
||||
|
||||
Attributes:
|
||||
COMMAND (`str`): The name of the command that this handler handles.
|
||||
LANGUAGE_ALIASES (`dict`): Mapping of language codes to their aliases.
|
||||
DEFAULT_LANGUAGE (`str`): Default language code for conversion if no language is specified.
|
||||
MAX_DURATION (`int`): Maximum allowed duration of the voice message in seconds.
|
||||
logger (`Logger`): Logger instance for logging events.
|
||||
"""
|
||||
|
||||
# Mapping of language codes to their aliases
|
||||
LANGUAGE_ALIASES: dict[str, list[str]] = {
|
||||
"en": ["en", "eng", "english"],
|
||||
"ru": ["ru", "rus", "russian"],
|
||||
}
|
||||
|
||||
# Default language code for conversion if no language is specified
|
||||
DEFAULT_LANGUAGE: str = "ru"
|
||||
|
||||
# Maximum allowed duration of the voice message in seconds
|
||||
MAX_DURATION: int = 300
|
||||
|
||||
def __init__(self, logger: Logger) -> None:
|
||||
"""
|
||||
Initializes the VoiceCommandHandler.
|
||||
|
||||
Args:
|
||||
logger (`Logger`): Logger instance for logging events.
|
||||
"""
|
||||
self.logger: Logger = logger
|
||||
|
||||
@property
|
||||
def COMMAND(self) -> str:
|
||||
"""
|
||||
The name of the command that this handler handles.
|
||||
|
||||
Returns:
|
||||
str: The command name.
|
||||
"""
|
||||
return "voice"
|
||||
|
||||
def get_filters(self) -> Filter:
|
||||
"""
|
||||
Returns the filter for the /voice command.
|
||||
|
||||
Returns:
|
||||
`pyrogram.filters.Filter`: A Pyrogram filter matching the /voice command.
|
||||
"""
|
||||
return filters.command(self.COMMAND)
|
||||
|
||||
async def handle(self, client: Client, message: Message) -> None:
|
||||
"""
|
||||
Handles the /voice command.
|
||||
|
||||
Converts a voice message to text in the specified language
|
||||
and sends the result back to the user. The command must be used in reply
|
||||
to a voice message.
|
||||
|
||||
Args:
|
||||
client (`pyrogram.client.Client`): The Pyrogram client instance.
|
||||
message (`pyrogram.types.Message`): The incoming message object to process.
|
||||
"""
|
||||
self.logger.info(f"Received /{self.COMMAND} command from chat_id={message.chat.id}.")
|
||||
|
||||
# Parse the language parameter or use the default (Russian)
|
||||
command_arguments: list[str] = message.text.split()
|
||||
try:
|
||||
language_code: str = (
|
||||
self.__resolve_language(command_arguments[1]) if len(command_arguments) > 1 else self.DEFAULT_LANGUAGE
|
||||
)
|
||||
except ValueError as error:
|
||||
await message.reply(str(error), quote=True)
|
||||
return
|
||||
|
||||
if not (message.reply_to_message and message.reply_to_message.voice):
|
||||
self.logger.warning(f"The /{self.COMMAND} command was not used in reply to a voice message.")
|
||||
await message.reply("Please reply to a voice message with the /voice command.", quote=True)
|
||||
return
|
||||
|
||||
# Notify the user that the request is being processed
|
||||
processing_message: Message = await message.reply_to_message.reply(
|
||||
"Converting voice message to text...", quote=True
|
||||
)
|
||||
|
||||
with tempfile.NamedTemporaryFile(delete=False) as temp_audio_file:
|
||||
audio_file_path = await client.download_media(
|
||||
message.reply_to_message.voice.file_id,
|
||||
file_name=temp_audio_file.name
|
||||
)
|
||||
self.logger.info(f"Voice message downloaded to {audio_file_path}.")
|
||||
|
||||
try:
|
||||
# Validate voice message duration
|
||||
voice_duration: float = await audio_processing.get_audio_duration(audio_file_path)
|
||||
if voice_duration > self.MAX_DURATION:
|
||||
self.logger.warning(f"Voice message too long: {voice_duration} seconds.")
|
||||
await processing_message.edit_text(
|
||||
f"The voice message is too long (over {self.MAX_DURATION // 60} minutes). Please send a shorter one."
|
||||
)
|
||||
return
|
||||
|
||||
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
|
||||
extracted_text: str = await audio_processing.convert_voice_to_text(audio_file_path, language=language_code) # type: ignore
|
||||
self.logger.info("Voice message successfully converted to text.")
|
||||
|
||||
response_text: str = (
|
||||
f"<pre language=\"Conversion Result ({language_code})\">"
|
||||
f"{extracted_text}"
|
||||
"</pre>"
|
||||
)
|
||||
await processing_message.edit_text(response_text)
|
||||
|
||||
except FileNotFoundError:
|
||||
self.logger.error("File not found during processing.", exc_info=True)
|
||||
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
||||
except RuntimeError:
|
||||
self.logger.error("A runtime error occurred.", exc_info=True)
|
||||
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
||||
except Exception:
|
||||
self.logger.error("An unexpected error occurred.", exc_info=True)
|
||||
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
||||
finally:
|
||||
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
|
||||
if os.path.exists(audio_file_path): # type: ignore
|
||||
os.remove(audio_file_path) # type: ignore
|
||||
|
||||
def __resolve_language(self, language_input: str) -> str:
|
||||
"""
|
||||
Resolves the language code based on the input text.
|
||||
|
||||
Args:
|
||||
language_input (str): User-provided language parameter.
|
||||
|
||||
Returns:
|
||||
str: The resolved language code.
|
||||
|
||||
Raises:
|
||||
ValueError: If the input does not match any supported language.
|
||||
"""
|
||||
normalized_input: str = language_input.lower()
|
||||
for language_code, aliases in self.LANGUAGE_ALIASES.items():
|
||||
if normalized_input in aliases:
|
||||
return language_code
|
||||
raise ValueError(
|
||||
"Invalid language parameter. Please use one of the following:\n" +
|
||||
"\n".join(f"{language_code}: {', '.join(aliases)}" for language_code, aliases in self.LANGUAGE_ALIASES.items())
|
||||
)
|
@ -1,65 +0,0 @@
|
||||
import logging
|
||||
from logging import Logger
|
||||
|
||||
from pyrogram.client import Client
|
||||
|
||||
from src.integrations.gigachat_api_client import GigaChatClient
|
||||
from src.integrations.google_translate_client import GoogleTranslateClient
|
||||
from src.bot.handlers import AbstractCommandHandler
|
||||
from src.bot.handlers import AICommandHandler
|
||||
from src.bot.handlers import VoiceCommandHandler
|
||||
from src.bot.handlers import VideoCommandHandler
|
||||
from src.bot.handlers import TranslateCommandHandler
|
||||
|
||||
|
||||
class TelegramUserBot:
|
||||
"""
|
||||
A Telegram user bot.
|
||||
|
||||
Attributes:
|
||||
app (`Client`): The Pyrogram client instance for the bot.
|
||||
gigachat_client (`GigaChatClient`): The client instance for GigaChat integration.
|
||||
"""
|
||||
|
||||
def __init__(self, session_name: str, api_id: str, api_hash: str, gigachat_client: GigaChatClient, translate_client: GoogleTranslateClient) -> None:
|
||||
"""
|
||||
Initializes the Telegram user bot.
|
||||
|
||||
Args:
|
||||
session_name (`str`): The session name for the bot.
|
||||
api_id (`str`): The API ID for the Telegram application.
|
||||
api_hash (`str`): The API hash for the Telegram application.
|
||||
gigachat_client (`GigaChatClient`): An instance of GigaChatClient for handling AI responses.
|
||||
"""
|
||||
# Configure logging
|
||||
self.logger: Logger = logging.getLogger(__name__)
|
||||
|
||||
self.app: Client = Client(session_name, api_id=api_id, api_hash=api_hash)
|
||||
|
||||
# Configure handlers
|
||||
self.handlers: dict[str, AbstractCommandHandler] = {
|
||||
"ai": AICommandHandler(self.logger, gigachat_client),
|
||||
"voice": VoiceCommandHandler(self.logger),
|
||||
"video": VideoCommandHandler(self.logger),
|
||||
"translate": TranslateCommandHandler(self.logger, translate_client),
|
||||
}
|
||||
self.register_handlers()
|
||||
|
||||
def register_handlers(self) -> None:
|
||||
"""
|
||||
Registers the message handlers for the bot.
|
||||
"""
|
||||
self.logger.debug("Registering handlers.")
|
||||
for command, handler in self.handlers.items():
|
||||
self.app.on_message(filters=handler.get_filters())(handler.handle)
|
||||
|
||||
def run(self) -> None:
|
||||
"""
|
||||
Starts the bot.
|
||||
"""
|
||||
self.logger.info("Bot is starting.")
|
||||
print("Bot is running.")
|
||||
try:
|
||||
self.app.run()
|
||||
except Exception as e:
|
||||
self.logger.critical(f"Failed to start the bot: {e}", exc_info=True)
|
@ -1,27 +0,0 @@
|
||||
"""
|
||||
config.py
|
||||
|
||||
This file contains the configuration settings required for the Telegram bot and GigaChat integration.
|
||||
|
||||
Configuration settings:
|
||||
API_ID (str): The unique identifier for your Telegram application. It is required to initialize the Pyrogram client.
|
||||
API_HASH (str): The hash associated with your Telegram application. It is required to initialize the Pyrogram client.
|
||||
BOT_TOKEN (str): The token for the Telegram bot. It is used for authenticating the bot with the Telegram API.
|
||||
API_GIGACHAT_TOKEN (str): The token for authenticating with the GigaChat API. It is used for communication with the GigaChat service.
|
||||
|
||||
Note:
|
||||
- Make sure to keep these tokens secure and do not share them publicly.
|
||||
- These values should be replaced with actual credentials for the bot and the GigaChat API.
|
||||
"""
|
||||
|
||||
# The API ID for the Telegram application
|
||||
API_ID: str = ''
|
||||
|
||||
# The API hash for the Telegram application
|
||||
API_HASH: str = ''
|
||||
|
||||
# The bot token for authenticating the bot with Telegram
|
||||
BOT_TOKEN: str = ''
|
||||
|
||||
# The token for authenticating with the GigaChat API
|
||||
API_GIGACHAT_TOKEN: str = ''
|
@ -1,102 +0,0 @@
|
||||
import logging
|
||||
from logging import Logger
|
||||
from typing import Dict
|
||||
|
||||
from langchain_core.runnables.history import RunnableWithMessageHistory
|
||||
from langchain_core.chat_history import InMemoryChatMessageHistory
|
||||
from langchain_gigachat.chat_models import GigaChat
|
||||
|
||||
|
||||
class GigaChatClient:
|
||||
"""
|
||||
A client class for interacting with the GigaChat API using LangChain components.
|
||||
"""
|
||||
|
||||
def __init__(self, api_token: str, model_name: str = "GigaChat") -> None:
|
||||
"""
|
||||
Initializes the GigaChatManager with API credentials and a default model.
|
||||
|
||||
Args:
|
||||
api_token (str): The API token for authenticating with the GigaChat API.
|
||||
model_name (str): The GigaChat model to use. Defaults to "GigaChat".
|
||||
"""
|
||||
# Configure logging
|
||||
self.logger: Logger = logging.getLogger(__name__)
|
||||
|
||||
self.api_token: str = api_token
|
||||
self.model_name: str = model_name
|
||||
self.logger.info(f"Initialize GigaChat client Using model: {self.model_name}")
|
||||
|
||||
self.llm: GigaChat = self._create_llm(model_name)
|
||||
self.store: Dict[str, InMemoryChatMessageHistory] = {}
|
||||
self.conversation = RunnableWithMessageHistory(self.llm, self.get_session_history)
|
||||
|
||||
def _create_llm(self, model_name: str) -> GigaChat:
|
||||
"""
|
||||
Creates and configures a GigaChat LLM instance.
|
||||
|
||||
Args:
|
||||
model_name (str): The GigaChat model to use.
|
||||
|
||||
Returns:
|
||||
GigaChat: Configured GigaChat instance.
|
||||
"""
|
||||
self.logger.debug(f"Creating GigaChat LLM with model: {model_name}")
|
||||
return GigaChat(
|
||||
credentials=self.api_token,
|
||||
scope="GIGACHAT_API_PERS",
|
||||
model=model_name,
|
||||
verify_ssl_certs=False,
|
||||
streaming=False,
|
||||
)
|
||||
|
||||
def get_session_history(self, session_id: str) -> InMemoryChatMessageHistory:
|
||||
"""
|
||||
Retrieves the chat history for a given session, creating it if it does not exist.
|
||||
|
||||
Args:
|
||||
session_id (str): The unique identifier for the session.
|
||||
|
||||
Returns:
|
||||
InMemoryChatMessageHistory: The chat history for the session.
|
||||
"""
|
||||
if session_id not in self.store:
|
||||
self.logger.debug(f"Creating new session history for session_id: {session_id}")
|
||||
self.store[session_id] = InMemoryChatMessageHistory()
|
||||
else:
|
||||
self.logger.debug(f"Retrieving existing session history for session_id: {session_id}")
|
||||
return self.store[session_id]
|
||||
|
||||
def set_model(self, model_name: str) -> None:
|
||||
"""
|
||||
Updates the LLM to use a different GigaChat model.
|
||||
|
||||
Args:
|
||||
model_name (str): The new GigaChat model to use.
|
||||
"""
|
||||
self.logger.info(f"Switching model to: {model_name}")
|
||||
self.llm = self._create_llm(model_name)
|
||||
self.conversation = RunnableWithMessageHistory(self.llm, self.get_session_history)
|
||||
|
||||
def get_response(self, session_id: str, text: str) -> str:
|
||||
"""
|
||||
Get a response to the provided input text for a given session.
|
||||
|
||||
Args:
|
||||
session_id (str): The unique identifier for the session.
|
||||
text (str): The input text for which a response is needed.
|
||||
|
||||
Returns:
|
||||
str: The response text.
|
||||
"""
|
||||
self.logger.info(f"Generating response for session_id: {session_id}")
|
||||
try:
|
||||
response = self.conversation.invoke(
|
||||
input=text,
|
||||
config={"configurable": {"session_id": session_id}},
|
||||
)
|
||||
self.logger.debug(f"Response for session_id {session_id}")
|
||||
return response.content
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error while getting response for session_id: {session_id}. Error: {e}", exc_info=True)
|
||||
raise
|
@ -1,106 +0,0 @@
|
||||
from logging import Logger
|
||||
|
||||
from googletrans import Translator, LANGUAGES
|
||||
from googletrans.models import Translated, Detected
|
||||
|
||||
|
||||
class GoogleTranslateClient:
|
||||
"""
|
||||
A client for interacting with Google Translate.
|
||||
"""
|
||||
|
||||
def __init__(self, logger: Logger) -> None:
|
||||
"""
|
||||
Initializes the client for interacting with Google Translate.
|
||||
|
||||
Args:
|
||||
logger (`logging.Logger`): Logger instance for logging.
|
||||
"""
|
||||
self.logger: Logger = logger
|
||||
self.translator: Translator = Translator()
|
||||
self.logger.info("Google Translate client initialized successfully.")
|
||||
|
||||
async def get_available_languages(self) -> dict[str, str]:
|
||||
"""
|
||||
Retrieves a list of available languages supported by Google Translate.
|
||||
|
||||
Returns:
|
||||
`dict[str, str]`: A dictionary where keys are language codes and values are language names.
|
||||
"""
|
||||
self.logger.info("Retrieving available languages.")
|
||||
return LANGUAGES
|
||||
|
||||
async def detect_language(self, text: str) -> Detected:
|
||||
"""
|
||||
Detects the language of a given text.
|
||||
|
||||
Args:
|
||||
text (`str`): The text for language detection.
|
||||
|
||||
Returns:
|
||||
`googletrans.models.Detected`: The detection object containing the detected language and confidence.
|
||||
"""
|
||||
try:
|
||||
self.logger.info("Detecting language for text.")
|
||||
detection: Detected = await self.translator.detect(text)
|
||||
self.logger.debug("Detection language completed successfully.")
|
||||
return detection
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error during language detection: {e}", exc_info=True)
|
||||
raise RuntimeError(f"Error during language detection: {e}")
|
||||
|
||||
async def translate_text(
|
||||
self,
|
||||
text: str,
|
||||
dest_lang: str = "ru",
|
||||
src_lang: str = "auto"
|
||||
) -> Translated:
|
||||
"""
|
||||
Translates a given text to the target language.
|
||||
|
||||
Args:
|
||||
text (`str`): The text to be translated.
|
||||
dest_lang (`str`): The target language code (e.g., 'ru' for Russian). Defaults to 'ru'.
|
||||
src_lang (`str`): The source language code. Defaults to 'auto' for automatic detection.
|
||||
|
||||
Returns:
|
||||
`googletrans.models.Translated`: The translation object containing the translated text and metadata.
|
||||
"""
|
||||
try:
|
||||
self.logger.info(f"Translating text from {src_lang} to {dest_lang}.")
|
||||
translation: Translated = await self.translator.translate(
|
||||
text, dest_lang, src_lang
|
||||
)
|
||||
self.logger.info("Translation completed successfully.")
|
||||
return translation
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error during translation: {e}", exc_info=True)
|
||||
raise RuntimeError(f"Error during translation: {e}")
|
||||
|
||||
async def translate_batch(
|
||||
self,
|
||||
texts: list[str],
|
||||
dest_lang: str = "ru",
|
||||
src_lang: str = "auto"
|
||||
) -> list[Translated]:
|
||||
"""
|
||||
Translates a list of texts to the target language.
|
||||
|
||||
Args:
|
||||
texts (`list[str]`): A list of texts to be translated.
|
||||
dest_lang (`str`): The target language code (e.g., 'ru' for Russian). Defaults to 'ru'.
|
||||
src_lang (`str`): The source language code. Defaults to 'auto' for automatic detection.
|
||||
|
||||
Returns:
|
||||
`list[googletrans.models.Translated]`: A list of translation objects containing the translated texts and metadata.
|
||||
"""
|
||||
try:
|
||||
self.logger.info(f"Translating batch of {len(texts)} texts from {src_lang} to {dest_lang}.")
|
||||
translations: list[Translated] = await self.translator.translate(
|
||||
texts, dest_lang, src_lang
|
||||
)
|
||||
self.logger.info("Batch translation completed successfully.")
|
||||
return translations
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error during batch translation: {e}", exc_info=True)
|
||||
raise RuntimeError(f"Error during batch translation: {e}")
|
@ -1,124 +0,0 @@
|
||||
import os
|
||||
import logging
|
||||
from logging import Logger
|
||||
import asyncio
|
||||
|
||||
from pydub import AudioSegment
|
||||
import speech_recognition as sr
|
||||
from speech_recognition.audio import AudioData
|
||||
|
||||
|
||||
# Configure logging
|
||||
logger: Logger = logging.getLogger(__name__)
|
||||
|
||||
async def convert_to_wav(file_path: str) -> str:
|
||||
"""
|
||||
Converts an audio file to WAV format if it is not already in WAV format.
|
||||
|
||||
Args:
|
||||
file_path (`str`): The path to the audio file to be converted.
|
||||
|
||||
Returns:
|
||||
`str`: The path to the converted or original WAV file.
|
||||
|
||||
Raises:
|
||||
`FileNotFoundError`: If the file does not exist.
|
||||
`RuntimeError`: If the conversion fails for any reason.
|
||||
"""
|
||||
# Check if the file exists
|
||||
if not os.path.exists(file_path):
|
||||
logger.error(f"File {file_path} does not exist.")
|
||||
raise FileNotFoundError(f"File {file_path} does not exist.")
|
||||
|
||||
if file_path.lower().endswith('.wav'):
|
||||
logger.info(f"File {file_path} is already in WAV format.")
|
||||
return file_path
|
||||
|
||||
try:
|
||||
logger.info(f"Converting {file_path} to WAV format.")
|
||||
audio = await asyncio.to_thread(AudioSegment.from_file, file_path)
|
||||
wav_path: str = f"{os.path.splitext(file_path)[0]}.wav"
|
||||
await asyncio.to_thread(audio.export, wav_path, format="wav")
|
||||
logger.info(f"File converted to {wav_path}.")
|
||||
return wav_path
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to convert file to WAV: {e}")
|
||||
raise RuntimeError(f"Failed to convert file to WAV: {e}")
|
||||
|
||||
async def get_audio_duration(file_path: str) -> float:
|
||||
"""
|
||||
Retrieves the duration of an audio file in seconds.
|
||||
|
||||
Args:
|
||||
file_path (`str`): The path to the audio file.
|
||||
|
||||
Returns:
|
||||
`float`: The duration of the audio file in seconds.
|
||||
|
||||
Raises:
|
||||
`FileNotFoundError`: If the file does not exist.
|
||||
`RuntimeError`: If unable to get the file duration.
|
||||
"""
|
||||
# Check if the file exists
|
||||
if not os.path.exists(file_path):
|
||||
logger.error(f"File {file_path} does not exist.")
|
||||
raise FileNotFoundError(f"File {file_path} does not exist.")
|
||||
|
||||
try:
|
||||
logger.info(f"Getting duration of {file_path}.")
|
||||
audio = await asyncio.to_thread(AudioSegment.from_file, file_path)
|
||||
duration: float = len(audio) / 1000 # Duration in seconds
|
||||
logger.info(f"Duration of {file_path}: {duration} seconds.")
|
||||
return duration
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get file duration: {e}")
|
||||
raise RuntimeError(f"Failed to get file duration: {e}")
|
||||
|
||||
async def convert_voice_to_text(file_path: str, language='ru') -> str:
|
||||
"""
|
||||
Converts speech from an audio file to text using OpenAI speech recognition service.
|
||||
|
||||
Args:
|
||||
file_path (`str`): The path to the audio file to be processed.
|
||||
language (`str`): The language code for speech recognition (default is 'ru').
|
||||
|
||||
Returns:
|
||||
`str`: The transcribed text if recognition is successful.
|
||||
|
||||
Raises:
|
||||
`FileNotFoundError`: If the file does not exist.
|
||||
`RuntimeError`: For any errors encountered during processing.
|
||||
"""
|
||||
# Check if the file exists
|
||||
if not os.path.exists(file_path):
|
||||
logger.error(f"File {file_path} does not exist.")
|
||||
raise FileNotFoundError("File does not exist.")
|
||||
|
||||
# Convert the file to WAV format if necessary
|
||||
try:
|
||||
wav_path: str = await convert_to_wav(file_path)
|
||||
except RuntimeError as e:
|
||||
logger.error(f"Error converting to WAV: {e}")
|
||||
raise RuntimeError(f"Error converting to WAV: {e}")
|
||||
|
||||
recognizer = sr.Recognizer()
|
||||
|
||||
try:
|
||||
duration: float = await get_audio_duration(wav_path)
|
||||
logger.info(f"Processing file {wav_path} ({duration} sec) for speech recognition.")
|
||||
with sr.AudioFile(wav_path) as source:
|
||||
audio_data: AudioData = await asyncio.to_thread(recognizer.record, source)
|
||||
text = await asyncio.to_thread(
|
||||
recognizer.recognize_whisper, audio_data, language=language, model='medium'
|
||||
)
|
||||
logger.info("Speech recognition successful.")
|
||||
return text # type: ignore
|
||||
except sr.UnknownValueError:
|
||||
logger.warning(f"Speech in {wav_path} could not be recognized.")
|
||||
raise RuntimeError("Speech could not be recognized.")
|
||||
except sr.RequestError as e:
|
||||
logger.error(f"Request error from the recognition service: {e}")
|
||||
raise RuntimeError(f"Request error from the recognition service: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"An unexpected error occurred: {e}")
|
||||
raise RuntimeError(f"An unexpected error occurred: {e}")
|
@ -1,62 +0,0 @@
|
||||
import logging
|
||||
import logging.config
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def setup_logging(output_to_console=False) -> None:
|
||||
"""
|
||||
Configures the logging system.
|
||||
|
||||
This function sets up logging with optional output to the console and ensures
|
||||
log files are rotated daily. It creates a detailed logging format and retains
|
||||
log files for a week.
|
||||
|
||||
Args:
|
||||
output_to_console (`bool`): If True, log messages will also be printed to the console.
|
||||
Defaults to False.
|
||||
"""
|
||||
# Define the default handlers to use. Always logs to a file.
|
||||
handlers: list[str] = ['file']
|
||||
if output_to_console:
|
||||
# Add console logging if requested
|
||||
handlers.append('console')
|
||||
|
||||
# Generate the log file name with the current date
|
||||
log_filename: str = f'logs/log-{datetime.now().strftime("%Y-%m-%d")}.log'
|
||||
|
||||
# Configure the logging settings using a dictionary
|
||||
logging.config.dictConfig({
|
||||
'version': 1, # Logging configuration version
|
||||
'disable_existing_loggers': True, # Deny other loggers to remain active
|
||||
'formatters': {
|
||||
'detailed': { # Define a detailed logging format
|
||||
'format': (
|
||||
'%(asctime)s | %(levelname)-8s | '
|
||||
'%(filename)s.%(funcName)s, line %(lineno)d: '
|
||||
'%(message)s'
|
||||
),
|
||||
'datefmt': '%Y-%m-%d %H:%M:%S' # Timestamp format
|
||||
},
|
||||
},
|
||||
'handlers': {
|
||||
# Console handler outputs log messages to the console
|
||||
'console': {
|
||||
'class': 'logging.StreamHandler', # Standard output stream
|
||||
'formatter': 'detailed', # Use the detailed formatter
|
||||
},
|
||||
# File handler writes log messages to a file, rotating daily
|
||||
'file': {
|
||||
'class': 'logging.handlers.TimedRotatingFileHandler',
|
||||
'filename': log_filename, # Log file path
|
||||
'when': 'midnight', # Rotate log files at midnight
|
||||
'interval': 1, # Rotate daily
|
||||
'backupCount': 7, # Keep up to 7 old log files
|
||||
'formatter': 'detailed', # Use the detailed formatter
|
||||
},
|
||||
},
|
||||
# Define the root logger configuration
|
||||
'root': {
|
||||
'handlers': handlers, # Handlers to use (console, file, or both)
|
||||
'level': 'DEBUG', # Log level (DEBUG logs all levels)
|
||||
},
|
||||
})
|
@ -1,71 +0,0 @@
|
||||
import os
|
||||
import logging
|
||||
from logging import Logger
|
||||
import asyncio
|
||||
|
||||
from moviepy import VideoFileClip
|
||||
|
||||
|
||||
# Configure logging
|
||||
logger: Logger = logging.getLogger(__name__)
|
||||
|
||||
async def get_video_duration(video_path: str) -> float:
|
||||
"""
|
||||
Get the duration of a video file in seconds.
|
||||
|
||||
Args:
|
||||
video_path (`str`): The path to the video file.
|
||||
|
||||
Returns:
|
||||
`float`: The duration of the video in seconds.
|
||||
|
||||
Raises:
|
||||
`FileNotFoundError`: If the video file does not exist.
|
||||
`RuntimeError`: If an error occurs during processing.
|
||||
"""
|
||||
if not os.path.exists(video_path):
|
||||
logger.error(f"Video file {video_path} does not exist.")
|
||||
raise FileNotFoundError(f"Video file {video_path} does not exist.")
|
||||
|
||||
try:
|
||||
video_clip: VideoFileClip = await asyncio.to_thread(VideoFileClip, video_path)
|
||||
duration = video_clip.duration
|
||||
logger.info(f"Duration of video {video_path}: {duration} seconds.")
|
||||
return duration
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get video duration: {e}", exc_info=True)
|
||||
raise RuntimeError(f"Failed to get video duration: {e}")
|
||||
finally:
|
||||
await asyncio.to_thread(video_clip.close)
|
||||
|
||||
async def extract_audio_from_video(video_path: str, output_dir: str) -> str:
|
||||
"""
|
||||
Extracts the audio track from a video file and saves it as a WAV file.
|
||||
|
||||
Args:
|
||||
video_path (`str`): The path to the video file.
|
||||
output_dir (`str`): The directory where the audio file will be saved.
|
||||
|
||||
Returns:
|
||||
`str`: The path to the extracted audio file.
|
||||
|
||||
Raises:
|
||||
`FileNotFoundError`: If the video file does not exist.
|
||||
`RuntimeError`: If an error occurs during audio extraction.
|
||||
"""
|
||||
if not os.path.exists(video_path):
|
||||
logger.error(f"Video file {video_path} does not exist.")
|
||||
raise FileNotFoundError(f"Video file {video_path} does not exist.")
|
||||
|
||||
try:
|
||||
logger.info(f"Extracting audio from video: {video_path}")
|
||||
video_clip: VideoFileClip = await asyncio.to_thread(VideoFileClip, video_path)
|
||||
audio_path: str = os.path.join(output_dir, f"{os.path.splitext(os.path.basename(video_path))[0]}.wav")
|
||||
await asyncio.to_thread(video_clip.audio.write_audiofile, audio_path) # type: ignore
|
||||
logger.info(f"Audio extracted and saved to: {audio_path}")
|
||||
return audio_path
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to extract audio from video: {e}", exc_info=True)
|
||||
raise RuntimeError(f"Failed to extract audio from video: {e}")
|
||||
finally:
|
||||
await asyncio.to_thread(video_clip.close)
|
@ -1,55 +0,0 @@
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
from langchain_core.chat_history import InMemoryChatMessageHistory
|
||||
from langchain_gigachat import GigaChat
|
||||
|
||||
from src.integrations.gigachat_api_client import GigaChatClient
|
||||
from src.core.configuration import config
|
||||
|
||||
|
||||
API_GIGACHAT_TOKEN: str = config.API_GIGACHAT_TOKEN
|
||||
|
||||
@pytest.fixture
|
||||
def gigachat_client() -> GigaChatClient:
|
||||
"""Fixture to create a GigaChatClient instance with a mock API token."""
|
||||
return GigaChatClient(api_token=API_GIGACHAT_TOKEN)
|
||||
|
||||
def test_initialization(gigachat_client) -> None:
|
||||
"""Test if the GigaChatClient initializes correctly."""
|
||||
assert gigachat_client.api_token == API_GIGACHAT_TOKEN
|
||||
assert gigachat_client.model_name == "GigaChat"
|
||||
assert isinstance(gigachat_client.store, dict)
|
||||
assert gigachat_client.llm is not None
|
||||
|
||||
def test_create_llm() -> None:
|
||||
"""Test the _create_llm method for proper LLM creation."""
|
||||
client = GigaChatClient(api_token=API_GIGACHAT_TOKEN)
|
||||
mock_llm: GigaChat = client._create_llm("GigaChat-Pro")
|
||||
assert mock_llm.credentials == API_GIGACHAT_TOKEN
|
||||
assert mock_llm.model == "GigaChat-Pro"
|
||||
|
||||
def test_get_session_history(gigachat_client) -> None:
|
||||
"""Test the get_session_history method for creating/retrieving session history."""
|
||||
session_id = "test_session"
|
||||
history = gigachat_client.get_session_history(session_id)
|
||||
assert isinstance(history, InMemoryChatMessageHistory)
|
||||
assert session_id in gigachat_client.store
|
||||
assert gigachat_client.store[session_id] is history
|
||||
|
||||
def test_set_model(gigachat_client) -> None:
|
||||
"""Test the set_model method for updating the LLM model."""
|
||||
new_model = "GigaChat-Pro"
|
||||
gigachat_client.set_model(new_model)
|
||||
assert gigachat_client.llm.model == new_model
|
||||
|
||||
def test_get_response() -> None:
|
||||
"""Test the get_response method by verifying the response code."""
|
||||
with patch("langchain_core.runnables.history.RunnableWithMessageHistory") as MockRunnable:
|
||||
mock_runnable = MagicMock()
|
||||
mock_runnable.invoke.return_value.code = 200
|
||||
MockRunnable.return_value = mock_runnable
|
||||
|
||||
client = GigaChatClient(api_token=API_GIGACHAT_TOKEN)
|
||||
response_code = mock_runnable.invoke.return_value.code
|
||||
|
||||
assert response_code == 200
|
Loading…
Reference in New Issue
Block a user