Zookeeper客户端Curator使用详解

直达一致首稿子介绍了眼前多数总人口以拟合手写笔迹的时利用的算法,
这篇稿子介绍一种自己独创的竟法.

[TOC]

这种算法有以下优点:

Zookeeper客户端Curator使用详解

1)
使用二次等贝塞尔曲线拟合, 计算量大概比较3不成贝塞尔曲线少三分之一.

2)
不必等交用户输入了生一个沾之后, 才会绘制当前有限个点间的曲线,
这种算法可以先绘当前得拟合的线条的平等片段,
能够很及时的拿用户之输入反馈给用户,
用户体验立刻提高了2只档次.

3)
不用计量控制点, 处理起来更为简明, 计算量也再度减少,
用户绘制体验得到进一步提高.

4)
笔迹拟合更加接近真实手写的笔迹.

简介

Curator是Netflix公司开源之等同模拟zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的底细开发工作,包括连续重连、反复注册Watcher和NodeExistsException异常等等。Patrixck
Hunt(Zookeeper)以平等词“Guava is to Java that Curator to
Zookeeper”给Curator予高度评价。
引子和趣闻:
Zookeeper名字的原因是比较有趣之,下面的局部摘抄自《从PAXOS到ZOOKEEPER分布式一致性原理及实践》一题:
Zookeeper最早起源于雅虎的研究院的一个研究小组。在这,研究人口发现,在雅虎内部多重型的体系要依赖一个像样之网开展分布式协调,但是这些系统往往在分布式单点问题。所以雅虎的开发人员就准备开一个通用的无论是单点问题之分布式协调框架。在立项初期,考虑到众多型还是故动物之讳来定名的(例如知名的Pig项目),雅虎的工程师希望让这路为取一个动物的讳。时任研究院的首席科学家Raghu
Ramakrishnan开玩笑说:再这么下去,我们这儿就成为动物园了。此话一来,大家纷纷表示就被动物园管理员吧——因为各个以动物命名的分布式组件放在同,雅虎的全分布式系统看上去就比如一个大型的动物园了,而Zookeeper正好用来进展分布式环境之和谐——于是,Zookeeper的名字由此诞生了。

Curator无疑是Zookeeper客户端中的瑞士军刀,它译作”馆长”或者”管理者”,不懂得是勿是出小组有意要为之,笔者猜测有或这样命名的因是验证Curator就是Zookeeper的馆长(脑洞有硌大:Curator就是动物园的园长)。
Curator包含了几单包:
curator-framework:本着zookeeper的根api的一部分包
curator-client:提供有客户端的操作,例如重试策略等
curator-recipes:包了片高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
Maven依赖(使用curator的版本:2.12.0,对承诺Zookeeper的本子也:3.4.x,一旦跨越版据会有兼容性问题,很有或导致节点操作失败):

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

 

Curator的基本Api

来以下缺点:

缔造会话

自实在尼玛没觉察发生缺点,
我确实不克掩人耳目大家, 它肯定无缺陷,

自身无要寻找一个短出来啊!!!?,作为一个程序员,
我莫可知说谎啊!!!!!O(∩_∩)O哈哈~

1.采用静态工程方创建客户端

一个例证如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

newClient静态工厂方法包含四只主要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

 

2.运Fluent风格的Api创建会话

基本参数变为流式设置,一个列子如下:

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

这样厉害的算法, 大家是匪是已迫不及待了.
下面就来被大家大快朵颐这算法的笔触, 先看下面的图解:

3.创立包含隔离命名空间的对话

为贯彻不同之Zookeeper业务之间的隔断,需要呢每个业务分配一个单身的命名空间(NameSpace),即指定一个Zookeeper的彻底路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(下面的例证)当客户端指定了独立命名空间啊“/base”,那么该客户端对Zookeeper上之数量节点的操作都是冲该目录进行的。通过安装Chroot可以拿客户端应用及Zookeeper服务端的同课子树相对应,在差不多单以共用一个Zookeeper集群的状况下,这对贯彻不同应用中的交互隔离十分起意义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

葡京在线开户 1

起步客户端

当创建会讲话成功,得到client的实例然后可以直接调用其start( )方法:

client.start();

兴许大家只看图虽曾亮应怎么开了. 现在照图备受之号, 若果:ABCDEFG为本来笔迹点. 

数量节点操作

 

开创数量节点

Zookeeper的节点创建模式:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带来序列号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:临时又带动序列号

**缔造一个节点,初始内容呢空 **

client.create().forPath("path");

留意:如果没有设置节点性,节点创建模式默认为持久化节点,内容默认为空

开创一个节点,附带初始化内容

client.create().forPath("path","init".getBytes());

创办一个节点,指定创建模式(临时节点),内容吧空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

缔造一个节点,指定创建模式(临时节点),附带初始化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

创立一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

夫creatingParentContainersIfNeeded()接口非常有因此,因为一般情形开发人员在创建一个子节点必须认清它们的父节点是否是,如果未设有直接创造会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够活动递归创建所有所需的父节点。

