首页 > Python基础应用 > Python 进阶应用教程 > 33 Python 生产者消费者模型

Python 生产者消费者模型

1. 简介

生产者和消费者问题是线程模型中的经典问题:

  • 生产者和消费者共享同一个存储空间
  • 生产者往存储空间中添加产品,消费者从存储空间中取走产品
  • 当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞

Python 的内置模块 queue 提供了对生产者和消费者模型的支持,模块 queue 定义了类 Queue,类 Queue 表示一个被生产者和消费者共享的队列,类 Queue 提供如下常用方法:

方法 功能
get() 从队列中取走数据,如果队列为空,则阻塞
put(item) 向队列中放置数据,如果队列为慢,则阻塞
join() 如果队列不为空,则等待队列变为空
task_done() 消费者从队列中取走一项数据,当队列变为空时,唤醒调用 join() 的线程

2. 实现生产者消费者模型

创建生产者线程和消费者线程,使用一个共享队列连接这两个线程,代码如下:

import threading
import queue

q = queue.Queue()
  • 导入 threading 模块和 queue 模块
  • 创建共享队列 q

    def produce(): for item in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']: q.put(item) print('produce %s' % item)

  • 创建生产者线程的入口函数 produce

  • 生产者生产 8 个数据
  • 调用 q.put(item) 将生产的数据放入到共享队列 q 中

    def consume(): for i in range(8): item = q.get() print(' consume %s' % item)

  • 创建消费者线程的入口函数 consume

  • 消费者消费 8 个数据
  • 调用 q.get() 从共享队列 q 中取走数据

    producer = threading.Thread(target=produce, args=()) consumer = threading.Thread(target=consume, args=()) producer.start() consumer.start() producer.join() consumer.join()

  • 创建生产者线程 producer,线程入口为 produce

  • 创建消费者线程 consumer,线程入口为 consume
  • 启动生产者线程和消费者线程,并等待它们结束

运行程序,输出结果如下:

produce a
produce b
  consume a
produce c
  consume b
  consume c
produce d
  consume d
produce e
  consume e
produce f
  consume f
produce g
  consume g
produce h
  consume h
  • 生产者生产了 8 个数据:a、b、c、d、e、f、g、h
  • 消费者取走了 8 个数据:a、b、c、d、e、f、g、h

3. 实现生产者、计算者、消费者模型

创建生产者、计算者、消费者线程:

  • 生产者生产 8 个数据
  • 计算者对生产者输出的数据进行加工,将加工后的数据送往消费者
  • 消费者取走计算者输出的数据

    import threading import queue

    q0 = queue.Queue() q1 = queue.Queue()

  • 导入模块 threading 和模块 queue

  • 使用两个共享队列连接这三个线程

    • 共享队列 q0 连接生产者和计算者
    • 共享队列 q1 连接计算者和消费者

    def produce(): for item in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']: q0.put(item) print('produce %s' % item)

  • 创建生产者线程的入口函数 produce

  • 生产者生产 8 个数据
  • 调用 q0.put(item) 将生产的数据放入到共享队列 q0 中

    def compute(): for i in range(8): item = q0.get() item = item.upper() q1.put(item)

  • 创建计算者线程的入口函数 compute

  • 调用 q0.get() 读取生产者输出数据,并进行加工
  • 调用 q1.put(item) 将加工后的数据放入到共享队列 q1 中

    def consume(): for i in range(8): item = q1.get() print(' consume %s' % item)

  • 创建消费者线程的入口函数 consume

  • 消费者消费 8 个数据
  • 调用 q1.get() 从共享队列 q1 中取走数据

    producer = threading.Thread(target=produce, args=()) computer = threading.Thread(target=compute, args=()) consumer = threading.Thread(target=consume, args=()) producer.start() computer.start() consumer.start()

    producer.join() computer.join() consumer.join()

  • 创建生产者线程 producer,线程入口为 produce

  • 创建计算者线程 computer,线程入口为 compute
  • 创建消费者线程 consumer,线程入口为 consume
  • 启动生产者线程、计算者线程、消费者线程,并等待它们结束

运行程序,输出结果如下:

produce a
produce b
produce c
  consume A
produce d
produce e
  consume B
produce f
  consume C
produce g
  consume D
produce h
  consume E
  consume F
  consume G
  consume H
  • 生产者生产了 8 个数据:a、b、c、d、e、f、g、h
  • 计算者将数据加工为:A、B、C、D、E、F、G、H
  • 消费者取走了 8 个数据:A、B、C、D、E、F、G、H

4. 同步生产者与消费者的推进速度

在生产者、消费者模型中,可能会存在两者推进速度不匹配的问题:生产者生产数据的速度较快,但是,消费者取走数据的速度较慢。

可以使用 queue 的 task_done() 方法和 join() 方法同步生产者与消费者的推进速度:

  • 生产者调用 join() 方法,等待队列中所有的数据被取走
  • 消费者调用 task_done() 方法,表示取走了队列中的一项数据,当队列为空时,唤醒阻塞在 join() 方法中的生产者

    import threading import queue

    q = queue.Queue()

  • 导入 threading 模块和 queue 模块

  • 创建共享队列 q

    def produce(): for item in ['A', 'B', 'C', 'D']: q.put(item) print('produce %s' % item) q.join() print('------------ q is empty')

    for item in ['E', 'F', 'G', 'H']:
        q.put(item)            
        print('produce %s' % item)
    q.join()        
    print('------------ q is empty')
    
  • 创建生产者线程的入口函数 produce

  • 首先,生产 4 个数据:A、B、C、D
    • 调用 q.put(item) 将它们放入到队列 q 中
    • 调用 q.join() 等待消费者将它们全部取走
  • 然后,生产 4 个数据:E、F、G、G

    • 调用 q.put(item) 将它们放入到队列 q 中
    • 调用 q.join() 等待消费者将它们全部取走

    def consume(): for i in range(8): item = q.get() print(' consume %s' % item) q.task_done()

  • 创建消费者线程的入口函数 consume

  • 调用 q.get() 从队列 q 中取走一个数据
  • 调用 q.task_done(),表示已经从队列 q 中取走了一个数据,当队列为空时,唤醒生产者

    producer = threading.Thread(target=produce, args=()) consumer = threading.Thread(target=consume, args=()) producer.start() consumer.start()

  • 创建生产者线程 producer,线程入口为 produce

  • 创建消费者线程 consumer,线程入口为 consume
  • 启动生产者线程和消费者线程,并等待它们结束

运行程序,输出结果如下:

produce A
produce B
  consume A
  consume B
produce C
  consume C
produce D
  consume D
------------ q is empty
produce E
  consume E
produce F
  consume F
produce G
produce H
  consume G
  consume H
------------ q is empty
  • 生产者生产第一批数据 A、B、C、D,消费者将其取走
  • 当第一批数据完全被消费者取走后,生产者才开始生产第二批数据
  • 生产者生产第二批数据 E、F、G、H,消费者将其取走
本文来自互联网用户投稿,不拥有所有权,该文观点仅代表作者本人,不代表本站立场。
访问者可将本网站提供的内容或服务用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯本网站及相关权利人的合法权利。
本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站,邮箱:80764001@qq.com,予以删除。
© 2023 PV138 · 站点地图 · 免责声明 · 联系我们 · 问题反馈