2019-06-01第六次课

Python publisher01 43℃

上期作业回顾

1、使用字典表示用户对象,例如:[{‘name’:’zhangsan’,’pwd’:’123456’,hasLogin:false}],将字典放入list中来表示数据库,请完成用户 注册、登录功能用户信息。
(1)注册
(2)登录
(3)查看
(4) 退出登录
(要求以面向对象的形式实现)

class User:
    """
    用于建立用户,以及验证用户信息的合法性
    """
    def __init__(self,username,pwd):
        self.username = username
        self.pwd = pwd
class DB:
    instance = None
    def __new__(cls, *args, **kwargs):
        """
        创建单例
        :param args:
        :param kwargs:
        :return:
        """
        if DB.instance is None:
            DB.instance = super().__new__(cls)
        return DB.instance
    def __init__(self):
        self.__pool = []
    def set_user(self,user):
        """
        储存数据
        :param user:存储用户的字典
        :return:
        """
        self.__pool.append(user)
    def get_user_by_username(self,username):
        """
        根据用户名查找用户
        :param username: 要查找用户的用户名
        :return: 查到则返回用户,否则返回空值
        """
        for user in self.__pool:
            if username == user.username:
                return user
            return None
    def get_all(self):
        return self.__pool
class View:
    def show_info(self,info):
        opt = input(info)
        return opt
    def read_name_and_pwd(self):
        check_name = input("请输入账户名(3-20位)")
        check_pwd = input("请输入密码:(6-20位)")
        if len(check_name) >= 20 or len(check_name) <= 3 or len(check_pwd) >= 20 or len(check_pwd) <=3 :
            print("账户或密码格式不正确")
            return None
        user =  User(check_name,check_pwd)
        return user
    def show_user_list(self,user):
        print("姓名:{},密码:{}".format(user.username,user.pwd))
class Service:
    def __init__(self):
        self.db = DB()
    def register(self,user):
        exist_user = self.db.get_user_by_username(user.username)
        if exist_user is not None:
            return print("该用户已存在")
        self.db.set_user(user)
        return print("注册成功")
    def login(self,user):
        exist_user = self.db.get_user_by_username(user.username)
        if exist_user is None:
            print("未能查找到该用户")
            return None
        if user.pwd != exist_user.pwd:
            print("密码错误")
            return None
        print("登陆成功")
        return user
    def show_all_user(self):
        return self.db.get_all()
class App:
    def __init__(self):
        self.v = View()
        self.s = Service()
        self.cur_user = None
    def start(self):
        while True:
            view_opt = self.v.show_info("请输入接下来要进行的操作:\n (1)注册 \n (2)登陆 \n (3)退出 \n")
            if view_opt == '1':
                register_user = self.v.read_name_and_pwd()
                if register_user is None:
                    continue
                self.s.register(register_user)
            elif view_opt == '2':
                check_user = self.v.read_name_and_pwd()
                if check_user is None:
                    continue
                login_user = self.s.login(check_user)
                if login_user is not None:
                    self.cur_user = login_user
                    self.show_home()
            elif view_opt == '3':
                exit()
            else:
                print("无效操作")
    def show_home(self):
        while True:
            show_opt = self.v.show_info("请输入接下来要进行的操作:\n (1)查看当前用户信息 \n (2)查看所有用户信息 \n (3)退出登陆 \n")
            if show_opt == '1':
                self.v.show_user_list(self.cur_user)
            elif show_opt == '2':
                users_list = self.s.show_all_user()
                for users in users_list:
                    self.v.show_user_list(users)
            elif show_opt == '3':
                return None
            else:
                print("无效操作,自动退出")
app = App()
app.start()

模拟一个检测器

import time
import random
class Server:
    def __init__(self,name):
        self.times = random.randint(3,10)
        self.name = name
    def connect(self):
        if self.times > 0:
            self.times -= 1
            return True
        return False
class Monitor:
    def __init__(self):
        self.servers = []
        self.admins = []
    def add_admin(self,admin):
        self.admins.append(admin)
        return self
    def add_server(self,server):
        self.servers.append(server)
        return self
    def test(self):
        for server in self.servers:
            if not server.connect():
                for admin in self.admins:
                    SendLetter.send_sms(admin,server.name+"链接失常")