1) 当用户通过点击鼠标或者点击手机屏幕手势, 输入点A时,
我们于A的职务画下一个有点圆点

去除数据节点

除去一个节点

client.delete().forPath("path");

只顾,此方只有能够去除叶子节点,否则会弃来很。

删除一个节点,并且递归删除其独具的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

去除一个节点,强制指定版本进行删减

client.delete().withVersion(10086).forPath("path");

除去一个节点,强制保险删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是一个维持办法,只要客户端会话有效,那么Curator会在后台持续开展去操作,直到删除节点成功。

注意:点的大都只流式接口是可以自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

2) 首先需办一个系数k,取值为(0,
0.5]中间的略
数. 当用于通过走, 输入了亚独点B时, 我们当线段AB上找到一个点A’, 使得 |A’B| /
|AB| = k,
并绘制线段AA’, 将该看做手写笔迹的相同总理分. 

读取数据节点数据

读取一个节点的数内容

client.getData().forPath("path");

顾,此方式返的返回值是byte[ ];

读取一个节点的数额内容,同时得到到拖欠节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

3) 当用户还移动鼠标, 得到得到第三单点C时, 我们在BC上, 找到两独点, B’ 暨 B”, 满足
|BB’| / |BC| = |B”C| / |BC| = k
, 然后用前的 A’ 和 B’ 作为少数独端点,

履新数据节点数据

更新一个节点的数内容

client.setData().forPath("path","data".getBytes());

留神:该接口会回到一个Stat实例

更新一个节点的数额内容,强制指定版本进行创新

client.setData().withVersion(10086).forPath("path","data".getBytes());

  点B作为控制点, 绘制A’BB’
描述的亚破贝塞尔曲线
. 作为手写笔迹的平总理分.

自我批评节点是否在

client.checkExists().forPath("path");

留神:该措施返回一个Stat实例,用于检查ZNode是否留存的操作.
可以调用额外的计(监控或后台处理)并当结尾调用forPath(
)指定要操作的ZNode

4) 连接B’B”的直线线段, 作为下写笔迹的一律管分. 

得有节点的所有子节点路径

client.getChildren().forPath("path");

专注:该措施的返回值为List<String>,获得ZNode的子节点Path列表。
可以调用额外的法门(监控、后台处理或者取得状态watch, background or get
stat) 并于最后调用forPath()指定要操作的父ZNode

5) 当用于输入点D,E,F…….时,
回到第2步, 循环执行2,3,4.

事务

CuratorFramework的实例包含inTransaction(
)接口方法,调用此方被一个ZooKeeper事务. 可以复合create, setData,
check, and/or delete
等操作然后调用commit()作为一个原子操作提交。一个例证如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

6) 当用于输入最后一个点G时, 执行2, 3步, 然后直连接F’G, 结束绘制.

异步接口

面提到的创建、删除、更新、读取等方式还是一起的,Curator提供异步接口,引入了BackgroundCallback接口用于拍卖异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个要的回调值吗CuratorEvent,里面含事件类、响应也和节点的详细信息。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

一个异步创建节点的事例如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法无指定executor,那么会默认使用Curator的EventThread去进行异步处理。

 

Curator食谱(高级特性)

提醒:首先你不能不补偿加curator-recipes依赖,下文仅仅针对recipes一些风味的运用进行诠释以及举例,不打算进行源码级别的探究

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

重点提醒:强烈推荐使用ConnectionStateListener监控连接的状态,当连接状态呢LOST,curator-recipes下的有所Api将会见失灵或者过,尽管后面有的事例都尚未用及ConnectionStateListener。

何以要把第4步单独分离出来呢, 因为当k取值为0.5之时刻,
B’B”, C’C”…..F’F” 直接重合为跟一个触及,
就可直接省略弟4步
.(实践证明, k值取0.5, 不但速度快,
效果还非常好!!!!
)

缓存

Zookeeper原生支持通过注册Watcher来进展事件监听,但是开发者需要数注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包装,可以看作是针对性事件监听的地头缓存视图,能够自动吗开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变型。

 

Path Cache

Path Cache用来监督一个ZNode的子节点. 当一个子节点增加, 更新,删除时,
Path Cache会改变它们的状态, 会包含最新的子节点,
子节点的多少与状态,而状态的更变将经过PathChildrenCacheListener通知。

事实上用时会涉嫌到四单类似:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

通过下的构造函数创建Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

纪念用cache,必须调用它的start方法,使用完后调用close办法。
可以安装StartMode来实现启动的模式,

StartMode有下面几乎种:

  1. NORMAL:正常初始化。
  2. BUILD_INITIAL_CACHE:在调用start()前会调用rebuild()
  3. POST_INITIALIZED_EVENT:
    当Cache初始化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)足追加listener监听缓存的更动。

getCurrentData()措施返回一个List<ChildData>目标,可以遍历所有的子节点。

