python 之 并发编程(守护进程、互斥锁、IPC通信机制)

释放双眼,带上耳机,听听看~!

9.5 守护进程

主进程创建守护进程

  其一:守护进程会在主进程代码执行结束后就立即终止

  其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

  • p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

from multiprocessing import Process
​
def task(name):
    print(\'%s is running\' % name)
    time.sleep(3)
​
if __name__ == \'__main__\':
    obj = Process(target=task, args=(\'egon\',))
    obj.daemon=True         #设置obj为守护进程,并且父进程代码执行结束,obj即终止运行
    obj.start()             # 发送信号给操作系统
    print(\'\')

9.6 互斥锁

互斥锁用来将并发编程串行,牺牲了效率而保证了数据安全

强调:必须是lock.acquire()一次,然后 lock.release()释放一次,才能继续lock.acquire(),不能连续的lock.acquire()

互斥锁和 join的区别:

二者的原理都是一样,都是将并发变成串行,从而保证有序

区别一:join是按照人为指定的顺序执行,而互斥锁是进程平等地竞争,谁先抢到谁执行,一个人拿到锁,其余人都等待

from multiprocessing import Process,Lock
import random
​
mutex=Lock()
def task1(lock):
    lock.acquire() 
    print(\'task1:名字是egon\')
    print(\'task1:性别是male\')
    lock.release()
    
def task2(lock):
    lock.acquire()
    print(\'task2:名字是alex\')
    print(\'task2:性别是male\')
    lock.release()
    
def task3(lock):
    lock.acquire()
    print(\'task3:名字是lxx\')
    print(\'task3:性别是female\')
    lock.release()
    
if __name__ == \'__main__\':
    p1=Process(target=task1,args=(mutex,))
    p2=Process(target=task2,args=(mutex,))
    p3=Process(target=task3,args=(mutex,))
    p1.start()# p1.start()
    p2.start()# p1.join()
    p3.start()# p2.start()
             # p2.join()
             # p3.start()
             # p3.join()

9.61 模拟抢票

互斥锁和 join的区别二:

互斥锁可以让一部分代码(修改共享数据的代码)串行,而join只能将代码整体串行

import json
import time
import random
import os
from multiprocessing import Process,Lock
​
mutex=Lock()
def search():
    time.sleep(random.randint(1,3))
    with open(\'db.json\',\'r\',encoding=\'utf-8\') as f:
        dic=json.load(f)
        print(\'%s 剩余票数:%s\' %(os.getpid(),dic[\'count\']))
​
def get():
    with open(\'db.json\',\'r\',encoding=\'utf-8\') as f:
        dic=json.load(f)
    if dic[\'count\'] > 0:
        dic[\'count\']-=1
        with open(\'db.json\',\'w\',encoding=\'utf-8\') as f:
            json.dump(dic,f)
        print(\'%s 购票成功\' %os.getpid())
​
def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()
​
if __name__ == \'__main__\':
    for i in range(10):
        p=Process(target=task,args=(mutex,))
        p.start()
        # p.join()

9.7 IPC通信机制

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

进程之间通信必须找到一种介质,该介质必须满足: 1、是所有进程共享的 2、必须是内存空间 附加:帮我们自动处理好锁的问题

from multiprocessing import Process,Manager,Lock
import time
​
mutex=Lock()
def task(dic,lock):
    lock.acquire()
    temp=dic[\'num\']
    time.sleep(0.1)
    dic[\'num\']=temp-1
    lock.release()
​
if __name__ == \'__main__\':
    m=Manager()
    dic=m.dict({\'num\':10})
    l=[]
    for i in range(10):
        p=Process(target=task,args=(dic,mutex))
        l.append(p)
        p.start()
    for p in l:
        p.join()
    print(dic)      #{\'num\': 0}

9.71创建队列的类Queue

底层就是以管道和锁定的方式实现:

队列 (管道+锁) :1、共享的空间 2、是内存空间 3、自动帮我们处理好锁定问题

  • Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

  • maxsize是队列中允许最大项数,省略则无大小限制。

from multiprocessing import Queue
q=Queue(3)      #maxsize=3
q.put(\'first\')
q.put({\'second\':None})
q.put(\'\')
​
# q.put(4) #阻塞
print(q.get())  #first
print(q.get())  #{\'second\': None}
print(q.get())  #

强调: 1、队列用来存成进程之间沟通的消息,数据量不应该过大 2、maxsize的值超过的内存限制就变得毫无意义

了解:block=True(默认值)、timeout

q=Queue(1)
q.put(\'first\',block=False)      #q.put方法用以插入数据到队列中
q.put(\'fourth\',block=False/True)#queue.Full/一直等
​
q.put(\'first\',block=True)
q.put(\'fourth\',block=True,timeout=3)#等3秒后报错queue.Full
​
q.get(block=False)#q.get方法可以从队列读取并且删除一个元素
q.get(block=False)#queue.Empty
​
q.get(block=True)
q.get(block=True,timeout=2)#等2秒后报错queue.Empty

给TA打赏
共{{data.count}}人
人已打赏
随笔日记

idea万能快捷键(alt enter),你不知道的17个实用技巧!!!

2020-11-9 5:58:10

随笔日记

大厂程序员的一天是如何度过的?

2020-11-9 5:58:12

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索