commit bd0eef8998b962beb79a7b365214882ceaed0692 Author: Marvin Vallecillo Date: Thu Apr 11 02:24:26 2024 -0600 primer commit diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..e2a56dc --- /dev/null +++ b/.drone.yml @@ -0,0 +1,53 @@ +kind: pipeline +type: docker +name: default + +steps: + - name: clean environment + image: appleboy/drone-ssh + settings: + host: + from_secret: host + user: + from_secret: user + key: + from_secret: git_usr_rsa_key + port: + from_secret: port + script: + - cd /home/ubuntu/universe-dwh/bi_dosv_inventory/app || exit + - echo > dummy.txt || exit + - rm -rf * + - name: copy files + image: appleboy/drone-scp + depends_on: + - clean environment + settings: + host: + from_secret: host + user: + from_secret: user + key: + from_secret: git_usr_rsa_key + port: + from_secret: port + command_timeout: 2m + target: /home/ubuntu/universe-dwh/bi_dosv_inventory/app + source: ./ + - name: rm git + image: appleboy/drone-ssh + depends_on: + - copy files + settings: + host: + from_secret: host + user: + from_secret: user + key: + from_secret: git_usr_rsa_key + port: + from_secret: port + script: + - cd /home/ubuntu/universe-dwh/bi_dosv_inventory/app || exit + - rm -rf .git + - hostname diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..592dcee --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +conf.json \ No newline at end of file diff --git a/dosv_inventory_c2db.py b/dosv_inventory_c2db.py new file mode 100644 index 0000000..400ecac --- /dev/null +++ b/dosv_inventory_c2db.py @@ -0,0 +1,137 @@ +import argparse +import json +import os +import sys +from datetime import datetime, timedelta +from myodbc.mysqlodbc import ConnMysql +from pathlib import Path +from utils.aws3 import Aws3 +from utils.date_utl import DateUtil +from utils.read_query import Readsql +from utils.wr_csv import ReadCsv + +path_main = Path(__file__).resolve().parent +parent_main = Path(path_main).resolve().parent +sys.path.append(path_main) +csv_dir = os.path.join(path_main, 'tmp') +sql_dir = os.path.join(path_main, 'sql') + + +def select(qry, **db_credentials): + with ConnMysql(**db_credentials, ret='cursor') as cur: + cur.execute(qry) + result = cur.fetchall() + return result + + +def insert(query, data, **conn): + with ConnMysql(**conn, ret='conn') as conn: + try: + cursor=conn.cursor() + cursor.executemany(query, data) + conn.commit() + except Exception as e: + if conn.is_connected(): + conn.rollback() + raise Exception(e) + + +def main(date_range, branch, conf): + filtered_databases = conf["databases"] if branch == "all" else [db for db in conf["databases"] if db['code'] in branch.split(',')] + for date in date_range: + print(f'Date to process: {date}') + year = date.year + month = date.month + day = date.day + for db_credentials in filtered_databases: + try: + print('') + conexion = { + "host": db_credentials["host"], + "database": db_credentials["database"], + "user": db_credentials["user"], + "password": db_credentials["password"] + } + csv_local_file = f"{conf['process']}_{db_credentials['code']}_{year}_{month}_{day}.csv" + + # Find data for date in db... + query = Readsql(sql_dir, conf["query"]["data_validation"]) + record_exists = select( + query.getQuery().format(date=date, code=db_credentials["code"]), + **conexion + ) + if any(1 in i for i in record_exists): + raise Exception('data already exists!') + + # Find file in bucket s3 + bucket_dir = f'{conf["aws"]["aws_s3_dir_base"]}/{db_credentials["code"]}/{str(year)}/{str(month)}' + s3 = Aws3( + conf["aws"]["aws_s3_bucket_name"], + conf["aws"]["aws_access_key_id"], + conf["aws"]["aws_secret_access_key"], + conf["aws"]["aws_s3_region"] + ) + if s3.check_file_existence(csv_local_file, buket_path=bucket_dir): + s3.download_file( + csv_local_file, + local_file_path=csv_dir, + buket_path=bucket_dir + ) + + # Read downloaded csv file + csv_info = { + 'filedir': csv_dir, + 'filename': csv_local_file, + 'delimiter': ';' + } + file = ReadCsv(**csv_info) + + # Insert into db + query = Readsql(sql_dir, conf["query"]["insert"]) + insert( + query.getQuery(), + file.readcsv(drop_header=True), + **conexion + ) + print(f'\tHost: {db_credentials["host"]}, Status: Ok , Branc: {db_credentials["branch"]}, Code: {db_credentials["code"]}') + + except Exception as e: + print( + f'\tHost: {db_credentials["host"]}, Status: Error, Branc: {db_credentials["branch"]}, Code: {db_credentials["code"]} \n' \ + f'\tmsj Error: {e}' + ) + continue + finally: + if 'file' in locals(): + file.rm_csv() + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(prog='Inventory DischargeSsync Script') + parser.add_argument( + '--start-date', + help='start date for searching', + default=None, + type=lambda s: datetime.strptime(s, '%Y%m%d') + ) + parser.add_argument( + '--end-date', + help='end date for searching', + default=None, + type=lambda s: datetime.strptime(s, '%Y%m%d') + ) + parser.add_argument( + '--branch', + help='branch for loading', + default='all', + ) + sys_args = parser.parse_args(sys.argv[1:]) + conf = json.load(open(parent_main / 'conf.json')) + + if not conf: + raise Exception('conf file was not found!') + + date_range = DateUtil(sys_args.start_date, sys_args.end_date, return_type='date') + date_range = date_range.date_range() + + main(date_range, sys_args.branch, conf) diff --git a/myodbc/__init__py b/myodbc/__init__py new file mode 100644 index 0000000..e69de29 diff --git a/myodbc/__pycache__/mysqlodbc.cpython-311.pyc b/myodbc/__pycache__/mysqlodbc.cpython-311.pyc new file mode 100644 index 0000000..e45f240 Binary files /dev/null and b/myodbc/__pycache__/mysqlodbc.cpython-311.pyc differ diff --git a/myodbc/mysqlodbc.py b/myodbc/mysqlodbc.py new file mode 100644 index 0000000..f179ff9 --- /dev/null +++ b/myodbc/mysqlodbc.py @@ -0,0 +1,16 @@ +import mysql.connector + +class ConnMysql: + def __init__(self, ret="conn", **kwargs): + self.ret = ret + self.conn = mysql.connector.connect(**kwargs, autocommit=False) + if ret == "cursor": + self.cursor = self.conn.cursor() + + def __enter__(self): + return self.cursor if self.ret == "cursor" else self.conn + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.ret == "cursor": + self.cursor.close() + self.conn.__exit__(exc_type, exc_val, exc_tb) diff --git a/sql/dosv_insert_inventory.sql b/sql/dosv_insert_inventory.sql new file mode 100644 index 0000000..7c4faa3 --- /dev/null +++ b/sql/dosv_insert_inventory.sql @@ -0,0 +1,20 @@ +insert into bi.daily_inventory( + created_dt, + tenant_id, + code2, + inventory_code, + inventory_item, + type, + count_unit, + unit_cost, + begining_inventory, + delivered_quantity, + starting_inv, + ending_inventory, + actual_usage_pkg, + actual_usage, + ideal_usage, + actual_vs_ideal, + cost_used +) +values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) \ No newline at end of file diff --git a/sql/valid_data_exists.sql b/sql/valid_data_exists.sql new file mode 100644 index 0000000..64298fb --- /dev/null +++ b/sql/valid_data_exists.sql @@ -0,0 +1,8 @@ +select + 1 value +from + bi.daily_inventory +where + date(created_dt) = '{date}' + and code2 = '{code}' +limit 1 \ No newline at end of file diff --git a/tmp/.gitkeep b/tmp/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/__pycache__/__init__.cpython-311.pyc b/utils/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..e4fdafc Binary files /dev/null and b/utils/__pycache__/__init__.cpython-311.pyc differ diff --git a/utils/__pycache__/aws3.cpython-311.pyc b/utils/__pycache__/aws3.cpython-311.pyc new file mode 100644 index 0000000..5fcc64b Binary files /dev/null and b/utils/__pycache__/aws3.cpython-311.pyc differ diff --git a/utils/__pycache__/date_utl.cpython-311.pyc b/utils/__pycache__/date_utl.cpython-311.pyc new file mode 100644 index 0000000..a6af9fc Binary files /dev/null and b/utils/__pycache__/date_utl.cpython-311.pyc differ diff --git a/utils/__pycache__/read_query.cpython-311.pyc b/utils/__pycache__/read_query.cpython-311.pyc new file mode 100644 index 0000000..38f432e Binary files /dev/null and b/utils/__pycache__/read_query.cpython-311.pyc differ diff --git a/utils/__pycache__/wr_csv.cpython-311.pyc b/utils/__pycache__/wr_csv.cpython-311.pyc new file mode 100644 index 0000000..edf22cd Binary files /dev/null and b/utils/__pycache__/wr_csv.cpython-311.pyc differ diff --git a/utils/aws3.py b/utils/aws3.py new file mode 100644 index 0000000..03759a7 --- /dev/null +++ b/utils/aws3.py @@ -0,0 +1,53 @@ +import boto3 +import os + + +class Aws3: + + def __init__(self, aws_s3_bucket_name, aws_access_key_id, aws_secret_access_key, aws_s3_region): + self.aws_s3_bucket_name = aws_s3_bucket_name + self.aws_access_key_id = aws_access_key_id + self.aws_secret_access_key = aws_secret_access_key + self.aws_s3_region = aws_s3_region + self.client = boto3.client( + 's3', + aws_access_key_id=self.aws_access_key_id, + aws_secret_access_key=self.aws_secret_access_key + ) + + def check_file_existence(self, file, buket_path=None): + file_key = f'{buket_path}/{file}' + self.client.head_object(Bucket=self.aws_s3_bucket_name, Key=file_key) + return True + + def create_directory(self, subdirectory): + subdirectory_list = subdirectory.strip('/').split('/') + path = [] + for folder in subdirectory_list: + path.append(folder) + path_to_create = '/'.join(path) + try: + self.client.head_object(Bucket=self.aws_s3_bucket_name, Key=path_to_create) + except Exception: + self.client.put_object(Bucket=self.aws_s3_bucket_name, Key=path_to_create) + print(f'\tthe directory {path_to_create} was created') + + def load_file(self, file, buket_path=None, local_file_path=None): + try: + # if remote path not exists, upload_file will create it + self.client.upload_file( + Bucket=self.aws_s3_bucket_name, + Filename=os.path.join(local_file_path,file) if local_file_path else file, + Key=f'{buket_path}/{file}' if buket_path else file + ) + print(f"\tfile '{file}' was upload to '{buket_path}'") + except Exception as e: + print(f"\tFail to load file: {str(e)}") + + def download_file(self, file, buket_path=None, local_file_path=None): + self.client.download_file( + Bucket=self.aws_s3_bucket_name, + Filename=os.path.join(local_file_path,file) if local_file_path else file, + Key=f'{buket_path}/{file}' if buket_path else file + ) + print(f"\tfile '{file}' was download to '{local_file_path}'") diff --git a/utils/date_utl.py b/utils/date_utl.py new file mode 100644 index 0000000..4b01ce8 --- /dev/null +++ b/utils/date_utl.py @@ -0,0 +1,45 @@ +from datetime import date, datetime, timedelta + +""" + This class recive two parameters, but only return a list of range of date + for example: + + Entry: + start-date 2024-03-01 00:00:00 + end-date 2024-03-03 00:00:00 + Result: + [datetime.datetime(2024, 3, 1, 0, 0), datetime.datetime(2024, 3, 2, 0, 0), datetime.datetime(2024, 3, 3, 0, 0)] + + if don't entry start-date and end-date default date will be today-1 +""" + +class EntryDateError(Exception): + def __init__(self, msj, code): + super().__init__(msj) + self.code = code + +class DateUtil: + def __init__(self, start_date, end_date, return_type='datetime'): + self.default_Date = datetime.combine((date.today() - timedelta(days=1)), datetime.min.time()) + self.start_date = start_date if start_date else self.default_Date + self.end_date = self.end_date = end_date if end_date else (start_date if start_date else self.default_Date) + self.return_type = return_type + + def date_eval(self): + if self.start_date and self.end_date: + if self.end_date < self.start_date: + raise EntryDateError(f'end-date {self.end_date} can not be less than start-date {self.start_date}', 10001) + + def date_range(self): + lista_de_fechas = [] + current_date = self.start_date + + self.date_eval() + + while current_date <= self.end_date: + lista_de_fechas.append( + current_date.date() if self.return_type == 'date' else current_date + ) + current_date += timedelta(days=1) + + return lista_de_fechas \ No newline at end of file diff --git a/utils/read_query.py b/utils/read_query.py new file mode 100644 index 0000000..bb2b521 --- /dev/null +++ b/utils/read_query.py @@ -0,0 +1,15 @@ +import os + +class Readsql: + def __init__(self, directory, name_query): + self.name_query = name_query + self.query = None + self.directory = directory + + def getQuery(self): + with open(os.path.join(self.directory, self.name_query)) as query_find: + self.query = query_find.read() + return self.query + + def show(self): + print(f'Query: {self.query}') diff --git a/utils/wr_csv.py b/utils/wr_csv.py new file mode 100644 index 0000000..79e1834 --- /dev/null +++ b/utils/wr_csv.py @@ -0,0 +1,25 @@ +import csv +import os + + +class ReadCsv: + def __init__(self, **kwargs): + self.filedir = kwargs.get('filedir') + self.filename = kwargs.get('filename') + self.delimiter = kwargs.get('delimiter',',') + self.lineterminator = kwargs.get('lineterminator','\n') + + def readcsv(self, drop_header=False): + result=[] + with open(os.path.join(self.filedir, self.filename), newline='') as file: + reader = csv.reader(file, delimiter=self.delimiter) + if drop_header: next(reader) + for row in reader: + row = tuple(row) + result.append(row) + return result + + def rm_csv(self): + file_2_delete = os.path.join(self.filedir, self.filename) + if os.path.exists(file_2_delete): + os.remove(file_2_delete)