装/更新、移除其实是使用client (CuratorFramework)来操作,
不经过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:设若new PathChildrenCache(client, PATH,
true)中之参数cacheData值设置为false,则示例中之event.getData().getData()、data.getData()将回到null,cache将无见面缓存节点数据。

注意:以身作则中之Thread.sleep(10)可以注释掉,但是注释后事件监听的触发次数会不统,这也许和PathCache的实现原理有关,不可知太过累的触及事件!

夫算法, 初看起, 有一部分题材, 整个曲线没有经过作为本笔迹点的BCDEF,
是不是功力不理想也???..再仔细思转:

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某一个一定的节点。它关系到下面的老三只类似:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:应用cache,依然要调用它的start()计,使用完后调用close()方法。

getCurrentData()将收获节点当前底状态,通过它的状态好得时之价。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:示范中之Thread.sleep(10)可以注释,但是注释后事件监听的触发次数会不咸,这恐怕与NodeCache的贯彻原理有关,不克无限过频之点事件!

注意:NodeCache只能监听一个节点的状态变化。

使用点ABC来举例, 尽管如此尚无经点B,
AA’和B’B两长达线段的轨道是意同原先笔迹的连线重合的
,
即使阈值取0.5之情形, 也闹半点只点(A’, B’)和原先笔迹连线重合’

Tree Cache

Tree
Cache可以监控整个树上的装有节点,类似于PathCache和NodeCache的整合,主要涉及到下面四独八九不离十:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCacheEvent – 触发的轩然大波类
  • ChildData – 节点数据

public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:于这示例中无动用Thread.sleep(10),但是事件触发次数也是常规的。

注意:TreeCache在初始化(调用start()计)的时光会回调TreeCacheListener实例一个事TreeCacheEvent,而回调的TreeCacheEvent对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()万分有或致空指针异常,这里当主动处理并避免这种情景。

所以, 咱们虽然舍了同样株树,得到了同一切片丛林;放弃一个碰,
重合了多单点, 我们尚得经阈值k来支配曲线的拟合程度, k越小, 转角的地方尤其锐利; k越怪,
拟合越平滑.

Leader选举

于分布式计算中, leader elections凡是格外重要的一个效,
这个选举过程是这样子的: 指派一个历程作为组织者,将任务分发给各级节点。
在任务开始前,
哪个节点都无理解哪个是leader(领导者)或者coordinator(协调者).
当选举算法开始实践后, 每个节点最终见面沾一个唯一的节点作为任务leader.
除此之外,
选举还三天两头会起在leader意外宕机的情景下,新的leader要被选举出来。

以zookeeper集群中,leader负责写操作,然后通过Zab共商落实follower的联名,leader或者follower都好处理读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前者是具备存活的客户端不间断的交替开Leader,大同社会。后者是要是选举产生Leader,除非有客户端挂掉还触发选举,否则不见面交出领导权。某党?

 

LeaderLatch

LeaderLatch有三三两两只构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

设若启动,LeaderLatch会和任何使用相同latch
path的别LeaderLatch交涉,然后中间一个末尾见面受推选为leader,可以通过hasLeadership方查看LeaderLatch实例是否leader:

leaderLatch.hasLeadership( ); //返回true说明时实例是leader

恍如JDK的CountDownLatch,
LeaderLatch在请成为leadership会block(阻塞),一旦不应用LeaderLatch了,必须调用close艺术。
如果它们是leader,会放出leadership, 其它的参与者以见面选一个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

很处理:
LeaderLatch实例可以加ConnectionStateListener来监听网络连接问题。 当
SUSPENDED 或 LOST 时,
leader不再认为好还是leader。当LOST后总是重连后RECONNECTED,LeaderLatch会删除先前底ZNode然后再创设一个。LeaderLatch用户须考虑导致leadership丢失的连续问题。
强烈推荐你以ConnectionStateListener。

一个LeaderLatch的用例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

可添加test module的倚重方便开展测试,不需要启动真实的zookeeper服务端:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

第一我们创建了10个LeaderLatch,启动后它遭的一个会于推选为leader。
因为选举会花费一些时日,start后连无能够立就是赢得leader。
通过hasLeadership翻开自己是不是是leader, 如果是的言语返回true。
可以通过.getLeader().getId()可以获时之leader的ID。
无非能够通过close自由当前的领导权。
await举凡一个死方法, 尝试获取leader地位,但是未必会上位。

如出一辙,为了大家学习好,
我以前头一篇文章的底子及稍微作改, 把这种算法用Python实现出来,
提供大家参考和掌握:

LeaderSelector

LeaderSelector以的当儿根本涉嫌下面几乎单类似:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

核心类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦启动,当实例取得领导权时你的listener的takeLeadership()主意被调用。而takeLeadership()方法就生领导权被假释时才回去。
当你不再使用LeaderSelector实例时,应该调用它的close方法。

可怜处理
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接状态的更改。如果实例成为leader,
它应有响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现经常,
实例必要在重连接成之前她恐怕不再是leader了。 如果LOST状态出现,
实例不再是leader, takeLeadership方法返回。

重要: 推荐处理方式是当接到SUSPENDED 或
LOST时抛来CancelLeadershipException异常.。这会促成LeaderSelector实例中断并销执行takeLeadership方法的异常.。这万分主要,
你必须考虑扩大LeaderSelectorListenerAdapter.
LeaderSelectorListenerAdapter提供了推介的处理逻辑。

下的一个事例摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

公可当takeLeadership进行任务之分配等等,并且毫不回来,如果你想如果如之实例一直是leader的言语可以加以一个死循环。调用
leaderSelector.autoRequeue();担保在这个实例释放领导权之后还可能赢得领导权。
在此处我们下AtomicInteger来记录此client获得领导权的次数, 它是”fair”,
每个client有相同之机会取得领导权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

对照能够,LeaderLatch必须调用close()道才会放出领导权,而对于LeaderSelector,通过LeaderSelectorListener可以针对领导权进行控制,
在适度的时刻释放领导权,这样每个节点都发生或赢得领导权。从而,LeaderSelector具有更好的八面玲珑和可控性,建议发LeaderElection应用场景下先使用LeaderSelector。

  1 #!/usr/bin/env python
  2 # -*- coding: utf-8 -*-
  3 import numpy as np
  4 from scipy.special import comb, perm
  5 import matplotlib.pyplot as plt
  6 
  7 plt.rcParams['font.sans-serif'] = ['SimHei']
  8 # plt.rcParams['font.sans-serif'] = ['STXIHEI']
  9 plt.rcParams['axes.unicode_minus'] = False
 10 
 11 class Handwriting:
 12     def __init__(self, line):
 13         self.line = line
 14         self.index_02 = None  # 保存拖动的这个点的索引
 15         self.press = None  # 状态标识,1为按下,None为没按下
 16         self.pick = None  # 状态标识,1为选中点并按下,None为没选中
 17         self.motion = None  # 状态标识,1为进入拖动,None为不拖动
 18         self.xs = list()  # 保存点的x坐标
 19         self.ys = list()  # 保存点的y坐标
 20         self.cidpress = line.figure.canvas.mpl_connect('button_press_event', self.on_press)  # 鼠标按下事件
 21         self.cidrelease = line.figure.canvas.mpl_connect('button_release_event', self.on_release)  # 鼠标放开事件
 22         self.cidmotion = line.figure.canvas.mpl_connect('motion_notify_event', self.on_motion)  # 鼠标拖动事件
 23         self.cidpick = line.figure.canvas.mpl_connect('pick_event', self.on_picker)  # 鼠标选中事件
 24         self.ctl_point_1 = None
 25 
 26     def on_press(self, event):  # 鼠标按下调用
 27         if event.inaxes != self.line.axes: return
 28         self.press = 1
 29 
 30     def on_motion(self, event):  # 鼠标拖动调用
 31         if event.inaxes != self.line.axes: return
 32         if self.press is None: return
 33         if self.pick is None: return
 34         if self.motion is None:  # 整个if获取鼠标选中的点是哪个点
 35             self.motion = 1
 36             x = self.xs
 37             xdata = event.xdata
 38             ydata = event.ydata
 39             index_01 = 0
 40             for i in x:
 41                 if abs(i - xdata) < 0.02:  # 0.02 为点的半径
 42                     if abs(self.ys[index_01] - ydata) < 0.02: break
 43                 index_01 = index_01 + 1
 44             self.index_02 = index_01
 45         if self.index_02 is None: return
 46         self.xs[self.index_02] = event.xdata  # 鼠标的坐标覆盖选中的点的坐标
 47         self.ys[self.index_02] = event.ydata
 48         self.draw_01()
 49 
 50     def on_release(self, event):  # 鼠标按下调用
 51         if event.inaxes != self.line.axes: return
 52         if self.pick is None:  # 如果不是选中点,那就添加点
 53             self.xs.append(event.xdata)
 54             self.ys.append(event.ydata)
 55         if self.pick == 1 and self.motion != 1:  # 如果是选中点,但不是拖动点,那就降阶
 56             x = self.xs
 57             xdata = event.xdata
 58             ydata = event.ydata
 59             index_01 = 0
 60             for i in x:
 61                 if abs(i - xdata) < 0.02:
 62                     if abs(self.ys[index_01] - ydata) < 0.02: break
 63                 index_01 = index_01 + 1
 64             self.xs.pop(index_01)
 65             self.ys.pop(index_01)
 66         self.draw_01()
 67         self.pick = None  # 所有状态恢复,鼠标按下到稀放为一个周期
 68         self.motion = None
 69         self.press = None
 70         self.index_02 = None
 71 
 72     def on_picker(self, event):  # 选中调用
 73         self.pick = 1
 74 
 75     def draw_01(self):  # 绘图
 76         self.line.clear()  # 不清除的话会保留原有的图
 77         self.line.set_title('Bezier曲线拟合手写笔迹')
 78         self.line.axis([0, 1, 0, 1])  # x和y范围0到1
 79         # self.bezier(self.xs, self.ys)  # Bezier曲线
 80         self.all_curve(self.xs, self.ys)
 81         self.line.scatter(self.xs, self.ys, color='b', s=20, marker="o", picker=5)  # 画点
 82         # self.line.plot(self.xs, self.ys, color='black', lw=0.5)  # 画线
 83         self.line.figure.canvas.draw()  # 重构子图
 84 
 85     # def list_minus(self, a, b):
 86     #     list(map(lambda x, y: x - y, middle, begin))
 87 
 88     def controls(self, k, begin, end):
 89         if k <= 0 or k >= 1: return
 90         first_middle = begin + k * (end - begin)
 91         second_middle = begin + (1 - k) * (end - begin)
 92         return first_middle, second_middle
 93 
 94 
 95     def all_curve(self, xs, ys):
 96         le = len(xs)
 97         if le < 2: return
 98         self.ctl_point_1 = None
 99 
