【干货】程序员常访问的海外技术交换网站集中

[TOC]

搞技术的,倘若想更高提高自我技术水平,芬兰语那关是逃不了的。

Zookeeper客户端Curator使用详解

                                                                       
   ——某位不愿表露姓名的四级loser

简介

Curator是Netflix公司开源的一套zookeeper客户端框架,解决了多如牛毛Zookeeper客户端极度底层的底细开发工作,包蕴连日来重连、反复注册Watcher和NodeExistsException格外等等。Patrixck
Hunt(Zookeeper)以一句“Guava is to Java that Curator to
Zookeeper”给Curator予高度评价。
引子和趣闻:
Zookeeper名字的原因是比较有趣的,下边的一些摘抄自《从PAXOS到ZOOKEEPER分布式一致性原理与履行》一书:
Zookeeper最早源点于雅虎的研商院的一个探讨小组。在当时,切磋人口发现,在雅虎内部很多大型的系统需求借助一个近乎的连串开展分布式协调,但是这几个系统往往存在分布式单点难点。所以雅虎的开发人员就准备开发一个通用的无单点难题的分布式协调框架。在立项初期,考虑到众多体系都是用动物的名字来命名的(例如盛名的Pig项目),雅虎的工程师希望给这几个种类也取一个动物的名字。时任商量院的首席数学家Raghu
Ramakrishnan开玩笑说:再这么下来,大家那时候就改成动物园了。此话一出,大家纷纭表示就叫动物园管理员吧——因为各样以动物命名的分布式组件放在一块儿,雅虎的万事分布式系统看上去似乎一个巨型的动物园了,而Zookeeper正好用来拓展分布式环境的调和——于是,Zookeeper的名字因此诞生了。

Curator无疑是Zookeeper客户端中的瑞士军刀,它译作”馆长”或者”管理者”,不精通是还是不是付出小组有意而为之,小编推断有可能这么命名的因由是申明Curator就是Zookeeper的馆长(脑洞有点大:Curator就是动物园的园长)。
Curator包括了多少个包:
curator-framework:对zookeeper的平底api的局地包裹
curator-client:提供一些客户端的操作,例如重试策略等
curator-recipes:包装了有些高等特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
Maven依赖(使用curator的版本:2.12.0,对应Zookeeper的本子为:3.4.x,倘若跨版本会有包容性难点,很有可能引致节点操作退步):

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

技术人士平日会在各类技术交流社区闲逛,我们相互学习、交换、分享、扶助。网络拉近了地球人的相距,让满世界的技术人士可以凑合在一起享受互换。当然因为多地点原因,常常最新最上流的技巧知识传播国内设有一定“时差”。本文将给大家享用技术人士经常访问的海外技术沟通社区网站。

Curator的基本Api

1.stackoverflow

创制会话

https://stackoverflow.com/

1.采取静态工程措施创造客户端

一个例证如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

newClient静态工厂方法包蕴多个主要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

众目睽睽,stackoverflow可以说是中外最为活跃的程序员技术问答调换社区。

2.选取Fluent风格的Api创设会话

宗旨参数变为流式设置,一个列子如下:

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

2.Google+

3.创设包蕴隔离命名空间的对话

为了落到实处分歧的Zookeeper业务之间的隔断,须求为每个事情分配一个独门的命名空间(NameSpace),即指定一个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(下边的例子)当客户端指定了独自命名空间为“/base”,那么该客户端对Zookeeper上的多寡节点的操作都是根据该目录进行的。通过安装Chroot可以将客户端应用与Zookeeper服务端的一课子树相对应,在多少个使用共用一个Zookeeper集群的风貌下,那对于贯彻差异接纳之间的交互隔离万分有意义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

https://plus.google.com/

起头客户端

当创制会话成功,得到client的实例然后可以直接调用其start( )方法:

client.start();

谷歌+是谷歌公司推出的交际网站,虽不是专为编程技术人员制作,但在此地您可以搜索并投入到很多技术社群,结交群内好友。

