hive_sales_sync_costarica/orm/model.py
Marvin Vallecillo a2663d15be First commit
2024-07-17 04:19:28 -06:00

237 lines
9.1 KiB
Python

import json
import pyodbc
import requests
from .fields import ThisModelValue, One2OneValue
from .fields.base import AttributeGetter
from threading import Thread
class Model(Thread):
class Conf:
db_table = ''
filters = []
log_table = ''
def __init__(self, filters: tuple, conn: str = None, raise_not_found_error: bool = True):
super().__init__()
self.conf = self.Conf()
self.filters = filters
self.raw_result = {}
self.many = []
self.conn_str = conn
self.raise_not_found_error = raise_not_found_error
self.cursor = None
self.response = None
self.endpoint_url = None
self.token = None
def run(self) -> None:
with pyodbc.connect(self.conn_str) as self.conn:
with self.conn.cursor() as cur:
self.cursor = cur
# Mark as working
self.set_status(1)
# Query and Sync
self.populate_data()
self.perform_request()
self.conn.commit()
def get_filters(self):
return ' AND '.join([f'{f} = ?' for f in self.conf.filters])
def get_where(self):
return f'WHERE \n {self.get_filters()}'
def get_fields(self):
# noinspection GrazieInspection
"""
There are 3 types of fields in a Model
###########################################################################################
This will append all the fields of the model which are instance of <ThisModelValue>
as example for this case
branch = fields.ThisModelValue(db_column='Location_Code')
the value for keyword db_column will be appended to fields ['Location_Code']
if the field has an alias as an example for:
unit_price = fields.ThisModelValue(db_column='Price/Quantity', db_alias='unit_price')
the following value will be appended to fields:
['Price/Quantity AS unit_price']
###########################################################################################
If the field is instance of One2OneValue
an example for this case would be:
official_doc = fields.One2OneValue(
db_column='ReceiptNumber',
db_table='GovernmentReceipt.dbo.ReceiptHistory',
join=(
('Order_Number', , 'OrderNumber'),
('Order_Date', 'OrderDate'),
('Location_Code', 'LocationCode')
)
)
the first position of each tuple in join will we appended to the fields because they will be
required in order to perform the one 2 one search so the values appended to fields will be:
['Order_Number', 'Order_Date', 'Location_Code]
###########################################################################################
If the field is another Model used for one 2 many relations the filters will be appended
an example for this case would be:
items = ItemModel(
['Location_Code', 'Order_Date', 'Order_Number']
)
so in this case items.filters will be appended to fields:
['Location_Code', 'Order_Date', 'Order_Number']
"""
fields = []
for k, field in self.__dict__.items():
if isinstance(field, ThisModelValue):
fields.append(field.get_column())
elif isinstance(field, One2OneValue):
for join in field.join:
fields.append(join[0])
elif issubclass(field.__class__, self.__class__.__base__):
for f in field.filters:
fields.append(f)
return set(fields)
def get_select(self):
cols = ', '.join([f for f in self.get_fields()])
return f'SELECT \n {cols}'
def get_from(self):
# noinspection SpellCheckingInspection,PyUnresolvedReferences
return f'FROM {self.conf.db_table} (NOLOCK)'
def get_query(self):
return f'{self.get_select()} \n{self.get_from()} \n{self.get_where()}'
def assign_values(self):
for k, field in self.__dict__.items():
if isinstance(field, ThisModelValue):
field.value = self.raw_result[field.db_alias]
field.raw_result = self.raw_result
def cursor_execute(self, query, *bind_variables, fetch=None, raise_not_found_error=True):
try:
self.cursor.execute(query, *bind_variables)
result = getattr(self.cursor, fetch)()
if not result and raise_not_found_error:
raise Exception(
f"NOT FOUND \nusing query => {query}"
f" with args => {bind_variables}"
)
return result
except pyodbc.ProgrammingError as e:
print(f"error => {e} \nwhen executing query => {query} \n with args => {bind_variables}")
raise
def populate_me(self):
print(f'attempt to populate me {self}')
row = self.cursor_execute(
self.get_query(),
*self.filters,
fetch='fetchone',
raise_not_found_error=self.raise_not_found_error
)
if not row:
return 0
columns = [i[0] for i in self.cursor.description]
self.raw_result = dict(zip(columns, row))
return 1
def populate_one2one(self):
for k, field in self.__dict__.items():
if isinstance(field, One2OneValue):
result = self.cursor_execute(
field.get_query(),
*(self.raw_result[f[0]] for f in field.join),
fetch='fetchone',
raise_not_found_error=field.raise_not_found_error
)
if result:
field.value = result[0]
continue
field.value = None
# noinspection PyUnresolvedReferences
def populate_one2many(self):
for k, field in self.__dict__.items():
if issubclass(field.__class__, self.__class__.__base__):
new_filters = (self.raw_result[f] for f in field.filters)
rows = self.cursor_execute(
field.get_query(),
*new_filters,
fetch='fetchall',
raise_not_found_error=field.raise_not_found_error
)
columns = [i[0] for i in self.cursor.description]
result = [dict(zip(columns, row)) for row in rows]
for r in result:
obj = field.__class__(new_filters)
obj.raw_result = r
obj.assign_values()
obj.cursor = self.cursor
obj.populate_one2one()
obj.populate_one2many()
field.many.append(obj)
def populate_data(self):
if not self.populate_me():
return
self.assign_values()
self.populate_one2one()
self.populate_one2many()
def get_value_serialized(self):
if self.many:
return [obj.get_value_serialized() for obj in self.many]
if not self.raw_result:
return None
data = {}
for k, v in self.__dict__.items():
try:
if isinstance(v, AttributeGetter):
data[k] = v.get_value_serialized()
"""
If attribute is one to many relationship and there is no result return
empty list
"""
if issubclass(v.__class__, self.__class__.__base__):
value_serialized = v.get_value_serialized()
if value_serialized is None:
data[k] = []
else:
data[k] = value_serialized
except Exception as e:
print(f"error when serializing {k} {v}")
raise e
return data
def perform_request(self):
self.set_status(2)
try:
self.response = requests.post(
self.endpoint_url,
headers={
"Authorization": self.token,
"Content-Type": "application/json"
},
data=json.dumps(self.get_value_serialized())
)
if self.response.ok:
self.set_status(4)
return
self.set_status(3, error=json.dumps(self.response.json()))
except Exception as e:
self.set_status(3, error=str(e))
def set_status(self, status, error=None):
args = (status, error) + tuple(self.filters)
# noinspection SqlDialectInspection,SqlNoDataSourceInspection,SqlResolve
self.cursor.execute(
f'''UPDATE {self.conf.log_table} WITH (ROWLOCK)
SET status=?, error=?
WHERE {self.get_filters()}''',
*args
)
self.conn.commit()