hive_sales_sync_costarica/docr.py
Marvin Vallecillo a2663d15be First commit
2024-07-17 04:19:28 -06:00

126 lines
4.5 KiB
Python

import argparse
import json
import logging
import pyodbc
import sys
from datetime import date, datetime
from models.docr.sale import SaleModel
from pathlib import Path
from utils.timezone import set_current
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
def read_sql(file_name: str) -> str:
with open(Path(__file__).resolve().parent / 'sql' / 'docr' / file_name) as sqlf:
return sqlf.read()
def main(args, conf):
for database in conf['databases']:
logger.debug(f"attempt to process branch {database['code']} {database['branch']}")
if args.branch != 'all' and args.branch != database['code']:
logger.debug(
f"branch {database['code']} {database['branch']} is not the one specified and not all was specified"
)
continue
try:
threads = []
# Fetch SQL sentences
prepare_sales = read_sql(
'new_sales.sql' if args.orders == 'all' else 'new_sales_specific.sql'
)
reprocess_sales = read_sql(
'reprocess_sales.sql' if args.orders == 'all' else 'reprocess_sales_specific.sql'
)
process_sales = read_sql('queue_table.sql')
with pyodbc.connect(conf['dsn'] % database) as conn:
with conn.cursor() as cur:
# Insert Sales into Table
if args.orders == 'all':
logger.debug(f"all sales with date {args.start_date}")
cur.execute(prepare_sales, args.start_date)
else:
logger.debug(f"specific sales with date {args.start_date} sales {args.orders}")
cur.execute(prepare_sales, args.start_date, args.orders)
conn.commit()
logger.debug(f"Number of records inserted: {cur.rowcount}")
# Reprocess, set status code all or specific in hive_sales table
if args.reprocess:
if args.orders == 'all':
cur.execute(reprocess_sales, args.start_date)
else:
cur.execute(reprocess_sales, args.start_date, args.orders)
conn.commit()
# Fetch Sales
cur.execute(process_sales)
sales = cur.fetchall()
logger.debug(f"sales to process {len(sales)}")
# This loop is to execute in batches of conf['max_threads']
max_thread = int(conf['max_threads'])
for i in range(0, len(sales), max_thread):
ss = sales[i:i + max_thread]
for s in ss:
sale = SaleModel(s, conn=conf['dsn'] % database)
sale.endpoint_url = conf['url']
sale.token = conf['tokens']['DOCR']
sale.start()
threads.append(sale)
# Wait for all threads to finish
logger.debug("waiting for threads to finish")
for thread in threads:
thread.join()
except Exception as e:
print(
f"{datetime.now()} error when processing branch {database['code']}-{database['branch']} => {e}",
file=sys.stderr
)
if __name__ == '__main__':
parser = argparse.ArgumentParser(prog='Sales SYNC Script')
parser.add_argument(
'--start-date',
help='date for searching',
default=date.today().strftime('%Y%m%d'),
type=lambda s: datetime.strptime(s, '%Y%m%d')
)
parser.add_argument(
'--branch',
help='branch for loading',
default='all',
)
parser.add_argument(
'--orders',
help='which orders are trying to fetch',
default='all',
type=str,
)
parser.add_argument(
'--reprocess',
help='flag which indicates if (all or specific) sale(s) will be reprocessed',
default=False,
type=bool,
)
sys_args = parser.parse_args(sys.argv[1:])
with open(Path(__file__).resolve().parent / 'conf.json') as f:
configuration = json.load(f)
set_current(configuration['timezone'])
logger.debug(f"starting process {datetime.now()}")
main(sys_args, configuration)
logger.debug(f"finished process {datetime.now()}")