进程相关 – Python量化投资

进程相关

一、基本概念

  • 定义:运行中的程序

  • 程序和进程之间的区别:

    • 程序只是一个文件

    • 进程是这个文件被CPU运行起来了

  • 进程是计算机中最小的资源分配单位

  • 在操作系统中的唯一标识符 :pid

  • 进程调度:

    • 操作系统调度进程的算法

      • 短作业优先算法

      • 先来先服务算法

      • 时间片轮转算法

      • 多级反馈算法

  • 并行与并发

    • 并行

      • 两个程序 两个CPU 每个程序分别占用一个CPU自己执行自己的

      • 看起来是同时执行,实际在每一个时间点上都在各自执行着

    • 并发

      • 两个程序 一个cpu 每个程序交替的在一个cpu上执行

      • 看起来在同时执行,但是实际上仍然是串行

  • 同步 / 异步 / 阻塞 / 非阻塞

    • 同步:一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列,要么成功都成功,失败都失败,两个任务的状态可以保持一致

    • 异步:不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了,至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列

    • 阻塞:cpu不工作

    • 非阻塞:cpu工作

    • 同步阻塞

      • conn.recv

      • socket 阻塞的tcp协议的时候

    • 同步非阻塞

      • func() 没有io操作

      • socket 非阻塞的tcp协议的时候

      • 调用函数(这个函数内部不存在io操作)

    • 异步非阻塞

      • 把func扔到其他任务里去执行了

      • 我本身的任务和func任务各自执行各自的 没有io操作

    • 异步阻塞

  • 进程的三状态

    • 就绪(Ready)状态

      • 当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便立即执行,这时的进程状态称为就绪状态

    • 执行/运行(Running)

      • 状态当进程已获得处理机,其程序正在处理机上执行,此时的进程状态称为执行状态

    • 阻塞(Blocked)状态

      • 正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能满足、等待信件(信号)等

  • 子进程 / 父进程

    • 在父进程中创建子进程

      • 在pycharm中启动的所有py程序都是pycharm的子进程

    • os模块

      • os.getpid():获取子进程的pid

      • os.getppid():获取父进程的pid

