操作系统 - 读者-写者问题
读者-写者问题是两个并发进程之间共享一个公共资源(如数据库或变量)的同步问题。请阅读本章以了解该问题、其条件以及如何使用信号量实现同步机制来解决它。
- 读者-写者问题详解
- 问题的解决方案
- 在编程语言中的实现
读者-写者问题详解
在读者-写者问题中,有两种类型的进程:读者和写者,它们之间共享一个共享资源。读者可以随时读取共享资源而不会出现任何问题。但对于写者,它们需要对资源进行独占访问以避免数据不一致。也就是说,当一个写者在向资源写入时,不允许其他写者或读者访问它。
我们的任务是设计一个同步机制,允许多个读者同时读取资源,但同一时间只有一个写者可以向资源写入。而且,当一个写者在写入时,不允许任何读者读取资源。
数据不一致是如何发生的?
想象有 2 个写者(W1 和 W2)、2 个读者(R1 和 R2)以及一个共享资源(变量 X),其初始值为 5。假设写者也有读取变量的能力。
Initial Value of X = 5 W1 reads X (=5) W2 reads X (=5) W1 increments X by 1 (Now, X=6) W2 decrements X by 1 (Now, X=4) W2 writes X (=4) R1 reads X (=4) W1 writes X (=6) R2 reads X (=6) R1's value of X : 4 R2's value of X : 6
这展示了由于写者的并发写入,相同数据对不同读者可能具有不同含义。这被称为数据不一致。想象一下,如果在银行系统中两个交易同时尝试更新同一账户余额而没有适当的同步,会发生什么!!
问题的解决方案
要解决这个问题,我们可以使用信号量来管理对共享资源的访问。我们可以使用以下信号量 −
- mutex − 此信号量用于在更新活动读者计数时提供互斥。
- writeBlock − 此信号量用于在存在活动读者时阻塞写者。
读者进程的伪代码如下 −
void reader() {
wait(mutex);
read_count++;
if (read_count == 1) {
wait(writeBlock); // 第一个读者锁定写者
}
signal(mutex);
// 关键区(读取)
read_data();
wait(mutex);
read_count--;
if (read_count == 0) {
signal(writeBlock); // 最后一个读者释放写者
}
signal(mutex);
}
写者进程的伪代码如下 −
void writer() {
wait(writeBlock); // 获取写入的独占访问
// 关键区(写入)
write_data();
signal(writeBlock); // 释放独占访问
}
在这里,读者进程在开始读取时递增 read_count,在完成时递减它。这允许多个读者同时读取资源。第一个读者锁定 writeBlock 信号量,以防止在存在活动读者时写者读取。
编程语言中的实现
以下是使用信号量在 C++、Python 和 Java 中实现读者-写者问题的简单示例 −
注意 − 该解决方案使用 多线程 来模拟并发读者和写者。
import threading
import time
# 共享变量
X = 5
# 读者-写者同步
mutex = threading.Lock() # 保护 read_count
write_lock = threading.Lock() # 写者独占锁
read_count = 0
def reader(name: str, delay_before=0.0):
global read_count, X
time.sleep(delay_before)
# 进入区
with mutex:
read_count += 1
if read_count == 1:
# 第一个读者锁定写者
write_lock.acquire()
# 关键区(读取)
print(f"[{name}] READ X = {X}")
# 退出区
with mutex:
read_count -= 1
if read_count == 0:
# 最后一个读者释放写者
write_lock.release()
def writer(op: str, name: str, delay_before=0.0, delay_between_read_write=0.0):
global X
time.sleep(delay_before)
# 获取独占访问权限以完成完整 RMW
with write_lock:
old = X
print(f"[{name}] READ X = {old}")
# 模拟写入前的一些工作(允许测试其他线程被阻塞)
if delay_between_read_write > 0:
time.sleep(delay_between_read_write)
# 应用操作
if callable(op):
new = op(old)
elif op == "inc":
new = old + 1
elif op == "dec":
new = old - 1
else:
raise ValueError("Unsupported op")
X = new
print(f"[{name}] WROTE X = {X}")
def demo_synchronized():
global X
X = 5
print("Initial X =", X)
w1 = threading.Thread(target=writer, args=("inc", "W1", 0.0, 0.5))
w2 = threading.Thread(target=writer, args=("dec", "W2", 0.05, 0.0))
r1 = threading.Thread(target=reader, args=("R1", 0.7))
r2 = threading.Thread(target=reader, args=("R2", 1.2))
w1.start()
w2.start()
r1.start()
r2.start()
w1.join()
w2.join()
r1.join()
r2.join()
print("Final X =", X)
if __name__ == "__main__":
demo_synchronized()
上述 Python 代码的输出将类似于 −
Initial X = 5 [W1] READ X = 5 [W1] WROTE X = 6 [W2] READ X = 6 [W2] WROTE X = 5 [R1] READ X = 5 [R2] READ X = 5 Final X = 5
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
#include <string>
#include <vector>
#include <random>
int X = 5;
std::mutex mutex; // 保护 read_count
std::mutex write_lock; // 写者独占锁;由第一个读者持有以阻塞写者
int read_count = 0;
void reader(const std::string &name, int delay_before_ms = 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(delay_before_ms));
mutex.lock();
read_count++;
if (read_count == 1) {
write_lock.lock();
}
mutex.unlock();
// 读取
std::cout << "[" << name << "] READ X = " << X << std::endl;
mutex.lock();
read_count--;
if (read_count == 0) {
write_lock.unlock();
}
mutex.unlock();
}
void writer(const std::string &op, const std::string &name, int delay_before_ms = 0, int delay_between_ms = 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(delay_before_ms));
write_lock.lock();
int old = X;
std::cout << "[" << name << "] READ X = " << old << std::endl;
if (delay_between_ms > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(delay_between_ms));
}
int nw = old;
if (op == "inc") nw = old + 1;
else if (op == "dec") nw = old - 1;
else {
try {
nw = std::stoi(op);
} catch (...) { /* ignore, keep old */ }
}
X = nw;
std::cout << "[" << name << "] WROTE X = " << X << std::endl;
write_lock.unlock();
}
int main() {
X = 5;
std::cout << "Initial X = " << X << std::endl;
std::thread w1(writer, "inc", "W1", 0, 500); // W1 递增,在 RMW 期间持有锁
std::thread w2(writer, "dec", "W2", 50, 0); // W2 在 W1 开始后不久尝试,将等待
std::thread r1(reader, "R1", 700); // R1 在一些事件后读取
std::thread r2(reader, "R2", 1200); // R2 稍后读取
w1.join();
w2.join();
r1.join();
r2.join();
std::cout << "Final X = " << X << std::endl;
return 0;
}
上述 C++ 代码的输出将类似于 −
Initial X = 5 [W1] READ X = 5 [W1] WROTE X = 6 [W2] READ X = 6 [W2] WROTE X = 5 [R1] READ X = 5 [R2] READ X = 5 Final X = 5
import java.util.concurrent.locks.ReentrantLock;
public class RWDemo {
static int X = 5;
static ReentrantLock mutex = new ReentrantLock(); // 保护 readCount
static ReentrantLock writeLock = new ReentrantLock(); // 写者独占锁;由第一个读者持有
static int readCount = 0;
static void reader(String name, int delayBeforeMs) {
try {
Thread.sleep(delayBeforeMs);
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
mutex.lock();
try {
readCount++;
if (readCount == 1) {
writeLock.lock();
}
} finally {
mutex.unlock();
}
// 读取
System.out.println("[" + name + "] READ X = " + X);
mutex.lock();
try {
readCount--;
if (readCount == 0) {
writeLock.unlock();
}
} finally {
mutex.unlock();
}
}
static void writer(String op, String name, int delayBeforeMs, int delayBetweenMs) {
try {
Thread.sleep(delayBeforeMs);
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
writeLock.lock();
try {
int oldVal = X;
System.out.println("[" + name + "] READ X = " + oldVal);
if (delayBetweenMs > 0) {
try { Thread.sleep(delayBetweenMs); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
int nw = oldVal;
if ("inc".equals(op)) nw = oldVal + 1;
else if ("dec".equals(op)) nw = oldVal - 1;
else {
try { nw = Integer.parseInt(op); } catch (NumberFormatException ex) { /* keep old */ }
}
X = nw;
System.out.println("[" + name + "] WROTE X = " + X);
} finally {
writeLock.unlock();
}
}
public static void main(String[] args) throws Exception {
X = 5;
System.out.println("Initial X = " + X);
Thread w1 = new Thread(() -> writer("inc", "W1", 0, 500));
Thread w2 = new Thread(() -> writer("dec", "W2", 50, 0));
Thread r1 = new Thread(() -> reader("R1", 700));
Thread r2 = new Thread(() -> reader("R2", 1200));
w1.start();
w2.start();
r1.start();
r2.start();
w1.join();
w2.join();
r1.join();
r2.join();
System.out.println("Final X = " + X);
}
}
上述 Java 代码的输出将类似于 −
Initial X = 5 [W1] READ X = 5 [W1] WROTE X = 6 [W2] READ X = 6 [W2] WROTE X = 5 [R1] READ X = 5 [R2] READ X = 5 Final X = 5
结论
读者-写者问题是经典的同步问题,它展示了管理共享资源并发访问的挑战。我们使用信号量实现了一个解决方案,允许多个读者同时访问资源,同时确保写者独占访问。这种方法有助于防止数据不一致,并确保共享资源的完整性。