Python线程间怎么通信?

文章导读
Previous Quiz Next 线程间通信指的是在 Python 多线程程序中,使线程之间能够进行通信和同步的过程。
📋 目录
  1. A Event 对象
  2. B Condition 对象
A A

Python - 线程间通信



Previous
Quiz
Next

线程间通信指的是在 Python 多线程程序中,使线程之间能够进行通信和同步的过程。

通常,Python 中的线程在同一进程内共享相同的内存空间,这允许它们通过共享变量、对象以及 threading 模块提供的专用同步机制来交换数据并协调活动。

为了便于线程间通信,threading 模块提供了各种同步原语,如 Locks、Events、Conditions 和 Semaphores 对象。在本教程中,您将学习如何使用 Event 和 Condition 对象在多线程程序中实现线程间的通信。

Event 对象

Event 对象管理一个内部标志的状态,以便线程可以等待或设置。Event 对象提供了控制该标志状态的方法,允许线程基于共享条件同步它们的活动。

该标志最初为 false,通过 set() 方法变为 true,并通过 clear() 方法重置为 false。wait() 方法会阻塞直到标志为 true。

以下是 Event 对象的关键方法 −

  • is_set(): 仅当内部标志为 true 时返回 True。
  • set(): 将内部标志设置为 true。所有等待其变为 true 的线程都会被唤醒。一旦标志为 true,调用 wait() 的线程将不会阻塞。
  • clear(): 将内部标志重置为 false。此后,调用 wait() 的线程将阻塞,直到再次调用 set() 将内部标志设置为 true。
  • wait(timeout=None): 阻塞直到内部标志为 true。如果进入时内部标志为 true,则立即返回。否则,阻塞直到另一个线程调用 set() 将标志设置为 true,或者直到可选的 timeout 发生。当提供 timeout 参数且不为 None 时,它应是一个浮点数,指定操作的超时时间(秒)。

示例

以下代码尝试模拟交通流量由交通信号灯的状态(GREEN 或 RED)控制。

程序中有两个线程,分别针对两个不同的函数。signal_state() 函数定期设置和重置 event,以指示信号从 GREEN 变为 RED 的变化。

traffic_flow() 函数等待 event 被设置,并在它保持设置状态时运行循环。

from threading import Event, Thread
import time

terminate = False

def signal_state():
    global terminate
    while not terminate:
        time.sleep(0.5)
        print("Traffic Police Giving GREEN Signal")
        event.set()
        time.sleep(1)
        print("Traffic Police Giving RED Signal")
        event.clear()

def traffic_flow():
    global terminate
    num = 0
    while num < 10 and not terminate:
        print("Waiting for GREEN Signal")
        event.wait()
        print("GREEN Signal ... Traffic can move")
        while event.is_set() and not terminate:
            num += 1
            print("Vehicle No:", num," Crossing the Signal")
            time.sleep(1)
        print("RED Signal ... Traffic has to wait")

event = Event()
t1 = Thread(target=signal_state)
t2 = Thread(target=traffic_flow)
t1.start()
t2.start()

# Terminate the threads after some time
time.sleep(5)
terminate = True

# join all threads to complete
t1.join()
t2.join()

print("Exiting Main Thread")

输出

执行上述代码后,您将得到以下输出 −

Waiting for GREEN Signal
Traffic Police Giving GREEN Signal
GREEN Signal ... Traffic can move
Vehicle No: 1  Crossing the Signal
Traffic Police Giving RED Signal
RED Signal ... Traffic has to wait
Waiting for GREEN Signal
Traffic Police Giving GREEN Signal
GREEN Signal ... Traffic can move
Vehicle No: 2  Crossing the Signal
Vehicle No: 3  Crossing the Signal
Traffic Police Giving RED Signal
Traffic Police Giving GREEN Signal
Vehicle No: 4  Crossing the Signal
Traffic Police Giving RED Signal
RED Signal ... Traffic has to wait
Traffic Police Giving GREEN Signal
Traffic Police Giving RED Signal
Exiting Main Thread

