import argparse import json import logging import pyodbc import sys from datetime import date, datetime from models.docr.sale import SaleModel from pathlib import Path from utils.timezone import set_current logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) console_handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) logger.addHandler(console_handler) def read_sql(file_name: str) -> str: with open(Path(__file__).resolve().parent / 'sql' / 'docr' / file_name) as sqlf: return sqlf.read() def main(args, conf): for database in conf['databases']: logger.debug(f"attempt to process branch {database['code']} {database['branch']}") if args.branch != 'all' and args.branch != database['code']: logger.debug( f"branch {database['code']} {database['branch']} is not the one specified and not all was specified" ) continue try: threads = [] # Fetch SQL sentences prepare_sales = read_sql( 'new_sales.sql' if args.orders == 'all' else 'new_sales_specific.sql' ) reprocess_sales = read_sql( 'reprocess_sales.sql' if args.orders == 'all' else 'reprocess_sales_specific.sql' ) process_sales = read_sql('queue_table.sql') with pyodbc.connect(conf['dsn'] % database) as conn: with conn.cursor() as cur: # Insert Sales into Table if args.orders == 'all': logger.debug(f"all sales with date {args.start_date}") cur.execute(prepare_sales, args.start_date) else: logger.debug(f"specific sales with date {args.start_date} sales {args.orders}") cur.execute(prepare_sales, args.start_date, args.orders) conn.commit() logger.debug(f"Number of records inserted: {cur.rowcount}") # Reprocess, set status code all or specific in hive_sales table if args.reprocess: if args.orders == 'all': cur.execute(reprocess_sales, args.start_date) else: cur.execute(reprocess_sales, args.start_date, args.orders) conn.commit() # Fetch Sales cur.execute(process_sales) sales = cur.fetchall() logger.debug(f"sales to process {len(sales)}") # This loop is to execute in batches of conf['max_threads'] max_thread = int(conf['max_threads']) for i in range(0, len(sales), max_thread): ss = sales[i:i + max_thread] for s in ss: sale = SaleModel(s, conn=conf['dsn'] % database) sale.endpoint_url = conf['url'] sale.token = conf['tokens']['DOCR'] sale.start() threads.append(sale) # Wait for all threads to finish logger.debug("waiting for threads to finish") for thread in threads: thread.join() except Exception as e: print( f"{datetime.now()} error when processing branch {database['code']}-{database['branch']} => {e}", file=sys.stderr ) if __name__ == '__main__': parser = argparse.ArgumentParser(prog='Sales SYNC Script') parser.add_argument( '--start-date', help='date for searching', default=date.today().strftime('%Y%m%d'), type=lambda s: datetime.strptime(s, '%Y%m%d') ) parser.add_argument( '--branch', help='branch for loading', default='all', ) parser.add_argument( '--orders', help='which orders are trying to fetch', default='all', type=str, ) parser.add_argument( '--reprocess', help='flag which indicates if (all or specific) sale(s) will be reprocessed', default=False, type=bool, ) sys_args = parser.parse_args(sys.argv[1:]) with open(Path(__file__).resolve().parent / 'conf.json') as f: configuration = json.load(f) set_current(configuration['timezone']) logger.debug(f"starting process {datetime.now()}") main(sys_args, configuration) logger.debug(f"finished process {datetime.now()}")