Python之线程

Python publisher01 23℃ 0评论

>撩个概念 多任务

是么是多任务呢?
我现在听着音乐,同时浏览着网页,在文档中写着笔记.是的,这就是多任务;对于计算机来说,就是同时执行多段的代码;

如今计算机都是多核CPU了,单核CPU也可以执行多任务,我们都知道计算机的代码都是顺序执行的,那么,单核的CPU是如何实现多任务的呢?

答案就是在极短的时间内轮流切换各个任务,CPU的运算太快了,给我们的感觉就是在同时进行一样;

这里引进两个概念:并行和并发

并发: 例如在单核CPU中执行多个任务,这个就是并发(执行的任务数量大于CPU核数)

并行: 两个任务在多核CPU机器中执行,两个任务分别在不同的CPU中执行,这个就是并行(任务数小于CPU核数)

例如: 学校运动会中,5000米决赛都是十几个人,起跑时,人数多于赛道,那么这种情况就是并发;100决赛都是武林高手,待遇不同,每人一个跑道,这个就是并发.

>接下来 线程

在python3中,线程由threading模块提供,来一窥threading面貌

threading模块下常用的方法或者属性

方法 说明
current_thread() 返回当前线程
active_count() 返回当前活跃的线程数量,主线程+子线程
get_ident() 返回当前线程
enumerate() 返回当前活动的Thread列表
main_thread() 返回主Thread对象
settrace(func) 为所有线程设置一个 trace 函数
setprofile(func) 为所有线程设置一个 profile 函数
stack_size([size]) 返回新创建线程栈大小;或为后续创建的线程设定栈大小为 size
TIMEOUT_MAX Lock.acquire(), RLock.acquire(), Condition.wait() 允许的最大超时时间

threading模块包含的类

说明
Thread 基本的线程类
Lock 互斥锁
RLock 可重入锁,使单一进程再次获得已持有的锁(递归锁)
Condition 条件锁,使得一个线程等待另一个线程满足特定条件,比如改变状态或某个值
Semaphore 信号锁。为线程间共享的有限资源提供一个”计数器”,如果没有可用资源则会被阻塞
Event 事件锁,任意数量的线程等待某个事件的发生,在该事件发生后所有线程被激活
Timer 一种计时器
Barrier Python3.2新增的“阻碍”类,必须达到指定数量的线程后才可以继续执行

threading模块中Thread类的方法和属性

方法与属性 说明
start() 启动线程,等待CPU调度
run() 线程被cpu调度后自动执行的方法
getName()、setName()和name 用于获取和设置线程的名称
setDaemon() 设置为后台线程或前台线程(默认是False,前台线程)。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止。如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程执行完成后,程序才停止
ident 获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None
is_alive() 判断线程是否是激活的(alive)。从调用start()方法启动线程,到run()方法执行完毕或遇到未处理异常而中断这段时间内,线程是激活的
isDaemon()方法和daemon属性 是否为守护线程
join([timeout]) 调用该方法将会使主调线程堵塞,直到被调用线程运行结束或超时。参数timeout是一个数值类型,表示超时时间,如果未提供该参数,那么主调线程将一直堵塞到被调线程结束

– 单线程(001_single_thread.py)

import time
def single_thread():
    print("这个单线程执行:%s"%time.time())
    time.sleep(1)
def main():
    for _ in range(5):
        single_thread()
if __name__ == "__main__":
    main()

– 多线程(002_multi_thread.py)

import time
import threading
def single_thread():
    print("这个单线程执行:%s"%time.time())
    time.sleep(1)
def main():
    for _ in range(5):
       t = threading.Thread(target= single_thread)
       t.start()
if __name__ == "__main__":
    main()

执行结果:

test_code$ python3 001_single_thread.py 
这个单线程执行:1563089407.2469919
这个单线程执行:1563089408.248269
这个单线程执行:1563089409.2495542
这个单线程执行:1563089410.2508354
这个单线程执行:1563089411.2516193
test_code$ python3 002_mulit_thread.py 
这个多线程执行:1563089416.1928792
这个多线程执行:1563089416.1931264
这个多线程执行:1563089416.1932962
这个多线程执行:1563089416.1934686
这个多线程执行:1563089416.1936424

我们刚看了单线程执行和两个线程的执行效果,让我回想起GIL里讲到的,在I/0密集操作程序中可以使用多线程,这里的耗时操作使用了time.sleep(1)来模仿了.接下来让我们学习更多的关于threading模块的知识…

使用多线程并发操作,花费时间要短很多
当调用start(),才会真正的创建线程,并且开始执行

>主线程会等待所有的子线程结束后才结束

#coding=utf-8
import threading
from time import sleep,ctime
def sing():
    for i in range(3):
        print("正在唱歌...%d"%i)
        sleep(1)
