PIbd-42_SSPR/app/services/minio_service.py

51 lines
1.4 KiB
Python
Raw Normal View History

2024-10-22 00:12:37 +04:00
from minio import Minio
from minio.error import S3Error
import json
with open('credentials.json', 'r', encoding='utf-8') as file:
data = json.load(file)
minio_client = Minio(
"127.0.0.1:9000",
access_key=data['accessKey'],
secret_key=data['secretKey'],
secure=False
)
def upload_to_minio(bucket_name, file_path, object_name):
try:
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
minio_client.fput_object(bucket_name, object_name, file_path)
print(f"'{file_path}' is successfully uploaded as '{object_name}' to bucket '{bucket_name}'.")
except S3Error as e:
print("S3 Error:", e)
except Exception as e:
print("Error:", e)
def check_file_exists_minio(bucket_name, object_name):
try:
minio_client.stat_object(bucket_name, object_name)
return True
except S3Error as e:
print("S3 Error:", e)
return False
except Exception as e:
print("Error:", e)
return False
def get_file_from_minio(bucket_name, object_name, file_path):
try:
minio_client.fget_object(bucket_name, object_name, file_path)
print(f"'{object_name}' is successfully downloaded to '{file_path}'.")
return file_path
except S3Error as e:
print("S3 Error:", e)
raise
except Exception as e:
print("Error:", e)
raise