数据节点操作

3.DZone

创设数量节点

Zookeeper的节点成立方式:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带连串号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:临时并且带系列号

**创设一个节点,伊始内容为空 **

client.create().forPath("path");

专注:如若没有安装节点属性,节点创制方式默许为持久化节点,内容默许为空

始建一个节点,附带伊始化内容

client.create().forPath("path","init".getBytes());

开创一个节点,指定创立方式(临时节点),内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

创制一个节点,指定创造格局(临时节点),附带发轫化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

创制一个节点,指定创制格局(临时节点),附带起先化内容,并且自动递归创造父节点

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

本条creatingParentContainersIfNeeded()接口极度有用,因为相似情形开发人士在开创一个子节点必须认清它的父节点是不是留存,固然不存在直接创建会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator可以自动递归创立所有所需的父节点。

https://dzone.com/

剔除数据节点

去除一个节点

client.delete().forPath("path");

留意,此办法只可以去除叶子节点,否则会抛出相当。

删去一个节点,并且递归删除其具备的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

剔除一个节点,强制指定版本举办删减

client.delete().withVersion(10086).forPath("path");

除去一个节点,强制有限支撑删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是一个维持措施,只要客户端会话有效,那么Curator会在后台持续拓展删减操作,直到删除节点成功。

注意:上面的多少个流式接口是足以自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

DZone.com是世界上最大的在线社区之一,环球的开发技术人士经过共享文化来精晓最新的技术可行性,了然新技巧,方法和特级实践。

读取数据节点数据

读取一个节点的数码内容

client.getData().forPath("path");

专注,此格局返的重返值是byte[ ];

读取一个节点的数据内容,同时获获得该节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

4.Bytes

立异数据节点数据

更新一个节点的数额内容

client.setData().forPath("path","data".getBytes());

只顾:该接口会回到一个Stat实例

立异一个节点的数目内容,强制指定版本举行立异

client.setData().withVersion(10086).forPath("path","data".getBytes());

https://bytes.com/

反省节点是还是不是存在

client.checkExists().forPath("path");

只顾:该格局返回一个Stat实例,用于检查ZNode是还是不是存在的操作.
可以调用额外的办法(监控或者后台处理)并在最后调用forPath(
)指定要操作的ZNode

Bytes是面向开发人士和IT专业人士的沟通社区。涵盖了软件开发,Web开发,数据库开发+优化,数据库管理,系统管理,互联网管理等世界。在Bytes里你可以咨询、回答难点,并享受技术作品。

获得某个节点的所有子节点路径

client.getChildren().forPath("path");

注意:该方法的再次回到值为List<String>,获得ZNode的子节点Path列表。
可以调用额外的艺术(监控、后台处理仍然取得状态watch, background or get
stat) 并在结尾调用forPath()指定要操作的父ZNode

5.github

事务

CuratorFramework的实例包涵inTransaction(
)接口方法,调用此办法开启一个ZooKeeper事务. 可以复合create, setData,
check, and/or delete
等操作然后调用commit()作为一个原子操作提交。一个事例如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

https://github.com/

异步接口

上面提到的始建、删除、更新、读取等格局都是一头的,Curator提供异步接口,引入了BackgroundCallback接口用于拍卖异步接口调用之后服务端重回的结果新闻。BackgroundCallback接口中一个至关主要的回调值为Curator伊芙nt,里面包涵事件类型、响应吗和节点的详细新闻。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

一个异步创制节点的事例如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法不指定executor,那么会默许使用Curator的伊夫ntThread去开展异步处理。

可以核查代码、管理项目,与数百万用户一起交换、开发软件。

Curator食谱(高级特性)

唤醒:首先你必须添加curator-recipes看重,下文仅仅对recipes一些表征的应用举行表达和举例,不打算举办源码级其他切磋

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

根本提醒:强烈推荐使用ConnectionStateListener监控连接的情景,当连接情状为LOST,curator-recipes下的保有Api将会失效或者逾期,即使前面所有的事例都尚未运用到ConnectionStateListener。