100         begin = [xs[0], ys[0]]
101         end = [xs[1], ys[1]]
102         self.one_curve(begin, end)
103 
104         for i in range(2, le):
105             begin = end
106             end = [xs[i], ys[i]]
107             self.one_curve(begin, end)
108 
109         end = [xs[le - 1], ys[le - 1]]
110         x = [self.ctl_point_1[0], end[0]]
111         y = [self.ctl_point_1[1], end[1]]
112 
113         #linestyle='dashed',
114         self.line.plot(x, y,  color='yellowgreen', marker='o', lw=3)
115 
116     def one_curve(self, begin, end):
117         ctl_point1 = self.ctl_point_1
118 
119         begin = np.array(begin)
120         end = np.array(end)
121 
122         ctl_point2, self.ctl_point_1 = self.controls(0.4, begin, end)
123         color = 'red';
124         if ctl_point1 is None :
125             xs = [begin[0], self.ctl_point_1[0]]
126             ys = [begin[1], self.ctl_point_1[1]]
127             self.line.plot(xs, ys, color=color, marker='o', linewidth='3')
128         else :
129             xs = [ctl_point1[0], begin[0], ctl_point2[0]]
130             ys = [ctl_point1[1], begin[1], ctl_point2[1]]
131             self.bezier(xs, ys)
132             xs = [ctl_point2[0], self.ctl_point_1[0]]
133             ys = [ctl_point2[1], self.ctl_point_1[1]]
134             self.line.plot(xs, ys, color=color, marker='o', linewidth='3')
135 
136     def bezier(self, *args):  # Bezier曲线公式转换,获取x和y
137         t = np.linspace(0, 1)  # t 范围0到1
138         le = len(args[0]) - 1
139 
140         self.line.plot(args[0], args[1], marker='o', linestyle='dashed', color='limegreen', lw=1)
141         le_1 = 0
142         b_x, b_y = 0, 0
143         for x in args[0]:
144             b_x = b_x + x * (t ** le_1) * ((1 - t) ** le) * comb(len(args[0]) - 1, le_1)  # comb 组合,perm 排列
145             le = le - 1
146             le_1 = le_1 + 1
147 
148         le = len(args[0]) - 1
149         le_1 = 0
150         for y in args[1]:
151             b_y = b_y + y * (t ** le_1) * ((1 - t) ** le) * comb(len(args[0]) - 1, le_1)
152             le = le - 1
153             le_1 = le_1 + 1
154 
155         color = "mediumseagreen"
156         if len(args) > 2: color = args[2]
157         self.line.plot(b_x, b_y, color=color, linewidth='3')
158 
159 fig = plt.figure(2, figsize=(12, 6))
160 ax = fig.add_subplot(111)  # 一行一列第一个子图
161 ax.set_title('手写笔迹贝赛尔曲线, 计算控制点图解')
162 
163 handwriting = Handwriting(ax)
164 plt.xlabel('X')
165 plt.ylabel('Y')
166 
167 # begin = np.array([20, 6])
168 # middle = np.array([30, 40])
169 # end = np.array([35, 4])
170 # handwriting.one_curve(begin, middle, end)
171 # myBezier.controls(0.2, begin, middle, end)
172 plt.show()

分布式锁

提醒:

1.引进用ConnectionStateListener监控连接的状态,因为当连接LOST时你不再抱有锁

2.分布式的锁全局同步,
这表示任何一个时光点不见面时有发生少个客户端都拥有同等之缉。

产同样篇稿子,不出意外应该是这个手写笔迹系列的终极一篇文章.

然而另行可并享锁—Shared Reentrant Lock

Shared意味着锁是大局可见的, 客户端都得要锁。
Reentrant和JDK的ReentrantLock类似,即可重入,
意味着同一个客户端在颇具锁的还要,可以屡屡获,不会见受堵塞。
它是出于接近InterProcessMutex来贯彻。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()博锁,并提供过机制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()术释放锁。 InterProcessMutex 实例可以引用。

