直接上源代码:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- """ Created on 2020/8/6 21:57 @author: Miracle @blog : https://blog.csdn.net/weixin_39633383 @github: https://github.com/Mr-Miracle """ from tkinter import * from tkinter import messagebox, filedialog from tkinter.ttk import Progressbar import pandas as pd import time LOG_LINE_NUM = 0 columns_1603 = ['投资组合代码', '投资组合名称', '报告日期', '流动性资产-银行活期存款市值(元)', '流动性资产-银行活期存款占净资产比例(%)', '流动性资产-中央银行票据市值(元)', '流动性资产-中央银行票据占净资产比例(%)', '流动性资产-一年期以内(含一年)定期存款或协议存款市值(元)', '流动性资产-一年期以内(含一年)定期存款或协议存款占净资产比例(%)', '流动性资产-买入返售金融资产市值(元)', '流动性资产-买入返售金融资产占净资产比例(%)', '流动性资产-货币市场基金市值(元)', '流动性资产-货币市场基金占净资产比例(%)', '流动性资产-货币型养老金产品市值(元)', '流动性资产-货币型养老金产品占净资产比例(%)', '流动性资产-清算备付金市值(元)', '流动性资产-清算备付金占净资产比例(%)', '流动性资产-应收证券清算款市值(元)', '流动性资产-应收证券清算款占净资产比例(%)', '流动性资产-其他流动性资产市值(元)', '流动性资产-其他流动性资产占净资产比例(%)', '流动性资产占净资产比例 (%)', '固定收益类资产-一年期以上定期存款/协议存款市值(元)', '固定收益类资产一年期以上定期存款/协议存款占净资产比例(%)', '固定收益类资产-国债市值(元)', '固定收益类资产-国债占净资产比例(%)', '固定收益类资产-金融债市值(元)', '固定收益类资产-金融债占净资产比例(%)', '固定收益类资产-企业(公 司)债市值(元)', '固定收益类资产-企业(公 司)债占净资产比例(%)', '固定收益类资产-短期融资券市值(元)', '固定收益类资产-短期融资券占净资产比例(%)', '固定收益类资产-中期票据市值(元)', '固定收益类资产-中期票据占净资产比例(%)', '固定收益类资产-可转换债券(含分离交易可转换债)市值(元)', '固定收益类资产-可转换债券(含分离交易可转换债)占净资产比例(%)', '固定收益类资产-债券基金市值(元)', '固定收益类资产-债券基金占净资产比例(%)', '固定收益类资产-商业银行理财产品市值(元)', '固定收益类资产-商业银行理财产品占净资产比例 (%) ', '固定收益类资产-信托产品市值(元) ', '固定收益类资产-信托产品占净资产比例(%) ', '固定收益类资产-特定资产管理计划市值(元) ', '固定收益类资产-特定资产管理计划占净资产比例 (%) ', '固定收益类资产-基础设施债权投资计划市值(元)', '固定收益类资产-基础设施债权投资计划占净资产 比例(%) ', '固定收益类资产-固定收益型养老金产品市值(元)', '固定收益类资产-固定收益型养老金产品占净资产比例(%)', '固定收益类资产-混合型养老金产品市值(元)', '固定收益类资产-混合型养老金产品占净资产比例 (%)', '固定收益类资产-其他固定收益类资产市值(元)', '固定收益类资产-其他固定收益类资产占净资产比 例(%)', '固定收益类资产占净资产比例(%)', '权益类资产-股票市值', '权益类资产-股票占净资产比例', '权益类资产-股票基金、混合基金市值', '权益类资产-股票基金、混 合基金占净资产比例', '权益类资产-权证(非直接投资)市值(元)', '权益类资产-权证(非直接 投资)占净资产比例(%)', '权益类资产-权益工具市值(元) ', '权益类资产-权益工具占净资产比例(%)', '权益类资产-股票型养老金产品市值(元) ', '权益类资产-股票型养老金产品占净资产比例(%) ', '权益类资产-其他权益类资产市值(元)', '权益类资产-其他权益类资产占净资产比例(%)', '权益类资产占净资产比例(%)', '其他资产市值(元)', '其他资产占净资产比例(%)', '合计市值(元)', '合计占净资产比例(%)'] class DataCleaning: def __init__(self, master): self.path = StringVar() self.master = master self.Label_0 = Label(self.master, text="1603接口数据文件选择") self.Entry_0 = Entry(self.master, width=45, textvariable=self.path) self.Button_0 = Button(self.master, text="···", command=self.select_path) self.check_var1 = IntVar() self.Checkbutton_1 = Checkbutton(self.master, text="其他市值", variable=self.check_var1, onvalue=1, offvalue=0, height=5, width=20) self.check_var2 = IntVar() self.Checkbutton_2 = Checkbutton(self.master, text="比例字段", variable=self.check_var2, onvalue=1, offvalue=0, height=5, width=20) self.Button_1 = Button(self.master, text=" 开 始 ", command=self.extract_data) self.Button_2 = Button(self.master, text=" 退 出 ", command=self.master.quit) self.log_label = Label(self.master, text=" ") self.log_data_Text = Text(self.master, width=63, height=13) # 日志框 self.Progressbar = Progressbar(self.master, value=0, mode="determinate", orient=HORIZONTAL) # 设置窗口的各种属性,布局 def set_init_window(self): x = root.winfo_screenwidth() y = root.winfo_screenheight() w = 450 h = 400 self.master.title("数据清洗--1603接口数据核对v2.0") # 窗口名 self.master.geometry("%dx%d+%d+%d" % (w, h, (x - h) / 2.5, (y - h) / 3)) # 窗口大小及窗口弹出时的默认展示位置 self.master.resizable(0, 0) # 设置窗口宽高固定 self.Label_0.grid(row=0, column=0, columnspan=2) self.Entry_0.grid(row=1, column=0, columnspan=2) self.Button_0.grid(row=1, column=2) self.Checkbutton_1.grid(row=2, column=0) self.Checkbutton_2.grid(row=2, column=1) self.Button_1.grid(row=3, column=0) self.Button_2.grid(row=3, column=1) self.log_label.grid(row=4, column=0) self.log_data_Text.grid(row=5, column=0, columnspan=3) self.Progressbar.grid(row=6, column=0, columnspan=3) # 选择文件路径 def select_path(self): # 只显示所有的excel文件 path_ = filedialog.askopenfilenames(filetypes=[('excel', ('.xls', '.xlsx'))]) self.path.set(path_) # 获取当前时间 @staticmethod def get_now_time(): current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) return current_time # 日志动态打印 def write_log_to_text(self, log_msg): global LOG_LINE_NUM current_time = self.get_now_time() log_msg_in = "INFO:" + str(current_time) + " " + str(log_msg) + "\n" # 换行 if LOG_LINE_NUM <= 7: self.log_data_Text.insert(END, log_msg_in) LOG_LINE_NUM = LOG_LINE_NUM + 1 else: self.log_data_Text.delete(1.0, 2.0) self.log_data_Text.insert(END, log_msg_in) # 主要的程序主体逻辑 def extract_data(self): # 获取用户的输入数据 file_list = self.Entry_0.get().split() # 获取需要核对的文件list c1 = self.check_var1.get() c2 = self.check_var2.get() print(c1, c2) if not file_list: messagebox.showwarning('警告', '请先选择需要核对的文件!') else: self.write_log_to_text("正在提取数据...") self.Progressbar["value"] = 5 self.master.update() # 1、提取需要核对的数据,并将其贴到同一个excel文件里 list_0 = [] # A list_1 = [] # B list_hold = [] # 持仓数据 for data_file in file_list: # 如果A的数是作业平台导出的,则执行此段 if '资产分布' in data_file and '数据中心取数' not in pd.ExcelFile(data_file).sheet_names: # 读取A版1603数据文件 self.write_log_to_text("提取数据中心数据(作业平台导出)") self.write_log_to_text("读取文件:" + data_file) df00 = pd.read_excel(data_file) df00['币种'] = df00['报告起始日期'].astype(str) # 将报告日期列复制到币种列,并且格式化为文本格式 df00.rename(columns={'币种': '报告日期'}, inplace=True) # 修改列名 df00['报告日期'].replace('-', '', inplace=True) # 将日期字段格式化 df0 = df00.iloc[:, 14:84] # 取第14列至73列的数据集 list_0.append(df0) # 如果A的数是人工导出的,则执行此段 if ('CJDC' in data_file and '持仓' not in data_file) or '数据中心' in data_file or ('cjdc' in data_file and '持仓' not in data_file): # 读A版本的1603数据文件 self.write_log_to_text("开始提取A数据(手工导出版)") self.write_log_to_text("读取文件:" + data_file) df00 = pd.read_excel(data_file) df00['CALC_PHASE'] = df00['CALC_PHASE'].str.replace('-', '') # 将日期字段格式化文本格式 df0 = df00.iloc[:, 3:73] # 取第4列至73列的数据集 list_0.append(df0) if 'HT' in data_file or 'B' in data_file or 'ht' in data_file: # 读取B版本的1603数据文件 self.write_log_to_text("开始提取B版数据...(数据中心提供)") self.write_log_to_text("读取文件:" + data_file) df11 = pd.read_excel(data_file) df11['CALC_PHASE'] = df11['CALC_PHASE'].str.replace('-', '') # 将日期字段格式化文本格式 df1 = df11.iloc[:, 3:73] # 取第4列至73列的数据集 list_1.append(df1) # 如果数据是数据中心提供的系统导出的,取数则执行这段 if '资产分布' in data_file and '数据中心取数' in pd.ExcelFile(data_file).sheet_names and '衡泰取数' not in pd.ExcelFile( data_file).sheet_names: self.write_log_to_text("开始提取 A、B版数据...(均由系统导出版)") self.write_log_to_text("读取文件:" + data_file) # 读取A版1603数据文件 df0 = pd.read_excel(data_file, sheet_name='A取数', ).iloc[:, 2:72] # 取第3列至73列的数据集 list_0.append(df0) # 读取B版本的1603数据文件 df1 = pd.read_excel(data_file, sheet_name='B取数').iloc[:, 2:72] # 取第3列至73列的数据集 list_1.append(df1) if '持仓' in data_file: self.write_log_to_text("读取持仓数据:\n" + data_file) df_hold = pd.read_excel(data_file) # 持仓数据 list_hold.append(df_hold) self.write_log_to_text("正在进行数据合并...") self.Progressbar["value"] = 15 self.master.update() # 2 得到合并的结果 df_center = pd.concat(list_0) # print('合并后的数据: A-->', df_center) df_xquant = pd.concat(list_1) # print('合并后的数据: B-->', df_xquant) if list_hold: df_hold = pd.concat(list_hold) else: df_hold = pd.DataFrame() df_xquant.columns = columns_1603 # 修改结果的列名 # 组合代码的_STB后缀去掉 df_xquant['投资组合代码'] = df_xquant['投资组合代码'].str.replace('_STB', '') df_center.columns = columns_1603 # 修改结果的列名 # 组合代码的_STB后缀去掉 df_center['投资组合代码'] = df_center['投资组合代码'].str.replace('_STB', '') self.Progressbar["value"] = 20 self.master.update() # 3 对结果进行比较,将2个结果集进行内连接合并 result_excel = r'1603数据核对结果_' + self.get_now_time()[0:10] + '.xlsx' write = pd.ExcelWriter(result_excel) # 创建结果excel,将合并数据存入excel df_xquant.to_excel(write, sheet_name='0-B', index=False, freeze_panes=(1, 3)) df_center.to_excel(write, sheet_name='1-A', index=False, freeze_panes=(1, 3)) del df_center['投资组合名称'] df_hb = df_xquant.merge(df_center, how='inner', on=['投资组合代码', '报告日期'], ) # 将2个结果集进行内连接合并 check_list = [df_hb.iloc[:, 0:3]] # 取合并后数据的前三列 self.Progressbar["value"] = 40 self.master.update() for column in columns_1603: # 将同字段的_X和_Y进行相减 if column not in df_hb.columns.tolist(): x = column + '_x' y = column + '_y' s = round(df_hb[x] - df_hb[y], 6) # 数据项目相减,B减去A的数,并且将结果保留6位小数 check_list.append(s) df_check = pd.concat(check_list, axis=1) # 检查结果进行拼接 df_check.columns = columns_1603 # 修改结果的列名 print('异常数据:\n', df_check) df_check.to_excel(write, sheet_name='2-异常数据', index=False, freeze_panes=(1, 3)) self.write_log_to_text("写入异常数据...") self.Progressbar["value"] = 60 self.master.update() # 4 将检查结果进行整理 list_3 = [] # 检查结果 i = 0 if c1 == 0: if_list = ['投资组合代码', '投资组合名称', '报告日期', '其他资产市值(元)', '合计市值(元)'] else: if_list = ['投资组合代码', '投资组合名称', '报告日期'] if c2 == 0: if_str = '比例' else: if_str = '报告日期' # print(if_list, if_str) for row, series in df_check.iteritems(): # 按列遍历异常数据表 # 筛选去除其他资产、合计市值、金额比例字段,并且去除无异常的字段 if (row not in if_list) and (if_str not in row) and (series.sum() != 0): i += 1 j = 0 str_abnormal = [] for m, n in series.items(): # 进行异常字段个数统计 if n != 0: j += 1 # 将计划层的异常数据过滤,只取组合层的异常 if '计划' not in df_check.iloc[[m], [1]].values[0][0]: # 组合代码,组合名称,业务日期,异常金额市值 abnormal = [df_check.iloc[[m], [0]].values[0][0], df_check.iloc[[m], [1]].values[0][0], df_check.iloc[[m], [2]].values[0][0], n] if abnormal[3] > 0: reason = B比A多这笔' else: reason = 'B比A少这笔' # 如果持仓表不为空 if not df_hold.empty: data = df_hold[df_hold['F_HLM'] == n] if data.empty: str_sub = abnormal[1], abnormal[2], '差异金额:' + str(abnormal[3]), '证券代码:', '证券名称:', reason str_abnormal.append(str_sub) else: x = '证券代码:' + data.iloc[[0], [4]].values[0][0] # 证券代码 y = '证券名称:' + data.iloc[[0], [5]].values[0][0] # 证券名陈 str_sub = abnormal[1], abnormal[2], '差异金额:' + str(abnormal[3]), x, y, reason str_abnormal.append(str_sub) else: str_sub = abnormal[1], abnormal[2], '差异金额:' + str(abnormal[3]), '证券代码:', '证券名称:', reason str_abnormal.append(str_sub) # 将异常原因的描述list进行文本转换并换行处理 abnormal = '' for s in str_abnormal: abnormal = abnormal + ','.join(s) + '\n' # 进行异常描述拼接 result = [i, row, len(series), j, abnormal] list_3.append(result) df_result = pd.DataFrame(list_3) self.write_log_to_text("写入异常原因...") self.Progressbar["value"] = 80 self.master.update() # 判断异常结果是否为空 if df_result.empty: df_result = pd.DataFrame(columns=['序号', '字段名称', '检验笔数', '异常数', '差异原因\n(组合名称、业务日期、差异金额、证券代码、证券名称)']) self.write_log_to_text("检查完毕,没有异常!!") self.Progressbar["value"] = 90 self.master.update() else: df_result.columns = ['序号', '字段名称', '检验笔数', '异常数', '差异原因\n(组合名称、业务日期、差异金额、证券代码、证券名称)'] self.write_log_to_text("检查完毕!") print('检验结果为:\n', df_result) df_result.to_excel(write, sheet_name='3-检查结果', index=False, freeze_panes=(1, 0)) self.write_log_to_text("正在保存文件!") self.master.update() write.save() # 文件保存 write.close() print('核对完成,文件保存为:\n', result_excel) self.write_log_to_text('文件保存为:\n'+ result_excel) self.Progressbar["value"] = 100 self.master.update() if __name__ == '__main__': root = Tk() app = DataCleaning(root) # 设置根窗口默认属性 app.set_init_window() root.mainloop() # 窗口进入事件循环,保持窗口运行,否则界面不展示程序效果图如下: