Compare commits

...

7 Commits

10 changed files with 2495 additions and 1 deletions

7
.gitignore vendored
View File

@ -129,6 +129,9 @@ ENV/
env.bak/
venv.bak/
# Configuration
config.py
# Spyder project settings
.spyderproject
.spyproject
@ -161,6 +164,7 @@ cython_debug/
#.idea/
# ---> VisualStudioCode
.vscode
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
@ -174,3 +178,6 @@ cython_debug/
# Built Visual Studio Code Extensions
*.vsix
# Session info
*.session
*.session-journal

33
main.py
View File

@ -0,0 +1,33 @@
from src.integrations.gigachat_api_client import GigaChatClient
from src.bot.telegram_userbot import TelegramUserBot
from src.utils import logging
from src.core.configuration import config
def main() -> None:
"""
Entry point for starting the Telegram user bot.
"""
# Configure logging
logging.setup_logging()
# Load API credentials and configuration
api_id: str = config.API_ID
api_hash: str = config.API_HASH
api_token: str = config.API_GIGACHAT_TOKEN
# Initialize GigaChatClient
gigachat_client: GigaChatClient = GigaChatClient(api_token=api_token)
# Initialize and run the Telegram user bot
bot: TelegramUserBot = TelegramUserBot(
session_name="userbot",
api_id=api_id,
api_hash=api_hash,
gigachat_client=gigachat_client
)
bot.run()
if __name__ == "__main__":
main()

1882
poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -6,7 +6,23 @@ authors = ["Factorino73 <masenkin73@xmail.ru>"]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.13"
python = "^3.12"
pyrogram = "^2.0.106"
tgcrypto = "^1.2.5"
setuptools = "^75.6.0"
wheel = "^0.45.1"
langchain-gigachat = "^0.3.2"
punq = "^0.7.0"
pytest = "^8.3.4"
speechrecognition = "^3.13.0"
typing-extensions = "^4.12.2"
pydub = "^0.25.1"
numpy = "2.0.2"
soundfile = "^0.13.0"
torch = "^2.5.1"
llvmlite = "0.43.0"
numba = "0.60.0"
openai-whisper = "^20240930"
[build-system]

190
src/bot/telegram_userbot.py Normal file
View File

