138 lines
4.8 KiB
Python
138 lines
4.8 KiB
Python
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from myodbc.mysqlodbc import ConnMysql
|
|
from pathlib import Path
|
|
from utils.aws3 import Aws3
|
|
from utils.date_utl import DateUtil
|
|
from utils.read_query import Readsql
|
|
from utils.wr_csv import ReadCsv
|
|
|
|
path_main = Path(__file__).resolve().parent
|
|
parent_main = Path(path_main).resolve().parent
|
|
sys.path.append(path_main)
|
|
csv_dir = os.path.join(path_main, 'tmp')
|
|
sql_dir = os.path.join(path_main, 'sql')
|
|
|
|
|
|
def select(qry, **db_credentials):
|
|
with ConnMysql(**db_credentials, ret='cursor') as cur:
|
|
cur.execute(qry)
|
|
result = cur.fetchall()
|
|
return result
|
|
|
|
|
|
def insert(query, data, **conn):
|
|
with ConnMysql(**conn, ret='conn') as conn:
|
|
try:
|
|
cursor=conn.cursor()
|
|
cursor.executemany(query, data)
|
|
conn.commit()
|
|
except Exception as e:
|
|
if conn.is_connected():
|
|
conn.rollback()
|
|
raise Exception(e)
|
|
|
|
|
|
def main(date_range, branch, conf):
|
|
filtered_databases = conf["databases"] if branch == "all" else [db for db in conf["databases"] if db['code'] in branch.split(',')]
|
|
for date in date_range:
|
|
print(f'Date to process: {date}')
|
|
year = date.year
|
|
month = date.month
|
|
day = date.day
|
|
for db_credentials in filtered_databases:
|
|
try:
|
|
print('')
|
|
conexion = {
|
|
"host": db_credentials["host"],
|
|
"database": db_credentials["database"],
|
|
"user": db_credentials["user"],
|
|
"password": db_credentials["password"]
|
|
}
|
|
csv_local_file = f"{conf['process']}_{db_credentials['code']}_{year}_{month}_{day}.csv"
|
|
|
|
# Find data for date in db...
|
|
query = Readsql(sql_dir, conf["query"]["data_validation"])
|
|
record_exists = select(
|
|
query.getQuery().format(date=date, code=db_credentials["code"]),
|
|
**conexion
|
|
)
|
|
if any(1 in i for i in record_exists):
|
|
raise Exception('data already exists!')
|
|
|
|
# Find file in bucket s3
|
|
bucket_dir = f'{conf["aws"]["aws_s3_dir_base"]}/{db_credentials["code"]}/{str(year)}/{str(month)}'
|
|
s3 = Aws3(
|
|
conf["aws"]["aws_s3_bucket_name"],
|
|
conf["aws"]["aws_access_key_id"],
|
|
conf["aws"]["aws_secret_access_key"],
|
|
conf["aws"]["aws_s3_region"]
|
|
)
|
|
if s3.check_file_existence(csv_local_file, buket_path=bucket_dir):
|
|
s3.download_file(
|
|
csv_local_file,
|
|
local_file_path=csv_dir,
|
|
buket_path=bucket_dir
|
|
)
|
|
|
|
# Read downloaded csv file
|
|
csv_info = {
|
|
'filedir': csv_dir,
|
|
'filename': csv_local_file,
|
|
'delimiter': ';'
|
|
}
|
|
file = ReadCsv(**csv_info)
|
|
|
|
# Insert into db
|
|
query = Readsql(sql_dir, conf["query"]["insert"])
|
|
insert(
|
|
query.getQuery(),
|
|
file.readcsv(drop_header=True),
|
|
**conexion
|
|
)
|
|
print(f'\tHost: {db_credentials["host"]}, Status: Ok , Branc: {db_credentials["branch"]}, Code: {db_credentials["code"]}')
|
|
|
|
except Exception as e:
|
|
print(
|
|
f'\tHost: {db_credentials["host"]}, Status: Error, Branc: {db_credentials["branch"]}, Code: {db_credentials["code"]} \n' \
|
|
f'\tmsj Error: {e}'
|
|
)
|
|
continue
|
|
finally:
|
|
if 'file' in locals():
|
|
file.rm_csv()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(prog='Inventory DischargeSsync Script')
|
|
parser.add_argument(
|
|
'--start-date',
|
|
help='start date for searching',
|
|
default=None,
|
|
type=lambda s: datetime.strptime(s, '%Y%m%d')
|
|
)
|
|
parser.add_argument(
|
|
'--end-date',
|
|
help='end date for searching',
|
|
default=None,
|
|
type=lambda s: datetime.strptime(s, '%Y%m%d')
|
|
)
|
|
parser.add_argument(
|
|
'--branch',
|
|
help='branch for loading',
|
|
default='all',
|
|
)
|
|
sys_args = parser.parse_args(sys.argv[1:])
|
|
conf = json.load(open(parent_main / 'conf.json'))
|
|
|
|
if not conf:
|
|
raise Exception('conf file was not found!')
|
|
|
|
date_range = DateUtil(sys_args.start_date, sys_args.end_date, return_type='date')
|
|
date_range = date_range.date_range()
|
|
|
|
main(date_range, sys_args.branch, conf)
|