def dance():
    for i in range(3):
        print("正在跳舞...%d"%i)
        sleep(1)
if __name__ == '__main__':
    print('---开始---:%s'%ctime())
    t1 = threading.Thread(target=sing)
    t2 = threading.Thread(target=dance)
    t1.start()
    t2.start()
    #sleep(5) # 屏蔽此行代码,试试看,程序是否会立马结束?
    print('---结束---:%s'%ctime())

>查看线程数量

import threading
from time import sleep,ctime
def sing():
    for i in range(2):
        sleep(1)
    print("sing_ending...")
def dance():
    for i in range(3):
        sleep(1)
if __name__ == "__main__":
    print("----开始----:%s"%ctime())
    t1 = threading.Thread(target=sing)
    t2 = threading.Thread(target=dance)
    t1.start()
    t2.start()
    while True:
        length = len(threading.enumerate())
        print("当前运行的线程数量为:%d"%length)
        if length <= 1:
            break
        sleep(1)

运行结果:

----开始----:Sun Jul 14 17:11:24 2019
当前运行的线程数量为:3
当前运行的线程数量为:3
当前运行的线程数量为:3
sing_ending...
当前运行的线程数量为:2
当前运行的线程数量为:1
test_code$ 

>创建线程的第二种方式

  • 使用的都是在threading.Thread()实例化时,给里面传入对应的参数
    threading.Thread(self, group=None, target=None, name=None, args=(),kwargs=None, *, daemon=None)

    • group: 预留参数
    • target: 一个可调用对象,在线程执行后使用
    • name: 线程的名字,默认为”Thread-N”
    • args,kwargs: 传递的参数列表和关键字参数
  • 还有一种创建线程的方式是继承threading.Thread类,重写run方法,我们来尝试第二种方式

import threading
import time
class MyThread(threading.Thread):
    def run(self):
        print("I`m thread %s" % self.name)
if __name__ == '__main__':
    t = MyThread()
    t.start()

执行结果:

I`m thread Thread-1

总结

  • 创建自己的线程类时,需要重写run方法,创建自己的线程实例后,通过调用Thread类的start方法可以启动该线程,交给python虚拟机进行调度,当该线程获得执行机会时,就会调用run方法执行线程,run()方法执行完,线程结束.

>线程的执行顺序

import threading
import time
class MyThread(threading.Thread):
    def run(self):
        for i in range(2):
            time.sleep(0.5)
            print("I`m %s %s" % (self.name, i))
def main():
    for i in range(5):
        t = MyThread()
        t.start()
if __name__ == '__main__':
    main()

执行结果:

I`m Thread-2 0
I`m Thread-3 0
I`m Thread-1 0
I`m Thread-4 0
I`m Thread-5 0
I`m Thread-2 1
I`m Thread-3 1
I`m Thread-4 1
I`m Thread-1 1
I`m Thread-5 1

总结

  • 多线程的执行结果是不确定的,当执行到sleep时,线程将会被阻塞(Blocked),等到sleep结束后,线程进入就绪状态(Runnable)状态,等待调度;而线程的调度是随机选择一个线程执行.

>多线程,共享全局变量

import threading
import time
num = 100
def count_test1():
    global num
    for i in range(10):
        num += 1
    print("count_test1-->num:%s"%num)
def count_test2():
    global num
    for i in range(5):
        num += 1
    print("count_test2-->num:%s"%num)
print("最原始的num:%s"%num)
t1 = threading.Thread(target=count_test1)
t1.start()
time.sleep(2) #让t1执行完成
t2 = threading.Thread(target=count_test2)
t2.start()

执行结果:

最原始的num:100
count_test1-->num:110
count_test2-->num:115

>使用列表来测试

import threading
import time
def count_test1(num_list):
    num_list.append(10000)
    print("count_test1-->num:%s"%num_list)
def count_test2(num_list):
    print("count_test2-->num:%s"%num_list)
num_list = [11, 22, 33, 44]
t1 = threading.Thread(target=count_test1, args=(num_list,))
t1.start()
time.sleep(1) #让t1执行完成
t2 = threading.Thread(target=count_test2, args=(num_list,))
t2.start()

执行结果:

count_test1-->num:[11, 22, 33, 44, 10000]
count_test2-->num:[11, 22, 33, 44, 10000]

总结

  • 在一个进程内线程共享全局变量,多线程方便共享数据
  • 缺点就是,线程对全局变量的随意修改会造成线程之间对全局变量的混乱(即线程非安全)

>多线程的资源竞争问题

两个线程(t1,t2)对同一个全局变量(global_num)进行修改,正常情况下,t1对global_num加10,然后t2对global_num加10,最终global_num为20.

