Curator的InterProcessMutex实现可重入锁的核心是通过路径后缀记录线程ID和重入次数。重复加锁时,检查路径是否存在且owner匹配当前线程,重入计数+1;释放时计数-1,直至0时删除节点,确保同一线程多次加锁释放后才真正解锁,避免其他线程误获取。
源码解析:acquire方法
Curator的InterProcessMutex.acquire()首先通过client.checkExists().usingWatcher()检查锁路径是否存在,如果存在且getLockNodeBytes()解析的LockData.owner线程ID与当前线程ID匹配,则原子性地递增重入次数,直接返回成功。否则,创建临时顺序节点参与竞争。
具体代码:在LockInternals.doAcquire中,if (lockData.isValid(getParticipantNode())) { incrementReentrancy(); return null; } 然后tryToLock。
重复加锁机制
当线程首次获取锁,Curator在ZK路径下创建临时顺序节点,如/lock/locknode0000000001,并写入数据{ownerThreadId,reentrancyCount=1}。同一线程重复acquire时,解析该节点数据,验证owner匹配后,使用ZK的多事务操作(setData)原子递增reentrancyCount,继续持有锁而不创建新节点。
这种设计巧妙利用了ZK的顺序节点+临时节点+数据记录,确保可重入性和自动释放(会话断开节点消失)。
释放锁机制
释放时,internalLockNodeList.get(0)的节点数据被解析,如果owner匹配且reentrancyCount>1,则setData递减计数;若计数降至0,则delete节点。释放失败不抛异常,依赖watch通知其他等待者。
源码片段:void release() { if (internalLockNodeList != null) { LockData lockData = getLockNodeBytes(); if (lockData != null && lockData.getLockPath().startsWith(path)) { if (lockData.getOwner() == Thread.currentThread().getId()) { decrementReentrancy(); if (reentrancyCount.get() == 0) { sendEvent(LockEventType.UNLOCKED); } } } } }
实践分享:异常情况处理
在生产环境中,如果线程异常退出未释放,重入计数不会复位,但由于临时节点会因会话超时自动删除,其他线程可获取。推荐设置较短的sessionTimeout和公平锁模式,避免饥饿。
使用示例:InterProcessMutex lock = new InterProcessMutex(client, "/mylock"); lock.acquire(); try { //业务逻辑 } finally { lock.release(); }
性能优化与注意事项
Curator缓存锁路径列表,通过Watcher监听变化,减少getChildren调用。重复加锁无需创建节点,性能高。但高并发下,频繁setData可能成为瓶颈,建议结合业务拆分锁粒度。
FAQ:
Q: Curator可重入锁如何保证原子性?
A: 通过ZK的原子setData和多节点事务操作,确保计数更新与节点存在检查一致。
Q: 线程死亡后锁会自动释放吗?
A: 是,临时节点绑定会话,超时自动删除。
Q: 如何实现公平锁?
A: 使用InterProcessMutex的共享锁模式或Curator的公平实现,顺序节点天然支持。
Q: 重复加锁上限是多少?
A: 无硬上限,由ZK数据大小限制,通常Integer.MAX_VALUE足够。