二、进程模块

  • 开启进程


    # 面向函数方式:
    import os
    import time
    from multiprocessing import Process
    ​
    def eat():
        print('start eating',os.getpid())
        time.sleep(1)
        print('end eating',os.getpid())
    ​
    def sleep():
        print('start sleeping',os.getpid())
        time.sleep(1)
        print('end sleeping',os.getpid())
    ​
    if __name__ == '__main__':
        p1 = Process(target=eat)    # 创建一个即将要执行eat函数的进程对象
        p1.start()                  # 异步 调用开启进程的方法 但是并不等待这个进程真的开启
        p2 = Process(target=sleep)  # 创建一个即将要执行sleep函数的进程对象
        p2.start()                  # 异步 调用开启进程的方法 但是并不等待这个进程真的开启
        print('main :',os.getpid())
    # 面向对象方式:
    import os
    import time
    from multiprocessing import Process
    ​
    class MyProcecss1(Process):
        def __init__(self,x,y):
            self.x = x
            self.y = y
            super().__init__()
        def run(self):
            print(self.x,self.y,os.getpid())
            for i in range(5):
                print('in son2')
    ​
    if __name__ == '__main__':
        mp = MyProcecss1(1,2)
        mp.daemon = True
        mp.start()
        print(mp.is_alive())
        time.sleep(1)
        mp.terminate()


  • 操作系统创建进程的方式不同

    • windows操作系统执行开启进程的代码

      • 实际上新的子进程需要通过import父进程的代码来完成数据的导入工作

      • 所以有一些内容我们只希望在父进程中完成,就写在if name == ‘main‘:下面

    • ios linux操作系统创建进程 fork

  • 主进程和子进程之间的关系

    • 主进程的结束逻辑

      • 主进程的代码结束

      • 所有的子进程结束

      • 给子进程回收资源

      • 主进程结束



    import os
    import time
    from multiprocessing import Process
    def func():
        print('start',os.getpid())
        time.sleep(10)
        print('end',os.getpid())
    ​
    if __name__ == '__main__':
        p = Process(target=func)
        p.start()   # 异步 调用开启进程的方法 但是并不等待这个进程真的开启
        print('main :',os.getpid())
        # 主进程没结束 :等待子进程结束
        # 主进程负责回收子进程的资源
        # 如果子进程执行结束,父进程没有回收资源,那么这个子进程会变成一个僵尸进程



    示例

  • 主进程怎么知道子进程结束了的呢?

    • 基于网络、文件

    • join方法 :阻塞,直到子进程结束就结束



    import time
    from multiprocessing import Process
    def send_mail():
        time.sleep(3)
        print('发送了一封邮件')
    if __name__ == '__main__':
        p = Process(target=send_mail)
        p.start()   # 异步 非阻塞
        # time.sleep(5)
        print('join start')
        p.join()    # 同步 阻塞 直到p对应的进程结束之后才结束阻塞
        print('5000封邮件已发送完毕')



    示例一

    # 开启10个进程,给公司的5000个人发邮件,发送完邮件之后,打印一个消息“5000封邮件已发送完毕”
    import time
    import random
    from multiprocessing import Process
    def send_mail(a):
        time.sleep(random.random())
        print('发送了一封邮件',a)
    ​
    if __name__ == '__main__':
        l = []
        for i in range(10):
            p = Process(target=send_mail,args=(i,))
            p.start()
            l.append(p)
        print(l)
        for p in l:p.join()
        # 阻塞 直到上面的十个进程都结束
        print('5000封邮件已发送完毕')



    示例二

  • 总结:

    • 如何创建一个进程对象

      • 对象和进程之间的关系

        • 进程对象和进程并没有直接的关系

        • 只是存储了一些和进程相关的内容

        • 此时此刻,操作系统还没有接到创建进程的指令

    • 如何开启一个进程

      • 通过p.start()开启了一个进程–这个方法相当于给了操作系统一条指令

      • start方法的非阻塞和异步的特点

        • 在执行开启进程这个方法的时候

        • 我们既不等待这个进程开启,也不等待操作系统给我们的响应

        • 这里只是负责通知操作系统去开启一个进程

        • 开启了一个子进程之后,主进程的代码和子进程的代码完全异步

    • 父进程和子进程

      • 为了回收子进程的资源,父进程会等待着所有的子进程结束之后才结束

    • 进程开启的过程中windows和 linux / ios之间的区别

      • windows 通过(模块导入)再一次执行父进程文件中的代码来获取父进程中的数据

        • 所以只要是不希望被子进程执行的代码,就写在if name == ‘main‘下

        • 因为在进行导入的时候父进程文件中的name != ‘main

      • linux/ios

        • 正常的写就可以,没有if name == ‘main‘这件事情了

      • 补充


        if __name__ == '__main__':
            # 控制当这个py文件被当作脚本直接执行的时候,就执行这里面的代码
            # 当这个py文件被当作模块导入的时候,就不执行这里面的代码
            print('hello hello')
        __name__ == '__main__'  # 执行的文件就是__name__所在的文件
        __name__ == '文件名'     # __name__所在的文件被导入执行的时候


    • join方法

      • 把一个进程的结束事件封装成一个join方法

      • 执行join方法的效果就是阻塞,直到这个子进程执行结束就结束阻塞


      # 在多个子进程中使用join
      p_l= []
      for i in range(10):
          p = Process(target=函数名,args=(参数1,参数2))
          p.start()
          p_l.append(p)
      for p in p_l:p.join()
      # 所有的子进程都结束之后要执行的代码写在这里


三、守护进程

  • 方式:有一个参数可以把一个子进程设置为一个守护进程

    • p.daemon = True


    import time
    from multiprocessing import Process
    ​
    def son1(a,b):
        while True:
            print('is alive')
            time.sleep(0.5)
    ​
    def son2():
        for i in range(5):
            print('in son2')
            time.sleep(1)
    ​
    if __name__ == '__main__':
        p = Process(target=son1,args=(1,2))
        p.daemon = True
        p.start()      # 把p子进程设置成了一个守护进程
        p2 = Process(target=son2)
        p2.start()
        time.sleep(2)


  • 结束:守护进程是随着主进程的代码结束而结束的

    • 所有的子进程都必须在主进程结束之前结束,由主进程来负责回收资源

  • 应用场景:

    • 生产者消费者模型

    • 和守护线程做对比

