欢迎访问 生活随笔!

ag凯发k8国际

当前位置: ag凯发k8国际 > 编程资源 > 编程问答 >内容正文

编程问答

zookeeper做分布式锁 -ag凯发k8国际

发布时间:2024/9/30 编程问答 30 豆豆
ag凯发k8国际 收集整理的这篇文章主要介绍了 zookeeper做分布式锁 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

原理

多个客户端抢一把锁,这些客户端在 zookeeper 里创建各自的临时序列node,无论什么顺序,阻塞住,直到找到第一个创建的节点才放开,即第一个客户端获得了锁,不是第一个的客户端,就阻塞,直到第一个客户端运行结束,删除他创建的节点,让剩下的客户端继续轮循直到找到第一个的过程。

结果展示

用10个线程模拟10个客户端的结果如下:

代码

package org.faithgreen.lock1;import org.apache.zookeeper.*; import org.apache.zookeeper.data.stat;import java.nio.charset.standardcharsets; import java.util.collections; import java.util.list; import java.util.concurrent.countdownlatch;public class lockwatchcallback1 implements watcher, asynccallback.statcallback, asynccallback.childrencallback, asynccallback.stringcallback {private zookeeper zk;private string threadname;private string pathname;private countdownlatch latch = new countdownlatch(1);public zookeeper getzk() {return zk;}public void setzk(zookeeper zk) {this.zk = zk;}public string getthreadname() {return threadname;}public void setthreadname(string threadname) {this.threadname = threadname;}public string getpathname() {return pathname;}public void setpathname(string pathname) {this.pathname = pathname;}/*** 创建 createmode.ephemeral_sequential 节点,阻塞*/public void trylock() {zk.create("/lock", threadname.getbytes(standardcharsets.utf_8), zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential, this, "abc");try {latch.await();} catch (interruptedexception e) {e.printstacktrace();}}/*** zk.create 的 stringcallback impl* 如果创建完了,这个回调会被调用,获取children** @param rc* @param path* @param o* @param childpath*/@overridepublic void processresult(int rc, string path, object o, string childpath) {if (childpath != null) {system.out.println(childpath " created");pathname = childpath;zk.getchildren("/", this, this, "dev");}}/*** 获取 children 的回调* 多个客户端创建节点,list 总是有第一个的,如果是第一个,线程放行,否则在前一个节点上添加 watcher** @param rc* @param s* @param o* @param list*/@overridepublic void processresult(int rc, string s, object o, list<string> list) { // collections.sort(list);int i = list.indexof(pathname.substring(1));if (i == 0) {// 是第一个,就对了,获取锁了system.out.println(threadname " is first ....");latch.countdown();} else {// 不是第一个,就在他的前一个加 watcherzk.exists("/" list.get(i - 1), this, this, "dev");}}/*** children 节点的 watcher* 在释放锁是删除节点的时候触发再一次获取 children 的过程** @param e*/@overridepublic void process(watchedevent e) {switch (e.gettype()) {case none:break;case nodecreated:break;case nodedeleted:zk.getchildren("/", false, this, "dadd");break;case nodedatachanged:break;case nodechildrenchanged:break;}}@overridepublic void processresult(int i, string s, object o, stat stat) {}/*** 释放锁的时候,删除节点*/public void unlock() {try {system.out.println(threadname "工作结束,释放锁 ....");zk.delete(pathname, -1);} catch (interruptedexception e) {e.printstacktrace();} catch (keeperexception e) {e.printstacktrace();}} } package org.faithgreen.lock1;import org.apache.zookeeper.zookeeper; import org.faithgreen.conf1.zookeeperutils; import org.junit.after; import org.junit.before; import org.junit.test;import java.io.ioexception;public class testlock1 {zookeeper zk;@beforepublic void getzk() {zk = zookeeperutils.getzk();}@afterpublic void after() {try {zk.close();} catch (interruptedexception e) {e.printstacktrace();}}@testpublic void lock() {for (int i = 0; i < 10; i) {new thread(() -> {lockwatchcallback1 w = new lockwatchcallback1();string name = thread.currentthread().getname();w.setthreadname(name);w.setzk(zk);// 获取锁w.trylock();// 模拟客户端得到锁之后的工作system.out.println(name " working ...");// 释放锁w.unlock();}).start();}try {system.in.read();} catch (ioexception e) {e.printstacktrace();}} } package org.faithgreen.conf1;import org.apache.zookeeper.zookeeper;import java.io.ioexception; import java.util.concurrent.countdownlatch;public class zookeeperutils {static zookeeper zk;final static string address = "192.168.172.3:2181,192.168.172.4:2181,192.168.172.5:2181,192.168.172.6:2181/testlock";static countdownlatch latch = new countdownlatch(1);static defaultwatcher defaultwatcher = new defaultwatcher();public static zookeeper getzk() {zookeeper zk = null;try {zk = new zookeeper(address, 3000, defaultwatcher);defaultwatcher.setlatch(latch);latch.await();} catch (ioexception e) {e.printstacktrace();} catch (interruptedexception e) {e.printstacktrace();}return zk;}}

总结

以上是ag凯发k8国际为你收集整理的zookeeper做分布式锁的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得ag凯发k8国际网站内容还不错,欢迎将ag凯发k8国际推荐给好友。

网站地图