But,在多线程中,存在这种情况,t1获取到global_num,此时系统将t1设置为”sleep”状态,这时t2获取到global_num,对global_num进行加1,完成后,系统将t2设置为”sleep”状态,将t1设置为”running”状态,此时t1拿到的global_num是t2修改前的值,这时进行修改就会和t2修改重复.

测试1(循环数为100)

import threading
import time
num = 0
def count_test1():
    global num
    for i in range(100):
        num += 1
    print("count_test1-->num:%s"%num)
def count_test2():
    global num
    for i in range(100):
        num += 1
    print("count_test2-->num:%s"%num)
t1 = threading.Thread(target=count_test1)
t2 = threading.Thread(target=count_test2)
t1.start()
t2.start()
t1.join()
t2.join()
print("最终的num:%s"%num)

测试结果:

count_test1-->num:100
count_test2-->num:200
最终的num:200

测试2(循环数为100000)

import threading
import time
num = 0
def count_test1():
    global num
    for i in range(100000):
        num += 1
    print("count_test1-->num:%s"%num)
def count_test2():
    global num
    for i in range(100000):
        num += 1
    print("count_test2-->num:%s"%num)
t1 = threading.Thread(target=count_test1)
t2 = threading.Thread(target=count_test2)
t1.start()
t2.start()
t1.join()
t2.join()
print("最终的num:%s"%num)

测试结果:

count_test1-->num:100000
count_test2-->num:153462
最终的num:153462

总结

  • 如果多个线程对同一个全局变量操作,会出现资源问题,从而导致数据不准确

>解决资源竞争问题使用互斥锁

  • threading模块中定义了Lock类,可以实现锁
    • 创建锁对象: mutex = threading.Lock()
    • 上锁: mutex.acquire()
    • 释放锁: mutex.release()
  • 注意:
    • 如果这个锁之前是没有上锁的,那么acquire就不会阻塞
    • 如果调用acquire之前这个锁是被其它线程上了锁的,那么acquire就会阻塞,知道这个锁被释放

使用互斥锁(循环数为100000)

import threading
import time
num = 0
def count_test1():
    global num
    for i in range(100000):
        mutex.acquire()
        num += 1
        mutex.release()
    print("count_test1-->num:%s"%num)
def count_test2():
    global num
    for i in range(100000):
        mutex.acquire()
        num += 1
        mutex.release()
    print("count_test2-->num:%s"%num)
mutex = threading.Lock()
t1 = threading.Thread(target=count_test1)
t2 = threading.Thread(target=count_test2)
t1.start()
t2.start()
t1.join()
t2.join()
print("最终的num:%s"%num)

执行结果:

count_test1-->num:188038
count_test2-->num:200000
最终的num:200000

上锁释放锁的过程

当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态

每次只有一个线程可以获得锁,如果此时另一个线程试图获得这个锁,该线程就会变为”blocked”状态,称为”阻塞”,直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入”unlocked”状态。

线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态

总结

  • 锁的好处
    • 确保了一段代码只能由一个线程从前到尾完整执行
  • 锁的坏处
    • 阻止了多线程的并发执行,包含锁的代码段只能是单线程执行,大大降低了效率
    • 可能会存在多个锁,在获取锁和释放锁时容易造成死锁

>死锁问题

情侣吵架后,都在等待对方道歉,如果双方一直等待对方先开口,那么结果就悲剧了…

情侣吵架和死锁有什么联系呢?如果两个线程共享全局变量,两个线程分别占有一定的资源并且咋等待对方的资源,就会造成死锁问题

#coding=utf-8
import threading
import time
class MyThread1(threading.Thread):
    def run(self):
        # 对mutexA上锁
        mutexA.acquire()
        # mutexA上锁后,延时1秒,等待另外那个线程 把mutexB上锁
        print(self.name+'----do1---up----')
        time.sleep(1)
        # 此时会堵塞,因为这个mutexB已经被另外的线程抢先上锁了
        mutexB.acquire()
        print(self.name+'----do1---down----')
        mutexB.release()
        # 对mutexA解锁
        mutexA.release()
class MyThread2(threading.Thread):
    def run(self):
        # 对mutexB上锁
        mutexB.acquire()
        # mutexB上锁后,延时1秒,等待另外那个线程 把mutexA上锁
        print(self.name+'----do2---up----')
        time.sleep(1)
        # 此时会堵塞,因为这个mutexA已经被另外的线程抢先上锁了
        mutexA.acquire()
        print(self.name+'----do2---down----')
        mutexA.release()
        # 对mutexB解锁
        mutexB.release()
mutexA = threading.Lock()
mutexB = threading.Lock()
if __name__ == '__main__':
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()

执行结果

程序会卡住: 按唱、跳、Rap键+c退出

总结

  • 如何避免死锁
    • 程序设计上尽量避免
    • 添加超时时间等

你或许想:《去原作者写文章的地方

转载请注明:Python量化投资 » Python之线程

喜欢 (0)or分享 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址