四、生产者消费者模型

  • 解耦:把写在一起的大的功能分开成多个小的功能处理

    • 优点:修改 复用 可读性

  • 组成:

    • producer:生产者,生产数据

    • consumer:消费者,处理数据

    • 生产者和消费者之间的容器就是队列

  • 什么是生产者消费者模型?

    • 把一个产生数据并且处理数据的过程解耦,让生产的数据的过程和处理数据的过程达到一个工作效率上的平衡,

    • 中间的容器,在多进程中我们使用队列或者可被join的队列,做到控制数据的量

      • 当数据过剩的时候,队列的大小会控制这生产者的行为

      • 当数据严重不足的时候,队列会控制消费者的行为

      • 并且我们还可以通过定期检查队列中元素的个数来调节生产者消费者的个数

    • 比如说:一个爬虫,或者一个web程序的server端

      • 爬虫:请求网页的平均时间是0.3s,处理网页代码的时候是0.003s

        • 100倍,每启动100个线程生产数据,启动一个线程来完成处理数据

      • web程序的server端:每秒钟有6w条请求,一个服务每s中只能处理2000条

        • 先写一个web程序,只负责一件事情,就是接收请求,然后把请求放到队列中

        • 再写很多个server端,从队列中获取请求,然后处理,然后返回结果

  • 生产者消费者模型



    import time
    import random
    from multiprocessing import Process,Queue
    ​
    def producer(q,name,food):
        for i in range(10):
            time.sleep(random.random())
            fd = '%s%s'%(food,i)
            q.put(fd)
            print('%s生产了一个%s'%(name,food))
    ​
    def consumer(q,name):
        while True:
            food = q.get()
            if not food:break
            time.sleep(random.randint(1,3))
            print('%s吃了%s'%(name,food))
    ​
    def cp(c_count,p_count):
        q = Queue(10)
        for i in range(c_count):
            Process(target=consumer, args=(q, 'alex')).start()
        p_l = []
        for i in range(p_count):
            p1 = Process(target=producer, args=(q, 'wusir', '泔水'))
            p1.start()
            p_l.append(p1)
        for p in p_l:p.join()
        for i in range(c_count):
            q.put(None)
    if __name__ == '__main__':
        cp(2,3)



    生产者消费者模型

    import re
    import requests
    from multiprocessing import Process,Queue
    ​
    def producer(q,url):
        response = requests.get(url)
        q.put(response.text)
    ​
    def consumer(q):
        while True:
            s = q.get()
            if not s:break
            com = re.compile(
                '<div class="item">.*?<div class="pic">.*?<em .*?>(?P<id>\d+).*?<span class="title">(?P<title>.*?)</span>'
                '.*?<span class="rating_num" .*?>(?P<rating_num>.*?)</span>.*?<span>(?P<comment_num>.*?)评价</span>', re.S)
            ret = com.finditer(s)
            for i in ret:
                print({
                    "id": i.group("id"),
                    "title": i.group("title"),
                    "rating_num": i.group("rating_num"),
                    "comment_num": i.group("comment_num")}
                )
    ​
    if __name__ == '__main__':
        count = 0
        q = Queue(3)
        p_l = []
        for i in range(10):
            url = 'https://movie.douban.com/top250?start=%s&filter='%count
            count += 25
            p = Process(target=producer,args=(q,url,))
            p.start()
            p_l.append(p)
        Process(target=consumer, args=(q,)).start()
        for p in p_l:p.join()
        q.put(None)



    生产者消费者模型实现的爬虫示例

  • joinablequeue



    import time
    import random
    from  multiprocessing import JoinableQueue,Process
    ​
    def producer(q,name,food):
        for i in range(10):
            time.sleep(random.random())
            fd = '%s%s'%(food,i)
            q.put(fd)
            print('%s生产了一个%s'%(name,food))
        q.join()
    ​
    def consumer(q,name):
        while True:
            food = q.get()
            time.sleep(random.random())
            print('%s吃了%s'%(name,food))
            q.task_done()
    ​
    if __name__ == '__main__':
        jq = JoinableQueue()
        p =Process(target=producer,args=(jq,'wusir','泔水'))
        p.start()
        c = Process(target=consumer,args=(jq,'alex'))
        c.daemon = True
        c.start()
        p.join()



    示例

五、Process类的一些方法

  • p.start():开启进程,异步非阻塞

  • p.terminate():结束进程, 异步非阻塞

  • p.is_alive():判断子进程是否还活着

  • p.join():等待子进程的结束,同步阻塞



    import time
    from multiprocessing import Process
    ​
    def son1():
        while True:
            print('is alive')
            time.sleep(0.5)
    ​
    if __name__ == '__main__':
        p = Process(target=son1)
        p.start()      # 异步 非阻塞
        print(p.is_alive())
        time.sleep(1)
        p.terminate()   # 异步的 非阻塞
        print(p.is_alive())   # 进程还活着 因为操作系统还没来得及关闭进程
        time.sleep(0.01)
        print(p.is_alive())   # 操作系统已经响应了我们要关闭进程的需求,再去检测的时候,得到的结果是进程已经结束了



    示例

  • 总结:

    • 开启进程的方式


      # 面向函数
      def 函数名:要在子进程中执行的代码
      p = Process(target= 函数名,args=(参数1,))
      ​
      # 面向对象
      class 类名(Process):
          def __init__(self,参数1,参数2):   # 如果子进程不需要参数可以不写
              self.a = 参数1
              self.b = 参数2
              super().__init__()
          def run(self):
              # 要在子进程中执行的代码
      p = 类名(参数1,参数2)


    • Process提供的操作进程的方法

      • p.start():开启进程,异步非阻塞

      • p.terminate():结束进程,异步非阻塞

      • p.join():同步阻塞

      • p.isalive():获取当前进程的状态

      • p.daemon = True:设置为守护进程,守护进程永远在主进程的代码结束之后自动结束