Condition 对象

Python 的 threading 模块中的 Condition 对象提供了一种更高级的同步机制。它允许线程等待来自另一个线程的通知后再继续执行。Condition 对象总是与一个锁相关联,并提供线程间信号通知的机制。

以下是 threading.Condition() 类的语法 −

threading.Condition(lock=None)

以下是 Condition 对象的关键方法 −

  • acquire(*args):获取底层锁。此方法调用底层锁的对应方法;返回值是该方法返回的值。
  • release():释放底层锁。此方法调用底层锁的对应方法;无返回值。
  • wait(timeout=None):此方法释放底层锁,然后阻塞,直到被另一个线程中对同一 condition variable 的 notify() 或 notify_all() 调用唤醒,或者直到可选的 timeout 发生。一旦被唤醒或超时,它会重新获取锁并返回。
  • wait_for(predicate, timeout=None):此实用方法会反复调用 wait(),直到 predicate 满足或发生超时。返回值是 predicate 的最后返回值,如果方法超时则评估为 False。
  • notify(n=1):此方法唤醒最多 n 个等待 condition variable 的线程;如果没有线程在等待,则无操作。
  • notify_all():唤醒所有等待此 condition 的线程。此方法类似于 notify(),但会唤醒所有等待的线程而不是一个。如果调用线程在调用此方法时未获取锁,将引发 RuntimeError。

示例

此示例演示了使用 Python 的 threading 模块中的 Condition 对象进行简单的线程间通信。这里 thread_athread_b 使用 Condition 对象进行通信,thread_a 等待直到收到来自 thread_b 的通知。thread_b 在通知 thread_a 之前休眠 2 秒,然后结束。

from threading import Condition, Thread
import time

c = Condition()

def thread_a():
    print("Thread A started")
    with c:
        print("Thread A waiting for permission...")
        c.wait()
        print("Thread A got permission!")
    print("Thread A finished")

def thread_b():
    print("Thread B started")
    with c:
        time.sleep(2)
        print("Notifying Thread A...")
        c.notify()
    print("Thread B finished")

Thread(target=thread_a).start()
Thread(target=thread_b).start()

输出

执行上述代码将得到以下输出 −

Thread A started
Thread A waiting for permission...
Thread B started
Notifying Thread A...
Thread B finished
Thread A got permission!
Thread A finished

示例

以下是另一个代码示例,演示了 Condition 对象如何用于线程间通信。在此,线程 t2 运行 taskB() 函数,线程 t1 运行 taskA() 函数。t1 线程获取 condition 并通知它。

此时,t2 线程处于等待状态。condition 被释放后,等待的线程继续执行,消费由通知函数生成随机数。

from threading import Condition, Thread
import time
import random

numbers = []

def taskA(c):
    for _ in range(5):
        with c:
            num = random.randint(1, 10)
            print("Generated random number:", num)
            numbers.append(num)
            print("Notification issued")
            c.notify()
        time.sleep(0.3)

def taskB(c):
    for i in range(5):
        with c:
            print("waiting for update")
            while not numbers: 
                c.wait()
            print("Obtained random number", numbers.pop())
        time.sleep(0.3)

c = Condition()
t1 = Thread(target=taskB, args=(c,))
t2 = Thread(target=taskA, args=(c,))
t1.start()
t2.start()
t1.join()
t2.join()
print("Done")

执行此代码将产生以下 输出

waiting for update
Generated random number: 2
Notification issued
Obtained random number 2
Generated random number: 5
Notification issued
waiting for update
Obtained random number 5
Generated random number: 1
Notification issued
waiting for update
Obtained random number 1
Generated random number: 9
Notification issued
waiting for update
Obtained random number 9
Generated random number: 2
Notification issued
waiting for update
Obtained random number 2
Done