服务端程序:
import flask import sqlite3 import json app = flask.Flask(__name__) class StudentDB: def openDB(self): self.con = sqlite3.connect("students.db") print("连接成功") self.cursor = self.con.cursor() def closeDB(self): self.con.commit() self.con.close() def initTable(self): res = {} sql = """ create table students (No varchar(16) primary key, Name varchar(16), Sex varchar(8), Age int) """ try: self.cursor.execute(sql) res["msg"] = "OK" except Exception as err: res["msg"] = str(err) return res def insertRow(self, No, Name, Sex, Age): res = {} try: self.cursor.execute("insert into students (No, Name, Sex, Age) values(?,?,?,?)", (No, Name, Sex, Age)) res["msg"] = "OK" except Exception as err: res["msg"] = str(err)+"666" return res def deleteRow(self, No): res = {} try: self.cursor.execute("delete from students where No=?", (No,)) res["msg"] = "OK" except Exception as err: res["msg"] = str(err) return res def selectRows(self): res = {} try: data = [] self.cursor.execute("select * from students order by No") rows = self.cursor.fetchall() for row in rows: d = {} d["No"] = row[0] d["Name"] = row[1] d["Sex"] = row[2] d["Age"] = row[3] data.append(d) res["msg"] = "OK" res["data"] = data except Exception as err: res["msg"] = str(err) return res @app.route("/", methods=["GET", "POST"]) def process(): opt = flask.request.values.get("opt") if "opt" in flask.request.values else "" res = {} db = StudentDB() db.openDB() if opt == "init": res = db.initTable() elif opt == "insert": No = flask.request.values.get("No") if "No" in flask.request.values else "" Name = flask.request.values.get("Name") if "Name" in flask.request.values else "" Sex = flask.request.values.get("Sex") if "Sex" in flask.request.values else "" Age = flask.request.values.get("Age") if "Age" in flask.request.values else "" res = db.insertRow(No, Name, Sex, Age) elif opt == "delete": No = flask.request.values.get("No") if "No" in flask.request.values else "" res = db.deleteRow(No) else: res = db.selectRows() db.closeDB() return json.dumps(res) if __name__ == "__main__": app.run()客户端程序:
# coding=gbk import urllib.request import json class Student: def __init__(self, No, Name, Sex, Age): self.No = No self.Name = Name self.Sex = Sex self.Age = Age def show(self): print("%- 16s %-16s %-8s %-4d" % (self.No, self.Name, self.Sex, self.Age)) students = [] # 存储学生对象 url = "http://127.0.0.1:5000" def listStudents(): global students print("%- 16s %-16s %-8s %-4s" % ("No", "Name", "Sex", "Age")) for s in students: s.show() def insertStudent(s): global students i = 0 while (i < len(students) and s.No > students[i].No): i = i + 1 if (i < len(students) and s.No == students[i].No): print(s.No + "already exists") return False students.insert(i, s) return True def deleteRow(): global students No = input("No=") if (No != ""): for i in range(len(students)): if (students[i].No == No): st = "" try: st = "No=" + urllib.request.quote(No) st = st.encode() content = urllib.request.urlopen(url + "?opt=delete", st) st = content.readline() st = json.loads(st.decode()) st = st["msg"] except Exception as exp: st = str(exp) if (st == "OK"): del students[i] print("删除成功") else: print(st) break def insertRow(): No = input("No=") Name = input("Name=") while True: Sex = input("Sex=") if (Sex == "男" or Sex == "女"): break else: print("Sex is not valid") Age = input("Age=") if (Age == ""): Age = 0 else: Age = int(Age) if No != "" and Name != "": s = Student(No, Name, Sex, Age) for x in students: if (x.No == No): print(No + "already exists") return st = "" try: st = "No=" + urllib.request.quote(No) + "&Name=" + urllib.request.quote( Name) + "&Sex=" + urllib.request.quote(Sex) + "&Age=" + str(Age) st = st.encode() content = urllib.request.urlopen(url + "?opt=insert", st) st = content.read() st = json.loads(st.decode()) st = st["msg"] except Exception as exp: st = str(exp) if (st == "OK"): insertStudent(s) print("增加成功") else: print("增加失败") print(st) else: print("学号、姓名不能为空") def initialize(): st = "" try: content = urllib.request.urlopen(url + "?opt=init") st = content.read() st = json.loads(st.decode()) st = st["msg"] except Exception as exp: st = str(exp) if (st == "OK"): print("初始成功") else: print(st) return st def readStudents(): print("获取服务端数据") global students try: students.clear() content = urllib.request.urlopen(url) data = b"" while True: buf = content.read(1024) if (len(buf) > 0): data = data + buf else: break data = data.decode() data = json.loads(data) # 转化为字典类型数据 if data["msg"] == "OK": data = data["data"] for d in data: # each d is a dictionary s = Student(d["No"], d["Name"], d["Sex"], d["Age"]) students.append(s) print("添加成功") except Exception as exp: print(exp) try: readStudents() while True: print("") print("****学生名单***") print("0.初始化学生表") print("1.查看学生列表") print("2.增加学生记录") print("3.删除学生记录") print("4.退出这个程序") s = input("请选择(0,1,2,3,4):") if (s == "0"): initialize() elif (s == "1"): listStudents() elif (s == "2"): insertRow() elif (s == "3"): deleteRow() elif (s == "4"): break except Exception as exp: print(exp)