From bd0eef8998b962beb79a7b365214882ceaed0692 Mon Sep 17 00:00:00 2001 From: Marvin Vallecillo Date: Thu, 11 Apr 2024 02:24:26 -0600 Subject: [PATCH] primer commit --- .drone.yml | 53 +++++++ .gitignore | 1 + dosv_inventory_c2db.py | 137 +++++++++++++++++++ myodbc/__init__py | 0 myodbc/__pycache__/mysqlodbc.cpython-311.pyc | Bin 0 -> 1581 bytes myodbc/mysqlodbc.py | 16 +++ sql/dosv_insert_inventory.sql | 20 +++ sql/valid_data_exists.sql | 8 ++ tmp/.gitkeep | 0 utils/__init__.py | 0 utils/__pycache__/__init__.cpython-311.pyc | Bin 0 -> 237 bytes utils/__pycache__/aws3.cpython-311.pyc | Bin 0 -> 3719 bytes utils/__pycache__/date_utl.cpython-311.pyc | Bin 0 -> 2839 bytes utils/__pycache__/read_query.cpython-311.pyc | Bin 0 -> 1497 bytes utils/__pycache__/wr_csv.cpython-311.pyc | Bin 0 -> 2322 bytes utils/aws3.py | 53 +++++++ utils/date_utl.py | 45 ++++++ utils/read_query.py | 15 ++ utils/wr_csv.py | 25 ++++ 19 files changed, 373 insertions(+) create mode 100644 .drone.yml create mode 100644 .gitignore create mode 100644 dosv_inventory_c2db.py create mode 100644 myodbc/__init__py create mode 100644 myodbc/__pycache__/mysqlodbc.cpython-311.pyc create mode 100644 myodbc/mysqlodbc.py create mode 100644 sql/dosv_insert_inventory.sql create mode 100644 sql/valid_data_exists.sql create mode 100644 tmp/.gitkeep create mode 100644 utils/__init__.py create mode 100644 utils/__pycache__/__init__.cpython-311.pyc create mode 100644 utils/__pycache__/aws3.cpython-311.pyc create mode 100644 utils/__pycache__/date_utl.cpython-311.pyc create mode 100644 utils/__pycache__/read_query.cpython-311.pyc create mode 100644 utils/__pycache__/wr_csv.cpython-311.pyc create mode 100644 utils/aws3.py create mode 100644 utils/date_utl.py create mode 100644 utils/read_query.py create mode 100644 utils/wr_csv.py diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..e2a56dc --- /dev/null +++ b/.drone.yml @@ -0,0 +1,53 @@ +kind: pipeline +type: docker +name: default + +steps: + - name: clean environment + image: appleboy/drone-ssh + settings: + host: + from_secret: host + user: + from_secret: user + key: + from_secret: git_usr_rsa_key + port: + from_secret: port + script: + - cd /home/ubuntu/universe-dwh/bi_dosv_inventory/app || exit + - echo > dummy.txt || exit + - rm -rf * + - name: copy files + image: appleboy/drone-scp + depends_on: + - clean environment + settings: + host: + from_secret: host + user: + from_secret: user + key: + from_secret: git_usr_rsa_key + port: + from_secret: port + command_timeout: 2m + target: /home/ubuntu/universe-dwh/bi_dosv_inventory/app + source: ./ + - name: rm git + image: appleboy/drone-ssh + depends_on: + - copy files + settings: + host: + from_secret: host + user: + from_secret: user + key: + from_secret: git_usr_rsa_key + port: + from_secret: port + script: + - cd /home/ubuntu/universe-dwh/bi_dosv_inventory/app || exit + - rm -rf .git + - hostname diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..592dcee --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +conf.json \ No newline at end of file diff --git a/dosv_inventory_c2db.py b/dosv_inventory_c2db.py new file mode 100644 index 0000000..400ecac --- /dev/null +++ b/dosv_inventory_c2db.py @@ -0,0 +1,137 @@ +import argparse +import json +import os +import sys +from datetime import datetime, timedelta +from myodbc.mysqlodbc import ConnMysql +from pathlib import Path +from utils.aws3 import Aws3 +from utils.date_utl import DateUtil +from utils.read_query import Readsql +from utils.wr_csv import ReadCsv + +path_main = Path(__file__).resolve().parent +parent_main = Path(path_main).resolve().parent +sys.path.append(path_main) +csv_dir = os.path.join(path_main, 'tmp') +sql_dir = os.path.join(path_main, 'sql') + + +def select(qry, **db_credentials): + with ConnMysql(**db_credentials, ret='cursor') as cur: + cur.execute(qry) + result = cur.fetchall() + return result + + +def insert(query, data, **conn): + with ConnMysql(**conn, ret='conn') as conn: + try: + cursor=conn.cursor() + cursor.executemany(query, data) + conn.commit() + except Exception as e: + if conn.is_connected(): + conn.rollback() + raise Exception(e) + + +def main(date_range, branch, conf): + filtered_databases = conf["databases"] if branch == "all" else [db for db in conf["databases"] if db['code'] in branch.split(',')] + for date in date_range: + print(f'Date to process: {date}') + year = date.year + month = date.month + day = date.day + for db_credentials in filtered_databases: + try: + print('') + conexion = { + "host": db_credentials["host"], + "database": db_credentials["database"], + "user": db_credentials["user"], + "password": db_credentials["password"] + } + csv_local_file = f"{conf['process']}_{db_credentials['code']}_{year}_{month}_{day}.csv" + + # Find data for date in db... + query = Readsql(sql_dir, conf["query"]["data_validation"]) + record_exists = select( + query.getQuery().format(date=date, code=db_credentials["code"]), + **conexion + ) + if any(1 in i for i in record_exists): + raise Exception('data already exists!') + + # Find file in bucket s3 + bucket_dir = f'{conf["aws"]["aws_s3_dir_base"]}/{db_credentials["code"]}/{str(year)}/{str(month)}' + s3 = Aws3( + conf["aws"]["aws_s3_bucket_name"], + conf["aws"]["aws_access_key_id"], + conf["aws"]["aws_secret_access_key"], + conf["aws"]["aws_s3_region"] + ) + if s3.check_file_existence(csv_local_file, buket_path=bucket_dir): + s3.download_file( + csv_local_file, + local_file_path=csv_dir, + buket_path=bucket_dir + ) + + # Read downloaded csv file + csv_info = { + 'filedir': csv_dir, + 'filename': csv_local_file, + 'delimiter': ';' + } + file = ReadCsv(**csv_info) + + # Insert into db + query = Readsql(sql_dir, conf["query"]["insert"]) + insert( + query.getQuery(), + file.readcsv(drop_header=True), + **conexion + ) + print(f'\tHost: {db_credentials["host"]}, Status: Ok , Branc: {db_credentials["branch"]}, Code: {db_credentials["code"]}') + + except Exception as e: + print( + f'\tHost: {db_credentials["host"]}, Status: Error, Branc: {db_credentials["branch"]}, Code: {db_credentials["code"]} \n' \ + f'\tmsj Error: {e}' + ) + continue + finally: + if 'file' in locals(): + file.rm_csv() + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(prog='Inventory DischargeSsync Script') + parser.add_argument( + '--start-date', + help='start date for searching', + default=None, + type=lambda s: datetime.strptime(s, '%Y%m%d') + ) + parser.add_argument( + '--end-date', + help='end date for searching', + default=None, + type=lambda s: datetime.strptime(s, '%Y%m%d') + ) + parser.add_argument( + '--branch', + help='branch for loading', + default='all', + ) + sys_args = parser.parse_args(sys.argv[1:]) + conf = json.load(open(parent_main / 'conf.json')) + + if not conf: + raise Exception('conf file was not found!') + + date_range = DateUtil(sys_args.start_date, sys_args.end_date, return_type='date') + date_range = date_range.date_range() + + main(date_range, sys_args.branch, conf) diff --git a/myodbc/__init__py b/myodbc/__init__py new file mode 100644 index 0000000..e69de29 diff --git a/myodbc/__pycache__/mysqlodbc.cpython-311.pyc b/myodbc/__pycache__/mysqlodbc.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..e45f24036f8a50ef9a3510b18ed722066d92b195 GIT binary patch literal 1581 zcmZ`(&1)M+6rb4-d1bAWG;!QUZLB3HD~)|HO$nvcv@uCbhyzuF!7MED&bV1y+I41D zw$S3)xie>O+2XY2uM>KKLcFaEhX|&Sr}wK z&UsA7^(azt27F1hw8S~UDC;=e-AVwRhiAnW;9J-NMJ-NIWbiH53G3@T8bNBKT!}KVXj$UM z(7H(i#RtHh?wiEX7|7*K0LWbKZ0p9etxm4c&K0`mxz?Q{^K!?$d|(z1%|hEObPcnW zJu>ngBme7-oyEPC1LOLkalJi!;{-22_3*54_u#aCxyl_U8g-m%z-kqwO~-jyD_4>) z;yq9m7ZLD;B8z}^ipwrAh8QqQ@*9B1>C;m*eY!_ZgGu$TXpfx0jH-XyBX2^o3vK{6 z|7?8GX43zIz+Ydo`{@(CHa<@R`9fHrf*l%XucuN>P{wCAAY(d13PUi1CZpd7&-6`uVgcPUb${w5hbg5s_t$~2dq zU4|Ax6++>I5C}*RIIx?<2v4pIr+|+>);Zch47Nd5pw^4!kJ3uk;+7OmiCLah zCM{>RC}**%oVkI?MjDeav9C&&7AEPS^y~4&yQ+*;8xwD5b$g+$ZP40nt+AJMslkbFG7D>6UAa+iZs%tQV;ii&_nT+V zlk&9r@;E66%$J5qd2+Ys)ZL8f51OY3jMHQ0z>qO8QT4LDz8?_4j|4W@&6W-L?SVpI zplc(L2&aj>_gjE_s6>AStS(Vsp$E(wuww;Uom6(MUT+~KjBpuHVt@l6HJDFcwB?z$t!Ob<Rg}d6(!t(HXJpH_lqZJICADRX zCoP0BDp@tL9&RNgyHd+?>ZWM-%D2<1CTCM}vClrx`_Z+5xzh*>m^S6T>5W%wa&&Ai79bFQ1-!* z9Z={x+WfSDZZp?W0YWH{j*DYrXG~@n+XQV{4ohhaBm++j7yaQv61*78NllibK8sT| zoX$a!%cM2Slg#DhtYon(N;+%P6!#EaB$8Od?Cq4C(@4PapMx)zti;Pg?CLtej{ zUvfWZwI7zm3^3GUZAN=;C>cq{Rx6Q2O%a_HspOyuzv~PSI_;vXve@mg+ulHf{S`^aW~kl_snjc z25@pEIA;Xs^m?vxMBo1Yezm2sWR;lt*~v3g6ZEr*$?0Em-%U_}DVNUB?YifL?VdK> zq1j2tJ7IHV!(q)OKir3|?iaWUQ=*z9VHRR)iN@Z;@s+e4y)`MMk>FQLKzE455^K-m z9&pWW3l0uMf6nQK*VrZ<9<9Bu51jZ&kbpQ(AJuvt^sceLXNh*FR-o#x74PVG!L8(@p>Kzv3c2Es8UC1lTn2C1 zuQH$kN|hYcAEpMUBIsGfH+_nE_Im$x5A$6Q2le9}=C3dV?z8{#A@F~87Y3Z>1@02{ z)b530*cg5lpxEhfRZ62nchs!+QX$mN&GqtYJRKWUyv_$ z*yL-{O7yAZ#SW3%d4QbR=Fga&y+-G-dFd^^d*O?B?!I>~@p+;=`s8e-`-0JZL5KXC z1#@^z4^DnI4uKSMnRQCQr)PnC!T@kjAkRR-2iy~Uz&)W=Z=Jc{1pxWUs8a$~_*sLW z)$7@Q7fK6rTNYvQF$c5E}C1fCDMPKq3`Y1tDmQ2yG*^u0o1hTvsc{I{_!_bvnBS zVjK~*k?NvIMOCXr0;HT$5UIqW2QHO3_PB{$#aamosV5GVjKZPf)c0oNjZIRiWAC>! z^Jd<>eeZj3ehCJf2sAeIH@(q7$X__9Hs5Av;{tRRh(L`d-Izt^lqDFNq zncCdJU)B8^`(XGTSVT!&&`4Y?l5s&3=9IWpB-e$w47x?9<-B3B=^@a0p0O-Td3M2O z?BlcX1;Ee30Y_?^1KdcVOQjNr$Bn4p^<{x@8ffU37QL7e$ufPT&2H)m0b5oQ# zimIlKw56(!oXTo6Dm$`C^>Iham=mlS6gz%ZP3cL~R8>=kaU>F%eKmXTRN|sZnaMHn zsX>QW`YMeKL_Qh);IqU>*$hp5o@JAX3v?=LrmZYXXU#~;oHosYYiTW)xM;v~X3~M7 zYgZDZ*&ItzGm$nd%Ept>1)Hu?1GY^kQpdHiM9xam5w!~W3g>j=_x5aw$fAH(@~V?{YCg~z)BMJp)ZVNhJr7e zR5g>;ays?{s`_;t`ppJNs8*aszwFYf(N40-)A8})>wsB6r2PAORB$$DEc}p z)(04i7np%(!oUCqFVOEaDXyQ+cp=--YjzXbsHhxqYhrWRtJ<21?qbtYq zeh9??1|^d3jii!B#K>BaF&fdSX-2FoFu~*IE=lj|$4jCnLdZTS1{gP94BRL=e$=Sa zt4Te-+l!Sq7sk~(z$_^@hvwgd!r;FlELwMG++I1a*0nZzjC^W|BvfT_^}kUd^cL#5bJbRU%E8hT7HjXBk4_wF-l$Tfwaz znz$mpR)ata--6tD&BjMyBNR$G#?zQ}fELyX5N&iL+=mmGh)zRtY6^(L6VhFb#sU*C z@J>ii14&eks^c_uC22Z=RE{yY%y^EQ=moe!dHy<0oCPdtT&DT9dTQ`VJkIph{-dS-6OZ3n z>pyGvpS>MeZ|z-c?Okm>SZY0Jw;p^_cp-_wmX`zo2m;VRf}Qt<@4sb-2Uf!aCBVQy z@#Bi0gu1uXR|2GUzpd=!42l8UF?`VmhF31zjnIaGm5FheMWOeAxYroJ8?ibugg4+Z zhXH22UqzIC{S{IV0m+9JZ=) Ow!;gYd%1&vTlp`+G-pWw literal 0 HcmV?d00001 diff --git a/utils/__pycache__/read_query.cpython-311.pyc b/utils/__pycache__/read_query.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..38f432e803fa59e0e37831d44b38589f404699d6 GIT binary patch literal 1497 zcmZuw-D@0G6hC)nwwY|R>4y~DsO^|E+HISnqA#V`1cOS!NV^v4GB9lS-ZZmrX14dv zc1>6{NFVYL5JcIc6$?VN#6Q4C`yV8bhh?Bp@W}^ZOZpJ{|pj)O-V1fVUH49@OC$s}JTi<^iUZlQ-vJakQzxw>7en7!{bQXt}Y-a=?Y z%V>e86$BiROTs{lA(tk`p$HGzT)~X9L3vSlw;Mt(#hKnQ%zY^;Q7D&CZz)lmm&52! z4k0KC=Qz&uZ$G%`UROdYjqay`n3et=;Y>N76hHjT{Wx3{?&qQW!o4Qyq4Lq2A1bG! zmXw-m`n=&@571mHF*VyR)li>AQE!EjHKrWcSJx*xRrPKO}7#GHRVbyLND11 z^`+SM(8`ZIZ~17yIu{ZNHA=mLumYX2)2r6nkkJL@(%~v~~d1lz#AHH}IR@YC$y_6zoAYlgI~$V;#M3q!HeOuULycpbYk{b(CzR zQtTsjLN6RZ+=G4@1cxW!ZBSOz_|m)qO~&8`Fx;3+`ehL$#VXjb71l)%8=;D=dO2E% zt!n58dmWE0D!R1E*pPHJ;;fz%Z{82MqEa}HJWO)?mWZy>B<=Mhk1a0LBf|n5_DN*yIDwd;_>s^v{Y_B`B z#tAYNha6Ca)HF(Eic;Z2m9|O_Ipoko4^`AdkGAB4H4>+)s(N!o4v16VjPZgoyj_1g z^X9#IGxNTA{#n!75XhN-s@7#0p}%>jF3NUe<6UUvk%>&4K;haKhp~h{L8f#cner0} zp{MYxNmw%RRirEb@$O7)TPr?wn~g!}=8=WM*hJj^J_^gGYRdOa51iYSlo;hUfv=8!;ntbYuMhC``O{MV&AQBvA zO%C+(nQD_){m9{v`Sxd+f7VV|rcGE2Z=|A0i?y0o!cN+*MYixL>qyus3))FL6?Gj@ zwdp=4-?ChyLVV)q(87V|S(lhXt;8%-?_{Io7NvY|q0rXX7fwa4Q;Sf6k;^G-lGt-r zXe{*E)v0TdX(wq#zHrE$$Q3K?P#fad4h_ZVJxa&2wwZ}sPk}k1*4SkBcI2v)Au)?a z>=dkPHVRF{PR&^J6Y~K z3CfPjz|dNJHD1&|{B5W-Fi{?uSW?O@$BN?JfZ5Sh%*!+McvPOjjW94Kk8Ys_$@RE{ zkh{w^?zIbE&cV}@9Ey8cg|@Fg(3pD@Ka(0Y@1q6T)#}y=#d#`)m>u6zS!F-A8KhDR`5nPEuB%%8HdoX0vAT-5_iane?bNe*R7I_lnLO5Cwi zJCtTJFo&rGii1Uj4^vXsU6=XXOgdpPHJS#0Q&$N;LDp^(CvDso*4&F`5-wBtjUxSg zTqaJI@|=f+kWQ}j^VuQHd}p1L+ll~;(tiPflAVW^)OGF9_k)jy^P>+(SKPm7 zSvq{WeE9SqSBlznNt-Tf(^aIl_Ww1x-W_}tE-25H7ahg!iBk7OxqD)1s?ynC42=9Z zS?KxU(&I}jmw@P1Bx%P5JdoFWj;&nyF8(O~K(2K40IJ+-PBas&H0dsm6s$tGOrJwZdzP@puY9YBjb~RcGYT_B)PkO+L z^l}Jt;C`Z{GS#|kQ3&MkkdddAXbK@Yp2!JOce3 zU=ghc`ycz)0;_?-wVxu-Prf))3Z5$m&n;=|?b`AO-=6*E?8;cFeWctzQboA+t@S|1 z^2G`oZHAF_%nYp3U;)GUDiclAMo1Um z+#rWxfM1dD7i%ZHfC=D-O)mm0dQVkSd>`W~+J%7@A8@V_j>o+oE_XsC7|1RrM%_h|Tbd>t63OAVU2Q DIt}Dm literal 0 HcmV?d00001 diff --git a/utils/aws3.py b/utils/aws3.py new file mode 100644 index 0000000..03759a7 --- /dev/null +++ b/utils/aws3.py @@ -0,0 +1,53 @@ +import boto3 +import os + + +class Aws3: + + def __init__(self, aws_s3_bucket_name, aws_access_key_id, aws_secret_access_key, aws_s3_region): + self.aws_s3_bucket_name = aws_s3_bucket_name + self.aws_access_key_id = aws_access_key_id + self.aws_secret_access_key = aws_secret_access_key + self.aws_s3_region = aws_s3_region + self.client = boto3.client( + 's3', + aws_access_key_id=self.aws_access_key_id, + aws_secret_access_key=self.aws_secret_access_key + ) + + def check_file_existence(self, file, buket_path=None): + file_key = f'{buket_path}/{file}' + self.client.head_object(Bucket=self.aws_s3_bucket_name, Key=file_key) + return True + + def create_directory(self, subdirectory): + subdirectory_list = subdirectory.strip('/').split('/') + path = [] + for folder in subdirectory_list: + path.append(folder) + path_to_create = '/'.join(path) + try: + self.client.head_object(Bucket=self.aws_s3_bucket_name, Key=path_to_create) + except Exception: + self.client.put_object(Bucket=self.aws_s3_bucket_name, Key=path_to_create) + print(f'\tthe directory {path_to_create} was created') + + def load_file(self, file, buket_path=None, local_file_path=None): + try: + # if remote path not exists, upload_file will create it + self.client.upload_file( + Bucket=self.aws_s3_bucket_name, + Filename=os.path.join(local_file_path,file) if local_file_path else file, + Key=f'{buket_path}/{file}' if buket_path else file + ) + print(f"\tfile '{file}' was upload to '{buket_path}'") + except Exception as e: + print(f"\tFail to load file: {str(e)}") + + def download_file(self, file, buket_path=None, local_file_path=None): + self.client.download_file( + Bucket=self.aws_s3_bucket_name, + Filename=os.path.join(local_file_path,file) if local_file_path else file, + Key=f'{buket_path}/{file}' if buket_path else file + ) + print(f"\tfile '{file}' was download to '{local_file_path}'") diff --git a/utils/date_utl.py b/utils/date_utl.py new file mode 100644 index 0000000..4b01ce8 --- /dev/null +++ b/utils/date_utl.py @@ -0,0 +1,45 @@ +from datetime import date, datetime, timedelta + +""" + This class recive two parameters, but only return a list of range of date + for example: + + Entry: + start-date 2024-03-01 00:00:00 + end-date 2024-03-03 00:00:00 + Result: + [datetime.datetime(2024, 3, 1, 0, 0), datetime.datetime(2024, 3, 2, 0, 0), datetime.datetime(2024, 3, 3, 0, 0)] + + if don't entry start-date and end-date default date will be today-1 +""" + +class EntryDateError(Exception): + def __init__(self, msj, code): + super().__init__(msj) + self.code = code + +class DateUtil: + def __init__(self, start_date, end_date, return_type='datetime'): + self.default_Date = datetime.combine((date.today() - timedelta(days=1)), datetime.min.time()) + self.start_date = start_date if start_date else self.default_Date + self.end_date = self.end_date = end_date if end_date else (start_date if start_date else self.default_Date) + self.return_type = return_type + + def date_eval(self): + if self.start_date and self.end_date: + if self.end_date < self.start_date: + raise EntryDateError(f'end-date {self.end_date} can not be less than start-date {self.start_date}', 10001) + + def date_range(self): + lista_de_fechas = [] + current_date = self.start_date + + self.date_eval() + + while current_date <= self.end_date: + lista_de_fechas.append( + current_date.date() if self.return_type == 'date' else current_date + ) + current_date += timedelta(days=1) + + return lista_de_fechas \ No newline at end of file diff --git a/utils/read_query.py b/utils/read_query.py new file mode 100644 index 0000000..bb2b521 --- /dev/null +++ b/utils/read_query.py @@ -0,0 +1,15 @@ +import os + +class Readsql: + def __init__(self, directory, name_query): + self.name_query = name_query + self.query = None + self.directory = directory + + def getQuery(self): + with open(os.path.join(self.directory, self.name_query)) as query_find: + self.query = query_find.read() + return self.query + + def show(self): + print(f'Query: {self.query}') diff --git a/utils/wr_csv.py b/utils/wr_csv.py new file mode 100644 index 0000000..79e1834 --- /dev/null +++ b/utils/wr_csv.py @@ -0,0 +1,25 @@ +import csv +import os + + +class ReadCsv: + def __init__(self, **kwargs): + self.filedir = kwargs.get('filedir') + self.filename = kwargs.get('filename') + self.delimiter = kwargs.get('delimiter',',') + self.lineterminator = kwargs.get('lineterminator','\n') + + def readcsv(self, drop_header=False): + result=[] + with open(os.path.join(self.filedir, self.filename), newline='') as file: + reader = csv.reader(file, delimiter=self.delimiter) + if drop_header: next(reader) + for row in reader: + row = tuple(row) + result.append(row) + return result + + def rm_csv(self): + file_2_delete = os.path.join(self.filedir, self.filename) + if os.path.exists(file_2_delete): + os.remove(file_2_delete)