bi_dosv_inventory_c2db/utils/aws3.py
Marvin Vallecillo bd0eef8998 primer commit
2024-04-11 02:24:26 -06:00

54 lines
2.2 KiB
Python

import boto3
import os
class Aws3:
def __init__(self, aws_s3_bucket_name, aws_access_key_id, aws_secret_access_key, aws_s3_region):
self.aws_s3_bucket_name = aws_s3_bucket_name
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.aws_s3_region = aws_s3_region
self.client = boto3.client(
's3',
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key
)
def check_file_existence(self, file, buket_path=None):
file_key = f'{buket_path}/{file}'
self.client.head_object(Bucket=self.aws_s3_bucket_name, Key=file_key)
return True
def create_directory(self, subdirectory):
subdirectory_list = subdirectory.strip('/').split('/')
path = []
for folder in subdirectory_list:
path.append(folder)
path_to_create = '/'.join(path)
try:
self.client.head_object(Bucket=self.aws_s3_bucket_name, Key=path_to_create)
except Exception:
self.client.put_object(Bucket=self.aws_s3_bucket_name, Key=path_to_create)
print(f'\tthe directory {path_to_create} was created')
def load_file(self, file, buket_path=None, local_file_path=None):
try:
# if remote path not exists, upload_file will create it
self.client.upload_file(
Bucket=self.aws_s3_bucket_name,
Filename=os.path.join(local_file_path,file) if local_file_path else file,
Key=f'{buket_path}/{file}' if buket_path else file
)
print(f"\tfile '{file}' was upload to '{buket_path}'")
except Exception as e:
print(f"\tFail to load file: {str(e)}")
def download_file(self, file, buket_path=None, local_file_path=None):
self.client.download_file(
Bucket=self.aws_s3_bucket_name,
Filename=os.path.join(local_file_path,file) if local_file_path else file,
Key=f'{buket_path}/{file}' if buket_path else file
)
print(f"\tfile '{file}' was download to '{local_file_path}'")