六、锁Lock

  • 如果在一个并发的场景下,涉及到某部分内容是需要修改一些所有进程共享数据资源,需要加锁来维护数据的安全

  • 在数据安全的基础上,才考虑效率问题

  • 同步存在的意义:数据的安全性

  • 方式:

    • 在主进程中实例化 lock = Lock()

    • 把这把锁传递给子进程

    • 在子进程中 对需要加锁的代码 进行 with lock:

      • with lock相当于lock.acquire()和lock.release()

  • 应用场景:(在进程中需要加锁的场景)

    • 共享的数据资源(文件、数据库)

    • 对资源进行修改、删除操作

  • 加锁之后能够保证数据的安全性 但是也降低了程序的执行效率



    import time
    import json
    from multiprocessing import Process,Lock
    ​
    def search_ticket(user):
        with open('ticket_count') as f:
            dic = json.load(f)
            print('%s查询结果  : %s张余票'%(user,dic['count']))
    ​
    def buy_ticket(user,lock):
        # with lock:
        # lock.acquire()   # 给这段代码加上一把锁
            time.sleep(0.02)
            with open('ticket_count') as f:
                dic = json.load(f)
            if dic['count'] > 0:
                print('%s买到票了'%(user))
                dic['count'] -= 1
            else:
                print('%s没买到票' % (user))
            time.sleep(0.02)
            with open('ticket_count','w') as f:
                json.dump(dic,f)
        # lock.release()   # 给这段代码解锁
    def task(user, lock):
        search_ticket(user)
        with lock:
            buy_ticket(user, lock)
    ​
    if __name__ == '__main__':
        lock = Lock()
        for i in range(10):
            p = Process(target=task,args=('user%s'%i,lock))
            p.start()



    示例:抢票系统

七、进程之间通信

  • 进程之间的数据隔离


    from multiprocessing import Process
    n = 100
    def func():
        global n
        n -= 1if __name__ == '__main__':
        p_l = []
        for i in range(10):
            p = Process(target=func)
            p.start()
            p_l.append(p)
        for p in p_l:p.join()
        print(n)


  • 进程之间的通信 – IPC(inter process communication)


    from multiprocessing import Queue,Process
    # 先进先出
    def func(exp,q):
        ret = eval(exp)
        q.put({ret,2,3})
        q.put(ret*2)
        q.put(ret*4)
    ​
    if __name__ == '__main__':
        q = Queue()
        Process(target=func,args=('1+2+3',q)).start()
        print(q.get())
        print(q.get())
        print(q.get())
    import queue
    from multiprocessing import Queue
    q = Queue(5)
    q.put(1)
    q.put(2)
    q.put(3)
    q.put(4)
    q.put(5)             # 当队列为满的时候再向队列中放数据 队列会阻塞
    print('5555555')
    try:
        q.put_nowait(6)  # 当队列为满的时候再向队列中放数据 会报错并且会丢失数据
    except queue.Full:
        pass
    print('6666666')
    ​
    print(q.get())
    print(q.get())
    print(q.get())   
    print(q.get())   
    print(q.get())              # 在队列为空的时候会发生阻塞
    try:
        print(q.get_nowait())   # 在队列为空的时候 直接报错
    except queue.Empty:pass


  • 内置IPC机制:

    • Queue(队列):进程之间数据安全

      • 天生就是数据安全的

      • 基于文件家族的socket pickle lock

    • pipe(管道):进程之间数据不安全

      • 不安全的

      • 基于文件家族的socket pickle

      • 队列 = 管道 + 锁

  • 第三方工具(软件)提供的IPC机制:

    • redis / memcache / kafka / rabbitmq

    • 优点:

      • 并发需求

      • 高可用,多个消息中间件

      • 断电保存数据

      • 解耦

八、进程之间的数据共享

  • mulprocessing中有一个manager类,封装了所有和进程(数据共享、数据传递)相关的数据类型

  • 但是对于字典、列表这一类的数据操作的时候会产生数据不安全

  • 需要加锁解决问题,并且需要尽量少的使用这种方式



    from multiprocessing import Manager,Process,Lock
    ​
    def func(dic,lock):
        with lock:
            dic['count'] -= 1if __name__ == '__main__':
        # m = Manager()
        with Manager() as m:
            l = Lock()
            dic = m.dict({'count':100})
            p_l = []
            for i in range(100):
                p = Process(target=func,args=(dic,l))
                p.start()
                p_l.append(p)
            for p in p_l:p.join()
            print(dic)



    示例

https://www.cnblogs.com/zengyi1995/p/11330891.html

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
0 条回复 A 作者 M 管理员
    所有的伟大,都源于一个勇敢的开始!
欢迎您,新朋友,感谢参与互动!欢迎您 {{author}},您在本站有{{commentsCount}}条评论