Compare commits
8 Commits
main
...
feature/ad
Author | SHA1 | Date | |
---|---|---|---|
7c513c6395 | |||
f5c208ae2d | |||
c7f3e093b4 | |||
b9d6cde8fe | |||
0b9603d9f2 | |||
8bf781b67d | |||
a43873109c | |||
e734b9d827 |
7
.gitignore
vendored
7
.gitignore
vendored
@ -129,6 +129,9 @@ ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Configuration
|
||||
config.py
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
@ -161,6 +164,7 @@ cython_debug/
|
||||
#.idea/
|
||||
|
||||
# ---> VisualStudioCode
|
||||
.vscode
|
||||
.vscode/*
|
||||
!.vscode/settings.json
|
||||
!.vscode/tasks.json
|
||||
@ -174,3 +178,6 @@ cython_debug/
|
||||
# Built Visual Studio Code Extensions
|
||||
*.vsix
|
||||
|
||||
# Session info
|
||||
*.session
|
||||
*.session-journal
|
33
main.py
33
main.py
@ -0,0 +1,33 @@
|
||||
from src.integrations.gigachat_api_client import GigaChatClient
|
||||
from src.bot.telegram_userbot import TelegramUserBot
|
||||
from src.utils import logging
|
||||
from src.core.configuration import config
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""
|
||||
Entry point for starting the Telegram user bot.
|
||||
"""
|
||||
# Configure logging
|
||||
logging.setup_logging()
|
||||
|
||||
# Load API credentials and configuration
|
||||
api_id: str = config.API_ID
|
||||
api_hash: str = config.API_HASH
|
||||
api_token: str = config.API_GIGACHAT_TOKEN
|
||||
|
||||
# Initialize GigaChatClient
|
||||
gigachat_client: GigaChatClient = GigaChatClient(api_token=api_token)
|
||||
|
||||
# Initialize and run the Telegram user bot
|
||||
bot: TelegramUserBot = TelegramUserBot(
|
||||
session_name="userbot",
|
||||
api_id=api_id,
|
||||
api_hash=api_hash,
|
||||
gigachat_client=gigachat_client
|
||||
)
|
||||
bot.run()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
2094
poetry.lock
generated
Normal file
2094
poetry.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
@ -6,7 +6,24 @@ authors = ["Factorino73 <masenkin73@xmail.ru>"]
|
||||
readme = "README.md"
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.13"
|
||||
python = "^3.12"
|
||||
pyrogram = "^2.0.106"
|
||||
tgcrypto = "^1.2.5"
|
||||
setuptools = "^75.6.0"
|
||||
wheel = "^0.45.1"
|
||||
langchain-gigachat = "^0.3.2"
|
||||
punq = "^0.7.0"
|
||||
pytest = "^8.3.4"
|
||||
speechrecognition = "^3.13.0"
|
||||
typing-extensions = "^4.12.2"
|
||||
pydub = "^0.25.1"
|
||||
numpy = "2.0.2"
|
||||
soundfile = "^0.13.0"
|
||||
torch = "^2.5.1"
|
||||
llvmlite = "0.43.0"
|
||||
numba = "0.60.0"
|
||||
openai-whisper = "^20240930"
|
||||
moviepy = "^2.1.1"
|
||||
|
||||
|
||||
[build-system]
|
||||
|
289
src/bot/telegram_userbot.py
Normal file
289
src/bot/telegram_userbot.py
Normal file
@ -0,0 +1,289 @@
|
||||
import os
|
||||
import logging
|
||||
from logging import Logger
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import Optional
|
||||
|
||||
from pyrogram import filters
|
||||
from pyrogram.client import Client
|
||||
from pyrogram.types import Message
|
||||
from pyrogram.enums import ChatAction
|
||||
from pyrogram.types.messages_and_media.video import Video
|
||||
from pyrogram.types.messages_and_media.video_note import VideoNote
|
||||
|
||||
from src.integrations.gigachat_api_client import GigaChatClient
|
||||
from src.utils import speech_recognition, video_processing
|
||||
|
||||
|
||||
class TelegramUserBot:
|
||||
"""
|
||||
A Telegram user bot.
|
||||
|
||||
Attributes:
|
||||
app (Client): The Pyrogram client instance for the bot.
|
||||
gigachat_client (GigaChatClient): The client instance for GigaChat integration.
|
||||
"""
|
||||
|
||||
def __init__(self, session_name: str, api_id: str, api_hash: str, gigachat_client: GigaChatClient) -> None:
|
||||
"""
|
||||
Initializes the Telegram user bot.
|
||||
|
||||
Args:
|
||||
session_name (str): The session name for the bot.
|
||||
api_id (str): The API ID for the Telegram application.
|
||||
api_hash (str): The API hash for the Telegram application.
|
||||
gigachat_client (GigaChatClient): An instance of GigaChatClient for handling AI responses.
|
||||
"""
|
||||
# Configure logging
|
||||
self.logger: Logger = logging.getLogger(__name__)
|
||||
|
||||
self.app: Client = Client(session_name, api_id=api_id, api_hash=api_hash)
|
||||
self.gigachat_client: GigaChatClient = gigachat_client
|
||||
self.register_handlers()
|
||||
|
||||
def register_handlers(self) -> None:
|
||||
"""
|
||||
Registers the message handlers for the bot.
|
||||
"""
|
||||
self.logger.debug("Registering handlers.")
|
||||
self.app.on_message(filters.command("ai"))(self.handle_ai_command)
|
||||
self.app.on_message(filters.command("voice"))(self.handle_voice_command)
|
||||
self.app.on_message(filters.command("video"))(self.handle_video_command)
|
||||
|
||||
async def handle_ai_command(self, client: Client, message: Message) -> None:
|
||||
"""
|
||||
Handles messages that invoke the /ai command.
|
||||
|
||||
Args:
|
||||
client (Client): The Pyrogram client instance.
|
||||
message (Message): The incoming Telegram message.
|
||||
"""
|
||||
self.logger.info(f"Received /ai command from chat_id={message.chat.id}")
|
||||
|
||||
# Extract the command argument
|
||||
command_arg: Optional[str] = " ".join(message.text.split()[1:])
|
||||
|
||||
if not command_arg and message.reply_to_message and message.reply_to_message.text:
|
||||
# Use the text of the replied message if no argument is provided
|
||||
command_arg = message.reply_to_message.text
|
||||
|
||||
if not command_arg:
|
||||
self.logger.warning(f"No argument or replied message provided for /ai command by chat_id={message.chat.id}")
|
||||
await message.reply("Please provide a message after /ai or reply to a message.", quote=True)
|
||||
return
|
||||
|
||||
# Send an initial message indicating processing
|
||||
self.logger.debug(f"Processing request for chat_id={message.chat.id}")
|
||||
processing_message: Message = await message.reply(f"{self.gigachat_client.model_name} is processing your request...", quote=True)
|
||||
|
||||
try:
|
||||
# Start typing animation
|
||||
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
|
||||
|
||||
# Get a response from GigaChat
|
||||
response: str = self.gigachat_client.get_response(str(message.chat.id), command_arg)
|
||||
self.logger.debug(f"Received response for chat_id={message.chat.id}")
|
||||
|
||||
# Edit the processing message with the generated response
|
||||
await processing_message.edit_text(response)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error processing /ai command for chat_id={message.chat.id}: {e}", exc_info=True)
|
||||
await processing_message.edit_text("An error occurred while processing your request.")
|
||||
finally:
|
||||
# Stop indicating typing action
|
||||
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
|
||||
|
||||
def get_language(self, input_text: str) -> str:
|
||||
"""
|
||||
Determines the language for voice-to-text conversion based on the input parameter.
|
||||
|
||||
Args:
|
||||
input_text (str): The input parameter indicating the language.
|
||||
|
||||
Returns:
|
||||
str: The language code ('en' or 'ru').
|
||||
|
||||
Raises:
|
||||
ValueError: If an invalid language parameter is provided.
|
||||
"""
|
||||
language_params: dict[str, list[str]] = {
|
||||
'en': ['en', 'eng', 'english'],
|
||||
'ru': ['ru', 'rus', 'russian']
|
||||
}
|
||||
|
||||
input_lower: str = input_text.lower()
|
||||
for lang_code, aliases in language_params.items():
|
||||
if input_lower in aliases:
|
||||
return lang_code
|
||||
raise ValueError(
|
||||
"Invalid language parameter. Please use one of the following:\n" +
|
||||
"\n".join(f"{lang_code}: {', '.join(aliases)}" for lang_code, aliases in language_params.items())
|
||||
)
|
||||
|
||||
|
||||
async def handle_voice_command(self, client: Client, message: Message) -> None:
|
||||
"""
|
||||
Handle the /voice command to convert a voice message to text with optional language selection.
|
||||
|
||||
Args:
|
||||
client (Client): The Pyrogram Client instance.
|
||||
message (Message): The incoming message containing the /voice command.
|
||||
"""
|
||||
self.logger.info(f"Received /voice command from chat_id={message.chat.id}.")
|
||||
|
||||
# Parse the language parameter (default to Russian)
|
||||
command_parts: list[str] = message.text.split()
|
||||
try:
|
||||
language: str = self.get_language(command_parts[1]) if len(command_parts) > 1 else 'ru'
|
||||
except ValueError as e:
|
||||
await message.reply(str(e), quote=True)
|
||||
return
|
||||
|
||||
# Check if the reply is to a voice message
|
||||
if not (message.reply_to_message and message.reply_to_message.voice):
|
||||
self.logger.warning("The /voice command was not used in reply to a voice message.")
|
||||
await message.reply("Please reply to a voice message with the /voice command.", quote=True)
|
||||
return
|
||||
|
||||
# Send an initial message indicating processing
|
||||
processing_message: Message = await message.reply_to_message.reply("Converting voice message to text...", quote=True)
|
||||
|
||||
with NamedTemporaryFile(delete=False) as temp_file:
|
||||
file_path = await client.download_media(message.reply_to_message.voice.file_id, file_name=temp_file.name)
|
||||
self.logger.info(f"Voice message downloaded to {file_path}.")
|
||||
|
||||
try:
|
||||
# Check voice message duration
|
||||
duration: float = speech_recognition.get_audio_duration(file_path) # type: ignore
|
||||
if duration > 300:
|
||||
self.logger.warning(f"Voice message too long: {duration} seconds.")
|
||||
await processing_message.edit_text("The voice message is too long (over 5 minutes). Please send a shorter one.")
|
||||
return
|
||||
|
||||
# Start typing animation
|
||||
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
|
||||
|
||||
# Attempt to convert voice to text with the selected language
|
||||
text: str = speech_recognition.convert_voice_to_text(file_path, language=language) # type: ignore
|
||||
self.logger.info("Voice message successfully converted to text.")
|
||||
|
||||
# Format the text for sending
|
||||
formatted_text: str = (
|
||||
f"<pre language=\"Conversion Result ({language})\">"
|
||||
f"{text}"
|
||||
"</pre>"
|
||||
)
|
||||
|
||||
# Edit the initial processing message with the converted text
|
||||
await processing_message.edit_text(formatted_text)
|
||||
except FileNotFoundError:
|
||||
self.logger.error("File not found during processing.", exc_info=True)
|
||||
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
||||
except RuntimeError:
|
||||
self.logger.error("A runtime error occurred.", exc_info=True)
|
||||
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
||||
except Exception:
|
||||
self.logger.error("An unexpected error occurred.", exc_info=True)
|
||||
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
||||
finally:
|
||||
# Stop indicating typing action
|
||||
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
|
||||
|
||||
# Clean up temporary files
|
||||
if os.path.exists(file_path): # type: ignore
|
||||
os.remove(file_path) # type: ignore
|
||||
|
||||
async def handle_video_command(self, client: Client, message: Message) -> None:
|
||||
"""
|
||||
Handle the /video command to convert a video or video note message to text with optional language selection.
|
||||
|
||||
Args:
|
||||
client (Client): The Pyrogram Client instance.
|
||||
message (Message): The incoming message containing the /video command.
|
||||
"""
|
||||
self.logger.info(f"Received /video command from chat_id={message.chat.id}.")
|
||||
|
||||
# Parse the language parameter (default to Russian)
|
||||
command_parts: list[str] = message.text.split()
|
||||
try:
|
||||
language: str = self.get_language(command_parts[1]) if len(command_parts) > 1 else 'ru'
|
||||
except ValueError as e:
|
||||
await message.reply(str(e), quote=True)
|
||||
return
|
||||
|
||||
# Check if the reply is to a video or video note message
|
||||
if not (message.reply_to_message and (message.reply_to_message.video or message.reply_to_message.video_note)):
|
||||
self.logger.warning("The /video command was not used in reply to a video or video note message.")
|
||||
await message.reply("Please reply to a video or video note message with the /video command.", quote=True)
|
||||
return
|
||||
|
||||
# Identify the file type (video or video note)
|
||||
media_type: str = "video" if message.reply_to_message.video else "video_note"
|
||||
media: Video | VideoNote = message.reply_to_message.video if media_type == "video" else message.reply_to_message.video_note
|
||||
|
||||
# Send an initial message indicating processing
|
||||
processing_message: Message = await message.reply_to_message.reply("Processing video message to extract text...", quote=True)
|
||||
|
||||
with NamedTemporaryFile(delete=False) as temp_video_file:
|
||||
video_path = await client.download_media(
|
||||
media.file_id,
|
||||
file_name=temp_video_file.name
|
||||
)
|
||||
self.logger.info(f"{media_type} message downloaded to {video_path}.")
|
||||
|
||||
try:
|
||||
# Check video duration
|
||||
duration: float = video_processing.get_video_duration(video_path) # type: ignore
|
||||
if duration > 300:
|
||||
self.logger.warning(f"{media_type} too long: {duration} seconds.")
|
||||
await processing_message.edit_text("The video or video note is too long (over 5 minutes). Please send a shorter one.")
|
||||
return
|
||||
|
||||
# Extract audio from video
|
||||
output_dir = os.path.dirname(video_path) # type: ignore
|
||||
audio_path: str = video_processing.extract_audio_from_video(video_path, output_dir) # type: ignore
|
||||
|
||||
# Convert extracted audio to text
|
||||
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
|
||||
text: str = speech_recognition.convert_voice_to_text(audio_path, language=language) # type: ignore
|
||||
self.logger.info(f"{media_type} message successfully converted to text.")
|
||||
|
||||
# Format the text for sending
|
||||
formatted_text: str = (
|
||||
f"<pre language=\"Conversion Result ({language})\">"
|
||||
f"{text}"
|
||||
"</pre>"
|
||||
)
|
||||
|
||||
# Edit the initial processing message with the converted text
|
||||
await processing_message.edit_text(formatted_text)
|
||||
|
||||
except FileNotFoundError:
|
||||
self.logger.error("File not found during processing.", exc_info=True)
|
||||
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
|
||||
except RuntimeError:
|
||||
self.logger.error("A runtime error occurred.", exc_info=True)
|
||||
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
|
||||
except Exception:
|
||||
self.logger.error("An unexpected error occurred.", exc_info=True)
|
||||
await processing_message.edit_text("An error occurred while processing the video message. Please try again later.")
|
||||
finally:
|
||||
# Stop indicating typing action
|
||||
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
|
||||
|
||||
# Clean up temporary files
|
||||
if os.path.exists(video_path): # type: ignore
|
||||
os.remove(video_path) # type: ignore
|
||||
if os.path.exists(audio_path):
|
||||
os.remove(audio_path)
|
||||
|
||||
def run(self) -> None:
|
||||
"""
|
||||
Starts the bot.
|
||||
"""
|
||||
self.logger.info("Bot is starting.")
|
||||
print("Bot is running.")
|
||||
try:
|
||||
self.app.run()
|
||||
except Exception as e:
|
||||
self.logger.critical(f"Failed to start the bot: {e}", exc_info=True)
|
27
src/core/configuration/config.py.example
Normal file
27
src/core/configuration/config.py.example
Normal file
@ -0,0 +1,27 @@
|
||||
"""
|
||||
config.py
|
||||
|
||||
This file contains the configuration settings required for the Telegram bot and GigaChat integration.
|
||||
|
||||
Configuration settings:
|
||||
API_ID (str): The unique identifier for your Telegram application. It is required to initialize the Pyrogram client.
|
||||
API_HASH (str): The hash associated with your Telegram application. It is required to initialize the Pyrogram client.
|
||||
BOT_TOKEN (str): The token for the Telegram bot. It is used for authenticating the bot with the Telegram API.
|
||||
API_GIGACHAT_TOKEN (str): The token for authenticating with the GigaChat API. It is used for communication with the GigaChat service.
|
||||
|
||||
Note:
|
||||
- Make sure to keep these tokens secure and do not share them publicly.
|
||||
- These values should be replaced with actual credentials for the bot and the GigaChat API.
|
||||
"""
|
||||
|
||||
# The API ID for the Telegram application
|
||||
API_ID: str = ''
|
||||
|
||||
# The API hash for the Telegram application
|
||||
API_HASH: str = ''
|
||||
|
||||
# The bot token for authenticating the bot with Telegram
|
||||
BOT_TOKEN: str = ''
|
||||
|
||||
# The token for authenticating with the GigaChat API
|
||||
API_GIGACHAT_TOKEN: str = ''
|
102
src/integrations/gigachat_api_client.py
Normal file
102
src/integrations/gigachat_api_client.py
Normal file
@ -0,0 +1,102 @@
|
||||
import logging
|
||||
from logging import Logger
|
||||
from typing import Dict
|
||||
|
||||
from langchain_core.runnables.history import RunnableWithMessageHistory
|
||||
from langchain_core.chat_history import InMemoryChatMessageHistory
|
||||
from langchain_gigachat.chat_models import GigaChat
|
||||
|
||||
|
||||
class GigaChatClient:
|
||||
"""
|
||||
A client class for interacting with the GigaChat API using LangChain components.
|
||||
"""
|
||||
|
||||
def __init__(self, api_token: str, model_name: str = "GigaChat") -> None:
|
||||
"""
|
||||
Initializes the GigaChatManager with API credentials and a default model.
|
||||
|
||||
Args:
|
||||
api_token (str): The API token for authenticating with the GigaChat API.
|
||||
model_name (str): The GigaChat model to use. Defaults to "GigaChat".
|
||||
"""
|
||||
# Configure logging
|
||||
self.logger: Logger = logging.getLogger(__name__)
|
||||
|
||||
self.api_token: str = api_token
|
||||
self.model_name: str = model_name
|
||||
self.logger.info(f"Initialize GigaChat client Using model: {self.model_name}")
|
||||
|
||||
self.llm: GigaChat = self._create_llm(model_name)
|
||||
self.store: Dict[str, InMemoryChatMessageHistory] = {}
|
||||
self.conversation = RunnableWithMessageHistory(self.llm, self.get_session_history)
|
||||
|
||||
def _create_llm(self, model_name: str) -> GigaChat:
|
||||
"""
|
||||
Creates and configures a GigaChat LLM instance.
|
||||
|
||||
Args:
|
||||
model_name (str): The GigaChat model to use.
|
||||
|
||||
Returns:
|
||||
GigaChat: Configured GigaChat instance.
|
||||
"""
|
||||
self.logger.debug(f"Creating GigaChat LLM with model: {model_name}")
|
||||
return GigaChat(
|
||||
credentials=self.api_token,
|
||||
scope="GIGACHAT_API_PERS",
|
||||
model=model_name,
|
||||
verify_ssl_certs=False,
|
||||
streaming=False,
|
||||
)
|
||||
|
||||
def get_session_history(self, session_id: str) -> InMemoryChatMessageHistory:
|
||||
"""
|
||||
Retrieves the chat history for a given session, creating it if it does not exist.
|
||||
|
||||
Args:
|
||||
session_id (str): The unique identifier for the session.
|
||||
|
||||
Returns:
|
||||
InMemoryChatMessageHistory: The chat history for the session.
|
||||
"""
|
||||
if session_id not in self.store:
|
||||
self.logger.debug(f"Creating new session history for session_id: {session_id}")
|
||||
self.store[session_id] = InMemoryChatMessageHistory()
|
||||
else:
|
||||
self.logger.debug(f"Retrieving existing session history for session_id: {session_id}")
|
||||
return self.store[session_id]
|
||||
|
||||
def set_model(self, model_name: str) -> None:
|
||||
"""
|
||||
Updates the LLM to use a different GigaChat model.
|
||||
|
||||
Args:
|
||||
model_name (str): The new GigaChat model to use.
|
||||
"""
|
||||
self.logger.info(f"Switching model to: {model_name}")
|
||||
self.llm = self._create_llm(model_name)
|
||||
self.conversation = RunnableWithMessageHistory(self.llm, self.get_session_history)
|
||||
|
||||
def get_response(self, session_id: str, text: str) -> str:
|
||||
"""
|
||||
Get a response to the provided input text for a given session.
|
||||
|
||||
Args:
|
||||
session_id (str): The unique identifier for the session.
|
||||
text (str): The input text for which a response is needed.
|
||||
|
||||
Returns:
|
||||
str: The response text.
|
||||
"""
|
||||
self.logger.info(f"Generating response for session_id: {session_id}")
|
||||
try:
|
||||
response = self.conversation.invoke(
|
||||
input=text,
|
||||
config={"configurable": {"session_id": session_id}},
|
||||
)
|
||||
self.logger.debug(f"Response for session_id {session_id}")
|
||||
return response.content
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error while getting response for session_id: {session_id}. Error: {e}", exc_info=True)
|
||||
raise
|
62
src/utils/logging.py
Normal file
62
src/utils/logging.py
Normal file
@ -0,0 +1,62 @@
|
||||
import logging
|
||||
import logging.config
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def setup_logging(output_to_console=False) -> None:
|
||||
"""
|
||||
Configures the logging system.
|
||||
|
||||
This function sets up logging with optional output to the console and ensures
|
||||
log files are rotated daily. It creates a detailed logging format and retains
|
||||
log files for a week.
|
||||
|
||||
Args:
|
||||
output_to_console (bool): If True, log messages will also be printed to the console.
|
||||
Defaults to False.
|
||||
"""
|
||||
# Define the default handlers to use. Always logs to a file.
|
||||
handlers: list[str] = ['file']
|
||||
if output_to_console:
|
||||
# Add console logging if requested
|
||||
handlers.append('console')
|
||||
|
||||
# Generate the log file name with the current date
|
||||
log_filename: str = f'logs/log-{datetime.now().strftime("%Y-%m-%d")}.log'
|
||||
|
||||
# Configure the logging settings using a dictionary
|
||||
logging.config.dictConfig({
|
||||
'version': 1, # Logging configuration version
|
||||
'disable_existing_loggers': True, # Deny other loggers to remain active
|
||||
'formatters': {
|
||||
'detailed': { # Define a detailed logging format
|
||||
'format': (
|
||||
'%(asctime)s | %(levelname)-8s | '
|
||||
'%(filename)s.%(funcName)s, line %(lineno)d: '
|
||||
'%(message)s'
|
||||
),
|
||||
'datefmt': '%Y-%m-%d %H:%M:%S' # Timestamp format
|
||||
},
|
||||
},
|
||||
'handlers': {
|
||||
# Console handler outputs log messages to the console
|
||||
'console': {
|
||||
'class': 'logging.StreamHandler', # Standard output stream
|
||||
'formatter': 'detailed', # Use the detailed formatter
|
||||
},
|
||||
# File handler writes log messages to a file, rotating daily
|
||||
'file': {
|
||||
'class': 'logging.handlers.TimedRotatingFileHandler',
|
||||
'filename': log_filename, # Log file path
|
||||
'when': 'midnight', # Rotate log files at midnight
|
||||
'interval': 1, # Rotate daily
|
||||
'backupCount': 7, # Keep up to 7 old log files
|
||||
'formatter': 'detailed', # Use the detailed formatter
|
||||
},
|
||||
},
|
||||
# Define the root logger configuration
|
||||
'root': {
|
||||
'handlers': handlers, # Handlers to use (console, file, or both)
|
||||
'level': 'DEBUG', # Log level (DEBUG logs all levels)
|
||||
},
|
||||
})
|
120
src/utils/speech_recognition.py
Normal file
120
src/utils/speech_recognition.py
Normal file
@ -0,0 +1,120 @@
|
||||
import os
|
||||
import logging
|
||||
from logging import Logger
|
||||
|
||||
from pydub import AudioSegment
|
||||
import speech_recognition as sr
|
||||
from speech_recognition.audio import AudioData
|
||||
|
||||
|
||||
# Configure logging
|
||||
logger: Logger = logging.getLogger(__name__)
|
||||
|
||||
def convert_to_wav(file_path: str) -> str:
|
||||
"""
|
||||
Converts an audio file to WAV format if it is not already in WAV format.
|
||||
|
||||
Args:
|
||||
file_path (str): The path to the audio file to be converted.
|
||||
|
||||
Returns:
|
||||
str: The path to the converted or original WAV file.
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the file does not exist.
|
||||
RuntimeError: If the conversion fails for any reason.
|
||||
"""
|
||||
# Check if the file exists
|
||||
if not os.path.exists(file_path):
|
||||
logger.error(f"File {file_path} does not exist.")
|
||||
raise FileNotFoundError(f"File {file_path} does not exist.")
|
||||
|
||||
if file_path.lower().endswith('.wav'):
|
||||
logger.info(f"File {file_path} is already in WAV format.")
|
||||
return file_path
|
||||
|
||||
try:
|
||||
logger.info(f"Converting {file_path} to WAV format.")
|
||||
audio = AudioSegment.from_file(file_path)
|
||||
wav_path: str = f"{os.path.splitext(file_path)[0]}.wav"
|
||||
audio.export(wav_path, format="wav")
|
||||
logger.info(f"File converted to {wav_path}.")
|
||||
return wav_path
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to convert file to WAV: {e}")
|
||||
raise RuntimeError(f"Failed to convert file to WAV: {e}")
|
||||
|
||||
def get_audio_duration(file_path: str) -> float:
|
||||
"""
|
||||
Retrieves the duration of an audio file in seconds.
|
||||
|
||||
Args:
|
||||
file_path (str): The path to the audio file.
|
||||
|
||||
Returns:
|
||||
float: The duration of the audio file in seconds.
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the file does not exist.
|
||||
RuntimeError: If unable to get the file duration.
|
||||
"""
|
||||
# Check if the file exists
|
||||
if not os.path.exists(file_path):
|
||||
logger.error(f"File {file_path} does not exist.")
|
||||
raise FileNotFoundError(f"File {file_path} does not exist.")
|
||||
|
||||
try:
|
||||
logger.info(f"Getting duration of {file_path}.")
|
||||
audio = AudioSegment.from_file(file_path)
|
||||
duration: float = len(audio) / 1000 # Duration in seconds
|
||||
logger.info(f"Duration of {file_path}: {duration} seconds.")
|
||||
return duration
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get file duration: {e}")
|
||||
raise RuntimeError(f"Failed to get file duration: {e}")
|
||||
|
||||
def convert_voice_to_text(file_path: str, language='ru') -> str:
|
||||
"""
|
||||
Converts speech from an audio file to text using OpenAI speech recognition service.
|
||||
|
||||
Args:
|
||||
file_path (str): The path to the audio file to be processed.
|
||||
language (str): The language code for speech recognition (default is 'ru').
|
||||
|
||||
Returns:
|
||||
str: The transcribed text if recognition is successful.
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the file does not exist.
|
||||
RuntimeError: For any errors encountered during processing.
|
||||
"""
|
||||
# Check if the file exists
|
||||
if not os.path.exists(file_path):
|
||||
logger.error(f"File {file_path} does not exist.")
|
||||
raise FileNotFoundError("File does not exist.")
|
||||
|
||||
# Convert the file to WAV format if necessary
|
||||
try:
|
||||
wav_path: str = convert_to_wav(file_path)
|
||||
except RuntimeError as e:
|
||||
logger.error(f"Error converting to WAV: {e}")
|
||||
raise RuntimeError(f"Error converting to WAV: {e}")
|
||||
|
||||
recognizer = sr.Recognizer()
|
||||
|
||||
try:
|
||||
logger.info(f"Processing file {wav_path} ({get_audio_duration(wav_path)} sec) for speech recognition.")
|
||||
with sr.AudioFile(wav_path) as source:
|
||||
audio_data: AudioData = recognizer.record(source)
|
||||
text = recognizer.recognize_whisper(audio_data, language=language, model='medium')
|
||||
logger.info("Speech recognition successful.")
|
||||
return text # type: ignore
|
||||
except sr.UnknownValueError:
|
||||
logger.warning(f"Speech in {wav_path} could not be recognized.")
|
||||
raise RuntimeError("Speech could not be recognized.")
|
||||
except sr.RequestError as e:
|
||||
logger.error(f"Request error from the recognition service: {e}")
|
||||
raise RuntimeError(f"Request error from the recognition service: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"An unexpected error occurred: {e}")
|
||||
raise RuntimeError(f"An unexpected error occurred: {e}")
|
71
src/utils/video_processing.py
Normal file
71
src/utils/video_processing.py
Normal file
@ -0,0 +1,71 @@
|
||||
import os
|
||||
import logging
|
||||
from logging import Logger
|
||||
|
||||
from moviepy import VideoFileClip
|
||||
|
||||
|
||||
# Configure logging
|
||||
logger: Logger = logging.getLogger(__name__)
|
||||
|
||||
def get_video_duration(video_path: str) -> float:
|
||||
"""
|
||||
Get the duration of a video file in seconds.
|
||||
|
||||
Args:
|
||||
video_path (str): The path to the video file.
|
||||
|
||||
Returns:
|
||||
float: The duration of the video in seconds.
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the video file does not exist.
|
||||
RuntimeError: If an error occurs during processing.
|
||||
"""
|
||||
if not os.path.exists(video_path):
|
||||
logger.error(f"Video file {video_path} does not exist.")
|
||||
raise FileNotFoundError(f"Video file {video_path} does not exist.")
|
||||
|
||||
try:
|
||||
video_clip = VideoFileClip(video_path)
|
||||
duration = video_clip.duration
|
||||
logger.info(f"Duration of video {video_path}: {duration} seconds.")
|
||||
return duration
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get video duration: {e}", exc_info=True)
|
||||
raise RuntimeError(f"Failed to get video duration: {e}")
|
||||
finally:
|
||||
video_clip.close()
|
||||
|
||||
|
||||
def extract_audio_from_video(video_path: str, output_dir: str) -> str:
|
||||
"""
|
||||
Extracts the audio track from a video file and saves it as a WAV file.
|
||||
|
||||
Args:
|
||||
video_path (str): The path to the video file.
|
||||
output_dir (str): The directory where the audio file will be saved.
|
||||
|
||||
Returns:
|
||||
str: The path to the extracted audio file.
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the video file does not exist.
|
||||
RuntimeError: If an error occurs during audio extraction.
|
||||
"""
|
||||
if not os.path.exists(video_path):
|
||||
logger.error(f"Video file {video_path} does not exist.")
|
||||
raise FileNotFoundError(f"Video file {video_path} does not exist.")
|
||||
|
||||
try:
|
||||
logger.info(f"Extracting audio from video: {video_path}")
|
||||
video_clip = VideoFileClip(video_path)
|
||||
audio_path = os.path.join(output_dir, f"{os.path.splitext(os.path.basename(video_path))[0]}.wav")
|
||||
video_clip.audio.write_audiofile(audio_path) # type: ignore
|
||||
logger.info(f"Audio extracted and saved to: {audio_path}")
|
||||
return audio_path
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to extract audio from video: {e}", exc_info=True)
|
||||
raise RuntimeError(f"Failed to extract audio from video: {e}")
|
||||
finally:
|
||||
video_clip.close()
|
55
tests/integrations/test_gigachat_api_client.py
Normal file
55
tests/integrations/test_gigachat_api_client.py
Normal file
@ -0,0 +1,55 @@
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
from langchain_core.chat_history import InMemoryChatMessageHistory
|
||||
from langchain_gigachat import GigaChat
|
||||
|
||||
from src.integrations.gigachat_api_client import GigaChatClient
|
||||
from src.core.configuration import config
|
||||
|
||||
|
||||
API_GIGACHAT_TOKEN: str = config.API_GIGACHAT_TOKEN
|
||||
|
||||
@pytest.fixture
|
||||
def gigachat_client() -> GigaChatClient:
|
||||
"""Fixture to create a GigaChatClient instance with a mock API token."""
|
||||
return GigaChatClient(api_token=API_GIGACHAT_TOKEN)
|
||||
|
||||
def test_initialization(gigachat_client) -> None:
|
||||
"""Test if the GigaChatClient initializes correctly."""
|
||||
assert gigachat_client.api_token == API_GIGACHAT_TOKEN
|
||||
assert gigachat_client.model_name == "GigaChat"
|
||||
assert isinstance(gigachat_client.store, dict)
|
||||
assert gigachat_client.llm is not None
|
||||
|
||||
def test_create_llm() -> None:
|
||||
"""Test the _create_llm method for proper LLM creation."""
|
||||
client = GigaChatClient(api_token=API_GIGACHAT_TOKEN)
|
||||
mock_llm: GigaChat = client._create_llm("GigaChat-Pro")
|
||||
assert mock_llm.credentials == API_GIGACHAT_TOKEN
|
||||
assert mock_llm.model == "GigaChat-Pro"
|
||||
|
||||
def test_get_session_history(gigachat_client) -> None:
|
||||
"""Test the get_session_history method for creating/retrieving session history."""
|
||||
session_id = "test_session"
|
||||
history = gigachat_client.get_session_history(session_id)
|
||||
assert isinstance(history, InMemoryChatMessageHistory)
|
||||
assert session_id in gigachat_client.store
|
||||
assert gigachat_client.store[session_id] is history
|
||||
|
||||
def test_set_model(gigachat_client) -> None:
|
||||
"""Test the set_model method for updating the LLM model."""
|
||||
new_model = "GigaChat-Pro"
|
||||
gigachat_client.set_model(new_model)
|
||||
assert gigachat_client.llm.model == new_model
|
||||
|
||||
def test_get_response() -> None:
|
||||
"""Test the get_response method by verifying the response code."""
|
||||
with patch("langchain_core.runnables.history.RunnableWithMessageHistory") as MockRunnable:
|
||||
mock_runnable = MagicMock()
|
||||
mock_runnable.invoke.return_value.code = 200
|
||||
MockRunnable.return_value = mock_runnable
|
||||
|
||||
client = GigaChatClient(api_token=API_GIGACHAT_TOKEN)
|
||||
response_code = mock_runnable.invoke.return_value.code
|
||||
|
||||
assert response_code == 200
|
Loading…
Reference in New Issue
Block a user