Revoking ZooKeeper recipes wiki定义了可商量的吊销机制。
为了撤销mutex, 调用脚的点子:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

假设你要撤销当前底沿,
调用attemptRevoke()办法,注意锁释放时RevocationListener将会晤回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

仲糟糕提醒:错误处理
还是强烈推荐你以ConnectionStateListener拍卖连接状态的转移。
当连接LOST时若不再具有锁。

率先让咱创建一个仿照的共享资源,
这个资源要只能单线程的造访,否则会时有发生起问题。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

接下来创建一个InterProcessMutexDemo类, 它担负请求锁,
使用资源,释放锁这样一个完好无损的顾过程。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也要命简短,生成10个client, 每个client重复执行10不良
请求锁–访问资源–释放锁的长河。每个client都于独立的线程中。
结果可以看到,锁是任意的受每个实例排他性的使用。

既是是可选用的,你可以当一个线程中屡屡调用acquire(),在线程拥有锁时它们连接回到true。

卿免应于差不多只线程中之所以同一个InterProcessMutex
你可以在每个线程中还蛮成一个初的InterProcessMutex实例,它们的path都相同,这样它可以联手享同一个沿。

自我以将自家实现笔锋效果的有血有肉原理与细节,
还有
故C++对算法的切实可行落实,
以及可以一直运行查看效果的Demo一起享用给大家. 

不行重入共享锁—Shared Lock

此锁与方的InterProcessMutex对比,就是遗失了Reentrant的功用,也就是意味着它们不能够以跟一个线程中重入。这个近乎是InterProcessSemaphoreMutex,使用方式与InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运转后发觉,有且仅发生一个client成功获取第一单锁(第一单acquire()措施返回true),然后她好死在亚独acquire()方式,获取第二单锁超时;其他所有的客户端都阻塞在率先独acquire()主意超时并且抛来十分。

这样为就是认证了InterProcessSemaphoreMutex实现之沿是不足重入的。

 

然重复可读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。一个念写锁管理均等对系的锁。一个顶住读操作,另外一个担负写操作。读操作以描绘锁没叫采用时可同时由多个经过使,而写锁在使用时不允许读(阻塞)。

此锁是只是重入的。一个兼有写锁之线程可重入读锁,但是读锁却不可知进来写锁。这吗表示写锁得降成读锁,
比如请求写锁 —>请求读锁—>释放读锁
—->释放写锁
。从读锁升级成写锁是甚的。

然再次可读写锁主要由简单只类似实现:InterProcessReadWriteLockInterProcessMutex。使用时首先创建一个InterProcessReadWriteLock实例,然后还根据你的求得到读锁或者写锁,读写锁的类是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

无良商家老板娘拖欠两单月工资了, 
穷得叮当响,
.真尼玛坑啊,我乘!!!!!!!!现在每天吃8片钱的蛋炒饭,
早上沾同样卖,中午凭着一半, 晚上吃一半, 日子真实苦啊..

信号量—Shared Semaphore

一个计数的信号量类似JDK的Semaphore。
JDK中Semaphore维护的等同组认可(permits),而Curator中称租约(Lease)。
有有限种植方法得以控制semaphore的顶老租约数。第一栽艺术是用户给定path并且指定最酷LeaseSize。第二种植方法用户为定path并且利用SharedCountReader类。万一未以SharedCountReader,
必须管所有实例在多进程中以相同之(最酷)租约数量,否则发生或出现A进程遭到之实例持有最充分租约数量为10,但是于B进程面临保有的最为可怜租约数量也20,此时租约的意义就是失效了。

这次调用acquire()会回一个租约对象。
客户端必须在finally中close这些租约对象,否则这些租约会丢失不见。 但是,
但是,如果客户端session由于某种原因比如crash丢掉,
那么这些客户端有的租约会自动close,
这样任何客户端可继承采用这些租约。 租约还可由此下的方法赶回还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

在意你可一次性请求多独租约,如果Semaphore当前之租约不够,则请线程会受堵塞。
同时还提供了过的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的重要性类包括下面几乎个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

第一我们事先拿走了5个租约, 最后咱们拿它还被了semaphore。
接着要了一个租约,因为semaphore还发生5独租约,所以恳请可以满足,返回一个租约,还留4单租约。
然后再也要一个租约,因为租约不够,卡住到过期,还是没有能够满足,返回结果吧null(租约不足会阻塞到过,然后回null,不见面再接再厉弃来怪;如果未安装过时间,会同样阻塞)。

方说讲的锁都是公锁(fair)。 总ZooKeeper的角度看,
每个客户端都按照请求的顺序获得锁,不有非公平的抢占的图景。

世家而大家觉得这篇稿子针对性君发出协助,
又愿意打赏一些银两, 请拿起而的无绳电话机, 打开你的微信,
扫一扫下方二维码, 作为一个起气之程序员攻城狮,
我挺愿意受大家之支助…哈哈哈!!!

大抵并享锁对象 —Multi Shared Lock

