一、进程与程序的区别与联系
进程这个概念是源于操作系统,下面列出 4 项进程与线程的区别与联系
-
进程是程序及其数据在计算机上的一次运行活动,是一个动态的概念。进程的运行实体是程序,离开程序的进程没有存在的意义。从静态角度看,进程是由程序、数据和进程控制块(PCB)三部分组成的。而程序是一组有序的指令集合,是一种静态的概念。
-
进程是程序的一次执行过程,它是动态地创建和消亡的,具有一定的生命周期,是暂时存在的;而程序则是一组代码的集合,它是永久存在的,可长期保存。
-
一个进程可以执行一个或几个程序,一个程序也可以构成多个进程。进程可创建进程,而程序不可能形成新的程序。
-
进程与程序的组成不同。进程的组成包括程序、数据和 PCB。
二、多进程编程 - fork
Unix-like 操作系统提供了一个 fork 的系统特殊调用,为什么说它特殊呢?那是因为普通的函数调用,调用一次,返回一次。但 fork 调用一次,返回两次。
fork 的作用是在内存中复制一份当前的进程,这个复制后“副本”进程就是原进程的 子进程 ,同理原进程是子进程的 父进程 。
然后 fork 的值分别在父进程和子进程内返回,父进程返回子进程的 pid ,子进程永远返回 0 。
代码实例:
# !/usr/bin/env python3
# -*- coding: UTF-8 -*-
import os
print(os.getpid())
pid = os.fork()
print(pid,os.getpid(),os.getppid())
print('-'*30)
执行结果:
22802
22803 22802 17350
------------------
0 22803 22802
------------------
代码解读:
1. os.getpid() 是获取当前进程的进程号,也就是运行结果中的 22802
2. pid = os.fork() 是 fork 一个子进程出来
3. 此时看到运行结果有没有发现诡异的地方?是的,没错!它输出了两次,但是根本没有找到 while for 这些有关循环的语句。这是为什么呢?这个就是 fork 的特殊性啦,它返回两次,就相当于在解释器的级别上就做了特殊处理。别管有没有循环语句,只要有 fork,那么下面的语句都会重复输出两次的
4. 首先是在父进程中输出:[22803 22802 17350]:
22803 是父进程 fork 出的子进程的 pid,它的 pid 是 22803;
22802 是父进程自己的 pid,使用 os.getpid() 获取;
17350 是父进程的父进程的 pid,使用 os.getppid() 获取;这个其实没有实际的意义,这个 pid 是解释器通过操作系统得到的,所有的进程都是通过 init 这个初始进程 fork 出来的,它的 pid 是 1
5. 最后是在子进程中输出:[0 22803 22802]:
0 是子进程返回的一个类似状态码之类的东西,不是 pid,不用太过关心,只要 fork 成功就会返回一个 0;
22803 是子进程自己的 pid,使用 os.getpid() 获取;
17350 是子进程的父进程的 pid,使用 os.getppid() 获取;
三、多进程编程 - multiprocessing
由于 Windows 没有 fork 调用,为了支持 Python 跨平台的特性,提供一个跨平台的多进程支持 multiprocessing 模块。
代码实例:
# !/usr/bin/env python3
# -*- coding: UTF-8 -*-
from multiprocessing import Process
def main(x,y):
print(x + y)
p1 = Process(target=main,args=(1,2))
p2 = Process(target=main,args=(3,4))
p3 = Process(target=main,args=(5,6))
p1.start()
p2.start()
p3.start()
print('开始执行!')
p1.join()
p2.join()
p3.join()
print('执行完成!')
执行结果:
3
开始执行!
7
11
执行完成!
代码解读:
1. 使用 multiprocessing 模块中的 Process 方法我们很方便的创建子进程。
2. 创建子进程时,我们需要提供一个执行函数,这个函数就是多进程去并发执行的代码段。Process 方法只需要传入执行函数 target 和函数的参数 args。
3. 创建一个 Process 实例,用 start 方法启动,这样创建进程比 fork 还要简单。join 方法是等待子进程结束后再继续往下运行。
四、进程池 - Pool
如果要启动大量的子进程,那么上面的那种方式就不太方便了,好在 multiprocessing 可以创建进程池这样的方式批量创建子进程,其实在实际开发中,用到多进程的时候都用的是进程池
代码实例:
# !/usr/bin/env python3
# -*- coding: UTF-8 -*-
from multiprocessing import Pool
from multiprocessing import cpu_count
def main(x,y):
return x + y
data = []
p = Pool(cpu_count() * 2)
for i in range(10):
v = p.apply_async(main,args=(i,0))
data.append(v)
p.close()
p.join()
print(data)
for v in data:
print(v.get())
执行结果:
[<multiprocessing.pool.ApplyResult object at 0x10f71b8d0>,<multiprocessing.pool.ApplyResult object at 0x10f71beb8>,...]
0
1
2
4
5
6
7
8
3
9
代码解读:
1. cpu_count() 是获取当前计算机的 cpu 数量,一般为了效率,进程数应是 cpu 的 2 倍即可
2. p = Pool(cpu_count() * 2) 使用 Pool 方法创建一个进程池,进程的数量是 cpu * 2
3. v = p.apply_async(main,0)) 使用 apply_async 方法调用执行函数
4. data.append(v) 是将进程对象存储到list,此时进程可能还没执行完,所以存储的是进程对象(指针 => 内存地址),可能内存中还没有值
5. 调用 join() 方法会等待所有子进程执行完毕,调用 join() 之前必须先调用 close()
6. 调用 close() 之后就不能继续添加新的 Process 了
7. v.get() 是获取进程对象中的具体的值(解引用)
五、进程间通信
因为进程之间是有自己独立的内存空间,不共享数据,所以进程之间是需要通信的。
操作系统提供了很多机制来实现进程间的通信,Python 的 multiprocessing 模块包装了底层的机制,提供了 Queue 、Pipes 等多种方式来交换数据。
下面是以 Queue 为例,在父进程中创建两个子进程,一个往 Queue 里写数据,一个从 Queue 里读数据
代码实例:
# !/usr/bin/env python3
# -*- coding: UTF-8 -*-
from multiprocessing import Process,Queue
def write(q):
for value in ['A','B','C']:
print(value)
q.put(value)
def read(q):
while True:
value = q.get(True)
print(value)
q = Queue()
pw = Process(target=write,args=(q,))
pr = Process(target=read,))
pw.start()
pr.start()
pw.join()
pr.terminate()
执行结果:
A
B
C
A
B
C
代码解读:
q = Queue() 创建队列
q.put(value) 写数据
value = q.get(True) 读数据
pr.terminate() 进程里是死循环,无法等待其结束,只能强行终止
六、分布式进程
分布式进程是将多个进程分布到不同的服务器上,这样做的原因有很多,最常见的是分担单机的压力和速度的追求,分布式进程最重要的一点是如何在不同服务器上进程进行数据共享和交换。
Python 的 multiprocessing 模块不但支持多进程,其中 managers 子模块还支持把多进程分布到多台机器上,下面我们就以 Master 和 Worker 的方式实现分布式进程。
master.py
# !/usr/bin/env python3
# -*- coding: UTF-8 -*-
import time
from multiprocessing import Queue
from multiprocessing.managers import BaseManager
# 从 BaseManager 继承的 QueueManager
class QueueManager(BaseManager):
pass
# 创建任务队列
task_queue = Queue()
# 将任务队列注册到网络上,名字为 task_queue
QueueManager.register('task_queue',callable=lambda: task_queue)
# 设置 host:127.0.0.1 port:5000 password:123456
manager = QueueManager(address=('127.0.0.1',5000),authkey=b'123456')
# 启动网络队列
manager.start()
# 通过网络获取队列
task = manager.task_queue()
# 将数据 put 到网络队列中
task.put('Master put data.')
time.sleep(10)
# 关闭网络队列
manager.shutdown()
worker.py
# !/usr/bin/env python3
# -*- coding: UTF-8 -*-
from multiprocessing.managers import BaseManager
# 从 BaseManager 继承的 QueueManager
class QueueManager(BaseManager):
pass
# 将任务队列注册到网络上,名字为 task_queue
QueueManager.register('task_queue')
# 设置 host:127.0.0.1 port:5000 password:123456
manager = QueueManager(address=('127.0.0.1',authkey=b'123456')
# 连接网络队列
manager.connect()
# 通过网络获取队列
task = manager.task_queue()
# 将数据从网络队列中 get 下来
data = task.get()
print(data)
另一种方式是使用 Redis 进行数据的共享和交互,在实际开发中推荐使用 Redis ,不解释~ (编辑:北几岛)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|