6.Hacker News

缓存

Zookeeper原生协助通过注册沃特cher来举行事件监听,可是开发者须要反复注册(沃特cher只可以单次注册单次使用)。Cache是Curator中对事件监听的包装,可以用作是对事件监听的当地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三种沃特cher(Cache)来监听结点的变迁。

https://news.ycombinator.com/news

Path Cache

Path Cache用来监督一个ZNode的子节点. 当一个子节点扩展, 更新,删除时,
Path Cache会改变它的事态, 会包涵最新的子节点,
子节点的多寡和状态,而事态的更变将透过PathChildrenCacheListener公告。

实质上行使时会涉及到多个类:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

因此下边的构造函数创立Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想利用cache,必须调用它的start方式,使用完后调用close办法。
可以设置StartMode来完结启动的情势,

StartMode有下边二种:

  1. NORMAL:正常开始化。
  2. BUILD_INITIAL_CACHE:在调用start()前边会调用rebuild()
  3. POST_INITIALIZED_EVENT:
    当Cache初叶化数据后发送一个PathChildrenCache伊夫nt.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)能够追加listener监听缓存的变型。

getCurrentData()方法重临一个List<ChildData>对象,可以遍历所有的子节点。

安装/更新、移除其实是行使client (CuratorFramework)来操作,
不通过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:要是new PathChildrenCache(client, PATH,
true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将再次来到null,cache将不会缓存节点数据。

注意:以身作则中的Thread.sleep(10)可以注释掉,不过注释后事件监听的触发次数会不全,那也许与PathCache的贯彻原理有关,无法太过频仍的触及事件!

HackerNews也是深受广大技术人员欢迎的技术分享沟通网站。即便HackerNews被广大人吐槽网站界面很low,但终究访客的须要就是那般的分明,不难。所以部分时候浏览网站内容,能提供一个RSS订阅地址足矣。

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某一个一定的节点。它涉及到下边的多个类:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:动用cache,照旧要调用它的start()艺术,使用完后调用close()方法。

getCurrentData()将得到节点当前的景况,通过它的景况可以赢得当前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:演示中的Thread.sleep(10)可以注释,可是注释后事件监听的触发次数会不全,那或许与NodeCache的兑现原理有关,无法太过数十次的触发事件!

注意:NodeCache只可以监听一个节点的气象变化。

7.hongkiat

Tree Cache

Tree
Cache可以监控所有树上的有所节点,类似于PathCache和NodeCache的结合,主要涉嫌到上边七个类:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCache伊夫nt – 触发的风云类
  • ChildData – 节点数据

public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:在此示例中从不使用Thread.sleep(10),可是事件触发次数也是常规的。

注意:TreeCache在初阶化(调用start()措施)的时候会回调TreeCacheListener实例一个事TreeCacheEvent,而回调的TreeCache伊夫nt对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有可能引致空指针极度,那里应该主动处理并幸免那种场馆。

https://www.hongkiat.com/blog/

Leader选举

在分布式总计中, leader elections是很重大的一个意义,
这几个选举进度是那样子的: 指派一个经过作为协会者,将职分分发给各节点。
在职分先导前,
哪个节点都不知底何人是leader(领导者)或者coordinator(协调者).
当选举算法开首实施后, 每个节点最后会得到一个唯一的节点作为职分leader.
除此之外,
选举还时时会爆发在leader意外宕机的情景下,新的leader要被选举出来。

在zookeeper集群中,leader负责写操作,然后经过Zab商讨落到实处follower的一头,leader或者follower都足以拍卖读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前端是颇具存活的客户端不间断的轮流做Leader,日照社会。后者是只要选举出Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权。某党?

hongkiat是与技术、设计领域有关的站点之一,我们可以在此间分享技术文章。

LeaderLatch

LeaderLatch有四个构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

假若启动,LeaderLatch会和别的使用相同latch
path的其他LeaderLatch交涉,然后中间一个最后会被推选为leader,能够经过hasLeadership形式查看LeaderLatch实例是不是leader:

leaderLatch.hasLeadership( ); //重临true表明当前实例是leader

恍如JDK的CountDownLatch,
LeaderLatch在呼吁成为leadership会block(阻塞),一旦不选用LeaderLatch了,必须调用close主意。
如若它是leader,会释放leadership, 此外的加入者将会选举一个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

可怜处理:
LeaderLatch实例能够追加ConnectionStateListener来监听互连网连接难点。 当
SUSPENDED 或 LOST 时,
leader不再认为自己仍旧leader。当LOST后总是重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后再一次成立一个。LeaderLatch用户必须考虑导致leadership丢失的总是难题。
强烈推荐你选用ConnectionStateListener。

一个LeaderLatch的选拔例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

可以添加test module的借助方便进行测试,不须求启动真实的zookeeper服务端:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

率先大家创设了10个LeaderLatch,启动后它们中的一个会被公推为leader。
因为选举会开销一些光阴,start后并不能够立时就收获leader。
通过hasLeadership翻开自己是还是不是是leader, 即使是的话再次来到true。
能够通过.getLeader().getId()可以收获当前的leader的ID。
只好通过close获释当前的领导权。
await是一个梗阻方法, 尝试获取leader地位,可是未必能上位。

8.reddit

LeaderSelector

LeaderSelector使用的时候根本涉及上边几个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

宗旨类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦启动,当实例取得领导权时您的listener的takeLeadership()方法被调用。而takeLeadership()方法唯有领导权被假释时才回到。
当你不再选择LeaderSelector实例时,应该调用它的close方法。

老大处理
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接处境的更改。如若实例成为leader,
它应该响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现时,
实例必须假定在再次连接成功以前它恐怕不再是leader了。 借使LOST状态出现,
实例不再是leader, takeLeadership方法重回。

重要: 推荐处理格局是当接过SUSPENDED 或
LOST时抛出CancelLeadershipException很是.。那会导致LeaderSelector实例中断并打消执行takeLeadership方法的非常.。那丰盛重大,
你必须考虑增添LeaderSelectorListenerAdapter.
LeaderSelectorListenerAdapter提供了推荐的处理逻辑。

上面的一个事例摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

你可以在takeLeadership进行职务的分配等等,并且永不回来,若是你想要要此实例一向是leader的话可以加一个死循环。调用
leaderSelector.autoRequeue();管教在此实例释放领导权之后还可能获得领导权。
在那边大家运用AtomicInteger来记录此client获得领导权的次数, 它是”fair”,
每个client有同样的空子取得领导权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

比较能够,LeaderLatch必须调用close()格局才会放出领导权,而对此LeaderSelector,通过LeaderSelectorListener可以对领导权进行控制,
在适度的时候释放领导权,那样各类节点都有可能赢得领导权。从而,LeaderSelector具有更好的灵活性和可控性,提议有LeaderElection应用场景下优先利用LeaderSelector。

https://www.reddit.com/r/programming/

分布式锁

提醒:

1.引进应用ConnectionStateListener监控连接的情事,因为当连接LOST时你不再具有锁

2.分布式的锁全局同步,
那意味着任何一个日子点不会有多少个客户端都拥有相同的锁。

reddit是一个大型的交际信息媒体网站,涵盖放模块格外多,所以自己那里享用的地址是编程技术模块。用户可以并行分享,交换、学习。

可重入共享锁—Shared Reentrant Lock

Shared意味着锁是大局可知的, 客户端都足以请求锁。
Reentrant和JDK的ReentrantLock类似,即可重入,
意味着同一个客户端在享有锁的同时,可以屡屡得到,不会被卡住。
它是由类InterProcessMutex来兑现。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()赢得锁,并提供超时机制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()方式释放锁。 InterProcessMutex 实例可以引用。

Revoking ZooKeeper recipes wiki定义了可商讨的吊销机制。
为了废除mutex, 调用下边的方法:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

假如您请求取消当前的锁,
调用attemptRevoke()主意,注意锁释放时RevocationListener将会回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

二次提醒:错误处理
仍然强烈推荐你利用ConnectionStateListener拍卖连接景况的变更。
当连接LOST时您不再持有锁。

首先让我们创造一个效仿的共享资源,
那些资源期望只可以单线程的拜访,否则会有出现难题。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

接下来制造一个InterProcessMutexDemo类, 它承担请求锁,
使用资源,释放锁那样一个完好的造访进度。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也很简短,生成10个client, 每个client重复执行10次
请求锁–访问资源–释放锁的长河。每个client都在独立的线程中。
结果可以看看,锁是自由的被每个实例排他性的行使。

既然如此是可选取的,你可以在一个线程中频仍调用acquire(),在线程拥有锁时它连接回到true。

你不应有在多个线程中用同一个InterProcessMutex
你可以在各样线程中都生成一个新的InterProcessMutex实例,它们的path都一致,那样它们可以共享同一个锁。

9.InfoQ

不足重入共享锁—Shared Lock

其一锁和上面的InterProcessMutex相比,就是少了Reentrant的职能,也就表示它无法在同一个线程中重入。那一个类是InterProcessSemaphoreMutex,使用办法和InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运转后发觉,有且只有一个client成功得到第三个锁(第三个acquire()办法重回true),然后它和谐过不去在第三个acquire()艺术,获取首个锁超时;别的具有的客户端都阻塞在首先个acquire()主意超时并且抛出极度。

诸如此类也就印证了InterProcessSemaphoreMutex心想事成的锁是不可重入的。

https://www.infoq.com/

可重入读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。一个读写锁管理一对相关的锁。一个担当读操作,其它一个承担写操作。读操作在写锁没被运用时可同时由八个经过使用,而写锁在拔取时不允许读(阻塞)。

此锁是可重入的。一个有所写锁的线程可重入读锁,但是读锁却不可能跻身写锁。那也代表写锁能够降级成读锁,
比如请求写锁 —>请求读锁—>释放读锁
—->释放写锁
。从读锁升级成写锁是至极的。

可重入读写锁主要由多个类完结:InterProcessReadWriteLockInterProcessMutex。使用时首先创设一个InterProcessReadWriteLock实例,然后再根据你的急需获得读锁或者写锁,读写锁的门类是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

InfoQ是一家满世界性在线新闻/社区网站,百折不挠促进软件开发领域知识与立异的扩散要旨。当然InfoQ也有汉语站。

信号量—Shared Semaphore

一个计数的信号量类似JDK的Semaphore。
JDK中Semaphore维护的一组认同(permits),而Curator中称之为租约(Lease)。
有两种格局得以控制semaphore的最大租约数。第一种方法是用户给定path并且指定最大LeaseSize。第二种办法用户给定path并且采纳SharedCountReader类。若果不利用SharedCountReader,
必须保险拥有实例在多进度中选择相同的(最大)租约数量,否则有可能出现A进度中的实例持有最大租约数量为10,然则在B进度中有着的最大租约数量为20,此时租约的意义就失效了。

本次调用acquire()会回到一个租约对象。
客户端必须在finally中close那一个租约对象,否则那么些租约会丢失掉。 不过,
不过,若是客户端session由于某种原因比如crash丢掉,
那么这个客户端持有的租约会自动close,
那样任何客户端能够继续运用那几个租约。 租约还足以透过上边的点子返还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

专注你能够一遍性请求七个租约,假设Semaphore当前的租约不够,则呼吁线程会被打断。
同时还提供了晚点的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的最主要类包涵上面多少个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

率先我们先得到了5个租约, 最终大家把它还给了semaphore。
接着请求了一个租约,因为semaphore还有5个租约,所以恳请可以满足,重返一个租约,还剩4个租约。
然后再请求一个租约,因为租约不够,卡住到过期,依然没能知足,重返结果为null(租约不足会阻塞到过期,然后回来null,不会主动抛出很是;如若不安装超时时间,会一如既往阻塞)。

上边说讲的锁都是正义锁(fair)。 总ZooKeeper的角度看,
每个客户端都按照请求的逐一获得锁,不设有非公平的侵夺的情形。

10.daniweb

多共享锁对象 —Multi Shared Lock

Multi Shared Lock是一个锁的容器。 当调用acquire()
所有的锁都会被acquire(),假如请求败北,所有的锁都会被release。
同样调用release时拥有的锁都被release(战败被忽视)。
基本上,它就是组锁的代表,在它下边的请求释放操作都会传递给它含有的拥有的锁。

关键涉及三个类:

  • InterProcessMultiLock
  • InterProcessLock

它的构造函数须要包蕴的锁的会聚,或者一组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

新建一个InterProcessMultiLock, 包括一个重入锁和一个非重入锁。
调用acquire()后方可看到线程同时负有了那三个锁。
调用release()探望那五个锁都被保释了。

最后再反复几回,
强烈推荐使用ConnectionStateListener监控连接的场馆,当连接意况为LOST,锁将会丢掉。

https://www.daniweb.com/

分布式计数器

顾名思义,计数器是用来计数的,
利用ZooKeeper可以达成一个集群共享的计数器。
只要利用同样的path就足以获取最新的计数器值,
那是由ZooKeeper的一致性有限帮忙的。Curator有五个计数器,
一个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

daniweb为普遍开发技术人士提供了一个规范的就学、分享文化的社区平台。包蕴软件、硬件、程序设计、UI/UX设计等领域。

分布式int计数器—SharedCount

这几个类应用int类型来计数。 紧要涉嫌多个类。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount表示计数器,
可以为它伸张一个SharedCountListener,当计数器改变时此Listener可以监听到改变的风浪,而SharedCountReader可以读取到最新的值,
包蕴字面值和带版本音讯的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在这一个事例中,大家使用baseCount来监听计数值(addListener方式来添加SharedCountListener
)。 任意的SharedCount, 只要使用相同的path,都得以博得这几个计数值。
然后我们应用5个线程为计数值增添一个10以内的随机数。相同的path的SharedCount对计数值进行改动,将会回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

那边我们使用trySetCount去设置计数器。
率先个参数提供当前的VersionedValue,如果时期此外client更新了此计数值,
你的换代可能不成事,
不过此时你的client更新了时尚的值,所以败北了您可以品味再更新三次。
setCount是强制更新计数器的值

专注计数器必须start,使用完之后必须调用close关闭它。

强烈推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

11.sitepoint

分布式long计数器—DistributedAtomicLong

再看一个Long类型的计数器。 除了计数的限定比SharedCount大了之外,
它首先尝试使用乐观锁的法门设置计数器,
倘诺不成事(比如时期计数器已经被其余client更新了),
它利用InterProcessMutex艺术来更新计数值。

可以从它的里边贯彻DistributedAtomicValue.trySet()中看出:

   AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有一各个的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 增添一定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须反省重临结果的succeeded(), 它象征此操作是或不是中标。
借使操作成功, preValue()意味着操作前的值,
postValue()代表操作后的值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

https://www.sitepoint.com

分布式队列

采取Curator也得以简化Ephemeral Node
(暂时节点)的操作。Curator也提供ZK Recipe的分布式队列完成。 利用ZK的
PERSISTENTS_EQUENTIAL节点,
能够确保放入到行列中的项目是依据顺序排队的。
如若单纯的买主从队列中取数据, 那么它是先入先出的,那也是队列的风味。
要是你严刻要求顺序,你就的应用单一的主顾,可以应用Leader选举只让Leader作为唯一的买主。

但是, 根据Netflix的Curator作者所说,
ZooKeeper真心不合乎做Queue,或者说ZK没有落到实处一个好的Queue,详细内容可以看
Tech Note
4

原因有五:

  1. ZK有1MB 的传导限制。
    实践中ZNode必须相对较小,而队列包蕴众多的信息,十分的大。
  2. 假诺有不少节点,ZK启动时非凡的慢。 而使用queue会导致众多ZNode.
    你要求明确增大 initLimit 和 syncLimit.
  3. ZNode很大的时候很难清理。Netflix不得不创立了一个特其他程序做那事。
  4. 当很大气的带有众多的子节点的ZNode时, ZK的习性变得不得了
  5. ZK的数据库完全放在内存中。 大批量的Queue意味着会占有很多的内存空间。

即使, Curator仍然创立了各类Queue的兑现。
如若Queue的数据量不太多,数据量不太大的动静下,酌情考虑,如故得以拔取的。

SitePoint是一个面向WEB开发领域的调换社区。

分布式队列—DistributedQueue

DistributedQueue是最平日的一种队列。 它布署以下三个类:

  • QueueBuilder – 创设队列使用QueueBuilder,它也是其余队列的创办类
  • QueueConsumer – 队列中的新闻消费者接口
  • QueueSerializer –
    队列信息种类化和反体系化接口,提供了对队列中的对象的连串化和反系列化
  • DistributedQueue – 队列已毕类

QueueConsumer是主顾,它可以接收队列的数额。处理队列中的数据的代码逻辑能够置身QueueConsumer.consumeMessage()中。

例行情况下先将信息从队列中移除,再提交消费者消费。但那是五个步骤,不是原子的。能够调用Builder的lockPath()消费者加锁,当顾客消费数量时持有锁,那样任何消费者不可能消费此信息。借使消费失利或者经过死掉,新闻可以交到其余进度。这会带来或多或少性质的损失。最好或者单消费者方式采纳队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了四个分布式队列和五个买主,因为PATH是同等的,会存在消费者抢占消费音讯的场所。

12.tutorialspoint

带Id的分布式队列—DistributedIdQueue

DistributedIdQueue和地点的系列类似,只是足以为队列中的每一个要素设置一个ID
可以通过ID把队列中擅自的要素移除。 它事关多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

透过上面方法创立:

builder.buildIdQueue()

放入元素时:

queue.put(aMessage, messageId);

移除元素时:

int numberRemoved = queue.remove(messageId);

在那一个事例中,
有些元素还并未被消费者消费前就移除了,那样顾客不会收下删除的新闻。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

https://www.tutorialspoint.com/

先期级分布式队列—DistributedPriorityQueue

事先级队列对队列中的元素依照优先级举办排序。 Priority越小,
元素越靠前, 越先被消费掉
。 它事关上边多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

由此builder.buildPriorityQueue(minItemsBeforeRefresh)方法创建。
当优先级队列获得元素增删信息时,它会暂停处理当下的元素队列,然后刷新队列。minItemsBeforeRefresh指定刷新前当前移动的队列的细小数量。
主要安装你的程序可以忍受的不排序的细微值。

放入队列时索要指定优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

奇迹你恐怕会有错觉,优先级设置并不曾起效。那是因为事先级是对此队列积压的元素而言,倘若消费速度过快有可能出现在后一个要素入队操作从前前一个要素已经被消费,那种情景下DistributedPriorityQueue会退化为DistributedQueue。

tutorialspoint为大面积还学习的恋人提供了累累在线免费课程内容。用户也足以团结编排分享新的知识库。

分布式延迟队列—DistributedDelayQueue

JDK中也有DelayQueue,不了解您是还是不是熟稔。
DistributedDelayQueue也提供了接近的作用, 元素有个delay值,
消费者隔一段时间才能接受元素。 涉及到上边三个类。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

通过下边的语句成立:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入元素时方可指定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是离现在的一个年华距离,
比如20微秒,而是未来的一个岁月戳,如 System.current提姆eMillis() + 10秒。
即使delayUntilEpoch的年华已经归西,音信会立时被消费者收到。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

葡京在线开户,13.google developers

SimpleDistributedQueue

后面固然完结了各样队列,可是你放在心上到没有,这一个队列并从未落实类似JDK一样的接口。
SimpleDistributedQueue提供了和JDK基本一致的接口(不过从未达成Queue接口)。
创立很简单:

public SimpleDistributedQueue(CuratorFramework client,String path)

增添元素:

public boolean offer(byte[] data) throws Exception

剔除元素:

public byte[] take() throws Exception

除此以外还提供了任何格局:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take办法在功成名就再次来到在此以前会被打断。
poll措施在队列为空时直接回到null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


}

