bi_dosv_inventory_c2db/dosv_inventory_c2db.py
Marvin Vallecillo bd0eef8998 primer commit
2024-04-11 02:24:26 -06:00

138 lines
4.8 KiB
Python

import argparse
import json
import os
import sys
from datetime import datetime, timedelta
from myodbc.mysqlodbc import ConnMysql
from pathlib import Path
from utils.aws3 import Aws3
from utils.date_utl import DateUtil
from utils.read_query import Readsql
from utils.wr_csv import ReadCsv
path_main = Path(__file__).resolve().parent
parent_main = Path(path_main).resolve().parent
sys.path.append(path_main)
csv_dir = os.path.join(path_main, 'tmp')
sql_dir = os.path.join(path_main, 'sql')
def select(qry, **db_credentials):
with ConnMysql(**db_credentials, ret='cursor') as cur:
cur.execute(qry)
result = cur.fetchall()
return result
def insert(query, data, **conn):
with ConnMysql(**conn, ret='conn') as conn:
try:
cursor=conn.cursor()
cursor.executemany(query, data)
conn.commit()
except Exception as e:
if conn.is_connected():
conn.rollback()
raise Exception(e)
def main(date_range, branch, conf):
filtered_databases = conf["databases"] if branch == "all" else [db for db in conf["databases"] if db['code'] in branch.split(',')]
for date in date_range:
print(f'Date to process: {date}')
year = date.year
month = date.month
day = date.day
for db_credentials in filtered_databases:
try:
print('')
conexion = {
"host": db_credentials["host"],
"database": db_credentials["database"],
"user": db_credentials["user"],
"password": db_credentials["password"]
}
csv_local_file = f"{conf['process']}_{db_credentials['code']}_{year}_{month}_{day}.csv"
# Find data for date in db...
query = Readsql(sql_dir, conf["query"]["data_validation"])
record_exists = select(
query.getQuery().format(date=date, code=db_credentials["code"]),
**conexion
)
if any(1 in i for i in record_exists):
raise Exception('data already exists!')
# Find file in bucket s3
bucket_dir = f'{conf["aws"]["aws_s3_dir_base"]}/{db_credentials["code"]}/{str(year)}/{str(month)}'
s3 = Aws3(
conf["aws"]["aws_s3_bucket_name"],
conf["aws"]["aws_access_key_id"],
conf["aws"]["aws_secret_access_key"],
conf["aws"]["aws_s3_region"]
)
if s3.check_file_existence(csv_local_file, buket_path=bucket_dir):
s3.download_file(
csv_local_file,
local_file_path=csv_dir,
buket_path=bucket_dir
)
# Read downloaded csv file
csv_info = {
'filedir': csv_dir,
'filename': csv_local_file,
'delimiter': ';'
}
file = ReadCsv(**csv_info)
# Insert into db
query = Readsql(sql_dir, conf["query"]["insert"])
insert(
query.getQuery(),
file.readcsv(drop_header=True),
**conexion
)
print(f'\tHost: {db_credentials["host"]}, Status: Ok , Branc: {db_credentials["branch"]}, Code: {db_credentials["code"]}')
except Exception as e:
print(
f'\tHost: {db_credentials["host"]}, Status: Error, Branc: {db_credentials["branch"]}, Code: {db_credentials["code"]} \n' \
f'\tmsj Error: {e}'
)
continue
finally:
if 'file' in locals():
file.rm_csv()
if __name__ == '__main__':
parser = argparse.ArgumentParser(prog='Inventory DischargeSsync Script')
parser.add_argument(
'--start-date',
help='start date for searching',
default=None,
type=lambda s: datetime.strptime(s, '%Y%m%d')
)
parser.add_argument(
'--end-date',
help='end date for searching',
default=None,
type=lambda s: datetime.strptime(s, '%Y%m%d')
)
parser.add_argument(
'--branch',
help='branch for loading',
default='all',
)
sys_args = parser.parse_args(sys.argv[1:])
conf = json.load(open(parent_main / 'conf.json'))
if not conf:
raise Exception('conf file was not found!')
date_range = DateUtil(sys_args.start_date, sys_args.end_date, return_type='date')
date_range = date_range.date_range()
main(date_range, sys_args.branch, conf)