class SendLetter:
    @staticmethod
    def send_sms(admin,msg):
        print("向",admin,"发送",msg)
monitor = Monitor()
monitor.add_admin('zhangsan').add_admin('lisi')
monitor.add_server(Server('1号服务器')).add_server(Server('2号服务器')).add_server(Server('3号服务器'))
while True:
    monitor.test()
    time.sleep(3)

观察者模式

设计模式:一系列通用的解决方案

class Observer:
    """
    观察者
    """
    def __init__(self,name):
        self.name = name
    def update(self,msg):
        print("向",self.name,"发送了:",msg)
class Subject:
    def __init__(self):
        self.observers = []
    def add_observer(self,observer):
        self.observers.append(observer)
    def remove_observer(self,observer):
        self.observers.remove(observer)
    def notify(self,msg):
        for observer in self.observers:
            observer.update(msg)
zhangsan = Observer("zhangsan")
lisi = Observer("lisi")
magazine = Subject()
magazine.add_observer(zhangsan)
magazine.add_observer(lisi)
magazine.notify("更新了")

Observer作为观察者,可随时接到订阅了的Subject的通知信息
Subject可以添加、删除订阅的观察者,来实现对只关注了自己的部分进行通知

异常处理

异常发生后,不做处理,正常情况下程序会直接结束。
同时,程序会抛出一个异常。

  • 可以通过以下方式来抓住异常,以让程序不结束:
student = {"name":"lisi"}
try:
    #可能发生异常的部分
    print(student["key"])
except:
    #异常处理代码
    print("ssss")

except:处理所有异常
except Type:处理指定类型的异常
except Type as data;处理指定的异常同时,将异常信息放在data里
except (Type1 ,Type2 ,Type3):同时处理多个异常,也可以在随后加上as语句获取data。

  • 可通过以下方式抓取多个异常:
try:
    pass
except 异常类型1:
    pass
except 异常类型2:
    pass
except (异常类型4,异常类型5,异常类型6):
    pass
except Exception as e:
    print(e)

异常类型的抓取排序:从特殊到一般,从具体异常到最宽泛的异常。
所有的异常类型均继承自Exception。

try:
except:
发生异常执行
else:
没有发生异常时执行
finally:
有没有异常都会执行
(可以只有try…finally语句)
(finally 一般用来释放资源)

try:
    print("zs")
finally:
    print("sss")
try:
    num = int(input("请输入整数"))
    result = 8 / num
    print(result)
except ValueError:
    print("输入值不为整数")
except ZeroDivisionError:
    print("除数不可为零")
except Exception as e:
    print("其他异常",e)
else:
    print("无异常")
finally:
    print("执行完毕")
  • 异常的传递

异常的传递,异常发生后,会传递给方法(函数)的调用者A,如果A有捕捉到该异常,则按捕捉机制处理。如果A没有捕捉到该异常,则会层层向上传递。
最终会传递到python解析器。此处就简单的终止程序。

def fun1():
    a = 0;
    print(10 / a)
def fun2():
    # try:
        fun1()
    # except ZeroDivisionError:
    #     print("除数为0")
fun2()
  • 手动抛出异常:raise
def fun3():
    print("hello")
    raise ZeroDivisionError
    #raise ZeroDivisionError()
fun3()
  • 自定义异常
class ParamInvalidException(Exception):
#自定义的异常内容
    def __init__(self,code,msg):
        #异常的代号
        self.code = code
        #异常的信息
        self.msg = msg
def login(username,pwd):
    if len(username) < 6:
        raise ParamInvalidException("用户名长度在1-6位之间")#“ ”在返回异常的时输出信息
    if username != "zhangsan":
        raise ParamInvalidException("用户名错误")
    print("登陆成功")
try:
    login("zhangsan","12345")
    print("后继操作")
#e存有ParamInvalidException中的信息
except ParamInvalidException as e:
    print(e.code,e.msg)
  • 程序发生了异常了怎么办?

程序发生了异常了怎么办?

  1. 先记录异常
  2. 是否可控的?
    不可控则管不了了,抛出异常。
    可控可以处理。
try:
    pass
except:
    #先记录异常
    #能处理的异常,处理下
    #不能处理的异常直接抛出