而是实际上发送了100条音信,消费完第一条之后,后边的新闻不可以消费,如今没找到原因。查看一下合法文档推荐的demo使用下边多少个Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

唯独其实使用发现依然存在消费阻塞难点。

https://developers.google.com/

分布式屏障—Barrier

分布式Barrier是这么一个类:
它会阻塞所有节点上的等候进程,直到某一个被满意,
然后所有的节点继续拓展。

比如说赛马竞技中, 等赛马陆续赶来起跑线前。
一声令下,所有的赛马都飞奔而出。

google提供的开发技术、产品和种类资源交换社区,无论你的出品是针对性国内依旧面向国际市场,
您都可以在此处找到最相关的 谷歌(Google) 产品和技艺资源。

DistributedBarrier

DistributedBarrier类完毕了栅栏的法力。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

首先你需求设置栅栏,它将封堵在它上边等待的线程:

setBarrier();

下一场须要阻塞的线程调用方法等待放行条件:

public void waitOnBarrier()

当规则满意时,移除栅栏,所有等待的线程将继续执行:

removeBarrier();

不行处理 DistributedBarrier
会监控连接情状,当连接断掉时waitOnBarrier()方法会抛出极度。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

其一例子创建了controlBarrier来安装栅栏和移除栅栏。
大家创制了5个线程,在此巴里r上等待。
最终移除栅栏后有着的线程才继续执行。