Multi Shared Lock是一个锁之容器。 当调用acquire()
所有的缉都见面叫acquire(),如果要失败,所有的锁都见面为release。
同样调用release时具有的缉都于release(未果给忽略)。
基本上,它就是组锁的意味,在它点的乞求释放操作都见面传递让其富含的持有的锁。

第一涉及个别单近乎:

  • InterProcessMultiLock
  • InterProcessLock

她的构造函数需要包含的吊的聚合,或者千篇一律组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

新建一个InterProcessMultiLock, 包含一个复入锁和一个非重入锁。
调用acquire()继可见见线程同时具有了当下半单锁。
调用release()见到就有限独锁都吃释放了。

末尾再反复一差,
强烈推荐使用ConnectionStateListener监控连接的状态,当连接状态也LOST,锁将会晤掉。

 葡京在线开户 2

分布式计数器

顾名思义,计数器是为此来计数的,
利用ZooKeeper可以实现一个集群共享的计数器。
只要以相同之path就可赢得时的计数器值,
这是出于ZooKeeper的一致性保证的。Curator有个别单计数器,
一个凡故int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

分布式int计数器—SharedCount

斯看似使用int类型来计数。 主要干三单类似。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount意味着计数器,
可以吧其多一个SharedCountListener,当计数器改变时这个Listener可以监听到反之风波,而SharedCountReader得读取到最新的值,
包括字面值和带动版本信息的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

每当是例子中,我们用baseCount来监听计数值(addListener措施来补偿加SharedCountListener
)。 任意的SharedCount, 只要使用同样之path,都得以博这计数值。
然后我们利用5单线程为计数值长一个10因为内的随机数。相同的path的SharedCount对计数值进行更改,将见面回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

这边我们下trySetCount去装计数器。
首先单参数提供当前底VersionedValue,如果期间其它client更新了此计数值,
你的更新可能未成功,
但是此时你的client更新了新星的价,所以失败了公可尝尝重新另行新一不良。
setCount大凡挟持更新计数器的价

在意计数器必须start,使用完毕事后要调用close关闭它。

强烈推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

分布式long计数器—DistributedAtomicLong

复看一个Long类型的计数器。 除了计数的限制比较SharedCount很了外面,
它首先尝试采用乐观锁的方设置计数器,
如果不成事(比如中计数器已经深受其他client更新了),
它用InterProcessMutex计来更新计数值。

得由其的中间贯彻DistributedAtomicValue.trySet()中看出:

   AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有一致多样的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 增加一定的价值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须检查返回结果的succeeded(), 它代表者操作是否中标。
如果操作成, preValue()代表操作前的价,
postValue()意味着操作后底价值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

分布式队列

利用Curator也得以简化Ephemeral Node
(葡京在线开户现节点)的操作。Curator也供ZK Recipe的分布式队列实现。 利用ZK的
PERSISTENTS_EQUENTIAL节点,
可以保证放入到行列中的路是据顺序排队的。
如果单纯的买主自队列中取数据, 那么她是预先抱先有的,这吗是排的特点。
如果你严格要求顺序,你尽管的采用单一的买主,可以使Leader选举只给Leader作为唯一的消费者。

唯独, 根据Netflix的Curator作者所说,
ZooKeeper真心不合乎做Queue,或者说ZK没有落实一个吓之Queue,详细内容可以看
Tech Note
4,
原坐出五:

  1. ZK有1MB 的导限制。
    实践备受ZNode必须相对比较小,而班包含多的消息,非常之特别。
  2. 如果生广大节点,ZK启动时一定的减缓。 而使用queue会导致多ZNode.
    你得明白增大 initLimit 和 syncLimit.
  3. ZNode很十分之时光死不便清理。Netflix不得不创建了一个特地的程序召开这行。
  4. 当好大气之含有众多的子节点的ZNode时, ZK的特性变得不好
  5. ZK的数据库完全在内存中。 大量底Queue意味着会占用多底内存空间。

虽说, Curator还是创造了各种Queue的贯彻。
如果Queue的数据量不极端多,数据量不顶好的事态下,酌情考虑,还是得动用的。

分布式队列—DistributedQueue

DistributedQueue是绝家常的一模一样种植队列。 它设计以下四独八九不离十:

  • QueueBuilder – 创建行使用QueueBuilder,它呢是另外队列的缔造类
  • QueueConsumer – 队列中之消息消费者接口
  • QueueSerializer –
    队排消息序列化和反序列化接口,提供了针对性队列中之靶子的序列化和反序列化
  • DistributedQueue – 队列实现类

QueueConsumer是消费者,它可接收队列的多少。处理队列中之多寡的代码逻辑可以放在QueueConsumer.consumeMessage()中。

常规情况下先以消息于队列中移除,再付诸消费者花。但迅即是简单个步骤,不是原子的。可以调用Builder的lockPath()消费者加锁,当消费者花数据常常拥有锁,这样任何消费者莫可知消费之音。如果消费失败或者经过死掉,消息可以提交其他进程。这会带来或多或少属性的损失。最好要只是消费者模式应用队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了有限独分布式队列和片个买主,因为PATH是一样的,会是消费者抢占消费信息之气象。