@ -0,0 +1,190 @@
import logging
from logging import Logger
from tempfile import NamedTemporaryFile
from typing import Optional
from pyrogram import filters
from pyrogram.client import Client
from pyrogram.types import Message
from pyrogram.enums import ChatAction
from src.integrations.gigachat_api_client import GigaChatClient
from src.utils import speech_recognition
class TelegramUserBot:
"""
A Telegram user bot.
Attributes:
app (Client): The Pyrogram client instance for the bot.
gigachat_client (GigaChatClient): The client instance for GigaChat integration.
"""
def __init__(self, session_name: str, api_id: str, api_hash: str, gigachat_client: GigaChatClient) -> None:
"""
Initializes the Telegram user bot.
Args:
session_name (str): The session name for the bot.
api_id (str): The API ID for the Telegram application.
api_hash (str): The API hash for the Telegram application.
gigachat_client (GigaChatClient): An instance of GigaChatClient for handling AI responses.
"""
# Configure logging
self.logger: Logger = logging.getLogger(__name__)
self.app: Client = Client(session_name, api_id=api_id, api_hash=api_hash)
self.gigachat_client: GigaChatClient = gigachat_client
self.register_handlers()
def register_handlers(self) -> None:
"""
Registers the message handlers for the bot.
"""
self.logger.debug("Registering handlers.")
self.app.on_message(filters.command("ai"))(self.handle_ai_command)
self.app.on_message(filters.command("voice"))(self.handle_voice_command)
async def handle_ai_command(self, client: Client, message: Message) -> None:
"""
Handles messages that invoke the /ai command.
Args:
client (Client): The Pyrogram client instance.
message (Message): The incoming Telegram message.
"""
self.logger.info(f"Received /ai command from chat_id={message.chat.id}")
# Extract the command argument
command_arg: Optional[str] = " ".join(message.text.split()[1:])
if not command_arg and message.reply_to_message and message.reply_to_message.text:
# Use the text of the replied message if no argument is provided
command_arg = message.reply_to_message.text
if not command_arg:
self.logger.warning(f"No argument or replied message provided for /ai command by chat_id={message.chat.id}")
await message.reply("Please provide a message after /ai or reply to a message.", quote=True)
return
# Send an initial message indicating processing
self.logger.debug(f"Processing request for chat_id={message.chat.id}")
processing_message: Message = await message.reply(f"{self.gigachat_client.model_name} is processing your request...", quote=True)
try:
# Start typing animation
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
# Get a response from GigaChat
response: str = self.gigachat_client.get_response(str(message.chat.id), command_arg)
self.logger.debug(f"Received response for chat_id={message.chat.id}")
# Edit the processing message with the generated response
await processing_message.edit_text(response)
except Exception as e:
self.logger.error(f"Error processing /ai command for chat_id={message.chat.id}: {e}", exc_info=True)
await processing_message.edit_text("An error occurred while processing your request.")
finally:
# Stop indicating typing action
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
def get_language(self, input_text: str) -> str:
"""
Determines the language for voice-to-text conversion based on the input parameter.
Args:
input_text (str): The input parameter indicating the language.
Returns:
str: The language code ('en' or 'ru').
Raises:
ValueError: If an invalid language parameter is provided.
"""
language_params: dict[str, list[str]] = {
'en': ['en', 'eng', 'english'],
'ru': ['ru', 'rus', 'russian']
}
input_lower: str = input_text.lower()
for lang_code, aliases in language_params.items():
if input_lower in aliases:
return lang_code
raise ValueError(
"Invalid language parameter. Please use one of the following:\n" +
"\n".join(f"{lang_code}: {', '.join(aliases)}" for lang_code, aliases in language_params.items())
)
async def handle_voice_command(self, client: Client, message: Message) -> None:
"""
Handle the /voice command to convert a voice message to text with optional language selection.
Args:
client (Client): The Pyrogram Client instance.
message (Message): The incoming message containing the /voice command.
"""
self.logger.info(f"Received /voice command from chat_id={message.chat.id}.")
# Parse the language parameter (default to Russian)
command_parts: list[str] = message.text.split()
try:
language: str = self.get_language(command_parts[1]) if len(command_parts) > 1 else 'ru'
except ValueError as e:
await message.reply(str(e), quote=True)
return
# Check if the reply is to a voice message
if not (message.reply_to_message and message.reply_to_message.voice):
self.logger.warning("The /voice command was not used in reply to a voice message.")
await message.reply("Please reply to a voice message with the /voice command.", quote=True)
return
# Send an initial message indicating processing
processing_message: Message = await message.reply_to_message.reply("Converting voice message to text...", quote=True)
with NamedTemporaryFile(delete=False) as temp_file:
file_path = await client.download_media(message.reply_to_message.voice.file_id, file_name=temp_file.name)
self.logger.info(f"Voice message downloaded to {file_path}.")
try:
# Start typing animation
await client.send_chat_action(message.chat.id, ChatAction.TYPING)
# Attempt to convert voice to text with the selected language
text: str = speech_recognition.convert_voice_to_text(file_path, language=language) # type: ignore
self.logger.info("Voice message successfully converted to text.")
# Format the text for sending
formatted_text: str = (
f"<pre language=\"Conversion Result ({language})\">"
f"{text}"
"</pre>"
)
# Edit the initial processing message with the converted text
await processing_message.edit_text(formatted_text)
except FileNotFoundError:
self.logger.error("File not found during processing.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
except RuntimeError:
self.logger.error("A runtime error occurred.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
except Exception:
self.logger.error("An unexpected error occurred.", exc_info=True)
await processing_message.edit_text("An error occurred while processing the voice message. Please try again later.")
finally:
# Stop indicating typing action
await client.send_chat_action(message.chat.id, ChatAction.CANCEL)
def run(self) -> None:
"""
Starts the bot.
"""
self.logger.info("Bot is starting.")
print("Bot is running.")
try:
self.app.run()
except Exception as e:
self.logger.critical(f"Failed to start the bot: {e}", exc_info=True)

View File

@ -0,0 +1,27 @@
"""
config.py
This file contains the configuration settings required for the Telegram bot and GigaChat integration.
Configuration settings:
API_ID (str): The unique identifier for your Telegram application. It is required to initialize the Pyrogram client.
API_HASH (str): The hash associated with your Telegram application. It is required to initialize the Pyrogram client.
BOT_TOKEN (str): The token for the Telegram bot. It is used for authenticating the bot with the Telegram API.
API_GIGACHAT_TOKEN (str): The token for authenticating with the GigaChat API. It is used for communication with the GigaChat service.
Note:
- Make sure to keep these tokens secure and do not share them publicly.
- These values should be replaced with actual credentials for the bot and the GigaChat API.
"""
# The API ID for the Telegram application
API_ID: str = ''
# The API hash for the Telegram application
API_HASH: str = ''
# The bot token for authenticating the bot with Telegram
BOT_TOKEN: str = ''
# The token for authenticating with the GigaChat API
API_GIGACHAT_TOKEN: str = ''

View File

@ -0,0 +1,102 @@
import logging
from logging import Logger
from typing import Dict
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_gigachat.chat_models import GigaChat
class GigaChatClient:
"""
A client class for interacting with the GigaChat API using LangChain components.
"""
def __init__(self, api_token: str, model_name: str = "GigaChat") -> None:
"""
Initializes the GigaChatManager with API credentials and a default model.
Args:
api_token (str): The API token for authenticating with the GigaChat API.
model_name (str): The GigaChat model to use. Defaults to "GigaChat".
"""
# Configure logging
self.logger: Logger = logging.getLogger(__name__)
self.api_token: str = api_token
self.model_name: str = model_name
self.logger.info(f"Initialize GigaChat client Using model: {self.model_name}")
self.llm: GigaChat = self._create_llm(model_name)
self.store: Dict[str, InMemoryChatMessageHistory] = {}
self.conversation = RunnableWithMessageHistory(self.llm, self.get_session_history)
def _create_llm(self, model_name: str) -> GigaChat:
"""
Creates and configures a GigaChat LLM instance.
Args:
model_name (str): The GigaChat model to use.
Returns:
GigaChat: Configured GigaChat instance.
"""
self.logger.debug(f"Creating GigaChat LLM with model: {model_name}")
return GigaChat(
credentials=self.api_token,
scope="GIGACHAT_API_PERS",
model=model_name,
verify_ssl_certs=False,
streaming=False,
)
def get_session_history(self, session_id: str) -> InMemoryChatMessageHistory:
"""
Retrieves the chat history for a given session, creating it if it does not exist.
Args:
session_id (str): The unique identifier for the session.
Returns:
InMemoryChatMessageHistory: The chat history for the session.
"""
if session_id not in self.store:
self.logger.debug(f"Creating new session history for session_id: {session_id}")
self.store[session_id] = InMemoryChatMessageHistory()
else:
self.logger.debug(f"Retrieving existing session history for session_id: {session_id}")
return self.store[session_id]
def set_model(self, model_name: str) -> None:
"""
Updates the LLM to use a different GigaChat model.
Args:
model_name (str): The new GigaChat model to use.
"""
self.logger.info(f"Switching model to: {model_name}")
self.llm = self._create_llm(model_name)
self.conversation = RunnableWithMessageHistory(self.llm, self.get_session_history)
def get_response(self, session_id: str, text: str) -> str:
"""
Get a response to the provided input text for a given session.
Args:
session_id (str): The unique identifier for the session.
text (str): The input text for which a response is needed.
Returns:
str: The response text.
"""
self.logger.info(f"Generating response for session_id: {session_id}")
try:
response = self.conversation.invoke(
input=text,
config={"configurable": {"session_id": session_id}},
)
self.logger.debug(f"Response for session_id {session_id}")
return response.content
except Exception as e:
self.logger.error(f"Error while getting response for session_id: {session_id}. Error: {e}", exc_info=True)
raise

62
src/utils/logging.py Normal file
View File

@ -0,0 +1,62 @@
import logging
import logging.config
from datetime import datetime
def setup_logging(output_to_console=False) -> None:
"""
Configures the logging system.
This function sets up logging with optional output to the console and ensures
log files are rotated daily. It creates a detailed logging format and retains
log files for a week.
Args:
output_to_console (bool): If True, log messages will also be printed to the console.
Defaults to False.
"""
# Define the default handlers to use. Always logs to a file.
handlers: list[str] = ['file']
if output_to_console:
# Add console logging if requested
handlers.append('console')
# Generate the log file name with the current date
log_filename: str = f'logs/log-{datetime.now().strftime("%Y-%m-%d")}.log'
# Configure the logging settings using a dictionary
logging.config.dictConfig({
'version': 1, # Logging configuration version
'disable_existing_loggers': True, # Deny other loggers to remain active
'formatters': {
'detailed': { # Define a detailed logging format
'format': (
'%(asctime)s | %(levelname)-8s | '
'%(filename)s.%(funcName)s, line %(lineno)d: '
'%(message)s'
),
'datefmt': '%Y-%m-%d %H:%M:%S' # Timestamp format
},
},
'handlers': {
# Console handler outputs log messages to the console
'console': {
'class': 'logging.StreamHandler', # Standard output stream
'formatter': 'detailed', # Use the detailed formatter
},
# File handler writes log messages to a file, rotating daily
'file': {
'class': 'logging.handlers.TimedRotatingFileHandler',
'filename': log_filename, # Log file path
'when': 'midnight', # Rotate log files at midnight
'interval': 1, # Rotate daily
'backupCount': 7, # Keep up to 7 old log files
'formatter': 'detailed', # Use the detailed formatter
},
},
# Define the root logger configuration
'root': {
'handlers': handlers, # Handlers to use (console, file, or both)
'level': 'DEBUG', # Log level (DEBUG logs all levels)
},
})

View File

@ -0,0 +1,120 @@
import os
import logging
from logging import Logger
from pydub import AudioSegment
import speech_recognition as sr
from speech_recognition.audio import AudioData
# Configure logging
logger: Logger = logging.getLogger(__name__)
def convert_to_wav(file_path: str) -> str:
"""
Converts an audio file to WAV format if it is not already in WAV format.
Args:
file_path (str): The path to the audio file to be converted.
Returns:
str: The path to the converted or original WAV file.
Raises:
FileNotFoundError: If the file does not exist.
RuntimeError: If the conversion fails for any reason.
"""
# Check if the file exists
if not os.path.exists(file_path):
logger.error(f"File {file_path} does not exist.")
raise FileNotFoundError(f"File {file_path} does not exist.")
if file_path.lower().endswith('.wav'):
logger.info(f"File {file_path} is already in WAV format.")
return file_path
try:
logger.info(f"Converting {file_path} to WAV format.")
audio = AudioSegment.from_file(file_path)
wav_path: str = f"{os.path.splitext(file_path)[0]}.wav"
audio.export(wav_path, format="wav")
logger.info(f"File converted to {wav_path}.")
return wav_path
except Exception as e:
logger.error(f"Failed to convert file to WAV: {e}")
raise RuntimeError(f"Failed to convert file to WAV: {e}")
def get_audio_duration(file_path: str) -> float:
"""
Retrieves the duration of an audio file in seconds.
Args:
file_path (str): The path to the audio file.
Returns:
float: The duration of the audio file in seconds.
Raises:
FileNotFoundError: If the file does not exist.
RuntimeError: If unable to get the file duration.
"""
# Check if the file exists
if not os.path.exists(file_path):
logger.error(f"File {file_path} does not exist.")
raise FileNotFoundError(f"File {file_path} does not exist.")
try:
logger.info(f"Getting duration of {file_path}.")
audio = AudioSegment.from_file(file_path)
duration: float = len(audio) / 1000 # Duration in seconds
logger.info(f"Duration of {file_path}: {duration} seconds.")
return duration
except Exception as e:
logger.error(f"Failed to get file duration: {e}")
raise RuntimeError(f"Failed to get file duration: {e}")
def convert_voice_to_text(file_path: str, language='ru') -> str:
"""
Converts speech from an audio file to text using OpenAI speech recognition service.
Args:
file_path (str): The path to the audio file to be processed.
language (str): The language code for speech recognition (default is 'ru').
Returns:
str: The transcribed text if recognition is successful.
Raises:
FileNotFoundError: If the file does not exist.
RuntimeError: For any errors encountered during processing.
"""
# Check if the file exists
if not os.path.exists(file_path):
logger.error(f"File {file_path} does not exist.")
raise FileNotFoundError("File does not exist.")
# Convert the file to WAV format if necessary
try:
wav_path: str = convert_to_wav(file_path)
except RuntimeError as e:
logger.error(f"Error converting to WAV: {e}")
raise RuntimeError(f"Error converting to WAV: {e}")
recognizer = sr.Recognizer()
try:
logger.info(f"Processing file {wav_path} ({get_audio_duration(wav_path)} sec) for speech recognition.")
with sr.AudioFile(wav_path) as source:
audio_data: AudioData = recognizer.record(source)
text = recognizer.recognize_whisper(audio_data, language=language, model='medium')
logger.info("Speech recognition successful.")
return text # type: ignore
except sr.UnknownValueError:
logger.warning(f"Speech in {wav_path} could not be recognized.")
raise RuntimeError("Speech could not be recognized.")
except sr.RequestError as e:
logger.error(f"Request error from the recognition service: {e}")
raise RuntimeError(f"Request error from the recognition service: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
raise RuntimeError(f"An unexpected error occurred: {e}")

View File

@ -0,0 +1,55 @@
import pytest
from unittest.mock import MagicMock, patch
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_gigachat import GigaChat
from src.integrations.gigachat_api_client import GigaChatClient
from src.core.configuration import config
API_GIGACHAT_TOKEN: str = config.API_GIGACHAT_TOKEN
@pytest.fixture
def gigachat_client() -> GigaChatClient:
"""Fixture to create a GigaChatClient instance with a mock API token."""
return GigaChatClient(api_token=API_GIGACHAT_TOKEN)
def test_initialization(gigachat_client) -> None:
"""Test if the GigaChatClient initializes correctly."""
assert gigachat_client.api_token == API_GIGACHAT_TOKEN
assert gigachat_client.model_name == "GigaChat"
assert isinstance(gigachat_client.store, dict)
assert gigachat_client.llm is not None
def test_create_llm() -> None:
"""Test the _create_llm method for proper LLM creation."""
client = GigaChatClient(api_token=API_GIGACHAT_TOKEN)
mock_llm: GigaChat = client._create_llm("GigaChat-Pro")
assert mock_llm.credentials == API_GIGACHAT_TOKEN
assert mock_llm.model == "GigaChat-Pro"
def test_get_session_history(gigachat_client) -> None:
"""Test the get_session_history method for creating/retrieving session history."""
session_id = "test_session"
history = gigachat_client.get_session_history(session_id)
assert isinstance(history, InMemoryChatMessageHistory)
assert session_id in gigachat_client.store
assert gigachat_client.store[session_id] is history
def test_set_model(gigachat_client) -> None:
"""Test the set_model method for updating the LLM model."""
new_model = "GigaChat-Pro"
gigachat_client.set_model(new_model)
assert gigachat_client.llm.model == new_model
def test_get_response() -> None:
"""Test the get_response method by verifying the response code."""
with patch("langchain_core.runnables.history.RunnableWithMessageHistory") as MockRunnable:
mock_runnable = MagicMock()
mock_runnable.invoke.return_value.code = 200
MockRunnable.return_value = mock_runnable
client = GigaChatClient(api_token=API_GIGACHAT_TOKEN)
response_code = mock_runnable.invoke.return_value.code
assert response_code == 200