即使你从头不安装栅栏,所有的线程就不会阻塞住。

14.DEV

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在计算的起来和终止时协同。当足够的进度进入到双栅栏时,进程初步推测,
当计算完结时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成员数量,当enter()措施被调用时,成员被封堵,直到所有的积极分子都调用了enter()
leave()办法被调用时,它也打断调用线程,直到所有的积极分子都调用了leave()
就好像百米赛跑竞技, 发令枪响,
所有的运动员早先跑,等富有的选手跑过终点线,竞赛才为止。

DistributedDoubleBarrier会监控连接情形,当连接断掉时enter()leave()方法会抛出分外。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

参考资料:
《从PAXOS到ZOOKEEPER分布式一致性原理与实践》
《 跟着实例学习ZooKeeper的用法》博客连串

品类仓库:
https://github.com/zjcscut/curator-seed
(其实本文的MD是带导航目录[toc]的,相比便于导航到种种章节,只是简书不辅助,本文的MD原文放在项目标/resources/md目录下,有爱自取,小说用Typora编写,指出用Typora打开)

End on 2017-5-13 13:10.
Help yourselves!
本人是throwable,在都柏林努力,白天上班,清晨和双休不定时加班,早晨空余持之以恒写下博客。
指望我的文章可以给你带来收获,共勉。

https://dev.to/

那是二〇一六年创建的一个年轻平台,支持广大技术开发人士相互分享、调换。

15.codeproject

https://www.codeproject.com/

CodeProject为开发人员提供有关技能资源,并帮忙大家探听、明白最新的技能。

16.zentaoPM

http://zentao.pm

禅道项目管理软件是研发公司必备的序列管理利器。并逐步受到普遍国外用户的关怀。

正文和豪门一同享受了过多海外技术沟通门户社区,我们只要还有更加多好高的站点,欢迎推荐分享。

万一地方的某些网站由于一些原因不能开拓的,就无须出去说您是搞技术的了~