字符问题

在计算机中,字符通过一系列提前设定好的二进制数字编码表示。

  • 常见的编码表:
  1. ASCII(American Standard Code for Information Interchange:美国信息交换标准代码)
  2. 用于表示汉字的编码:GB2312,GBK
  3. 可以通用的编码:unicode 编码 (16位的等长编码)
  4. 现在常用的编码: UTF-8 (变长编码)
#encode获取了”zhangsan”的GBK格式的编码
bytes = '张三'.encode("GBK")
print(bytes)
print(type(bytes))
#encode获取了”zhangsan”的utf-8格式的编码
byte_utf8 = '张三'.encode("utf-8")
#将”zhangsan”的GBK格式的编码以GBK格式转换成字符
str = bytes.decode("GBK")
print(str)
#将”zhangsan”的utf-8格式的编码以utf-8格式转换成字符
str = byte_utf8.decode("GBK")
print(str)

文件操作

文件格式分为文本文件和二进制文件。
文本文件本质上存储时,也是二进制,但可以用文本编辑器查看。
二进制文件,无法通过文本编辑器查看。

  • 写入信息
try:
    #打开demo文件,根据‘w’方式,没有则会新建一个demo出来,编码格式为"utf-8"
    f = open("D://demo.txt","w",encoding="utf-8")
    #向文件里写入,如果原有内容,则直接覆盖
    f.write("neuedu\n")
    f.write("NEUQ")
    f.write("xxxxxxzzzzzz")
finally:
    f.close()
#另一种打开文件的格式,优于第一种方法的地方是不需要写close
with open("D://demo.txt","w",encoding="utf-8") as ff:
    ff.write("neuedu\n")
    ff.write("NEUQ")
    ff.write("xxxxxx")
  • 读取信息
with open("D://demo.txt",encoding="utf-8")as f:
    #读取法一
    content = f.read()
    print(content)
    #读取法二
    line = f.readline()
    while line:
        print(line)
        line = f.readline()
    #读取法三
    for line in f.readlines():
        print(line)
    #当文件数据过大时,不建议使用法三,因为会同时将大量文本数据写入内存,占用内存。

在read文件时,一次开文件是无法连续的两次read的,是因为读的时候,是有指针在指向当前字符来进行读取的,但可以通过一些方法来解决。


with open("D://demo.txt",encoding = 'utf-8')as f:
    #读取当前指针位置,最开始位于文件开头
    print(f.tell())
    #读取文本
    print(f.read())
    #接下来进行偏移:
    #def seek(self, offset: int, whence: int = 0)
    #offset:表示偏移量 
    #whence:0:从文件的开头偏移1:从当前指针所在位置开始偏移2:从文件的末尾开始偏移
    f.seek(0,0)
    #接下来表示当前位置的指针已偏移至文件开头
    print(f.tell())
    #再次读取文本
    print(f.read())
  • 文件的打开模式:
  1. r:(默认模式)以只读方式打开文件,文件的指针会放在文件的开头,这是默认模式,如果文件不存在,抛出异常。
  2. w:以只写方式打开文件,如果文件存在会被覆盖。如果文件不存在,会创建文件。
  3. a:以追加方式打开文件,如果文件已存在,则指针会放在文件的结尾。如果文件不存在,则创建新文件。
  4. r+:以读写形式打开文件,文件的指针放在文件的开头,如果文件不存在,则抛出异常。
  5. w+:以读写的方式打开文件,如果文件存在,会被覆盖,如果不存在,则创建文件。
  6. a+:以读写方式打开文件,如果存在,则指针在文件的末尾,如果不存在,则创建文件。

随堂习题

向文件写入10万行的1到10的随机数,一行一个数,随后查找文件中出现频率最高的是个数。

import random
from collections import Counter
with open('demo.txt','w',encoding='utf-8')as f:
    for _ in range(100000):
        f.write(str(random.randint(1,100))+"\n")
num_list = []
with open('demo.txt',encoding='utf-8')as ff:
    line = ff.readline()
    while line:
        num_list.append(line)
        line = ff.readline()
nums =  Counter(num_list)
num = nums.most_common(10)
print(num)

转载请注明:Python量化投资 » 2019-06-01第六次课

喜欢 (0)or分享 (0)