# STL

  • 本文挑选了一些实用但难以上手的标准库进行系统性学习,其中,标准库是基于Python3.7的。
  • 时至今日,python3.9的出现,引入了很多新特性,所以,keep learning吧少年!

# multiprocessing

# 进程

>>> import multiprocessing as ml
>>> import time

>>> def foo(a,b)
		time.sleep(10)
		print(a+b)

#新建一个进程,target为可调用对象,name为进程名字,args为位置参数元组
p = ml.Process(target = foo, name = 'lyj',args = (5,6))
#检查进程是否在运行
>>> p.is_alive()
	False
#进程的身份验证键,除非显式设定,否则由os.urandom()函数生成的32字符的字符串,为网络底层通讯安全提供保障
>>> p.authkey
	b'\x16\xe9\x19\xe0\xe3\xa3\\\xa9W\x9d\xa5\xf6\xd3\xb7\xf1?\xfd4\xe0\x85M|\t\xd3v\xc7\xf8J\xd3e\xed\xa4'
#指示进程是否为后台进程,当父进程终止时,后台进程自动终止,且后台进程不能创建自己的新进程,此值需在start()函数启动前设置
>>> p.daemon
	False
#进程名
>>> p.name
	'lyj'
#进程运行时的整数进程ID,不运行时没有
>>> p.pid
	22286
#启动子进程,进程名就是我们定义的name
>>> p.start()
#进程启动时运行的方法,并不启动一个新进程,就是在主线程中调用了一个普通函数而已。
>>> p.run()

# 工具函数

>>> import multiprocessing as ml
>>> ml.active_children()
	[]
>>> ml.cpu_count()
	12
>>> ml.current_process()
	<_MainProcess(MainProcess, started)>

# 进程间通信-队列方式

>>> import multiprocessing as ml

#创建共享的进程队列,参数为队列允许最大项数,若省略,则无大小限制。底层队列使用管道和锁实现
>>> q = ml.Queue(5)
#连接队列的后台进程,在q.close()后,等待所有队列项被消耗;默认情况下,此方法由不是q的原始创建者的所有进程调用
>>> q.join_thread()
#不会在进程退出时自动连接后台线程
>>> q.cancel_join_thread()
#关闭队列,防止队列加入更多数据,若q被垃圾回收,自动调用该方法
>>> q.close()
#检查队列满或空,及当前数量,但时间差导致结果不一定可靠
>>> q.full()
	False
>>> q.empty()
	True
>>> q.qsize()
	0
#返回q中的一个项,默认参数block = True,即若q为空,将阻塞至有项为止,若block = False, 将引发Queue.empty()异常;可选参数timeout为超时时间,用于阻塞模式,若一定时间内无项可用,则引发Queue.empty()异常
>>> q.get(block = True, timeout = 5)
>>> q.get_nowait()
#将item放入队列,若队列已满,则阻塞至有空间可用为止,block控制阻塞行为,timeout指定阻塞模式下可用空间的时间长短,超时后引发Queue.full()异常
>>> q.put(item = 'A',block = True, timeout = 3)
>>> q.put_nowait()
#可连接的共享进程队列,允许项的消费者通知生产者,项已经被成功处理,除了普通Queue对象的方法外,还有以下方法
>>> q = ml.JoinableQueue(8)
#消费者使用此方法发出信号,表示q.get()返回的项已被处理
>>> q.task_done()
#生产者使用此方法进行阻塞,直到队列所有项已经被处理,阻塞将持续到每个项均调用task_done()方法为止
>>> q.join()

# 案例——队列方式

  • 放入队列的每个项会被序列化,通过管道或套接字连接发送给进程
  • 一般来说,发送数量较少的大对象比发送大量小对象更好
import multiprocessing as ml

def consumer(input_q):
	while 1:
		item = input_q.get()
		if item is None:
			input_q.task_done()
			break
		#处理工作
		input_q.task_done()

def producer(sequence, output_q):
	for item in sequence:
		output_q.put(item)

#创建共享进程队列
q = ml.JoinableQueue()
#运行消费者进程
con_p = ml.Process(target = consumer, args = (q,))
con_p.daemon = True
con_p.start()
#生产多个项
sequence = [1,2,3,4]
producer(sequence, q)
#在队列上安置哨兵,发出完成信号,若存在n个消费者进程,则要放n个哨兵
q.put(None)
#等待项被处理,防止主程序被关闭后,消费被提前终止
q.join()

# 进程间通信-管道方式

#创建一条管道,返回元组(conn1,conn2),表示管道两端的connection对象,默认情况下,管道为双向,若将默认参数duplex改为False,则conn1只能接收,conn2只能发送,必须在创建和启用使用管道的Process对象前调用pipe()方法
>>> (server_p,client_p) = ml.Pipe()

#Pipe()方法返回的两个connection对象,有以下方法属性

#关闭管道,可选任意一边关闭,若connection对象被垃圾回收,则会自动调用该方法
>>> c.close()
#返回连接使用的整数文件描述符
>>> c.fileno()
#若连接上的数据可用,返回True,timeout是指定等待最长时限,默认为0,马上返回结果,若将其置为None,则无限期等待数据到达
>>> c.poll([timeout])
#接收c.send()返回的对象,若连接的另一端已经关闭,不存在任何数据,则引发EOFError
>>> c.recv()
#接收c.send_bytes()方法发送的一条完整的字节信息,maxlength指定要接受的最大字节数,若进入的消息超过这个值,则引发IOError,若另一侧已关闭,则引发EOFError
>>> c.recv_bytes([maxlength])
#接收一条完整的字节信息,并把它保存在buffer对象中,offset指定缓冲区中放置消息处的字节位移,返回值是收到的字节数,若消息长于可用缓冲区空间,则引发BufferTooShort异常
>>> c.recv_bytes_into(buffer[,offset])
#通过连接发送对象,obj是与序列化兼容的任意对象
>>> c.send(obj)
#通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,size是发送字节数,结果数据以单条消息的形式发出
>>> c.send_bytes(buffer[,offset[,size]])

# 案例——管道方式

  • 若生产者或消费者没有使用管道的某个端点,就应该关闭它,管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常
  • 不同进程的管道状态是独立的
import multiprocessing as ml

def consumer(pipe):
	output_p,input_p = pipe
	input_p.close()
	while True:
		try:
			item = output_p.recv()
		except EOFError:
			break
		#处理项目
		print(item)
	print('consumer done')
	
def producer(sequence,input_p):
	for item in sequence:
		input_p.send(item)

output_p, input_p = ml.Pipe()
#启动消费者进程
cons_p = ml.Process(target = consumer, args = ((output_p,input_p),))
cons_p.start()
#关闭生产者中的输出管道
output_p.close()
#生产项目
sequence = [1,2,3,4]
producer(sequence, input_p)
#关闭输入管道,表示完成,不关则消费者一直循环
input_p.close()
#等待使用者进程关闭
cons_p.join()

# 进程池

  • 可以把各种数据处理任务提交给进程池
  • 只有池工作进程执行足够弥补额外通信开销的工作,使用进程池才有意义
import multiprocessing as ml
#创建工作进程池,processes是要创建的进程数,省略则使用cpu_count()的值,initializer是每个工作进程启动时要执行的可调用对象,默认为None。initargs是要传递给initializer的参数元组
>>> p = ml.Pool([processes[,initializer[,initargs]]])
#在一个池的工作进程中(非并行)执行函数func(*args,**kwargs),然后返回结果;主进程会被阻塞
>>> p.apply(func[,args[,kwargs]])
# 在一个池的工作进程中并行执行函数,返回结果是AsyncResult类的实例,callback是可调用对象,接收输入参数,当func的结果变为可用时,立刻传递给callback,callback禁止执行任何阻塞操作,否则将阻塞接收其他异步操作中的结果,
>>> p.apply_async(func[,args[,kwargs[,callback]]]) 
#关闭进程池,防止进一步操作,若有挂起的操作,他们将在工作进程终止之前完成
>>> p.close()
#等待所有工作进程退出,只能在close()方法或terminate()方法之后调用
>>> p.join()
#立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作,若p被垃圾回收,自动调用此函数
>>> p.terminate()
#将可调用对象func应用给iterable中的所有项,然后以课表形式返回结果,通过将iterable划分为多块并将工作分派给工作进程,可以并行执行,chunksize指定每块中的项数,若数据量较大,可提高chunksize来提升性能
>>> p.map(func,iterable,[,chunksize])
#同map(),返回AsyncResult类的实例,callback是可调用对象 
>>> p.map_async(func,iterable,[,chunksize[,callback]])
# 同map(),但返回迭代器而非列表
>>> p.imap(func,iterable[,chunksize])
# 同map(),但结果顺序根据完成时间任意确定
>>> p.imap_unordered(func,iterable[,chunksize])
'''
apply_async()和map_async()返回的值都是AsyncResult类的实例,拥有以下方法:
'''
#返回结果
>>> a.get([timeout])
#若调用完成,返回True 
>>> a.ready()
#若调用完成且没有引发任何异常,返回True,若在结果就绪前调用此方法,引发AssertionError异常
>>> a.successful()
#等待结果变为可用 
>>> a.wait([timeout])
>>> 

# 案例—— apply(), apply_async(), map(), map_async()方法效率对比

import multiprocessing
import time

def func(msg):
    return multiprocessing.current_process().name + '-' + str(msg)

def test(num,sign):

    pool = multiprocessing.Pool()
    start = time.time()
    if sign == 'normal':
        for i in range(num):
            msg = "hello %d" %(i)
            pool.apply(func, (msg, ))
    elif sign == 'async':
        for i in range(num):
            msg = "hello %d" %(i)
            pool.apply_async(func, (msg, ))
    elif sign == 'map':
        results = pool.map(func,range(num))
    elif sign == 'map-async':
        results = pool.map_async(func,range(num))
    else:
        pass
    pool.close() 
    pool.join()     
    end = time.time()
    print (str(num) + ' ' + str(sign) + ' '+ str(end-start)+'s')

sample = [1000,10000,100000,1000000]

for i in sample:

    test(i,'normal')
    test(i,'async')
    test(i,'map')
    test(i,'map-async')
    print('\n')


运行时间对比

	num			normal		async		map			map-async
	
	1000		0.21s		0.11s		0.11s		0.11s
	10000		1.01s		0.41s		0.11s		0.11s
	100000		8.98s		4.15s		0.11s		0.11s
	1000000		90.5s		44.3s	0.32s		0.32s

# 共享数据与同步

  • 在共享内存创建ctypes对象
  • multiprocessing.Value(typecode,arg1,...,argn,lock)
  • 同value对象,但无锁
  • multiprocessing.RawValue(typecode,arg1,...,argn)
  • 在共享内存中创造ctypes数组,initializer为指定数组初始大小的整数或项序列
  • multiprocessing.Array(typecode,initializer,lock)
  • 同array对象,但无锁
  • multiprocessing.RawArray(typecode,initializer)
  • multiprocessing中还定义了一堆原语: Lock,RLock,Semaphore,BoundedSemaphore,Event,Condition

# 案例——共享队列,管道传输,共享内存的效率对比

import multiprocessing as ml
import time



def share_queue(transmit_time, transmit_size):
    def consumer(input_q):
        while 1:
            item = input_q.get()
            if item is None:
                input_q.task_done()
                break
            #处理工作
            input_q.task_done()

    def producer(sequence, output_q):
        for item in sequence:
            output_q.put(item)

    def test(transmit_time,transmit_size):
        start = time.time()
        #创建共享进程队列
        q = ml.JoinableQueue()
        #运行消费者进程
        con_p = ml.Process(target = consumer, args = (q,))
        con_p.daemon = True
        con_p.start()
        #生产多个项
        sequence = [float(x) for x in range(transmit_size)]
        producer(sequence * transmit_time, q)
        #在队列上安置哨兵,发出完成信号,若存在n个消费者进程,则要放n个哨兵
        q.put(None)
        #等待项被处理,防止主程序被关闭后,消费被提前终止
        q.join()
        end = time.time()
        print('share_queue',transmit_size,'time:',(end-start)/transmit_time,'s')


    test(transmit_time,transmit_size)

def share_memory(transmit_time,transmit_size):
    class channel(object):
        def __init__(self,maxsize):
            self.buffer = ml.RawArray('d',maxsize)
            self.buffer_len = ml.Value('i')
            self.empty = ml.Semaphore(1)
            self.full = ml.Semaphore(0)
        def send(self,values):
            self.empty.acquire()
            nitems = len(values)
            self.buffer_len = nitems
            self.buffer[:nitems] = values
            self.full.release()
        def recv(self):
            self.full.acquire()
            values = self.buffer[:self.buffer_len.value]
            self.empty.release()
            return values

    def consumer(count,ch):
        for i in range(count):
            values = ch.recv()

    def producer(count,values,ch):
        for i in range(count):
            ch.send(values)

    def test(transmit_time, transmit_size):
        start = time.time()
        ch = channel(transmit_size)
        p = ml.Process(target = consumer, args = (transmit_time, ch))
        p.start()
        values = [float(x) for x in range(transmit_size)]
        producer(transmit_time,values,ch)
        p.join()
        end = time.time()
        print('share_memory',transmit_size,'time:',(end-start)/transmit_time,'s')

    test(transmit_time,transmit_size)

def pipe(transmit_time,transmit_size):
    
    def consumer(pipe):
        output_p,input_p = pipe
        input_p.close()
        while True:
            try:
                item = output_p.recv()
            except EOFError:
                break
    
    def producer(sequence,input_p):
        for item in sequence:
            input_p.send(item)

    def test(transmit_time,transmit_size):
        start = time.time()
        output_p, input_p = ml.Pipe()
        #启动消费者进程
        cons_p = ml.Process(target = consumer, args = ((output_p,input_p),))
        cons_p.start()
        #关闭生产者中的输出管道
        output_p.close()
        #生产项目
        sequence = [float(x) for x in range(transmit_size)]
        producer(sequence * transmit_time, input_p)
        #关闭输入管道,表示完成,不关则消费者一直循环
        input_p.close()
        #等待使用者进程关闭
        cons_p.join()
        end = time.time()
        print('pipe',transmit_size,'time:',(end-start)/transmit_time,'s')
    
    test(transmit_time,transmit_size)

#传输次数
transmit_time = 10
#单次传输大小
sample = [10000,100000,1000000]

for i in sample:
    #共享队列方式
    share_queue(transmit_time,i)
    #共享内存方法
    share_memory(transmit_time,i)
    #管道传输方法
    pipe(transmit_time,i)
    print('\n')
运行结果对比:

传输大小	share_queue()		pipe()		share_memory()
10000	   0.309s			   0.076s			0.003s
100000	   3.247s			   0.756s			0.005s
1000000		过慢               7.612s			   0.043s

# 托管对象

  • 对更高级的python对象,multiprocessing没有自带的共享对象,但可以使他们运行在管理器的控制下,实现共享效果
  • 管理器是独立的子进程,存在真实的对象,以服务器形式运行
  • 其他进程采用代理商访问共享对象,这些代理作为管理器服务器的客户端运行
import multiprocessing as ml
#在进程中创建运行的管理器,返回值是SyncManager类型的实例
m = ml.Manager()
'''
SyncManager类型的实例m具有一系列方法
'''
#在服务器上创建共享的对象实例并返回可访问它的代理
m.Array(typecode,sequence)
m.BoundedSemaphore([value])
m.Condition([lock])
m.dict([args])
m.Event()
m.list([Sequence])
m.Lock()
m.Namespace()
m.Queue()
m.RLock()
m.Semaphore([value])
m.Value(typecode,value)
  • 对于更复杂类型的共享对象,如用户自定义的类,则必须创建自定义管理器对象,需要创建一个继承自BaseManager的类,并注册该数据类型
import multiprocessing as ml
from multiprocessing.managers import BaseManager

class A(obj):
    def __init__(self,value):
        self.x = value
    def __repr__(self):
        return 'A (%s)' % self.x
    def getx(self):
        return self.x
    def setx(self,value):
        self.x = value
    def __add__(self,value):
        self.x += value
        return self

class MyManager(BaseManager):
    pass

MyManager.register('A',A)

m = MyManager()
m.start()
#创建托管对象
a = m.A(37)

# 案例——托管对象

import multiprocessing as ml
import time

def watch(d,evt):
    while 1:
        evt.wait()
        print(d)
        evt.clear()

m = ml.Manager()
d = m.dict()
evt = m.Event()

#启动监视字典的进程
p = ml.Process(target=watch,args=(d,evt))
p.daemon = True
p.start()

#更新字典并通知监视者
d['foo'] = 42
evt.set()
time.sleep(5)

#终止进程和管理器
p.terminate()
m.shutdown() 

# 连接

  • 若要求程序不仅能在一个机器上运行,还能扩展到一个计算集群上,可以使用multiprocessing.connection模块
  • 在mac上要先允许python.app能接入网络
from multiprocessing.connection import Listener
from multiprocessing.connection import Client
import multiprocessing as ml

def listen():
    serv = Listener(('',8000),authkey=bytes('password',encoding = 'utf-8'))
    while True:
        conn = serv.accept()
        while True:
            try:
                x,y = conn.recv()
            except EOFError:
                break
            result = x + y
            conn.send(result)
        conn.close()

def client():
    conn = Client(('localhost',8000),authkey=bytes('password',encoding = 'utf-8'))
    conn.send((3,4))
    r = conn.recv()
    print(r)
    conn.send(('Hello','World'))
    r = conn.recv()
    print(r)
    conn.close()

server_p = ml.Process(target=listen)
server_p.daemon = True
server_p.start()
client()

# 注意事项及原则

  • 确保进程之间传递的所有数据能够序列化

  • 避免使用共享数据,尽量使用消息传递和队列

  • 在必须运行在单独进程的函数内部,不要使用全局变量,而应当显式传递参数

  • 显式关闭进程

  • 让事情变简单

# re

import re

#根据string的顺序找pattern,成功找到一个匹配pattern的,就是m.group()的值,若没找到pattern,返回的匹配对象m为None,关于m的一切方法都不存在
#当m不是None值时,m.groups()的值是一个列表,有多少个分组括号,列表就对应有多少项,且顺序是基于左括号的
pattern = '32(ab(?P<cd>c(?:d(e))))|f'
string = '23a12abcdeff'

# re.compile()函数

#编译一个正则对象,以后用pat直接调用match(),search(),findall(),finditer()函数,可省略pattern参数,flags参数re.I表示忽略大小写,re.M表示将^和$应用到整个字符串的开始和结尾的每一行,不仅是开头和结尾
pat = re.compile(string,flags = re.I)

# re.match()函数

#从字符串一开始匹配,若一开始就不对,则返回None
mm = re.match(pattern,string)

# re.search()函数

m = re.search(pattern,string)

# re.findall()函数

#从字符串中找到所有匹配pattern的,返回一个列表,列表每一项是一个元组,元组元素相当于每个m.groups()中的元素
lf = re.findall(pattern,string)
print('lf',lf)

# re.finditer()函数

#返回一个迭代器,迭代的是匹配对象m
iterator = re.finditer(pattern,string)
for m in iterator:
    pass

# match对象的用法

#以下属性,仅当匹配成功,m不为None时才存在,且下列属性都是基于匹配回来的东西去讨论,如此次匹配中的是第一个f,就不包含分组,所以分组为[None,None,None]

#根据string的顺序找pattern,成功找到最先能匹配pattern的,就是m.group()的值
print('m.group()',m.group())
#m.groups()的值是一个列表,有多少个分组括号,列表就对应有多少项,且顺序是基于左括号的,
print('m.groups()',m.groups())
#第2个分组在string中的起始位置,若省略参数,则选用m.group()在string中的起始位置
print('m.start(2)',m.start(2))
#第3个分组在string中的结束位置,若省略参数,则选用m.group()在string中的结束位置
print('m.end(3)',m.end(3))
#第2个分组在string的起末位置的元组,若省略参数,则选用m.group()在string中的起末位置的元组
print('m.span()',m.span())
#传递给search()或match()函数的pos值
print('m.pos', m.pos)
#传递给search()或match()函数的endpos值
print('m.endpos', m.endpos)
#匹配对象所对应的正则对象
print('m.re',m.re)
#匹配对象所对应的输入字符串
print('m.string', m.string)
#返回标注了名称的分组的字典,key为组名,value为分组的字符串
print('m.groupdict()',m.groupdict())

# re.split()函数

#根据pattern出现的位置拆分string,返回字符串列表,包括pattern中任何分组匹配的文本
ls = re.split(pattern,string)
# 例子
>>> print line
abc aa;bb,cc | dd(xx).xxx 12.12'	xxxx
# 按空格切
>>> re.split(r' ',line)
['abc', 'aa;bb,cc', '|', 'dd(xx).xxx', "12.12'\txxxx"]
# 加将空格放可选框内[]内
>>> re.split(r'[ ]',line)
['abc', 'aa;bb,cc', '|', 'dd(xx).xxx', "12.12'\txxxx"]
# 按所有空白字符来切割:\s([\t\n\r\f\v])\S(任意非空白字符[^\t\n\r\f\v]
>>> re.split(r'[\s]',line)
['abc', 'aa;bb,cc', '|', 'dd(xx).xxx', "12.12'", 'xxxx']
# 多字符匹配
>>> re.split(r'[;,]',line)
['abc aa', 'bb', "cc | dd(xx).xxx 12.12'\txxxx"]
>>> re.split(r'[;,\s]',line)
['abc', 'aa', 'bb', 'cc', '|', 'dd(xx).xxx', "12.12'", 'xxxx']
# 使用括号捕获分组的适合,默认保留分割符
>>> re.split('([;])',line)
['abc aa', ';', "bb,cc | dd(xx).xxx 12.12'\txxxx"]
# 去掉分隔符,加?:
>>> re.split(r'(?:;)',line)
['abc aa', "bb,cc | dd(xx).xxx 12.12'\txxxx"]

# re.sub()用法

def deal(matched):
    intStr=matched.group("number")
    intValue=int(intStr)
    addValue=intValue+111
    addValueStr=str(addValue)
    return addValueStr

inputStr="hello 123 world 456"
replacedStr=re.sub("hello 123 world 456",deal,inputStr)
print(replacedStr)
最后更新: 5/31/2022, 6:43:40 AM