237 lines
9.1 KiB
Python
237 lines
9.1 KiB
Python
import json
|
|
import pyodbc
|
|
import requests
|
|
from .fields import ThisModelValue, One2OneValue
|
|
from .fields.base import AttributeGetter
|
|
from threading import Thread
|
|
|
|
|
|
class Model(Thread):
|
|
class Conf:
|
|
db_table = ''
|
|
filters = []
|
|
log_table = ''
|
|
|
|
def __init__(self, filters: tuple, conn: str = None, raise_not_found_error: bool = True):
|
|
super().__init__()
|
|
self.conf = self.Conf()
|
|
self.filters = filters
|
|
self.raw_result = {}
|
|
self.many = []
|
|
self.conn_str = conn
|
|
self.raise_not_found_error = raise_not_found_error
|
|
self.cursor = None
|
|
self.response = None
|
|
self.endpoint_url = None
|
|
self.token = None
|
|
|
|
def run(self) -> None:
|
|
with pyodbc.connect(self.conn_str) as self.conn:
|
|
with self.conn.cursor() as cur:
|
|
self.cursor = cur
|
|
# Mark as working
|
|
self.set_status(1)
|
|
|
|
# Query and Sync
|
|
self.populate_data()
|
|
self.perform_request()
|
|
self.conn.commit()
|
|
|
|
def get_filters(self):
|
|
return ' AND '.join([f'{f} = ?' for f in self.conf.filters])
|
|
|
|
def get_where(self):
|
|
return f'WHERE \n {self.get_filters()}'
|
|
|
|
def get_fields(self):
|
|
# noinspection GrazieInspection
|
|
"""
|
|
There are 3 types of fields in a Model
|
|
###########################################################################################
|
|
This will append all the fields of the model which are instance of <ThisModelValue>
|
|
as example for this case
|
|
branch = fields.ThisModelValue(db_column='Location_Code')
|
|
the value for keyword db_column will be appended to fields ['Location_Code']
|
|
if the field has an alias as an example for:
|
|
unit_price = fields.ThisModelValue(db_column='Price/Quantity', db_alias='unit_price')
|
|
the following value will be appended to fields:
|
|
['Price/Quantity AS unit_price']
|
|
###########################################################################################
|
|
If the field is instance of One2OneValue
|
|
an example for this case would be:
|
|
official_doc = fields.One2OneValue(
|
|
db_column='ReceiptNumber',
|
|
db_table='GovernmentReceipt.dbo.ReceiptHistory',
|
|
join=(
|
|
('Order_Number', , 'OrderNumber'),
|
|
('Order_Date', 'OrderDate'),
|
|
('Location_Code', 'LocationCode')
|
|
)
|
|
)
|
|
the first position of each tuple in join will we appended to the fields because they will be
|
|
required in order to perform the one 2 one search so the values appended to fields will be:
|
|
['Order_Number', 'Order_Date', 'Location_Code]
|
|
###########################################################################################
|
|
If the field is another Model used for one 2 many relations the filters will be appended
|
|
an example for this case would be:
|
|
items = ItemModel(
|
|
['Location_Code', 'Order_Date', 'Order_Number']
|
|
)
|
|
so in this case items.filters will be appended to fields:
|
|
['Location_Code', 'Order_Date', 'Order_Number']
|
|
"""
|
|
fields = []
|
|
for k, field in self.__dict__.items():
|
|
if isinstance(field, ThisModelValue):
|
|
fields.append(field.get_column())
|
|
elif isinstance(field, One2OneValue):
|
|
for join in field.join:
|
|
fields.append(join[0])
|
|
elif issubclass(field.__class__, self.__class__.__base__):
|
|
for f in field.filters:
|
|
fields.append(f)
|
|
return set(fields)
|
|
|
|
def get_select(self):
|
|
cols = ', '.join([f for f in self.get_fields()])
|
|
return f'SELECT \n {cols}'
|
|
|
|
def get_from(self):
|
|
# noinspection SpellCheckingInspection,PyUnresolvedReferences
|
|
return f'FROM {self.conf.db_table} (NOLOCK)'
|
|
|
|
def get_query(self):
|
|
return f'{self.get_select()} \n{self.get_from()} \n{self.get_where()}'
|
|
|
|
def assign_values(self):
|
|
for k, field in self.__dict__.items():
|
|
if isinstance(field, ThisModelValue):
|
|
field.value = self.raw_result[field.db_alias]
|
|
field.raw_result = self.raw_result
|
|
|
|
def cursor_execute(self, query, *bind_variables, fetch=None, raise_not_found_error=True):
|
|
try:
|
|
self.cursor.execute(query, *bind_variables)
|
|
result = getattr(self.cursor, fetch)()
|
|
if not result and raise_not_found_error:
|
|
raise Exception(
|
|
f"NOT FOUND \nusing query => {query}"
|
|
f" with args => {bind_variables}"
|
|
)
|
|
return result
|
|
except pyodbc.ProgrammingError as e:
|
|
print(f"error => {e} \nwhen executing query => {query} \n with args => {bind_variables}")
|
|
raise
|
|
|
|
def populate_me(self):
|
|
print(f'attempt to populate me {self}')
|
|
row = self.cursor_execute(
|
|
self.get_query(),
|
|
*self.filters,
|
|
fetch='fetchone',
|
|
raise_not_found_error=self.raise_not_found_error
|
|
)
|
|
if not row:
|
|
return 0
|
|
columns = [i[0] for i in self.cursor.description]
|
|
self.raw_result = dict(zip(columns, row))
|
|
return 1
|
|
|
|
def populate_one2one(self):
|
|
for k, field in self.__dict__.items():
|
|
if isinstance(field, One2OneValue):
|
|
result = self.cursor_execute(
|
|
field.get_query(),
|
|
*(self.raw_result[f[0]] for f in field.join),
|
|
fetch='fetchone',
|
|
raise_not_found_error=field.raise_not_found_error
|
|
)
|
|
if result:
|
|
field.value = result[0]
|
|
continue
|
|
field.value = None
|
|
|
|
# noinspection PyUnresolvedReferences
|
|
def populate_one2many(self):
|
|
for k, field in self.__dict__.items():
|
|
if issubclass(field.__class__, self.__class__.__base__):
|
|
new_filters = (self.raw_result[f] for f in field.filters)
|
|
rows = self.cursor_execute(
|
|
field.get_query(),
|
|
*new_filters,
|
|
fetch='fetchall',
|
|
raise_not_found_error=field.raise_not_found_error
|
|
)
|
|
columns = [i[0] for i in self.cursor.description]
|
|
result = [dict(zip(columns, row)) for row in rows]
|
|
for r in result:
|
|
obj = field.__class__(new_filters)
|
|
obj.raw_result = r
|
|
obj.assign_values()
|
|
obj.cursor = self.cursor
|
|
obj.populate_one2one()
|
|
obj.populate_one2many()
|
|
field.many.append(obj)
|
|
|
|
def populate_data(self):
|
|
if not self.populate_me():
|
|
return
|
|
self.assign_values()
|
|
self.populate_one2one()
|
|
self.populate_one2many()
|
|
|
|
def get_value_serialized(self):
|
|
if self.many:
|
|
return [obj.get_value_serialized() for obj in self.many]
|
|
if not self.raw_result:
|
|
return None
|
|
data = {}
|
|
for k, v in self.__dict__.items():
|
|
try:
|
|
if isinstance(v, AttributeGetter):
|
|
data[k] = v.get_value_serialized()
|
|
|
|
"""
|
|
If attribute is one to many relationship and there is no result return
|
|
empty list
|
|
"""
|
|
if issubclass(v.__class__, self.__class__.__base__):
|
|
value_serialized = v.get_value_serialized()
|
|
if value_serialized is None:
|
|
data[k] = []
|
|
else:
|
|
data[k] = value_serialized
|
|
except Exception as e:
|
|
print(f"error when serializing {k} {v}")
|
|
raise e
|
|
return data
|
|
|
|
def perform_request(self):
|
|
self.set_status(2)
|
|
try:
|
|
self.response = requests.post(
|
|
self.endpoint_url,
|
|
headers={
|
|
"Authorization": self.token,
|
|
"Content-Type": "application/json"
|
|
},
|
|
data=json.dumps(self.get_value_serialized())
|
|
)
|
|
if self.response.ok:
|
|
self.set_status(4)
|
|
return
|
|
self.set_status(3, error=json.dumps(self.response.json()))
|
|
except Exception as e:
|
|
self.set_status(3, error=str(e))
|
|
|
|
def set_status(self, status, error=None):
|
|
args = (status, error) + tuple(self.filters)
|
|
# noinspection SqlDialectInspection,SqlNoDataSourceInspection,SqlResolve
|
|
self.cursor.execute(
|
|
f'''UPDATE {self.conf.log_table} WITH (ROWLOCK)
|
|
SET status=?, error=?
|
|
WHERE {self.get_filters()}''',
|
|
*args
|
|
)
|
|
self.conn.commit()
|