直接上源代码:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- """ Created on : 2020/7/20 12:38 @Author : Miracle @blog : https://blog.csdn.net/weixin_39633383 @github: https://github.com/Mr-Miracle """ import os import datetime from mailbox import Message, Mailbox import pandas as pd import pypyodbc as mdb import wx import time from exchangelib import * from exchangelib.protocol import BaseProtocol, NoVerifyHTTPAdapter import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) BaseProtocol.HTTP_ADAPTER_CLS = NoVerifyHTTPAdapter # 读取MDB文件内容 def read_mdb(file): # 连接mdb文件 conn_str = (r'Driver={Microsoft Access Driver (*.mdb)};DBQ='+file+';' ) conn = mdb.win_connect_mdb(conn_str) # 创建游标 port_info = [] cur = conn.cursor() sql_str = "SELECT (SELECT FCODE FROM BaseInFo) AS PORTCODE,FDATE,(IIF ((SELECT COUNT(1) FROM FCWVCH WHERE FKMH IS NULL)=0, 0, 1)) AS RECORDS FROM GZB GROUP BY FDATE;" cur.execute(sql_str) gzb = cur.fetchall() for i in gzb: str1 = str(i[1])[0:10].replace('-', '') # 对日期的格式进行调整 port_info.append([i[0], str1, i[2]]) cur.close() conn.close() return port_info # noinspection PyBroadException def send_email(to, subject, body): """ 电子邮件发送 :param to: 收件人邮箱 :param subject: 邮件主题 :param body: 邮件正文 :return:""" cred = Credentials( username="###", # 创建、申明邮箱账户 password="###", ) config = Configuration( server="###", # 发件服务器地址 credentials=cred, auth_type=NTLM, ) account = Account( primary_smtp_address="###", config=config, autodiscover=False, access_type=DELEGATE, ) message = Message( account=account, # 发件人、申明的账户 subject=subject, # 邮件主题 body=HTMLBody(body), # 邮件的内容 to_recipients=[Mailbox(email_address=to), ], # 收件人 ) try: message.send_and_save() print('邮件发送成功!') except: print('邮件发送失败!') # 定义用户窗体的类 class AppFrame(wx.Frame): def __init__(self, parent, title): super(AppFrame, self).__init__(parent, title=title, size=(500, 480)) panel = wx.Panel(self) # 定义用户的文本输入框等组件 self.text0 = wx.StaticText(panel, -1, label="MDB根目录:") # id = -1,表示窗口id自动生成 self.text1 = wx.StaticText(panel, -1, label="最新文件个数:") self.text2 = wx.StaticText(panel, -1, label="托管行名称:") self.text3 = wx.StaticText(panel, -1, label="年金计划:") self.input0 = wx.TextCtrl(panel, -1, 'F:\TGH') self.input1 = wx.TextCtrl(panel, -1, '7') self.banks = ['所有托管', '交通银行', '浦发银行', '招商银行', '建设银行', '农业银行', '工商银行'] self.input2 = wx.ComboBox(panel, -1, choices=self.banks, value=self.banks[0]) self.input3 = wx.TextCtrl(panel, -1, '江西 山东 中央 湖南 安徽 新疆维吾尔 黑龙江') self.execute_button = wx.Button(panel, -1, label="执行") self.exit_button = wx.Button(panel, -1, label="退出") # 创建文本域 self.multiText = wx.TextCtrl(panel, -1, size=(200, 100), style=wx.TE_MULTILINE | wx.TE_READONLY) # 创建一个文本控件,并设置只读 self.multiText.SetInsertionPoint(0) # 设置插入点 # and a status bar self.CreateStatusBar() self.SetStatusText("Created by Miracle on 2020.10") # 定义各个组件的页面布局 v_box = wx.BoxSizer(wx.VERTICAL) # 实例化一个垂直盒子 h_box = wx.BoxSizer(wx.HORIZONTAL) # 实例化一个水平盒子 sizer = wx.FlexGridSizer(5, 2, 15, 30) # 设置5行2列,垂直间距15像素,水平间距20像素 sizer.AddMany([[self.text0, wx.ALIGN_LEFT], (self.input0, wx.EXPAND), (self.text1, wx.ALIGN_LEFT), (self.input1, wx.EXPAND), (self.text2, wx.ALIGN_LEFT), (self.input2, wx.EXPAND), (self.text3, wx.ALIGN_LEFT), (self.input3, wx.EXPAND), (self.execute_button, wx.Center), (self.exit_button, wx.Center), ]) h_box.Add(sizer, proportion=1, flag=wx.ALL | wx.EXPAND, border=5) # 将这组件插入水平盒子 v_box.Add(h_box, 0, wx.ALL | wx.CENTER, 5) # 在垂直盒子里添加水平盒子 v_box.Add(self.multiText, 1, wx.ALL | wx.EXPAND, 5) # 在垂直盒子里添加文本域 self.SetSizer(v_box) # 启用所有布局 self.Show() # 显示用户页面的窗体 self.Bind(wx.EVT_BUTTON, self.execute_event, self.execute_button) # 定义按钮的点击事件 self.Bind(wx.EVT_BUTTON, self.exit_event, self.exit_button) # 定义按钮的点击事件 def execute_event(self, event): # 年金计划组合信息 ports = [###] ## 此部分为计划组合信息,数据安全原因,未列示 # 获取用户的输入值 root = self.input0.GetValue() num = self.input1.GetValue() annuity_bank = self.input2.GetValue() annuity_plan = self.input3.GetValue() # 判断托管行根目录路径是否存在,否则不执行 if os.path.exists(root): check_plan0 = [] check_plan1 = [] for port in ports: for s in port: if annuity_plan: for i in annuity_plan.split(" "): if i in s: check_plan0.append(port) else: # 否则查询所有计划 check_plan0 = ports if annuity_bank == '所有托管': check_plan1 = ports else: # 否则查询所有计划 for j in annuity_bank.split(" "): if j in s: check_plan1.append(port) # 将2个查询条件的结果取交集,生成需要进行查询的计划组合list check_plan = [val for val in check_plan0 if val in check_plan1] result_data = [] for plan in check_plan: self.write_log_to_text("开始检查:"+','.join(plan)) data = self.mdb_check(root, plan, int(num)) # 取到MDB的检查结果列表 result_data.append(data) # 对list进行表格化的处理 msg_list = pd.DataFrame(result_data) msg_list.columns = ['托管行', '年金计划', '组合名称', '组合代码', '日期', '到达情况', '检查结果'] msg = msg_list.to_html(bold_rows=True) # 发送异常的提醒邮件 send_email('###', '周度数据MDB文件检查_异常数据' +self.get_now_time(), msg) self.write_log_to_text("异常提醒邮件发送完成!") else: wx.MessageBox("托管行MDB文件的目录不存在!\n"+ root, "输入有误", wx.OK | wx.YES_DEFAULT) event.Skip() # MDB文件检查的主要逻辑 def mdb_check(self, mdb_root, port, num): """ :param mdb_root: MDB文件的根目录-/TGH :param port: 需要检查的组合的基本信息的list :param num: 检查的最新文件的个数 :return: 检查结果list """ check_list = [] # 程序的返回值,异常数据的list file_load = os.path.join(mdb_root, port[1], port[5]) mdb_list = [] for root, sub, files in os.walk(file_load): for file in files: if file.endswith(".MDB"): mdb_file = os.path.join(root, file) mdb_list.append(mdb_file) mdb_list.sort(reverse=True) # 如果文件夹下有mdb文件 day_result = [] if len(mdb_list) > 0: for i in range(num): # 取前num个文件来进行数据读取 print(datetime.datetime.now(), '读取MDB文件:', mdb_list[i]) self.write_log_to_text('读取MDB文件:' + mdb_list[i]) result = read_mdb(mdb_list[i]) for result in result: day_result.append((result[1], result[2])) print(datetime.datetime.now(), '取到的数据:\n', day_result) else: print(datetime.datetime.now(), '取到的数据: 找不到此组合的MDB文件!!!') self.write_log_to_text('取到的数据: 找不到此组合的MDB文件!!!') for i in self.get_last_week(): if (i, 0) in day_result: # 当且仅当估值表中取到的日期和上周日期一致,估值表的科目字段不为空时显示正常 check_list = [port[0], str(port[2])[0:2], port[4], port[5], i, '已到达 ', '正常'] print(check_list) self.write_log_to_text(' '.join(check_list)) elif (i, 1) in day_result: check_list = [port[0], str(port[2])[0:2], port[4], port[5], i, '已到达 ', '异常'] print(check_list) self.write_log_to_text(' '.join(check_list)) else: check_list = [port[0], str(port[2])[0:2], port[4], port[5], i, '未到达 ', '异常'] print(check_list) self.write_log_to_text(' '.join(check_list)) return check_list def exit_event(self, event): """Close the frame, terminating the application.""" self.Close(True) event.Skip() # 获取当前时间 @staticmethod def get_now_time(): current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) return current_time # 日志动态打印 def write_log_to_text(self, log_msg): current_time = self.get_now_time() log_msg_in = "INFO:" + str(current_time) + " " + log_msg + "\n" # 换行 self.multiText.AppendText(log_msg_in) # 取上周7天的数 @staticmethod def get_last_week(): now = datetime.datetime.now() day_list = [] for i in range(7): time_stamp = now - datetime.timedelta(days=now.weekday() + (7 - i)) str_day = time_stamp.strftime("%Y%m%d") day_list.append(str_day) return day_list if __name__ == '__main__': app = wx.App() # 创建应用程序对象 frame = AppFrame(None, title="周度MDB文件数据检查_v1.0") frame.Center() # 设置主窗口居中对齐 app.MainLoop() 项目效果图: