多进程、多线程、分布式进程
多进程
Unix/Linux操作系统提供了一个fork()
系统调用,fork()
调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。
子进程永远返回0
,而父进程返回子进程的ID。
父进程要记下每个子进程的ID,而子进程只需要调用getppid()
就可以拿到父进程的ID。
import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
multiprocessing
模块就是跨平台版本的多进程模块。
from multiprocessing import Process
import os
#子进程要运行的函数
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('run parent process %s'%(os.getpid))
p=Process(target=run_proc,args=('test',))
print('child process start...')
p.start()
p.join()
print('child process end..')
join()
方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。
Pool
如果要启动大量的子进程,可以用进程池的方式批量创建子进程:
from multiprocessing import Pool
import os,time,random
def long_time_task(name):
print('task name is %s, pid is %s'%(name,os.getpid()))
start=time.time()
time.sleep(random.random()*4)
end=time.time()
print('task %s runtime is %s'%(name,end-start))
if __name__=='__main__':
print('parent process pid is %s'%os.getpid())
p=Pool(4)
for i in range(5):
p.apply_async(long_time_task,args=(i,))
p.close()
p.join()
print('all process complite')
进程间通信
Python的multiprocessing
模块包装了底层的机制,提供了Queue
、Pipes
等多种方式来交换数据。
使用消息队列Queue
例子:
from multiprocessing import Process,Queue
import os,time,random
def write(q):
for value in ['a','b','c']:
q.put(value)
time.sleep(random.random())
def read(q):
while True:
value=q.get(True)
print('read process get value:%s'%value)
if __name__=='__main__':
q=Queue()
pw=Process(target=write,args=(q,))
pr=Process(target=read,args=(q,))
pw.start()
pr.start()
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()
多线程
Python的标准库提供了两个模块:_thread
和threading
绝大多数情况下,我们只需要使用threading
这个高级模块。
import time, threading
# 新线程执行的代码:
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n < 5:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)
print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享。这就是所说的进程占有资源,线程是系统调度资源的基本单位。
所以引出线程安全问题:什么是线程安全?
Lock
多线程执行时,因为交替对同一个数据读写,从而导致数据不一致问题,就要给一片段的执行过程加锁。该线程因为获得了锁,因此其他线程不能同时执行change_it()
,只能等待
balance = 0
lock = threading.Lock()
def run_thread(n):
for i in range(100000):
# 先要获取锁:
lock.acquire()
try:
# 放心地改吧:
change_it(n)
finally:
# 改完了一定要释放锁:
lock.release()
因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。多线程的并发在Python中就是一个美丽的梦。
ThreadLocal
在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。
如果用一个全局dict
存放所有的Student
对象,然后以thread
自身作为key
获得线程对应的Student
对象如何?
global_dict = {}
def std_thread(name):
std = Student(name)
# 把std放到全局变量global_dict中:
global_dict[threading.current_thread()] = std
do_task_1()
do_task_2()
def do_task_1():
# 不传入std,而是根据当前线程查找:
std = global_dict[threading.current_thread()]
...
def do_task_2():
# 任何函数都可以查找出当前线程的std变量:
std = global_dict[threading.current_thread()]
...
它最大的优点是消除了std
对象在每层函数中的传递问题,但是,每个函数获取std
的代码有点丑。
ThreadLocal
应运而生,不用查找dict
,ThreadLocal
帮你自动做这件事:
import threading
# 创建全局ThreadLocal对象:
local_school = threading.local()
def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
操作系统在切换进程或者线程时,它需要先保存当前执行的现场环境(CPU寄存器状态、内存页等),然后,把新任务的执行环境准备好(恢复上次的寄存器状态,切换内存页等),才能开始执行。这个切换过程虽然很快,但是也需要耗费时间。如果有几千个任务同时进行,操作系统可能就主要忙着切换任务,根本没有多少时间去执行任务了,这种情况最常见的就是硬盘狂响,点窗口无反应,系统处于假死状态。
所以,多任务一旦多到一个限度,就会消耗掉系统所有的资源,结果效率急剧下降,所有任务都做不好。
异步IO
考虑到CPU和IO之间巨大的速度差异,一个任务在执行的过程中大部分时间都在等待IO操作
如果充分利用操作系统提供的异步IO支持,就可以用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型,Nginx就是支持异步IO的Web服务器
分布式进程
Python的multiprocessing
模块不但支持多进程,其中managers
子模块还支持把多进程分布到多台机器上。由于managers
模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
我们先看服务进程,服务进程负责启动Queue
,把Queue
注册到网络上,然后往Queue
里面写入任务:
# task_master.py
import random, time, queue
from multiprocessing.managers import BaseManager
# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()
# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
pass
# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')
然后,在另一台机器上启动任务进程(本机上启动也可以):
# task_worker.py
import time, sys, queue
from multiprocessing.managers import BaseManager
# 创建类似的QueueManager:
class QueueManager(BaseManager):
pass
# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
# 处理结束:
print('worker exit.')
这个简单的Master/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上
Queue对象存储在哪?注意到task_worker.py
中根本没有创建Queue的代码,所以,Queue对象存储在task_master.py
进程中:
┌─────────────────────────────────────────┐ ┌──────────────────────────────────────┐
│task_master.py │ │ │task_worker.py │
│ │ │ │
│ task = manager.get_task_queue() │ │ │ task = manager.get_task_queue() │
│ result = manager.get_result_queue() │ │ result = manager.get_result_queue() │
│ │ │ │ │ │ │
│ │ │ │ │ │
│ ▼ │ │ │ │ │
│ ┌─────────────────────────────────┐ │ │ │ │
│ │QueueManager │ │ │ │ │ │
│ │ ┌────────────┐ ┌──────────────┐ │ │ │ │ │
│ │ │ task_queue │ │ result_queue │ │◀───┼──┼──┼──────────────┘ │
│ │ └────────────┘ └──────────────┘ │ │ │ │
│ └─────────────────────────────────┘ │ │ │ │
└─────────────────────────────────────────┘ └──────────────────────────────────────┘
│
Network
引用:https://liaoxuefeng.com/books/python/process-thread/index.html