From 2d12eeffd1e8c5c463dbca3c6dfbdfd3854e5f18 Mon Sep 17 00:00:00 2001 From: Josue Gomez Date: Sun, 10 Mar 2019 04:20:04 -0600 Subject: [PATCH] Version 2 new version of pytutils --- .gitignore | 4 ++++ __init__.py | 50 ++++++++++++++++++++++--------------------- localenv.py | 55 +++++++++++++++++++++++++++++++++++++++++++++++ oracle.py | 16 ++++++++++++++ pysyspath2 | 4 +++- pysyspath3 | 4 +++- remote.py | 52 +++++++++++++++++++++++++++++++++++++++++++++ sendmail.py | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 220 insertions(+), 26 deletions(-) create mode 100644 localenv.py create mode 100644 oracle.py create mode 100644 remote.py create mode 100644 sendmail.py diff --git a/.gitignore b/.gitignore index 1b05740..13ddc54 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,6 @@ __pycache__/ + +\.idea/ + +*.pyc diff --git a/__init__.py b/__init__.py index b2e2c16..b9ae084 100644 --- a/__init__.py +++ b/__init__.py @@ -4,47 +4,49 @@ r""" To use, simply 'import pyutils' shellExecute can receive a cmd as str or arr example - >>> pyutils.shellExecute('date') + >>> pyutils.shell_execute('date') (b'mi\xc3\xa9 ene 30 11:35:00 CST 2019\n', b'') - >>> pyutils.shellExecute(['echo',"'hola mundo'"]) + >>> pyutils.shell_execute(['echo',"'hola mundo'"]) (b"'hola mundo'\n", b'') + """ __author__ = 'Josue Gomez ' +__maintainer__ = "Josue Gomez" +__email__ = "jgomez@binkfe.com" +__license__ = "GPL" __version__ = '2.0' -__all__ = [ 'resizeTTY', 'shellExecute', 'progressBar', 'getSensible', ] -__status__ = "production" -__date__ = "30 January 2019" - -import os, sys, subprocess -from dotenv import load_dotenv +__all__ = ['', ] +__status__ = "production" +__date__ = "30 January 2019" -result = os.environ.get('MYENV') -load_dotenv(result) +import sys +import subprocess -def getSensible(key): - return os.environ.get(key) def pysyspath(): print(sys.version) for pth in sys.path: print(pth) -def resizeTTY(rows, cols): + +def resize_tty(rows, cols): sys.stdout.write("\x1b[8;{rows};{cols}t".format(rows=rows, cols=cols)) -def shellExecute(cmd,stdin=None): - rpt = subprocess.Popen(cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = rpt.communicate() - return stdout, stderr -def progressBar(progress, total, status=''): - ttySize = shellExecute(['stty','size']) - ttySize = ttySize[0].decode().split(' ') - barLen = round(int(ttySize[1])/100*90) - fillLen = int(round(barLen * progress / float(total))) +def shell_execute(cmd, stdin=None): + proc = subprocess.Popen(cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = proc.communicate() + return stdout, stderr, proc.returncode + + +def progress_bar(progress, total, status=''): + ttysize = shell_execute(['stty','size']) + ttysize = ttysize[0].decode().split(' ') + barlen = round(int(ttysize[1])/100*90) + fill_len = int(round(barlen * progress / float(total))) percent = round(100.0 * progress / float(total), 1) - bar = '■' * fillLen + '-' * (barLen - fillLen) + bar = '■' * fill_len + '-' * (barlen - fill_len) sys.stdout.write('[%s] %s%s ...%s\r' % (bar, percent, '%', status)) - sys.stdout.flush() \ No newline at end of file + sys.stdout.flush() diff --git a/localenv.py b/localenv.py new file mode 100644 index 0000000..b5be2c9 --- /dev/null +++ b/localenv.py @@ -0,0 +1,55 @@ +import os +import sys + + +class NoEnvironmentFile(Exception): + pass + + +class LocalEnv: + def __init__(self): + self.file = None + self.data = {} + + def load(self, file=None): + """ + If no file is defined, the .env file will be searched + in invoker module's directory + """ + if file is not None: + self.file = file + else: + self.file = self._invoker() + + if not os.path.isfile(self.file): + raise NoEnvironmentFile(f'for file {self.file}') + + with open(self.file) as f: + for line in f: + line = line.strip() + if not line or line.startswith('#') or '=' not in line: + continue + key, value = line.split('=', 1) + key = key.replace('export', '') + key = key.strip() + value = value.strip().strip('\'"') + self.data[key] = value + + def get(self, key, cast=None): + if cast is None: + return self.data[key] + + return cast(self.data[key]) + + @staticmethod + def _invoker(): + # tip from: + # https://github.com/henriquebastos/python-decouple/blob/master/decouple.py + # MAGIC! Get the caller's module path. + frame = sys._getframe() + path = os.path.dirname(frame.f_back.f_back.f_code.co_filename) + file = os.path.join(path, '.env') + return file + + +localenv = LocalEnv() diff --git a/oracle.py b/oracle.py new file mode 100644 index 0000000..9a63fb0 --- /dev/null +++ b/oracle.py @@ -0,0 +1,16 @@ +import cx_Oracle + + +class OraConn: + def __init__(self, str_conn, conn_name=None): + self.conn = cx_Oracle.connect(str_conn, encoding='UTF-8') + self.cur = self.conn.cursor() + if conn_name is not None: + self.cur.callproc('DBMS_APPLICATION_INFO.SET_MODULE', [conn_name, None]) + self.cur.close() + + def __enter__(self): + return self.conn + + def __exit__(self, exc_type, exc_val, exc_tb): + self.conn.close() diff --git a/pysyspath2 b/pysyspath2 index 3a569df..31a36e0 100755 --- a/pysyspath2 +++ b/pysyspath2 @@ -1,8 +1,10 @@ #!/usr/bin/env python2 import pyutils as utl + def main(): utl.pysyspath() + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/pysyspath3 b/pysyspath3 index fc3501e..5b4a226 100755 --- a/pysyspath3 +++ b/pysyspath3 @@ -1,8 +1,10 @@ #!/usr/bin/env python3 import pyutils as utl + def main(): utl.pysyspath() + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/remote.py b/remote.py new file mode 100644 index 0000000..31e2d91 --- /dev/null +++ b/remote.py @@ -0,0 +1,52 @@ +import paramiko + + +class RemoteSSH: + def __init__(self, auth_info): + if not isinstance(auth_info, tuple) or len(auth_info) != 4: + raise AssertionError(f'expected tuple (host, port, user, password) instead got {auth_info}') + + self.host = auth_info[0] + self.port = auth_info[1] + self.user = auth_info[2] + self.pssw = auth_info[3] + self.sock = paramiko.Transport((self.host, self.port)) + self.sock.connect(username=self.user, password=self.pssw) + + def __exit__(self, exc_type, exc_val, exc_tb): + self.sock.close() + + +class RemoteExecute(RemoteSSH): + def __init__(self, auth_info): + super().__init__(auth_info) + self.session = self.sock.open_channel(kind='session') + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.session.close() + super().__exit__(exc_type, exc_val, exc_tb) + + def execute(self, command): + stdout = b'' + self.session.exec_command(command) + while True: + stdout += self.session.recv(4096) + if self.session.exit_status_ready(): + break + return stdout + + +class Sftp(RemoteSSH): + def __init__(self, auth_info): + super().__init__(auth_info) + self.sftp = paramiko.SFTPClient.from_transport(self.sock) + + def __enter__(self): + return self.sftp + + def __exit__(self, exc_type, exc_val, exc_tb): + self.sftp.close() + super().__exit__(exc_type, exc_val, exc_tb) diff --git a/sendmail.py b/sendmail.py new file mode 100644 index 0000000..d5dedba --- /dev/null +++ b/sendmail.py @@ -0,0 +1,61 @@ +import smtplib +from os.path import basename +from email.mime.text import MIMEText +from email.utils import COMMASPACE +from email.mime.multipart import MIMEMultipart +from email.mime.application import MIMEApplication + + +class SendMail: + def __init__(self, auth_info): + if not isinstance(auth_info, tuple) or len(auth_info) != 4: + raise AssertionError(f'expected tuple (host, port, user, password) instead got {auth_info}') + + self.host = auth_info[0] + self.port = auth_info[1] + self.user = auth_info[2] + self.pssw = auth_info[3] + self.conn = None + self.msg = None + + def __enter__(self): + self.open_conn() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.conn.close() + + def open_conn(self): + if int(self.port) == 465: + self.conn = smtplib.SMTP_SSL(self.host, 465) + else: + self.conn = smtplib.SMTP(self.host, self.port) + self.conn.starttls() + self.conn.login(self.user, self.pssw) + + def content(self, fromaddr, toaddr, subject, msg): + if not isinstance(toaddr, list): + raise AssertionError('destination address should be a list []') + self.msg = MIMEMultipart() + self.msg['Subject'] = subject + self.msg['From'] = fromaddr + self.msg['To'] = COMMASPACE.join(toaddr) + self.msg.attach(MIMEText(msg, 'html')) + + def attach(self, files): + if not isinstance(files, list): + raise AssertionError('file(s) to attach should be a list []') + + for file in files: + filename = basename(file) + with open(file, "rb") as f: + fl = MIMEApplication(f.read(), Name=filename) + fl['Content-Disposition'] = f'attachment; filename="{filename}"' + self.msg.attach(fl) + + print(self.msg) + + def send(self): + if self.msg is None: + raise Exception('msg is not defined') + self.conn.send_message(self.msg)