Compare commits
7 Commits
main
...
feature/ad
Author | SHA1 | Date | |
---|---|---|---|
f5c208ae2d | |||
c7f3e093b4 | |||
b9d6cde8fe | |||
0b9603d9f2 | |||
8bf781b67d | |||
a43873109c | |||
e734b9d827 |
7
.gitignore
vendored
7
.gitignore
vendored
@ -129,6 +129,9 @@ ENV/
|
|||||||
env.bak/
|
env.bak/
|
||||||
venv.bak/
|
venv.bak/
|
||||||
|
|
||||||
|
# Configuration
|
||||||
|
config.py
|
||||||
|
|
||||||
# Spyder project settings
|
# Spyder project settings
|
||||||
.spyderproject
|
.spyderproject
|
||||||
.spyproject
|
.spyproject
|
||||||
@ -161,6 +164,7 @@ cython_debug/
|
|||||||
#.idea/
|
#.idea/
|
||||||
|
|
||||||
# ---> VisualStudioCode
|
# ---> VisualStudioCode
|
||||||
|
.vscode
|
||||||
.vscode/*
|
.vscode/*
|
||||||
!.vscode/settings.json
|
!.vscode/settings.json
|
||||||
!.vscode/tasks.json
|
!.vscode/tasks.json
|
||||||
@ -174,3 +178,6 @@ cython_debug/
|
|||||||
# Built Visual Studio Code Extensions
|
# Built Visual Studio Code Extensions
|
||||||
*.vsix
|
*.vsix
|
||||||
|
|
||||||
|
# Session info
|
||||||
|
*.session
|
||||||
|
*.session-journal
|
33
main.py
33
main.py
@ -0,0 +1,33 @@
|
|||||||
|
from src.integrations.gigachat_api_client import GigaChatClient
|
||||||
|
from src.bot.telegram_userbot import TelegramUserBot
|
||||||
|
from src.utils import logging
|
||||||
|
from src.core.configuration import config
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""
|
||||||
|
Entry point for starting the Telegram user bot.
|
||||||
|
"""
|
||||||
|
# Configure logging
|
||||||
|
logging.setup_logging()
|
||||||
|
|
||||||
|
# Load API credentials and configuration
|
||||||
|
api_id: str = config.API_ID
|
||||||
|
api_hash: str = config.API_HASH
|
||||||
|
api_token: str = config.API_GIGACHAT_TOKEN
|
||||||
|
|
||||||
|
# Initialize GigaChatClient
|
||||||
|
gigachat_client: GigaChatClient = GigaChatClient(api_token=api_token)
|
||||||
|
|
||||||
|
# Initialize and run the Telegram user bot
|
||||||
|
bot: TelegramUserBot = TelegramUserBot(
|
||||||
|
session_name="userbot",
|
||||||
|
api_id=api_id,
|
||||||
|
api_hash=api_hash,
|
||||||
|
gigachat_client=gigachat_client
|
||||||
|
)
|
||||||
|
bot.run()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
1882
poetry.lock
generated
Normal file
1882
poetry.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
@ -6,7 +6,23 @@ authors = ["Factorino73 <masenkin73@xmail.ru>"]
|
|||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
|
|
||||||
[tool.poetry.dependencies]
|
[tool.poetry.dependencies]
|
||||||
python = "^3.13"
|
python = "^3.12"
|
||||||
|
pyrogram = "^2.0.106"
|
||||||
|
tgcrypto = "^1.2.5"
|
||||||
|
setuptools = "^75.6.0"
|
||||||
|
wheel = "^0.45.1"
|
||||||
|
langchain-gigachat = "^0.3.2"
|
||||||
|
punq = "^0.7.0"
|
||||||
|
pytest = "^8.3.4"
|
||||||
|
speechrecognition = "^3.13.0"
|
||||||
|
typing-extensions = "^4.12.2"
|
||||||
|
pydub = "^0.25.1"
|
||||||
|
numpy = "2.0.2"
|
||||||
|
soundfile = "^0.13.0"
|
||||||
|
torch = "^2.5.1"
|
||||||
|
llvmlite = "0.43.0"
|
||||||
|
numba = "0.60.0"
|
||||||
|
openai-whisper = "^20240930"
|
||||||
|
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
|
190
src/bot/telegram_userbot.py
Normal file
190
src/bot/telegram_userbot.py
Normal file
@ -0,0 +1,190 @@
|
|||||||
|
import logging
|
||||||
|
from logging import Logger
|
||||||
|
from tempfile import NamedTemporaryFile
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from pyrogram import filters
|
||||||
|
from pyrogram.client import Client
|
||||||
|
from pyrogram.types import Message
|
||||||
|
from pyrogram.enums import ChatAction
|
||||||
|
|
||||||
|
from src.integrations.gigachat_api_client import GigaChatClient
|
||||||
|
from src.utils import speech_recognition
|
||||||
|
|
||||||
|
|
||||||
|
class TelegramUserBot:
|
||||||
|
"""
|
||||||
|
A Telegram user bot.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
app (Client): The Pyrogram client instance for the bot.
|
||||||
|
gigachat_client (GigaChatClient): The client instance for GigaChat integration.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, session_name: str, api_id: str, api_hash: str, gigachat_client: GigaChatClient) -> None:
|
||||||
|
"""
|
||||||
|
Initializes the Telegram user bot.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session_name (str): The session name for the bot.
|
||||||
|
api_id (str): The API ID for the Telegram application.
|
||||||
|
api_hash (str): The API hash for the Telegram application.
|
||||||
|
gigachat_client (GigaChatClient): An instance of GigaChatClient for handling AI responses.
|
||||||
|
"""
|
||||||
|
# Configure logging
|
||||||
|
self.logger: Logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
self.app: Client = Client(session_name, api_id=api_id, api_hash=api_hash)
|
||||||
|
self.gigachat_client: GigaChatClient = gigachat_client
|
||||||
|
self.register_handlers()
|
||||||
|
|
||||||
|
def register_handlers(self) -> None:
|
||||||
|
"""
|
||||||
|
Registers the message handlers for the bot.
|
||||||
|
"""
|
||||||
|
self.logger.debug("Registering handlers.")
|
||||||
|
self.app.on_message(filters.command("ai"))(self.handle_ai_command)
|
||||||
|
self.app.on_message(filters.command("voice"))(self.handle_voice_command)
|
||||||
|
|
||||||
|
async def handle_ai_command(self, client: Client, message: Message) -> None:
|
||||||
|
"""
|
||||||
|
Handles messages that invoke the /ai command.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
client (Client): The Pyrogram client instance.
|
||||||
|
message (Message): The incoming Telegram message.
|
||||||
|
"""
|
||||||
|
self.logger.info(f"Received /ai command from chat_id={message.chat.id}")
|
||||||
|
|
||||||
|
# Extract the command argument
|
||||||
|
command_arg: Optional[str] = " ".join(message.text.split()[1:])
|
||||||
|
|
||||||
|
if not command_arg and message.reply_to_message and message.reply_to_message.text:
|
||||||
|
# Use the text of the replied message if no argument is provided
|
||||||
|
command_arg = message.reply_to_message.text
|
||||||
|
|
||||||
|
if not command_arg:
|
||||||
|
self.logger.warning(f"No argument or replied message provided for /ai command by chat_id={message.chat.id}")
|
||||||
|
await message.reply("Please provide a message after /ai or reply to a message.", quote=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Send an initial message indicating processing
|
||||||
|
self.logger.debug(f"Processing request for chat_id={message.chat.id}")
|
||||||
|
processing_message: Message = await message.reply(f"{self.gigachat_client.model_name} is processing your request...", quote=True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Start typing animation
|
||||||
|
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
|
||||||
|
|
||||||
|
# Get a response from GigaChat
|
||||||
|
response: str = self.gigachat_client.get_response(str(message.chat.id), command_arg)
|
||||||
|
self.logger.debug(f"Received response for chat_id={message.chat.id}")
|
||||||
|
|
||||||
|
# Edit the processing message with the generated response
|
||||||
|
await processing_message.edit_text(response)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Error processing /ai command for chat_id={message.chat.id}: {e}", exc_info=True)
|
||||||
|
await processing_message.edit_text("An error occurred while processing your request.")
|
||||||
|
finally:
|
||||||
|
# Stop indicating typing action
|
||||||
|
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
|
||||||
|
|
||||||
|
def get_language(self, input_text: str) -> str:
|
||||||
|
"""
|
||||||
|
Determines the language for voice-to-text conversion based on the input parameter.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
input_text (str): The input parameter indicating the language.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The language code ('en' or 'ru').
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If an invalid language parameter is provided.
|
||||||
|
"""
|
||||||
|
language_params: dict[str, list[str]] = {
|
||||||
|
'en': ['en', 'eng', 'english'],
|
||||||
|
'ru': ['ru', 'rus', 'russian']
|
||||||
|
}
|
||||||
|
|
||||||
|
input_lower: str = input_text.lower()
|
||||||
|
for lang_code, aliases in language_params.items():
|
||||||
|
if input_lower in aliases:
|
||||||
|
return lang_code
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid language parameter. Please use one of the following:\n" +
|
||||||
|
"\n".join(f"{lang_code}: {', '.join(aliases)}" for lang_code, aliases in language_params.items())
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_voice_command(self, client: Client, message: Message) -> None:
|
||||||
|
"""
|
||||||
|
Handle the /voice command to convert a voice message to text with optional language selection.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
client (Client): The Pyrogram Client instance.
|
||||||
|
message (Message): The incoming message containing the /voice command.
|
||||||
|
"""
|
||||||
|
self.logger.info(f"Received /voice command from chat_id={message.chat.id}.")
|
||||||
|
|
||||||
|
# Parse the language parameter (default to Russian)
|
||||||
|
command_parts: list[str] = message.text.split()
|
||||||
|
try:
|
||||||
|
language: str = self.get_language(command_parts[1]) if len(command_parts) > 1 else 'ru'
|
||||||
|
except ValueError as e:
|
||||||
|
await message.reply(str(e), quote=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Check if the reply is to a voice message
|
||||||
|
if not (message.reply_to_message and message.reply_to_message.voice):
|
||||||
|
self.logger.warning("The /voice command was not used in reply to a voice message.")
|
||||||
|
await message.reply("Please reply to a voice message with the /voice command.", quote=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Send an initial message indicating processing
|
||||||
|
processing_message: Message = await message.reply_to_message.reply("Converting voice message to text...", quote=True)
|
||||||
|
|
||||||
|
with NamedTemporaryFile(delete=False) as temp_file:
|
||||||
|
file_path = await client.download_media(message.reply_to_message.voice.file_id, file_name=temp_file.name)
|
||||||
|
self.logger.info(f"Voice message downloaded to {file_path}.")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Start typing animation
|
||||||
|
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
|
||||||
|
|
||||||
|
# Attempt to convert voice to text with the selected language
|
||||||
|
text: str = speech_recognition.convert_voice_to_text(file_path, language=language) # type: ignore
|
||||||
|
self.logger.info("Voice message successfully converted to text.")
|
||||||
|
|
||||||
|
# Format the text for sending
|
||||||
|
formatted_text: str = (
|
||||||
|
f"<pre language=\"Conversion Result ({language})\">"
|
||||||
|
f"{text}"
|
||||||
|
"</pre>"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Edit the initial processing message with the converted text
|
||||||
|
await processing_message.edit_text(formatted_text)
|
||||||
|
except FileNotFoundError:
|
||||||
|
self.logger.error("File not found during processing.", exc_info=True)
|
||||||
|
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
||||||
|
except RuntimeError:
|
||||||
|
self.logger.error("A runtime error occurred.", exc_info=True)
|
||||||
|
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
||||||
|
except Exception:
|
||||||
|
self.logger.error("An unexpected error occurred.", exc_info=True)
|
||||||
|
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
|
||||||
|
finally:
|
||||||
|
# Stop indicating typing action
|
||||||
|
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
|
||||||
|
|
||||||
|
def run(self) -> None:
|
||||||
|
"""
|
||||||
|
Starts the bot.
|
||||||
|
"""
|
||||||
|
self.logger.info("Bot is starting.")
|
||||||
|
print("Bot is running.")
|
||||||
|
try:
|
||||||
|
self.app.run()
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.critical(f"Failed to start the bot: {e}", exc_info=True)
|
27
src/core/configuration/config.py.example
Normal file
27
src/core/configuration/config.py.example
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
"""
|
||||||
|
config.py
|
||||||
|
|
||||||
|
This file contains the configuration settings required for the Telegram bot and GigaChat integration.
|
||||||
|
|
||||||
|
Configuration settings:
|
||||||
|
API_ID (str): The unique identifier for your Telegram application. It is required to initialize the Pyrogram client.
|
||||||
|
API_HASH (str): The hash associated with your Telegram application. It is required to initialize the Pyrogram client.
|
||||||
|
BOT_TOKEN (str): The token for the Telegram bot. It is used for authenticating the bot with the Telegram API.
|
||||||
|
API_GIGACHAT_TOKEN (str): The token for authenticating with the GigaChat API. It is used for communication with the GigaChat service.
|
||||||
|
|
||||||
|
Note:
|
||||||
|
- Make sure to keep these tokens secure and do not share them publicly.
|
||||||
|
- These values should be replaced with actual credentials for the bot and the GigaChat API.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# The API ID for the Telegram application
|
||||||
|
API_ID: str = ''
|
||||||
|
|
||||||
|
# The API hash for the Telegram application
|
||||||
|
API_HASH: str = ''
|
||||||
|
|
||||||
|
# The bot token for authenticating the bot with Telegram
|
||||||
|
BOT_TOKEN: str = ''
|
||||||
|
|
||||||
|
# The token for authenticating with the GigaChat API
|
||||||
|
API_GIGACHAT_TOKEN: str = ''
|
102
src/integrations/gigachat_api_client.py
Normal file
102
src/integrations/gigachat_api_client.py
Normal file
@ -0,0 +1,102 @@
|
|||||||
|
import logging
|
||||||
|
from logging import Logger
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
from langchain_core.runnables.history import RunnableWithMessageHistory
|
||||||
|
from langchain_core.chat_history import InMemoryChatMessageHistory
|
||||||
|
from langchain_gigachat.chat_models import GigaChat
|
||||||
|
|
||||||
|
|
||||||
|
class GigaChatClient:
|
||||||
|
"""
|
||||||
|
A client class for interacting with the GigaChat API using LangChain components.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, api_token: str, model_name: str = "GigaChat") -> None:
|
||||||
|
"""
|
||||||
|
Initializes the GigaChatManager with API credentials and a default model.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
api_token (str): The API token for authenticating with the GigaChat API.
|
||||||
|
model_name (str): The GigaChat model to use. Defaults to "GigaChat".
|
||||||
|
"""
|
||||||
|
# Configure logging
|
||||||
|
self.logger: Logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
self.api_token: str = api_token
|
||||||
|
self.model_name: str = model_name
|
||||||
|
self.logger.info(f"Initialize GigaChat client Using model: {self.model_name}")
|
||||||
|
|
||||||
|
self.llm: GigaChat = self._create_llm(model_name)
|
||||||
|
self.store: Dict[str, InMemoryChatMessageHistory] = {}
|
||||||
|
self.conversation = RunnableWithMessageHistory(self.llm, self.get_session_history)
|
||||||
|
|
||||||
|
def _create_llm(self, model_name: str) -> GigaChat:
|
||||||
|
"""
|
||||||
|
Creates and configures a GigaChat LLM instance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
model_name (str): The GigaChat model to use.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
GigaChat: Configured GigaChat instance.
|
||||||
|
"""
|
||||||
|
self.logger.debug(f"Creating GigaChat LLM with model: {model_name}")
|
||||||
|
return GigaChat(
|
||||||
|
credentials=self.api_token,
|
||||||
|
scope="GIGACHAT_API_PERS",
|
||||||
|
model=model_name,
|
||||||
|
verify_ssl_certs=False,
|
||||||
|
streaming=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_session_history(self, session_id: str) -> InMemoryChatMessageHistory:
|
||||||
|
"""
|
||||||
|
Retrieves the chat history for a given session, creating it if it does not exist.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session_id (str): The unique identifier for the session.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
InMemoryChatMessageHistory: The chat history for the session.
|
||||||
|
"""
|
||||||
|
if session_id not in self.store:
|
||||||
|
self.logger.debug(f"Creating new session history for session_id: {session_id}")
|
||||||
|
self.store[session_id] = InMemoryChatMessageHistory()
|
||||||
|
else:
|
||||||
|
self.logger.debug(f"Retrieving existing session history for session_id: {session_id}")
|
||||||
|
return self.store[session_id]
|
||||||
|
|
||||||
|
def set_model(self, model_name: str) -> None:
|
||||||
|
"""
|
||||||
|
Updates the LLM to use a different GigaChat model.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
model_name (str): The new GigaChat model to use.
|
||||||
|
"""
|
||||||
|
self.logger.info(f"Switching model to: {model_name}")
|
||||||
|
self.llm = self._create_llm(model_name)
|
||||||
|
self.conversation = RunnableWithMessageHistory(self.llm, self.get_session_history)
|
||||||
|
|
||||||
|
def get_response(self, session_id: str, text: str) -> str:
|
||||||
|
"""
|
||||||
|
Get a response to the provided input text for a given session.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session_id (str): The unique identifier for the session.
|
||||||
|
text (str): The input text for which a response is needed.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The response text.
|
||||||
|
"""
|
||||||
|
self.logger.info(f"Generating response for session_id: {session_id}")
|
||||||
|
try:
|
||||||
|
response = self.conversation.invoke(
|
||||||
|
input=text,
|
||||||
|
config={"configurable": {"session_id": session_id}},
|
||||||
|
)
|
||||||
|
self.logger.debug(f"Response for session_id {session_id}")
|
||||||
|
return response.content
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Error while getting response for session_id: {session_id}. Error: {e}", exc_info=True)
|
||||||
|
raise
|
62
src/utils/logging.py
Normal file
62
src/utils/logging.py
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
import logging
|
||||||
|
import logging.config
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
|
def setup_logging(output_to_console=False) -> None:
|
||||||
|
"""
|
||||||
|
Configures the logging system.
|
||||||
|
|
||||||
|
This function sets up logging with optional output to the console and ensures
|
||||||
|
log files are rotated daily. It creates a detailed logging format and retains
|
||||||
|
log files for a week.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
output_to_console (bool): If True, log messages will also be printed to the console.
|
||||||
|
Defaults to False.
|
||||||
|
"""
|
||||||
|
# Define the default handlers to use. Always logs to a file.
|
||||||
|
handlers: list[str] = ['file']
|
||||||
|
if output_to_console:
|
||||||
|
# Add console logging if requested
|
||||||
|
handlers.append('console')
|
||||||
|
|
||||||
|
# Generate the log file name with the current date
|
||||||
|
log_filename: str = f'logs/log-{datetime.now().strftime("%Y-%m-%d")}.log'
|
||||||
|
|
||||||
|
# Configure the logging settings using a dictionary
|
||||||
|
logging.config.dictConfig({
|
||||||
|
'version': 1, # Logging configuration version
|
||||||
|
'disable_existing_loggers': True, # Deny other loggers to remain active
|
||||||
|
'formatters': {
|
||||||
|
'detailed': { # Define a detailed logging format
|
||||||
|
'format': (
|
||||||
|
'%(asctime)s | %(levelname)-8s | '
|
||||||
|
'%(filename)s.%(funcName)s, line %(lineno)d: '
|
||||||
|
'%(message)s'
|
||||||
|
),
|
||||||
|
'datefmt': '%Y-%m-%d %H:%M:%S' # Timestamp format
|
||||||
|
},
|
||||||
|
},
|
||||||
|
'handlers': {
|
||||||
|
# Console handler outputs log messages to the console
|
||||||
|
'console': {
|
||||||
|
'class': 'logging.StreamHandler', # Standard output stream
|
||||||
|
'formatter': 'detailed', # Use the detailed formatter
|
||||||
|
},
|
||||||
|
# File handler writes log messages to a file, rotating daily
|
||||||
|
'file': {
|
||||||
|
'class': 'logging.handlers.TimedRotatingFileHandler',
|
||||||
|
'filename': log_filename, # Log file path
|
||||||
|
'when': 'midnight', # Rotate log files at midnight
|
||||||
|
'interval': 1, # Rotate daily
|
||||||
|
'backupCount': 7, # Keep up to 7 old log files
|
||||||
|
'formatter': 'detailed', # Use the detailed formatter
|
||||||
|
},
|
||||||
|
},
|
||||||
|
# Define the root logger configuration
|
||||||
|
'root': {
|
||||||
|
'handlers': handlers, # Handlers to use (console, file, or both)
|
||||||
|
'level': 'DEBUG', # Log level (DEBUG logs all levels)
|
||||||
|
},
|
||||||
|
})
|
120
src/utils/speech_recognition.py
Normal file
120
src/utils/speech_recognition.py
Normal file
@ -0,0 +1,120 @@
|
|||||||
|
import os
|
||||||
|
import logging
|
||||||
|
from logging import Logger
|
||||||
|
|
||||||
|
from pydub import AudioSegment
|
||||||
|
import speech_recognition as sr
|
||||||
|
from speech_recognition.audio import AudioData
|
||||||
|
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logger: Logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def convert_to_wav(file_path: str) -> str:
|
||||||
|
"""
|
||||||
|
Converts an audio file to WAV format if it is not already in WAV format.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_path (str): The path to the audio file to be converted.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The path to the converted or original WAV file.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
FileNotFoundError: If the file does not exist.
|
||||||
|
RuntimeError: If the conversion fails for any reason.
|
||||||
|
"""
|
||||||
|
# Check if the file exists
|
||||||
|
if not os.path.exists(file_path):
|
||||||
|
logger.error(f"File {file_path} does not exist.")
|
||||||
|
raise FileNotFoundError(f"File {file_path} does not exist.")
|
||||||
|
|
||||||
|
if file_path.lower().endswith('.wav'):
|
||||||
|
logger.info(f"File {file_path} is already in WAV format.")
|
||||||
|
return file_path
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info(f"Converting {file_path} to WAV format.")
|
||||||
|
audio = AudioSegment.from_file(file_path)
|
||||||
|
wav_path: str = f"{os.path.splitext(file_path)[0]}.wav"
|
||||||
|
audio.export(wav_path, format="wav")
|
||||||
|
logger.info(f"File converted to {wav_path}.")
|
||||||
|
return wav_path
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to convert file to WAV: {e}")
|
||||||
|
raise RuntimeError(f"Failed to convert file to WAV: {e}")
|
||||||
|
|
||||||
|
def get_audio_duration(file_path: str) -> float:
|
||||||
|
"""
|
||||||
|
Retrieves the duration of an audio file in seconds.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_path (str): The path to the audio file.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
float: The duration of the audio file in seconds.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
FileNotFoundError: If the file does not exist.
|
||||||
|
RuntimeError: If unable to get the file duration.
|
||||||
|
"""
|
||||||
|
# Check if the file exists
|
||||||
|
if not os.path.exists(file_path):
|
||||||
|
logger.error(f"File {file_path} does not exist.")
|
||||||
|
raise FileNotFoundError(f"File {file_path} does not exist.")
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info(f"Getting duration of {file_path}.")
|
||||||
|
audio = AudioSegment.from_file(file_path)
|
||||||
|
duration: float = len(audio) / 1000 # Duration in seconds
|
||||||
|
logger.info(f"Duration of {file_path}: {duration} seconds.")
|
||||||
|
return duration
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to get file duration: {e}")
|
||||||
|
raise RuntimeError(f"Failed to get file duration: {e}")
|
||||||
|
|
||||||
|
def convert_voice_to_text(file_path: str, language='ru') -> str:
|
||||||
|
"""
|
||||||
|
Converts speech from an audio file to text using OpenAI speech recognition service.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_path (str): The path to the audio file to be processed.
|
||||||
|
language (str): The language code for speech recognition (default is 'ru').
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The transcribed text if recognition is successful.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
FileNotFoundError: If the file does not exist.
|
||||||
|
RuntimeError: For any errors encountered during processing.
|
||||||
|
"""
|
||||||
|
# Check if the file exists
|
||||||
|
if not os.path.exists(file_path):
|
||||||
|
logger.error(f"File {file_path} does not exist.")
|
||||||
|
raise FileNotFoundError("File does not exist.")
|
||||||
|
|
||||||
|
# Convert the file to WAV format if necessary
|
||||||
|
try:
|
||||||
|
wav_path: str = convert_to_wav(file_path)
|
||||||
|
except RuntimeError as e:
|
||||||
|
logger.error(f"Error converting to WAV: {e}")
|
||||||
|
raise RuntimeError(f"Error converting to WAV: {e}")
|
||||||
|
|
||||||
|
recognizer = sr.Recognizer()
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info(f"Processing file {wav_path} ({get_audio_duration(wav_path)} sec) for speech recognition.")
|
||||||
|
with sr.AudioFile(wav_path) as source:
|
||||||
|
audio_data: AudioData = recognizer.record(source)
|
||||||
|
text = recognizer.recognize_whisper(audio_data, language=language, model='medium')
|
||||||
|
logger.info("Speech recognition successful.")
|
||||||
|
return text # type: ignore
|
||||||
|
except sr.UnknownValueError:
|
||||||
|
logger.warning(f"Speech in {wav_path} could not be recognized.")
|
||||||
|
raise RuntimeError("Speech could not be recognized.")
|
||||||
|
except sr.RequestError as e:
|
||||||
|
logger.error(f"Request error from the recognition service: {e}")
|
||||||
|
raise RuntimeError(f"Request error from the recognition service: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"An unexpected error occurred: {e}")
|
||||||
|
raise RuntimeError(f"An unexpected error occurred: {e}")
|
55
tests/integrations/test_gigachat_api_client.py
Normal file
55
tests/integrations/test_gigachat_api_client.py
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
import pytest
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
from langchain_core.chat_history import InMemoryChatMessageHistory
|
||||||
|
from langchain_gigachat import GigaChat
|
||||||
|
|
||||||
|
from src.integrations.gigachat_api_client import GigaChatClient
|
||||||
|
from src.core.configuration import config
|
||||||
|
|
||||||
|
|
||||||
|
API_GIGACHAT_TOKEN: str = config.API_GIGACHAT_TOKEN
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def gigachat_client() -> GigaChatClient:
|
||||||
|
"""Fixture to create a GigaChatClient instance with a mock API token."""
|
||||||
|
return GigaChatClient(api_token=API_GIGACHAT_TOKEN)
|
||||||
|
|
||||||
|
def test_initialization(gigachat_client) -> None:
|
||||||
|
"""Test if the GigaChatClient initializes correctly."""
|
||||||
|
assert gigachat_client.api_token == API_GIGACHAT_TOKEN
|
||||||
|
assert gigachat_client.model_name == "GigaChat"
|
||||||
|
assert isinstance(gigachat_client.store, dict)
|
||||||
|
assert gigachat_client.llm is not None
|
||||||
|
|
||||||
|
def test_create_llm() -> None:
|
||||||
|
"""Test the _create_llm method for proper LLM creation."""
|
||||||
|
client = GigaChatClient(api_token=API_GIGACHAT_TOKEN)
|
||||||
|
mock_llm: GigaChat = client._create_llm("GigaChat-Pro")
|
||||||
|
assert mock_llm.credentials == API_GIGACHAT_TOKEN
|
||||||
|
assert mock_llm.model == "GigaChat-Pro"
|
||||||
|
|
||||||
|
def test_get_session_history(gigachat_client) -> None:
|
||||||
|
"""Test the get_session_history method for creating/retrieving session history."""
|
||||||
|
session_id = "test_session"
|
||||||
|
history = gigachat_client.get_session_history(session_id)
|
||||||
|
assert isinstance(history, InMemoryChatMessageHistory)
|
||||||
|
assert session_id in gigachat_client.store
|
||||||
|
assert gigachat_client.store[session_id] is history
|
||||||
|
|
||||||
|
def test_set_model(gigachat_client) -> None:
|
||||||
|
"""Test the set_model method for updating the LLM model."""
|
||||||
|
new_model = "GigaChat-Pro"
|
||||||
|
gigachat_client.set_model(new_model)
|
||||||
|
assert gigachat_client.llm.model == new_model
|
||||||
|
|
||||||
|
def test_get_response() -> None:
|
||||||
|
"""Test the get_response method by verifying the response code."""
|
||||||
|
with patch("langchain_core.runnables.history.RunnableWithMessageHistory") as MockRunnable:
|
||||||
|
mock_runnable = MagicMock()
|
||||||
|
mock_runnable.invoke.return_value.code = 200
|
||||||
|
MockRunnable.return_value = mock_runnable
|
||||||
|
|
||||||
|
client = GigaChatClient(api_token=API_GIGACHAT_TOKEN)
|
||||||
|
response_code = mock_runnable.invoke.return_value.code
|
||||||
|
|
||||||
|
assert response_code == 200
|
Loading…
Reference in New Issue
Block a user