带来Id的分布式队列—DistributedIdQueue

DistributedIdQueue和上面的序列类似,然而得吧队列中之各一个要素设置一个ID
可以通过ID把班中随心所欲的因素移除。 它事关几只类似:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

透过下方法创建:

builder.buildIdQueue()

放入元素时:

queue.put(aMessage, messageId);

移除元素时:

int numberRemoved = queue.remove(messageId);

于这例子中,
有些元素还未曾让消费者花前便变除了,这样顾客不见面接删除的音讯。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

先期级分布式队列—DistributedPriorityQueue

先行级列对队列中的因素以预级进行排序。 Priority越小,
元素越靠前, 越先叫消费掉
。 它事关下面几乎只类似:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

经过builder.buildPriorityQueue(minItemsBeforeRefresh)方法创建。
当优先级列得到元素增删消息不时,它见面半途而废处理时的素队列,然后刷新队列。minItemsBeforeRefresh指定刷新前时倒之阵的极其小数目。
主要安装你的次可以容忍的莫排序的极度小价。

放入队列时索要指定优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

有时你恐怕会见出错觉,优先级设置并没起效。那是为事先级是对于队列积压的因素而言,如果消费速度了不久有或出现在继一个元素入队操作前前一个素就为消费,这种情景下DistributedPriorityQueue会退化为DistributedQueue。

分布式延迟队列—DistributedDelayQueue

JDK中吗出DelayQueue,不了解你是不是熟悉。
DistributedDelayQueue也供了近乎的机能, 元素有只delay值,
消费者隔一段时间才会收元素。 涉及到下面四单近乎。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

通过下面的语句创建:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入元素时得以指定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch莫是去现在之一个时距离,
比如20毫秒,而是未来底一个时空戳,如 System.currentTimeMillis() + 10秒。
如果delayUntilEpoch的辰就仙逝,消息会立即让消费者收到。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

SimpleDistributedQueue

前面则实现了各种队列,但是若放在心上到没,这些队列并不曾兑现类似JDK一样的接口。
SimpleDistributedQueue供了同JDK基本一致的接口(但是尚未落实Queue接口)。
创建充分简单:

public SimpleDistributedQueue(CuratorFramework client,String path)

多元素:

public boolean offer(byte[] data) throws Exception

抹元素:

public byte[] take() throws Exception

除此以外还提供了外方法:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take方在成功返回之前会于卡住。
poll术在班列为空时直接回到null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


}

但实际发送了100条消息,消费了第一久后,后面的音信无法消费,目前无找到原因。查看转法定文档推荐的demo使用下几乎独Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

不过事实上行使发现还是存在消费阻塞问题。

分布式屏障—Barrier

分布式Barrier是这般一个类:
它会阻塞所有节点上之等历程,直到有一个给满足,
然后具备的节点继续拓展。

准赛马比赛被, 等赛马陆续来到起跑线前。
一名气叫下,所有的跑马都飞奔而出。

DistributedBarrier

DistributedBarrier类实现了栅栏的机能。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

先是你需要设置栅栏,它将阻塞在它点等待的线程:

setBarrier();

接下来要阻塞的线程调用方法等放行条件:

public void waitOnBarrier()

当极满足时,移除栅栏,所有等待的线程将继续执行:

removeBarrier();

那个处理 DistributedBarrier
会监控连接状态,当连接断掉时waitOnBarrier()方法会抛来好。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

本条事例创建了controlBarrier来装栅栏及移除栅栏。
我们创建了5只线程,在此Barrier上等待。
最后移除了栅栏后拥有的线程才继续执行。

如果你开始免设置栅栏,所有的线程就无见面死住。

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在盘算的开同收时同。当足够的历程进入到双栅栏时,进程始起算,
当计算好时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty大凡成员数,当enter()法给调用时,成员叫堵塞,直到有的积极分子还调用了enter()
leave()计吃调用时,它为不通调用线程,直到所有的积极分子还调用了leave()
就像百米赛跑比赛, 发令枪响,
所有的运动员开走,等具备的选手跑过终点线,比赛才了。

DistributedDoubleBarrier会监控连接状态,当连接断掉时enter()leave()方法会抛来大。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

参考资料:
《从PAXOS到ZOOKEEPER分布式一致性原理及履行》
《 跟着实例学习ZooKeeper的用法》博客系列

色仓库:
https://github.com/zjcscut/curator-seed
(其实本文的MD是带来导航目录[toc]的,比较便宜导航及每个章节,只是简书不支持,本文的MD原文放在项目之/resources/md目录下,有爱自取,文章之所以Typora编写,建议就此Typora打开)

End on 2017-5-13 13:10.
Help yourselves!
自是throwable,在广州斗争,白天上班,晚上及双休不定时加班,晚上悠闲坚持写下博客。
愿意自己的篇章会被您带来收获,共勉。