Compare commits
No commits in common. "feature/refactor-command-handlers" and "main" have entirely different histories.
feature/re
...
main
7
.gitignore
vendored
7
.gitignore
vendored
@ -129,9 +129,6 @@ ENV/
|
|||||||
env.bak/
|
env.bak/
|
||||||
venv.bak/
|
venv.bak/
|
||||||
|
|
||||||
# Configuration
|
|
||||||
config.py
|
|
||||||
|
|
||||||
# Spyder project settings
|
# Spyder project settings
|
||||||
.spyderproject
|
.spyderproject
|
||||||
.spyproject
|
.spyproject
|
||||||
@ -164,7 +161,6 @@ cython_debug/
|
|||||||
#.idea/
|
#.idea/
|
||||||
|
|
||||||
# ---> VisualStudioCode
|
# ---> VisualStudioCode
|
||||||
.vscode
|
|
||||||
.vscode/*
|
.vscode/*
|
||||||
!.vscode/settings.json
|
!.vscode/settings.json
|
||||||
!.vscode/tasks.json
|
!.vscode/tasks.json
|
||||||
@ -178,6 +174,3 @@ cython_debug/
|
|||||||
# Built Visual Studio Code Extensions
|
# Built Visual Studio Code Extensions
|
||||||
*.vsix
|
*.vsix
|
||||||
|
|
||||||
# Session info
|
|
||||||
*.session
|
|
||||||
*.session-journal
|
|
33
main.py
33
main.py
@ -1,33 +0,0 @@
|
|||||||
from src.integrations.gigachat_api_client import GigaChatClient
|
|
||||||
from src.bot.telegram_userbot import TelegramUserBot
|
|
||||||
from src.utils import logging
|
|
||||||
from src.core.configuration import config
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
|
||||||
"""
|
|
||||||
Entry point for starting the Telegram user bot.
|
|
||||||
"""
|
|
||||||
# Configure logging
|
|
||||||
logging.setup_logging()
|
|
||||||
|
|
||||||
# Load API credentials and configuration
|
|
||||||
api_id: str = config.API_ID
|
|
||||||
api_hash: str = config.API_HASH
|
|
||||||
api_token: str = config.API_GIGACHAT_TOKEN
|
|
||||||
|
|
||||||
# Initialize GigaChatClient
|
|
||||||
gigachat_client: GigaChatClient = GigaChatClient(api_token=api_token)
|
|
||||||
|
|
||||||
# Initialize and run the Telegram user bot
|
|
||||||
bot: TelegramUserBot = TelegramUserBot(
|
|
||||||
session_name="userbot",
|
|
||||||
api_id=api_id,
|
|
||||||
api_hash=api_hash,
|
|
||||||
gigachat_client=gigachat_client
|
|
||||||
)
|
|
||||||
bot.run()
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
2094
poetry.lock
generated
2094
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@ -6,24 +6,7 @@ authors = ["Factorino73 <masenkin73@xmail.ru>"]
|
|||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
|
|
||||||
[tool.poetry.dependencies]
|
[tool.poetry.dependencies]
|
||||||
python = "^3.12"
|
python = "^3.13"
|
||||||
pyrogram = "^2.0.106"
|
|
||||||
tgcrypto = "^1.2.5"
|
|
||||||
setuptools = "^75.6.0"
|
|
||||||
wheel = "^0.45.1"
|
|
||||||
langchain-gigachat = "^0.3.2"
|
|
||||||
punq = "^0.7.0"
|
|
||||||
pytest = "^8.3.4"
|
|
||||||
speechrecognition = "^3.13.0"
|
|
||||||
typing-extensions = "^4.12.2"
|
|
||||||
pydub = "^0.25.1"
|
|
||||||
numpy = "2.0.2"
|
|
||||||
soundfile = "^0.13.0"
|
|
||||||
torch = "^2.5.1"
|
|
||||||
llvmlite = "0.43.0"
|
|
||||||
numba = "0.60.0"
|
|
||||||
openai-whisper = "^20240930"
|
|
||||||
moviepy = "^2.1.1"
|
|
||||||
|
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
|
@ -1,4 +0,0 @@
|
|||||||
from src.bot.handlers.abstract_command_handler import AbstractCommandHandler
|
|
||||||
from src.bot.handlers.ai_command_handler import AICommandHandler
|
|
||||||
from src.bot.handlers.voice_command_handler import VoiceCommandHandler
|
|
||||||
from src.bot.handlers.video_command_handler import VideoCommandHandler
|
|
@ -1,55 +0,0 @@
|
|||||||
from abc import ABC, abstractmethod
|
|
||||||
|
|
||||||
from pyrogram.filters import Filter
|
|
||||||
from pyrogram.client import Client
|
|
||||||
from pyrogram.types import Message
|
|
||||||
|
|
||||||
|
|
||||||
class AbstractCommandHandler(ABC):
|
|
||||||
"""
|
|
||||||
Abstract base class for creating command handlers in a Pyrogram bot.
|
|
||||||
|
|
||||||
This class provides a structure for defining custom command handlers.
|
|
||||||
Each subclass must implement the following abstract methods:
|
|
||||||
|
|
||||||
- `get_filters`: Defines the filters used to match messages that
|
|
||||||
the command handler should process.
|
|
||||||
- `handle`: Contains the logic to execute when a message matches
|
|
||||||
the defined filters.
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
COMMAND (`str`): The name of the command that this handler handles.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
@abstractmethod
|
|
||||||
def COMMAND(self) -> str:
|
|
||||||
"""
|
|
||||||
The name of the command that this handler handles.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The command name.
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def get_filters(self) -> Filter:
|
|
||||||
"""
|
|
||||||
Returns the filters for this command handler.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
`pyrogram.filters.Filter`: A Pyrogram filter or a custom filter
|
|
||||||
determines which messages this handler processes.
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
async def handle(self, client: Client, message: Message) -> None:
|
|
||||||
"""
|
|
||||||
Handles the command logic.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
client (`pyrogram.client.Client`): The Pyrogram client instance.
|
|
||||||
message (`pyrogram.types.Message`): The incoming message object to process.
|
|
||||||
"""
|
|
||||||
pass
|
|
@ -1,94 +0,0 @@
|
|||||||
from logging import Logger
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from pyrogram import filters
|
|
||||||
from pyrogram.filters import Filter
|
|
||||||
from pyrogram.client import Client
|
|
||||||
from pyrogram.types import Message
|
|
||||||
from pyrogram.enums import ChatAction
|
|
||||||
|
|
||||||
from src.bot.handlers import AbstractCommandHandler
|
|
||||||
from src.integrations.gigachat_api_client import GigaChatClient
|
|
||||||
|
|
||||||
|
|
||||||
class AICommandHandler(AbstractCommandHandler):
|
|
||||||
"""
|
|
||||||
Command handler for the /ai command in a Pyrogram bot.
|
|
||||||
|
|
||||||
This handler processes text input, sends it to the GigaChat AI model,
|
|
||||||
and returns the generated response.
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
COMMAND (`str`): The name of the command that this handler handles.
|
|
||||||
logger (`Logger`): Logger instance for logging events.
|
|
||||||
gigachat_client (`GigaChatClient`): Client for interacting with the GigaChat API.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, logger: Logger, gigachat_client: GigaChatClient) -> None:
|
|
||||||
"""
|
|
||||||
Initializes the AICommandHandler.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
logger (`Logger`): Logger instance for logging events.
|
|
||||||
gigachat_client (`GigaChatClient`): Client for interacting with the GigaChat API.
|
|
||||||
"""
|
|
||||||
self.logger: Logger = logger
|
|
||||||
self.gigachat_client: GigaChatClient = gigachat_client
|
|
||||||
|
|
||||||
@property
|
|
||||||
def COMMAND(self) -> str:
|
|
||||||
"""
|
|
||||||
The name of the command that this handler handles.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The command name.
|
|
||||||
"""
|
|
||||||
return "ai"
|
|
||||||
|
|
||||||
def get_filters(self) -> Filter:
|
|
||||||
"""
|
|
||||||
Returns the filter for the /ai command.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
`pyrogram.filters.Filter`: A Pyrogram filter matching the /ai command.
|
|
||||||
"""
|
|
||||||
return filters.command(self.COMMAND)
|
|
||||||
|
|
||||||
async def handle(self, client: Client, message: Message) -> None:
|
|
||||||
"""
|
|
||||||
Handles the /ai command.
|
|
||||||
|
|
||||||
Sends the user's input to the GigaChat AI model and returns the generated response.
|
|
||||||
The command can be used with an inline argument or as a reply to a text message.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
client (`pyrogram.client.Client`): The Pyrogram client instance.
|
|
||||||
message (`pyrogram.types.Message`): The incoming message object to process.
|
|
||||||
"""
|
|
||||||
self.logger.info(f"Received /{self.COMMAND} command from chat_id={message.chat.id}.")
|
|
||||||
|
|
||||||
# Extract the command argument or use the replied message's text
|
|
||||||
command_argument: Optional[str] = " ".join(message.text.split()[1:])
|
|
||||||
if not command_argument and message.reply_to_message and message.reply_to_message.text:
|
|
||||||
command_argument = message.reply_to_message.text
|
|
||||||
|
|
||||||
if not command_argument:
|
|
||||||
self.logger.warning(f"No argument provided for /{self.COMMAND} command in chat_id={message.chat.id}.")
|
|
||||||
await message.reply(f"Please provide a message after /{self.COMMAND} or reply to a message.", quote=True)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Notify the user that the request is being processed
|
|
||||||
processing_message: Message = await message.reply(
|
|
||||||
f"{self.gigachat_client.model_name} is processing your request...", quote=True
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
|
|
||||||
response_text: str = self.gigachat_client.get_response(str(message.chat.id), command_argument)
|
|
||||||
self.logger.debug(f"Generated response for chat_id={message.chat.id}")
|
|
||||||
await processing_message.edit_text(response_text)
|
|
||||||
except Exception as error:
|
|
||||||
self.logger.error(f"Error processing /{self.COMMAND} command for chat_id={message.chat.id}: {error}", exc_info=True)
|
|
||||||
await processing_message.edit_text("An error occurred while processing your request.")
|
|
||||||
finally:
|
|
||||||
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
|
|
@ -1,179 +0,0 @@
|
|||||||
import os
|
|
||||||
from logging import Logger
|
|
||||||
from tempfile import NamedTemporaryFile
|
|
||||||
|
|
||||||
from pyrogram import filters
|
|
||||||
from pyrogram.filters import Filter
|
|
||||||
from pyrogram.client import Client
|
|
||||||
from pyrogram.types import Message
|
|
||||||
from pyrogram.enums import ChatAction
|
|
||||||
from pyrogram.types.messages_and_media.video import Video
|
|
||||||
from pyrogram.types.messages_and_media.video_note import VideoNote
|
|
||||||
|
|
||||||
from src.bot.handlers import AbstractCommandHandler
|
|
||||||
from src.utils import video_processing, audio_processing
|
|
||||||
|
|
||||||
|
|
||||||
class VideoCommandHandler(AbstractCommandHandler):
|
|
||||||
"""
|
|
||||||
Command handler for the /video command in a Pyrogram bot.
|
|
||||||
|
|
||||||
This handler processes video or video note messages, extracts audio,
|
|
||||||
converts the audio to text in a specified language, and sends the result back to the user.
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
COMMAND (`str`): The name of the command that this handler handles.
|
|
||||||
LANGUAGE_ALIASES (`dict`): Mapping of language codes to their aliases.
|
|
||||||
DEFAULT_LANGUAGE (`str`): Default language code for conversion if no language is specified.
|
|
||||||
MAX_DURATION (`int`): Maximum allowed duration of the video in seconds.
|
|
||||||
logger (`Logger`): Logger instance for logging events.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Mapping of language codes to their aliases
|
|
||||||
LANGUAGE_ALIASES: dict[str, list[str]] = {
|
|
||||||
"en": ["en", "eng", "english"],
|
|
||||||
"ru": ["ru", "rus", "russian"],
|
|
||||||
}
|
|
||||||
|
|
||||||
# Default language code for conversion if no language is specified
|
|
||||||
DEFAULT_LANGUAGE: str = "ru"
|
|
||||||
|
|
||||||
# Maximum allowed duration of the video message in seconds
|
|
||||||
MAX_DURATION: int = 300
|
|
||||||
|
|
||||||
def __init__(self, logger: Logger) -> None:
|
|
||||||
"""
|
|
||||||
Initializes the VideoCommandHandler.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
logger (`Logger`): Logger instance for logging events.
|
|
||||||
"""
|
|
||||||
self.logger: Logger = logger
|
|
||||||
|
|
||||||
def get_filters(self) -> Filter:
|
|
||||||
"""
|
|
||||||
Returns the filter for the /video command.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
`pyrogram.filters.Filter`: A Pyrogram filter matching the /video command.
|
|
||||||
"""
|
|
||||||
return filters.command(self.COMMAND)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def COMMAND(self) -> str:
|
|
||||||
"""
|
|
||||||
The name of the command that this handler handles.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The command name.
|
|
||||||
"""
|
|
||||||
return "video"
|
|
||||||
|
|
||||||
async def handle(self, client: Client, message: Message) -> None:
|
|
||||||
"""
|
|
||||||
Handles the /video command.
|
|
||||||
|
|
||||||
Extracts audio from a video or video note message, converts the audio to text
|
|
||||||
in the specified language and sends the result back to the user. The command
|
|
||||||
must be used in reply to a video or video note message.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
client (`pyrogram.client.Client`): The Pyrogram client instance.
|
|
||||||
message (`pyrogram.types.Message`): The incoming message object to process.
|
|
||||||
"""
|
|
||||||
self.logger.info(f"Received /{self.COMMAND} command from chat_id={message.chat.id}.")
|
|
||||||
|
|
||||||
# Parse the language parameter or use the default (Russian)
|
|
||||||
command_arguments: list[str] = message.text.split()
|
|
||||||
try:
|
|
||||||
language_code: str = (
|
|
||||||
self.__resolve_language(command_arguments[1]) if len(command_arguments) > 1 else self.DEFAULT_LANGUAGE
|
|
||||||
)
|
|
||||||
except ValueError as error:
|
|
||||||
await message.reply(str(error), quote=True)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Check if the reply is to a video or video note message
|
|
||||||
if not (message.reply_to_message and (message.reply_to_message.video or message.reply_to_message.video_note)):
|
|
||||||
self.logger.warning(f"The /{self.COMMAND} command was not used in reply to a video or video note message.")
|
|
||||||
await message.reply("Please reply to a video or video note message with the /video command.", quote=True)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Identify the file type (video or video note)
|
|
||||||
media_type: str = "video" if message.reply_to_message.video else "video_note"
|
|
||||||
media: Video | VideoNote = message.reply_to_message.video if media_type == "video" else message.reply_to_message.video_note
|
|
||||||
|
|
||||||
# Notify the user that the request is being processed
|
|
||||||
processing_message: Message = await message.reply_to_message.reply(
|
|
||||||
"Processing video message to extract text...", quote=True
|
|
||||||
)
|
|
||||||
|
|
||||||
with NamedTemporaryFile(delete=False) as temp_video_file:
|
|
||||||
video_file_path = await client.download_media(
|
|
||||||
media.file_id,
|
|
||||||
file_name=temp_video_file.name
|
|
||||||
)
|
|
||||||
self.logger.info(f"{media_type} message downloaded to {video_file_path}.")
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Validate video duration
|
|
||||||
video_duration: float = video_processing.get_video_duration(video_file_path) # type: ignore
|
|
||||||
if video_duration > self.MAX_DURATION:
|
|
||||||
self.logger.warning(f"{media_type} too long: {video_duration} seconds.")
|
|
||||||
await processing_message.edit_text(
|
|
||||||
f"The video or video note is too long (over {self.MAX_DURATION // 60} minutes). Please send a shorter one."
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
|
|
||||||
|
|
||||||
# Extract audio and convert it to text
|
|
||||||
output_dir = os.path.dirname(video_file_path) # type: ignore
|
|
||||||
audio_file_path: str = video_processing.extract_audio_from_video(video_file_path, output_dir) # type: ignore
|
|
||||||
extracted_text: str = audio_processing.convert_voice_to_text(audio_file_path, language=language_code) # type: ignore
|
|
||||||
self.logger.info(f"{media_type} message successfully converted to text.")
|
|
||||||
|
|
||||||
response_text: str = (
|
|
||||||
f"<pre language=\"Conversion Result ({language_code})\">"
|
|
||||||
f"{extracted_text}"
|
|
||||||
"</pre>"
|
|
||||||
)
|
|
||||||
await processing_message.edit_text(response_text)
|
|
||||||
|
|
||||||
except FileNotFoundError:
|
|
||||||
self.logger.error("File not found during processing.", exc_info=True)
|
|
||||||
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
|
|
||||||
except RuntimeError:
|
|
||||||
self.logger.error("A runtime error occurred.", exc_info=True)
|
|
||||||
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
|
|
||||||
except Exception:
|
|
||||||
self.logger.error("An unexpected error occurred.", exc_info=True)
|
|
||||||
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
|
|
||||||
finally:
|
|
||||||
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
|
|
||||||
if os.path.exists(video_file_path): # type: ignore
|
|
||||||
os.remove(video_file_path) # type: ignore
|
|
||||||
if os.path.exists(audio_file_path): # type: ignore
|
|
||||||
os.remove(audio_file_path) # type: ignore
|
|
||||||
|
|
||||||
def __resolve_language(self, language_input: str) -> str:
|
|
||||||
"""
|
|
||||||
Resolves the language code based on the input text.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
language_input (str): User-provided language parameter.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The resolved language code.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ValueError: If the input does not match any supported language.
|
|
||||||
"""
|
|
||||||
normalized_input: str = language_input.lower()
|
|
||||||
for language_code, aliases in self.LANGUAGE_ALIASES.items():
|
|
||||||
if normalized_input in aliases:
|
|
||||||
return language_code
|
|
||||||
raise ValueError(
|
|
||||||
"Invalid language parameter. Please use one of the following:\n" +
|
|
||||||
"\n".join(f"{language_code}: {', '.join(aliases)}" for language_code, aliases in self.LANGUAGE_ALIASES.items())
|
|
||||||
)
|
|
@ -1,166 +0,0 @@
|
|||||||
import os
|
|
||||||
from logging import Logger
|
|
||||||
from tempfile import NamedTemporaryFile
|
|
||||||
|
|
||||||
from pyrogram import filters
|
|
||||||
from pyrogram.filters import Filter
|
|
||||||
from pyrogram.client import Client
|
|
||||||
from pyrogram.types import Message
|
|
||||||
from pyrogram.enums import ChatAction
|
|
||||||
|
|
||||||
from src.bot.handlers import AbstractCommandHandler
|
|
||||||
from src.utils import audio_processing
|
|
||||||
|
|
||||||
|
|
||||||
class VoiceCommandHandler(AbstractCommandHandler):
|
|
||||||
"""
|
|
||||||
Command handler for the /voice command in a Pyrogram bot.
|
|
||||||
|
|
||||||
This handler processes voice messages, converts them to text
|
|
||||||
in a specified language, and sends the result back to the user.
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
COMMAND (`str`): The name of the command that this handler handles.
|
|
||||||
LANGUAGE_ALIASES (`dict`): Mapping of language codes to their aliases.
|
|
||||||
DEFAULT_LANGUAGE (`str`): Default language code for conversion if no language is specified.
|
|
||||||
MAX_DURATION (`int`): Maximum allowed duration of the voice message in seconds.
|
|
||||||
logger (`Logger`): Logger instance for logging events.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Mapping of language codes to their aliases
|
|
||||||
LANGUAGE_ALIASES: dict[str, list[str]] = {
|
|
||||||
"en": ["en", "eng", "english"],
|
|
||||||
"ru": ["ru", "rus", "russian"],
|
|
||||||
}
|
|
||||||
|
|
||||||
# Default language code for conversion if no language is specified
|
|
||||||
DEFAULT_LANGUAGE: str = "ru"
|
|
||||||
|
|
||||||
# Maximum allowed duration of the voice message in seconds
|
|
||||||
MAX_DURATION: int = 300
|
|
||||||
|
|
||||||
def __init__(self, logger: Logger) -> None:
|
|
||||||
"""
|
|
||||||
Initializes the VoiceCommandHandler.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
logger (`Logger`): Logger instance for logging events.
|
|
||||||
"""
|
|
||||||
self.logger: Logger = logger
|
|
||||||
|
|
||||||
@property
|
|
||||||
def COMMAND(self) -> str:
|
|
||||||
"""
|
|
||||||
The name of the command that this handler handles.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The command name.
|
|
||||||
"""
|
|
||||||
return "voice"
|
|
||||||
|
|
||||||
def get_filters(self) -> Filter:
|
|
||||||
"""
|
|
||||||
Returns the filter for the /voice command.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
`pyrogram.filters.Filter`: A Pyrogram filter matching the /voice command.
|
|
||||||
"""
|
|
||||||
return filters.command(self.COMMAND)
|
|
||||||
|
|
||||||
async def handle(self, client: Client, message: Message) -> None:
|
|
||||||
"""
|
|
||||||
Handles the /voice command.
|
|
||||||
|
|
||||||
Converts a voice message to text in the specified language
|
|
||||||
and sends the result back to the user. The command must be used in reply
|
|
||||||
to a voice message.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
client (`pyrogram.client.Client`): The Pyrogram client instance.
|
|
||||||
message (`pyrogram.types.Message`): The incoming message object to process.
|
|
||||||
"""
|
|
||||||
self.logger.info(f"Received /{self.COMMAND} command from chat_id={message.chat.id}.")
|
|
||||||
|
|
||||||
# Parse the language parameter or use the default (Russian)
|
|
||||||
command_arguments: list[str] = message.text.split()
|
|
||||||
try:
|
|
||||||
language_code: str = (
|
|
||||||
self.__resolve_language(command_arguments[1]) if len(command_arguments) > 1 else self.DEFAULT_LANGUAGE
|
|
||||||
)
|
|
||||||
except ValueError as error:
|
|
||||||
await message.reply(str(error), quote=True)
|
|
||||||
return
|
|
||||||
|
|
||||||
if not (message.reply_to_message and message.reply_to_message.voice):
|
|
||||||
self.logger.warning(f"The /{self.COMMAND} command was not used in reply to a voice message.")
|
|
||||||
await message.reply("Please reply to a voice message with the /voice command.", quote=True)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Notify the user that the request is being processed
|
|
||||||
processing_message: Message = await message.reply_to_message.reply(
|
|
||||||
"Converting voice message to text...", quote=True
|
|
||||||
)
|
|
||||||
|
|
||||||
with NamedTemporaryFile(delete=False) as temp_audio_file:
|
|
||||||
audio_file_path = await client.download_media(
|
|
||||||
message.reply_to_message.voice.file_id,
|
|
||||||
file_name=temp_audio_file.name
|
|
||||||
)
|
|
||||||
self.logger.info(f"Voice message downloaded to {audio_file_path}.")
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Validate voice message duration
|
|
||||||
voice_duration: float = audio_processing.get_audio_duration(audio_file_path) # type: ignore
|
|
||||||
if voice_duration > self.MAX_DURATION:
|
|
||||||
self.logger.warning(f"Voice message too long: {voice_duration} seconds.")
|
|
||||||
await processing_message.edit_text(
|
|
||||||
f"The voice message is too long (over {self.MAX_DURATION // 60} minutes). Please send a shorter one."
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
|
|
||||||
extracted_text: str = audio_processing.convert_voice_to_text(audio_file_path, language=language_code) # type: ignore
|
|
||||||
self.logger.info("Voice message successfully converted to text.")
|
|
||||||
|
|
||||||
response_text: str = (
|
|
||||||
f"<pre language=\"Conversion Result ({language_code})\">"
|
|
||||||
f"{extracted_text}"
|
|
||||||
"</pre>"
|
|
||||||
)
|
|
||||||
await processing_message.edit_text(response_text)
|
|
||||||
|
|
||||||
except FileNotFoundError:
|
|
||||||
self.logger.error("File not found during processing.", exc_info=True)
|
|
||||||
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
|
||||||
except RuntimeError:
|
|
||||||
self.logger.error("A runtime error occurred.", exc_info=True)
|
|
||||||
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
|
||||||
except Exception:
|
|
||||||
self.logger.error("An unexpected error occurred.", exc_info=True)
|
|
||||||
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
|
||||||
finally:
|
|
||||||
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
|
|
||||||
if os.path.exists(audio_file_path): # type: ignore
|
|
||||||
os.remove(audio_file_path) # type: ignore
|
|
||||||
|
|
||||||
def __resolve_language(self, language_input: str) -> str:
|
|
||||||
"""
|
|
||||||
Resolves the language code based on the input text.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
language_input (str): User-provided language parameter.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The resolved language code.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ValueError: If the input does not match any supported language.
|
|
||||||
"""
|
|
||||||
normalized_input: str = language_input.lower()
|
|
||||||
for language_code, aliases in self.LANGUAGE_ALIASES.items():
|
|
||||||
if normalized_input in aliases:
|
|
||||||
return language_code
|
|
||||||
raise ValueError(
|
|
||||||
"Invalid language parameter. Please use one of the following:\n" +
|
|
||||||
"\n".join(f"{language_code}: {', '.join(aliases)}" for language_code, aliases in self.LANGUAGE_ALIASES.items())
|
|
||||||
)
|
|
@ -1,63 +0,0 @@
|
|||||||
import logging
|
|
||||||
from logging import Logger
|
|
||||||
|
|
||||||
from pyrogram.client import Client
|
|
||||||
|
|
||||||
from src.integrations.gigachat_api_client import GigaChatClient
|
|
||||||
|
|
||||||
from src.bot.handlers import AbstractCommandHandler
|
|
||||||
from src.bot.handlers import AICommandHandler
|
|
||||||
from src.bot.handlers import VoiceCommandHandler
|
|
||||||
from src.bot.handlers import VideoCommandHandler
|
|
||||||
|
|
||||||
|
|
||||||
class TelegramUserBot:
|
|
||||||
"""
|
|
||||||
A Telegram user bot.
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
app (`Client`): The Pyrogram client instance for the bot.
|
|
||||||
gigachat_client (`GigaChatClient`): The client instance for GigaChat integration.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, session_name: str, api_id: str, api_hash: str, gigachat_client: GigaChatClient) -> None:
|
|
||||||
"""
|
|
||||||
Initializes the Telegram user bot.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
session_name (`str`): The session name for the bot.
|
|
||||||
api_id (`str`): The API ID for the Telegram application.
|
|
||||||
api_hash (`str`): The API hash for the Telegram application.
|
|
||||||
gigachat_client (`GigaChatClient`): An instance of GigaChatClient for handling AI responses.
|
|
||||||
"""
|
|
||||||
# Configure logging
|
|
||||||
self.logger: Logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
self.app: Client = Client(session_name, api_id=api_id, api_hash=api_hash)
|
|
||||||
|
|
||||||
# Configure handlers
|
|
||||||
self.handlers: dict[str, AbstractCommandHandler] = {
|
|
||||||
"ai": AICommandHandler(self.logger, gigachat_client),
|
|
||||||
"voice": VoiceCommandHandler(self.logger),
|
|
||||||
"video": VideoCommandHandler(self.logger),
|
|
||||||
}
|
|
||||||
self.register_handlers()
|
|
||||||
|
|
||||||
def register_handlers(self) -> None:
|
|
||||||
"""
|
|
||||||
Registers the message handlers for the bot.
|
|
||||||
"""
|
|
||||||
self.logger.debug("Registering handlers.")
|
|
||||||
for command, handler in self.handlers.items():
|
|
||||||
self.app.on_message(filters=handler.get_filters())(handler.handle)
|
|
||||||
|
|
||||||
def run(self) -> None:
|
|
||||||
"""
|
|
||||||
Starts the bot.
|
|
||||||
"""
|
|
||||||
self.logger.info("Bot is starting.")
|
|
||||||
print("Bot is running.")
|
|
||||||
try:
|
|
||||||
self.app.run()
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.critical(f"Failed to start the bot: {e}", exc_info=True)
|
|
@ -1,27 +0,0 @@
|
|||||||
"""
|
|
||||||
config.py
|
|
||||||
|
|
||||||
This file contains the configuration settings required for the Telegram bot and GigaChat integration.
|
|
||||||
|
|
||||||
Configuration settings:
|
|
||||||
API_ID (str): The unique identifier for your Telegram application. It is required to initialize the Pyrogram client.
|
|
||||||
API_HASH (str): The hash associated with your Telegram application. It is required to initialize the Pyrogram client.
|
|
||||||
BOT_TOKEN (str): The token for the Telegram bot. It is used for authenticating the bot with the Telegram API.
|
|
||||||
API_GIGACHAT_TOKEN (str): The token for authenticating with the GigaChat API. It is used for communication with the GigaChat service.
|
|
||||||
|
|
||||||
Note:
|
|
||||||
- Make sure to keep these tokens secure and do not share them publicly.
|
|
||||||
- These values should be replaced with actual credentials for the bot and the GigaChat API.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# The API ID for the Telegram application
|
|
||||||
API_ID: str = ''
|
|
||||||
|
|
||||||
# The API hash for the Telegram application
|
|
||||||
API_HASH: str = ''
|
|
||||||
|
|
||||||
# The bot token for authenticating the bot with Telegram
|
|
||||||
BOT_TOKEN: str = ''
|
|
||||||
|
|
||||||
# The token for authenticating with the GigaChat API
|
|
||||||
API_GIGACHAT_TOKEN: str = ''
|
|
@ -1,102 +0,0 @@
|
|||||||
import logging
|
|
||||||
from logging import Logger
|
|
||||||
from typing import Dict
|
|
||||||
|
|
||||||
from langchain_core.runnables.history import RunnableWithMessageHistory
|
|
||||||
from langchain_core.chat_history import InMemoryChatMessageHistory
|
|
||||||
from langchain_gigachat.chat_models import GigaChat
|
|
||||||
|
|
||||||
|
|
||||||
class GigaChatClient:
|
|
||||||
"""
|
|
||||||
A client class for interacting with the GigaChat API using LangChain components.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, api_token: str, model_name: str = "GigaChat") -> None:
|
|
||||||
"""
|
|
||||||
Initializes the GigaChatManager with API credentials and a default model.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
api_token (str): The API token for authenticating with the GigaChat API.
|
|
||||||
model_name (str): The GigaChat model to use. Defaults to "GigaChat".
|
|
||||||
"""
|
|
||||||
# Configure logging
|
|
||||||
self.logger: Logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
self.api_token: str = api_token
|
|
||||||
self.model_name: str = model_name
|
|
||||||
self.logger.info(f"Initialize GigaChat client Using model: {self.model_name}")
|
|
||||||
|
|
||||||
self.llm: GigaChat = self._create_llm(model_name)
|
|
||||||
self.store: Dict[str, InMemoryChatMessageHistory] = {}
|
|
||||||
self.conversation = RunnableWithMessageHistory(self.llm, self.get_session_history)
|
|
||||||
|
|
||||||
def _create_llm(self, model_name: str) -> GigaChat:
|
|
||||||
"""
|
|
||||||
Creates and configures a GigaChat LLM instance.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
model_name (str): The GigaChat model to use.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
GigaChat: Configured GigaChat instance.
|
|
||||||
"""
|
|
||||||
self.logger.debug(f"Creating GigaChat LLM with model: {model_name}")
|
|
||||||
return GigaChat(
|
|
||||||
credentials=self.api_token,
|
|
||||||
scope="GIGACHAT_API_PERS",
|
|
||||||
model=model_name,
|
|
||||||
verify_ssl_certs=False,
|
|
||||||
streaming=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_session_history(self, session_id: str) -> InMemoryChatMessageHistory:
|
|
||||||
"""
|
|
||||||
Retrieves the chat history for a given session, creating it if it does not exist.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
session_id (str): The unique identifier for the session.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
InMemoryChatMessageHistory: The chat history for the session.
|
|
||||||
"""
|
|
||||||
if session_id not in self.store:
|
|
||||||
self.logger.debug(f"Creating new session history for session_id: {session_id}")
|
|
||||||
self.store[session_id] = InMemoryChatMessageHistory()
|
|
||||||
else:
|
|
||||||
self.logger.debug(f"Retrieving existing session history for session_id: {session_id}")
|
|
||||||
return self.store[session_id]
|
|
||||||
|
|
||||||
def set_model(self, model_name: str) -> None:
|
|
||||||
"""
|
|
||||||
Updates the LLM to use a different GigaChat model.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
model_name (str): The new GigaChat model to use.
|
|
||||||
"""
|
|
||||||
self.logger.info(f"Switching model to: {model_name}")
|
|
||||||
self.llm = self._create_llm(model_name)
|
|
||||||
self.conversation = RunnableWithMessageHistory(self.llm, self.get_session_history)
|
|
||||||
|
|
||||||
def get_response(self, session_id: str, text: str) -> str:
|
|
||||||
"""
|
|
||||||
Get a response to the provided input text for a given session.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
session_id (str): The unique identifier for the session.
|
|
||||||
text (str): The input text for which a response is needed.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The response text.
|
|
||||||
"""
|
|
||||||
self.logger.info(f"Generating response for session_id: {session_id}")
|
|
||||||
try:
|
|
||||||
response = self.conversation.invoke(
|
|
||||||
input=text,
|
|
||||||
config={"configurable": {"session_id": session_id}},
|
|
||||||
)
|
|
||||||
self.logger.debug(f"Response for session_id {session_id}")
|
|
||||||
return response.content
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.error(f"Error while getting response for session_id: {session_id}. Error: {e}", exc_info=True)
|
|
||||||
raise
|
|
@ -1,120 +0,0 @@
|
|||||||
import os
|
|
||||||
import logging
|
|
||||||
from logging import Logger
|
|
||||||
|
|
||||||
from pydub import AudioSegment
|
|
||||||
import speech_recognition as sr
|
|
||||||
from speech_recognition.audio import AudioData
|
|
||||||
|
|
||||||
|
|
||||||
# Configure logging
|
|
||||||
logger: Logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
def convert_to_wav(file_path: str) -> str:
|
|
||||||
"""
|
|
||||||
Converts an audio file to WAV format if it is not already in WAV format.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
file_path (str): The path to the audio file to be converted.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The path to the converted or original WAV file.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
FileNotFoundError: If the file does not exist.
|
|
||||||
RuntimeError: If the conversion fails for any reason.
|
|
||||||
"""
|
|
||||||
# Check if the file exists
|
|
||||||
if not os.path.exists(file_path):
|
|
||||||
logger.error(f"File {file_path} does not exist.")
|
|
||||||
raise FileNotFoundError(f"File {file_path} does not exist.")
|
|
||||||
|
|
||||||
if file_path.lower().endswith('.wav'):
|
|
||||||
logger.info(f"File {file_path} is already in WAV format.")
|
|
||||||
return file_path
|
|
||||||
|
|
||||||
try:
|
|
||||||
logger.info(f"Converting {file_path} to WAV format.")
|
|
||||||
audio = AudioSegment.from_file(file_path)
|
|
||||||
wav_path: str = f"{os.path.splitext(file_path)[0]}.wav"
|
|
||||||
audio.export(wav_path, format="wav")
|
|
||||||
logger.info(f"File converted to {wav_path}.")
|
|
||||||
return wav_path
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to convert file to WAV: {e}")
|
|
||||||
raise RuntimeError(f"Failed to convert file to WAV: {e}")
|
|
||||||
|
|
||||||
def get_audio_duration(file_path: str) -> float:
|
|
||||||
"""
|
|
||||||
Retrieves the duration of an audio file in seconds.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
file_path (str): The path to the audio file.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
float: The duration of the audio file in seconds.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
FileNotFoundError: If the file does not exist.
|
|
||||||
RuntimeError: If unable to get the file duration.
|
|
||||||
"""
|
|
||||||
# Check if the file exists
|
|
||||||
if not os.path.exists(file_path):
|
|
||||||
logger.error(f"File {file_path} does not exist.")
|
|
||||||
raise FileNotFoundError(f"File {file_path} does not exist.")
|
|
||||||
|
|
||||||
try:
|
|
||||||
logger.info(f"Getting duration of {file_path}.")
|
|
||||||
audio = AudioSegment.from_file(file_path)
|
|
||||||
duration: float = len(audio) / 1000 # Duration in seconds
|
|
||||||
logger.info(f"Duration of {file_path}: {duration} seconds.")
|
|
||||||
return duration
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to get file duration: {e}")
|
|
||||||
raise RuntimeError(f"Failed to get file duration: {e}")
|
|
||||||
|
|
||||||
def convert_voice_to_text(file_path: str, language='ru') -> str:
|
|
||||||
"""
|
|
||||||
Converts speech from an audio file to text using OpenAI speech recognition service.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
file_path (str): The path to the audio file to be processed.
|
|
||||||
language (str): The language code for speech recognition (default is 'ru').
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The transcribed text if recognition is successful.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
FileNotFoundError: If the file does not exist.
|
|
||||||
RuntimeError: For any errors encountered during processing.
|
|
||||||
"""
|
|
||||||
# Check if the file exists
|
|
||||||
if not os.path.exists(file_path):
|
|
||||||
logger.error(f"File {file_path} does not exist.")
|
|
||||||
raise FileNotFoundError("File does not exist.")
|
|
||||||
|
|
||||||
# Convert the file to WAV format if necessary
|
|
||||||
try:
|
|
||||||
wav_path: str = convert_to_wav(file_path)
|
|
||||||
except RuntimeError as e:
|
|
||||||
logger.error(f"Error converting to WAV: {e}")
|
|
||||||
raise RuntimeError(f"Error converting to WAV: {e}")
|
|
||||||
|
|
||||||
recognizer = sr.Recognizer()
|
|
||||||
|
|
||||||
try:
|
|
||||||
logger.info(f"Processing file {wav_path} ({get_audio_duration(wav_path)} sec) for speech recognition.")
|
|
||||||
with sr.AudioFile(wav_path) as source:
|
|
||||||
audio_data: AudioData = recognizer.record(source)
|
|
||||||
text = recognizer.recognize_whisper(audio_data, language=language, model='medium')
|
|
||||||
logger.info("Speech recognition successful.")
|
|
||||||
return text # type: ignore
|
|
||||||
except sr.UnknownValueError:
|
|
||||||
logger.warning(f"Speech in {wav_path} could not be recognized.")
|
|
||||||
raise RuntimeError("Speech could not be recognized.")
|
|
||||||
except sr.RequestError as e:
|
|
||||||
logger.error(f"Request error from the recognition service: {e}")
|
|
||||||
raise RuntimeError(f"Request error from the recognition service: {e}")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"An unexpected error occurred: {e}")
|
|
||||||
raise RuntimeError(f"An unexpected error occurred: {e}")
|
|
@ -1,62 +0,0 @@
|
|||||||
import logging
|
|
||||||
import logging.config
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
|
|
||||||
def setup_logging(output_to_console=False) -> None:
|
|
||||||
"""
|
|
||||||
Configures the logging system.
|
|
||||||
|
|
||||||
This function sets up logging with optional output to the console and ensures
|
|
||||||
log files are rotated daily. It creates a detailed logging format and retains
|
|
||||||
log files for a week.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
output_to_console (bool): If True, log messages will also be printed to the console.
|
|
||||||
Defaults to False.
|
|
||||||
"""
|
|
||||||
# Define the default handlers to use. Always logs to a file.
|
|
||||||
handlers: list[str] = ['file']
|
|
||||||
if output_to_console:
|
|
||||||
# Add console logging if requested
|
|
||||||
handlers.append('console')
|
|
||||||
|
|
||||||
# Generate the log file name with the current date
|
|
||||||
log_filename: str = f'logs/log-{datetime.now().strftime("%Y-%m-%d")}.log'
|
|
||||||
|
|
||||||
# Configure the logging settings using a dictionary
|
|
||||||
logging.config.dictConfig({
|
|
||||||
'version': 1, # Logging configuration version
|
|
||||||
'disable_existing_loggers': True, # Deny other loggers to remain active
|
|
||||||
'formatters': {
|
|
||||||
'detailed': { # Define a detailed logging format
|
|
||||||
'format': (
|
|
||||||
'%(asctime)s | %(levelname)-8s | '
|
|
||||||
'%(filename)s.%(funcName)s, line %(lineno)d: '
|
|
||||||
'%(message)s'
|
|
||||||
),
|
|
||||||
'datefmt': '%Y-%m-%d %H:%M:%S' # Timestamp format
|
|
||||||
},
|
|
||||||
},
|
|
||||||
'handlers': {
|
|
||||||
# Console handler outputs log messages to the console
|
|
||||||
'console': {
|
|
||||||
'class': 'logging.StreamHandler', # Standard output stream
|
|
||||||
'formatter': 'detailed', # Use the detailed formatter
|
|
||||||
},
|
|
||||||
# File handler writes log messages to a file, rotating daily
|
|
||||||
'file': {
|
|
||||||
'class': 'logging.handlers.TimedRotatingFileHandler',
|
|
||||||
'filename': log_filename, # Log file path
|
|
||||||
'when': 'midnight', # Rotate log files at midnight
|
|
||||||
'interval': 1, # Rotate daily
|
|
||||||
'backupCount': 7, # Keep up to 7 old log files
|
|
||||||
'formatter': 'detailed', # Use the detailed formatter
|
|
||||||
},
|
|
||||||
},
|
|
||||||
# Define the root logger configuration
|
|
||||||
'root': {
|
|
||||||
'handlers': handlers, # Handlers to use (console, file, or both)
|
|
||||||
'level': 'DEBUG', # Log level (DEBUG logs all levels)
|
|
||||||
},
|
|
||||||
})
|
|
@ -1,71 +0,0 @@
|
|||||||
import os
|
|
||||||
import logging
|
|
||||||
from logging import Logger
|
|
||||||
|
|
||||||
from moviepy import VideoFileClip
|
|
||||||
|
|
||||||
|
|
||||||
# Configure logging
|
|
||||||
logger: Logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
def get_video_duration(video_path: str) -> float:
|
|
||||||
"""
|
|
||||||
Get the duration of a video file in seconds.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
video_path (str): The path to the video file.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
float: The duration of the video in seconds.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
FileNotFoundError: If the video file does not exist.
|
|
||||||
RuntimeError: If an error occurs during processing.
|
|
||||||
"""
|
|
||||||
if not os.path.exists(video_path):
|
|
||||||
logger.error(f"Video file {video_path} does not exist.")
|
|
||||||
raise FileNotFoundError(f"Video file {video_path} does not exist.")
|
|
||||||
|
|
||||||
try:
|
|
||||||
video_clip = VideoFileClip(video_path)
|
|
||||||
duration = video_clip.duration
|
|
||||||
logger.info(f"Duration of video {video_path}: {duration} seconds.")
|
|
||||||
return duration
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to get video duration: {e}", exc_info=True)
|
|
||||||
raise RuntimeError(f"Failed to get video duration: {e}")
|
|
||||||
finally:
|
|
||||||
video_clip.close()
|
|
||||||
|
|
||||||
|
|
||||||
def extract_audio_from_video(video_path: str, output_dir: str) -> str:
|
|
||||||
"""
|
|
||||||
Extracts the audio track from a video file and saves it as a WAV file.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
video_path (str): The path to the video file.
|
|
||||||
output_dir (str): The directory where the audio file will be saved.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The path to the extracted audio file.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
FileNotFoundError: If the video file does not exist.
|
|
||||||
RuntimeError: If an error occurs during audio extraction.
|
|
||||||
"""
|
|
||||||
if not os.path.exists(video_path):
|
|
||||||
logger.error(f"Video file {video_path} does not exist.")
|
|
||||||
raise FileNotFoundError(f"Video file {video_path} does not exist.")
|
|
||||||
|
|
||||||
try:
|
|
||||||
logger.info(f"Extracting audio from video: {video_path}")
|
|
||||||
video_clip = VideoFileClip(video_path)
|
|
||||||
audio_path = os.path.join(output_dir, f"{os.path.splitext(os.path.basename(video_path))[0]}.wav")
|
|
||||||
video_clip.audio.write_audiofile(audio_path) # type: ignore
|
|
||||||
logger.info(f"Audio extracted and saved to: {audio_path}")
|
|
||||||
return audio_path
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to extract audio from video: {e}", exc_info=True)
|
|
||||||
raise RuntimeError(f"Failed to extract audio from video: {e}")
|
|
||||||
finally:
|
|
||||||
video_clip.close()
|
|
@ -1,55 +0,0 @@
|
|||||||
import pytest
|
|
||||||
from unittest.mock import MagicMock, patch
|
|
||||||
from langchain_core.chat_history import InMemoryChatMessageHistory
|
|
||||||
from langchain_gigachat import GigaChat
|
|
||||||
|
|
||||||
from src.integrations.gigachat_api_client import GigaChatClient
|
|
||||||
from src.core.configuration import config
|
|
||||||
|
|
||||||
|
|
||||||
API_GIGACHAT_TOKEN: str = config.API_GIGACHAT_TOKEN
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def gigachat_client() -> GigaChatClient:
|
|
||||||
"""Fixture to create a GigaChatClient instance with a mock API token."""
|
|
||||||
return GigaChatClient(api_token=API_GIGACHAT_TOKEN)
|
|
||||||
|
|
||||||
def test_initialization(gigachat_client) -> None:
|
|
||||||
"""Test if the GigaChatClient initializes correctly."""
|
|
||||||
assert gigachat_client.api_token == API_GIGACHAT_TOKEN
|
|
||||||
assert gigachat_client.model_name == "GigaChat"
|
|
||||||
assert isinstance(gigachat_client.store, dict)
|
|
||||||
assert gigachat_client.llm is not None
|
|
||||||
|
|
||||||
def test_create_llm() -> None:
|
|
||||||
"""Test the _create_llm method for proper LLM creation."""
|
|
||||||
client = GigaChatClient(api_token=API_GIGACHAT_TOKEN)
|
|
||||||
mock_llm: GigaChat = client._create_llm("GigaChat-Pro")
|
|
||||||
assert mock_llm.credentials == API_GIGACHAT_TOKEN
|
|
||||||
assert mock_llm.model == "GigaChat-Pro"
|
|
||||||
|
|
||||||
def test_get_session_history(gigachat_client) -> None:
|
|
||||||
"""Test the get_session_history method for creating/retrieving session history."""
|
|
||||||
session_id = "test_session"
|
|
||||||
history = gigachat_client.get_session_history(session_id)
|
|
||||||
assert isinstance(history, InMemoryChatMessageHistory)
|
|
||||||
assert session_id in gigachat_client.store
|
|
||||||
assert gigachat_client.store[session_id] is history
|
|
||||||
|
|
||||||
def test_set_model(gigachat_client) -> None:
|
|
||||||
"""Test the set_model method for updating the LLM model."""
|
|
||||||
new_model = "GigaChat-Pro"
|
|
||||||
gigachat_client.set_model(new_model)
|
|
||||||
assert gigachat_client.llm.model == new_model
|
|
||||||
|
|
||||||
def test_get_response() -> None:
|
|
||||||
"""Test the get_response method by verifying the response code."""
|
|
||||||
with patch("langchain_core.runnables.history.RunnableWithMessageHistory") as MockRunnable:
|
|
||||||
mock_runnable = MagicMock()
|
|
||||||
mock_runnable.invoke.return_value.code = 200
|
|
||||||
MockRunnable.return_value = mock_runnable
|
|
||||||
|
|
||||||
client = GigaChatClient(api_token=API_GIGACHAT_TOKEN)
|
|
||||||
response_code = mock_runnable.invoke.return_value.code
|
|
||||||
|
|
||||||
assert response_code == 200
|
|
Loading…
Reference in New Issue
Block a user