【Remoting】.Net remoting方法实现简单的在线升级(下篇:重启exe)

   
通过上篇有关Cluster-Singleton的介绍,我们询问了Akka为分布式程序提供的编程支持:基于消息令的运算模式特别契合分布式程序编程,我们不需要特地的竭力,只待按照一般的Actor编程方式就得实现集群分布式程序了。Cluster-Singleton可以包不管集群节点出了另问题,只要集众多中还有节点在线,都可以穿梭的安运算。Cluster-Singleton这种模式保证了某种Actor的唯一实例可以安全稳定地于集群环境下运作。还有雷同种情况就是要出成千上万特地占用资源的Actor需要同时运转,而这些Actor同时占用的资源远远超过同样尊服务器的容量,如此我们得把这些Actor分布到多玉服务器上,或者是一个由多台服务器组成的集群环境,这时就得Cluster-Sharding模式来救助缓解这样的问题了。

一、前言

自身管经过下Cluster-Sharding后及的组成部分目的和大家分享一下,大家共同来分析分析到底这些上的靶子里是不是包括了Actor在集群节点内的分布:

     及篇用了.Net
Remoting技术解决了本地与服务器版本对比,并下载更新包之进程。

首先自己有只Actor,它的号是一个自编码,由Cluster-Sharding在汇众多被有节点上构建。由于当一个集群环境里用是Actor到底在谁节点上,具体地址是什么我还无理解,我单独需要为此者自编码就得跟它们关系。如果自己有好多从编码的损耗资源的Actor,我可以通过由编码中之分片(shard)编号来指定在其他的分片(shard)里构建这些Actor。Akka-Cluster还好因总体集众多被节点的增减按时下集群节点情况开展分片在集群节点调动来更配载(rebalance),包括在一些节点因故脱离集群时将节点上的装有Actor在任何在线节点上再也构建。这样看来,这个Actor的自编码应该是Cluster-Sharding的应用核心元素了。按规矩我们要用例子来示范Cluster-Sharding的采用。我们需要分片(sharding)的Actor就是前面几篇讨论里关系的Calculator:

      本篇主要是行使Process,来落实又开程序的长河。

package clustersharding.entity

import akka.actor._
import akka.cluster._
import akka.persistence._
import scala.concurrent.duration._
import akka.cluster.sharding._

object Calculator {
  sealed trait Command
  case class Num(d: Double) extends Command
  case class Add(d: Double) extends Command
  case class Sub(d: Double) extends Command
  case class Mul(d: Double) extends Command
  case class Div(d: Double) extends Command
  case object ShowResult extends Command



  sealed trait Event
  case class SetResult(d: Any) extends Event

  def getResult(res: Double, cmd: Command) = cmd match {
    case Num(x) => x
    case Add(x) => res + x
    case Sub(x) => res - x
    case Mul(x) => res * x
    case Div(x) => {
      val _ = res.toInt / x.toInt //yield ArithmeticException when /0.00
      res / x
    }
    case _ => new ArithmeticException("Invalid Operation!")
  }

  case class State(result: Double) {

    def updateState(evt: Event): State = evt match {
      case SetResult(n) => copy(result = n.asInstanceOf[Double])
    }
  }

  case object Disconnect extends Command    //exit cluster

  def props = Props(new Calcultor)

}

class Calcultor extends PersistentActor with ActorLogging {
  import Calculator._
  val cluster = Cluster(context.system)

  var state: State = State(0)

  override def persistenceId: String = self.path.parent.name+"-"+self.path.name

  override def receiveRecover: Receive = {
    case evt: Event => state = state.updateState(evt)
    case SnapshotOffer(_,st: State) => state = state.copy(result =  st.result)
  }

  override def receiveCommand: Receive = {
    case Num(n) => persist(SetResult(getResult(state.result,Num(n))))(evt => state = state.updateState(evt))
    case Add(n) => persist(SetResult(getResult(state.result,Add(n))))(evt => state = state.updateState(evt))
    case Sub(n) => persist(SetResult(getResult(state.result,Sub(n))))(evt => state = state.updateState(evt))
    case Mul(n) => persist(SetResult(getResult(state.result,Mul(n))))(evt => state = state.updateState(evt))
    case Div(n) => persist(SetResult(getResult(state.result,Div(n))))(evt => state = state.updateState(evt))
    case ShowResult => log.info(s"Result on ${cluster.selfAddress.hostPort} is: ${state.result}")
    case Disconnect =>
      log.info(s"${cluster.selfAddress} is leaving cluster!!!")
      cluster.leave (cluster.selfAddress)

  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting calculator: ${reason.getMessage}")
    super.preRestart(reason, message)
  }
}

class CalcSupervisor extends Actor {
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: ArithmeticException => SupervisorStrategy.Resume
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
      decider.orElse(SupervisorStrategy.defaultDecider)
    }
  val calcActor = context.actorOf(Calculator.props,"calculator")

  override def receive: Receive = {
    case msg@ _ => calcActor.forward(msg)
  }

}

      气象要:

咱们看出:Calculator是一个习以为常的PersisitentActor,内部状态好兑现持久化,Actor重启时可以回复状态。CalcSupervisor是Calculator的监管,这样做是以兑现新的监管策略SupervisorStrategy。

      Revit2016正要加载某dll,其版本为1.0.0.0。服务器的换代dll版本为1.0.0.10。

Calculator就是咱准备集群分片(sharding)的靶子enitity。一栽Actor的分片是由此Akka的Cluster-Sharding的ClusterSharding.start方法以集聚众多被构建的。我们用在备以承载分片的节点上运行此法子来部署分片:

     
下充斥了晚,Revit2016停歇,旧dll删除,新dll复制到旧dll的大街小巷路径,重启Revit2016。

/**
   * Register a named entity type by defining the [[akka.actor.Props]] of the entity actor and
   * functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor
   * for this type can later be retrieved with the [[#shardRegion]] method.
   *
   * The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]]
   * is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`.
   *
   * Some settings can be configured as described in the `akka.cluster.sharding` section
   * of the `reference.conf`.
   *
   * @param typeName the name of the entity type
   * @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion`
   * @param settings configuration settings, see [[ClusterShardingSettings]]
   * @param extractEntityId partial function to extract the entity id and the message to send to the
   *   entity from the incoming message, if the partial function does not match the message will
   *   be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
   * @param extractShardId function to determine the shard id for an incoming message, only messages
   *   that passed the `extractEntityId` will be used
   * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
   */
  def start(
    typeName:        String,
    entityProps:     Props,
    settings:        ClusterShardingSettings,
    extractEntityId: ShardRegion.ExtractEntityId,
    extractShardId:  ShardRegion.ExtractShardId): ActorRef = {

    val allocationStrategy = new LeastShardAllocationStrategy(
      settings.tuningParameters.leastShardAllocationRebalanceThreshold,
      settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance)

    start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill)
  }

二、代码

start返回了ShardRegion,是只ActorRef类型。ShardRegion是一个特种的Actor,负责管理可能多个分片(shard)内称为Entity的Actor实例。这些分片可能是布于不同之集群节点上的,外界通过ShardRegion与该辖下Entities沟通。从start函数参数entityProps我们来看:每个分片中单独可能一个色的Actor;具体的Entity实例是出于另外一个内部Actor即shard构建的,shard可以以一个分片中构建多单Entity实例。多shard多entity的特征可从extractShardId,extractEntityId这有限只措施吃收获有信。我们说过Actor自编码即entity-id是Cluster-Sharding的骨干元素。在entity-id这个自编码中还蕴含了shard-id,所以用户可经过entity-id的编码规则来统筹总体分片系统包括每个ShardRegion下shard和entity的数据。当ShardRegion得到一个entity-id后,首先从中抽取shard-id,如果shard-id在集聚众多中无在的言辞就是仍集群各节点负载情况于里边一个节点上构建新的shard;然后还就此entity-id在shard-id分片中查找entity,如果无存在即构建一个新的entity实例。整个shard和entity的构建过程还是经用户提供的函数extractShardId和extractEntityId实现之,Cluster-Sharding就是由此就点儿独函数按用户之要求来构建与用shard和entity的。这个自编码无需以一定之一一,只待保证唯一性。下面是一个编码例子:

      在上篇最终一截代码的79—80推行之前插入如下代码:

object CalculatorShard {
  import Calculator._

  case class CalcCommands(eid: String, msg: Command)  //user should use it to talk to shardregion
  val shardName = "calcShard"
  val getEntityId: ShardRegion.ExtractEntityId = {
    case CalcCommands(id,msg) => (id,msg)
  }
  val getShardId: ShardRegion.ExtractShardId = {
    case CalcCommands(id,_) => id.head.toString
  }
  def entityProps = Props(new CalcSupervisor)
}
                bgk_Update.ReportProgress(100, "正在更新");              
                Thread t = new Thread(new ThreadStart(Update));
                t.Start();

用户是故CalcCommands与ShardRegion沟通的。这是一个特地为跟分片系统关系要如的包嵌消息类型,包嵌的信息里除Calculator正常支持的Command消息他,还包了靶Entity实例的号码eid。这个eid的首先单字节代表shard-id,这样咱们得一直指定目标entity所于分片或者随便任选一个shard-id如:Random.NextInt(9).toString。由于每个分片只含一种类型的Actor,不同之entity-id代表多独同类Actor实例的又有,就比如前议论的Router一样:所有实例针对不同的输入进行同样效果的演算处理。一般的话用户会经过某种算法任意产生entity-id,希望会不负众望每分片中entity的均匀布局,Cluster-Sharding可以根据现实的集群负载情况自行调整分片在集群节点层面达到的安排。

       执行Update函数:

下面的代码示范了什么以一个集群节点上配备分片:

        public void Update()
        {
            string exepath = string.Empty;
            Process[] ps = Process.GetProcessesByName("Revit");
            foreach (Process p in ps)
            {
                exepath = p.MainModule.FileName.ToString();  //获得该进程的exe路径
                p.Kill();                                    //关闭该进程
            }
            foreach (var module in serverupdatefiles.Keys)  //删除原有的module
            {
                System.Threading.Thread.Sleep(1 * 1000);    //做个延时,因为进程响应快慢问题,可能会导致报错如:拒绝访问...文件或者尚未找到该应用路径
                System.IO.File.Delete(Path.Combine(Config.Dir, module + ".dll")); //将新文件复制到旧文件的文件夹内
            }
            CopyDir(Config.TempDir, Config.Dir);

            System.Diagnostics.Process.Start(exepath);//重启该exe
        }
package clustersharding.shard
import akka.persistence.journal.leveldb._
import akka.actor._
import akka.cluster.sharding._
import com.typesafe.config.ConfigFactory
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern._
import clustersharding.entity.CalculatorShard

object CalcShards {
  def create(port: Int) = {
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${port}")
      .withFallback(ConfigFactory.load("sharding"))
    // Create an Akka system
    val system = ActorSystem("ShardingSystem", config)

    startupSharding(port,system)

  }

  def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = {
    // Start the shared journal one one node (don't crash this SPOF)
    // This will not be needed with a distributed journal
    if (startStore)
      system.actorOf(Props[SharedLeveldbStore], "store")
    // register the shared journal
    import system.dispatcher
    implicit val timeout = Timeout(15.seconds)
    val f = (system.actorSelection(path) ? Identify(None))
    f.onSuccess {
      case ActorIdentity(_, Some(ref)) =>
        SharedLeveldbJournal.setStore(ref, system)
      case _ =>
        system.log.error("Shared journal not started at {}", path)
        system.terminate()
    }
    f.onFailure {
      case _ =>
        system.log.error("Lookup of shared journal at {} timed out", path)
        system.terminate()
    }
  }

  def startupSharding(port: Int, system: ActorSystem) = {

    startupSharedJournal(system, startStore = (port == 2551), path =
      ActorPath.fromString("akka.tcp://ShardingSystem@127.0.0.1:2551/user/store"))

    ClusterSharding(system).start(
      typeName = CalculatorShard.shardName,
      entityProps = CalculatorShard.entityProps,
      settings = ClusterShardingSettings(system),
      extractEntityId = CalculatorShard.getEntityId,
      extractShardId = CalculatorShard.getShardId
    )

  }

}

         复制某文件夹的内容到其它一个文本夹内:

具体的安排代码在startupSharding方法里。下面这段代码示范了怎样行使分片里之entity:

public static void CopyDir(string srcPath, string aimPath)
        {
            try
            {
                // 检查目标目录是否以目录分割字符结束如果不是则添加之
                if (aimPath[aimPath.Length - 1] != Path.DirectorySeparatorChar)
                    aimPath += Path.DirectorySeparatorChar;
                // 判断目标目录是否存在如果不存在则新建之
                if (!Directory.Exists(aimPath))
                    Directory.CreateDirectory(aimPath);
                // 得到源目录的文件列表,该里面是包含文件以及目录路径的一个数组
                // 如果你指向copy目标文件下面的文件而不包含目录请使用下面的方法
                string[] fileList = Directory.GetFiles(srcPath);
               // string[] fileList = Directory.GetFileSystemEntries(srcPath);
                // 遍历所有的文件和目录
                foreach (string file in fileList)
                {
                    // 先当作目录处理如果存在这个目录就递归Copy该目录下面的文件
                    if (Directory.Exists(file))
                        CopyDir(file, aimPath + Path.GetFileName(file));
                    // 否则直接Copy文件
                    else
                        File.Copy(file, aimPath + Path.GetFileName(file), true);
                }
            }
            catch
            {
                Console.WriteLine("无法复制!");
            }
        }
package clustersharding.demo
import akka.actor.ActorSystem
import akka.cluster.sharding._
import clustersharding.entity.CalculatorShard.CalcCommands
import clustersharding.entity._
import clustersharding.shard.CalcShards
import com.typesafe.config.ConfigFactory

object ClusterShardingDemo extends App {

  CalcShards.create(2551)
  CalcShards.create(0)
  CalcShards.create(0)
  CalcShards.create(0)

  Thread.sleep(1000)

  val shardingSystem = ActorSystem("ShardingSystem",ConfigFactory.load("sharding"))
  CalcShards.startupSharding(0,shardingSystem)

  Thread.sleep(1000)

  val calcRegion = ClusterSharding(shardingSystem).shardRegion(CalculatorShard.shardName)

  calcRegion ! CalcCommands("1012",Calculator.Num(13.0))   //shard 1, entity 1012
  calcRegion ! CalcCommands("1012",Calculator.Add(12.0))
  calcRegion ! CalcCommands("1012",Calculator.ShowResult)  //shows address too
  calcRegion ! CalcCommands("1012",Calculator.Disconnect)   //disengage cluster

  calcRegion ! CalcCommands("2012",Calculator.Num(10.0))   //shard 2, entity 2012
  calcRegion ! CalcCommands("2012",Calculator.Mul(3.0))
  calcRegion ! CalcCommands("2012",Calculator.Div(2.0))
  calcRegion ! CalcCommands("2012",Calculator.Div(0.0))   //divide by zero


  Thread.sleep(15000)
  calcRegion ! CalcCommands("1012",Calculator.ShowResult)   //check if restore result on another node
  calcRegion ! CalcCommands("2012",Calculator.ShowResult)
}

         

上述代码里人为选定了分片和entity-id,其中囊括了起集群中抽出一个节点的操作。运算结果如下:

 

[INFO] [07/15/2017 09:32:49.414] [ShardingSystem-akka.actor.default-dispatcher-20] [akka.tcp://ShardingSystem@127.0.0.1:50456/system/sharding/calcShard/1/1012/calculator] Result on ShardingSystem@127.0.0.1:50456 is: 25.0
[INFO] [07/15/2017 09:32:49.414] [ShardingSystem-akka.actor.default-dispatcher-20] [akka.tcp://ShardingSystem@127.0.0.1:50456/system/sharding/calcShard/1/1012/calculator] akka.tcp://ShardingSystem@127.0.0.1:50456 is leaving cluster!!!
[WARN] [07/15/2017 09:32:49.431] [ShardingSystem-akka.actor.default-dispatcher-18] [akka://ShardingSystem/system/sharding/calcShard/2/2012/calculator] / by zero
[INFO] [07/15/2017 09:33:01.320] [ShardingSystem-akka.actor.default-dispatcher-4] [akka.tcp://ShardingSystem@127.0.0.1:50464/system/sharding/calcShard/2/2012/calculator] Result on ShardingSystem@127.0.0.1:50464 is: 15.0
[INFO] [07/15/2017 09:33:01.330] [ShardingSystem-akka.actor.default-dispatcher-18] [akka.tcp://ShardingSystem@127.0.0.1:50457/system/sharding/calcShard/1/1012/calculator] Result on ShardingSystem@127.0.0.1:50457 is: 25.0

 

结果显示entity1012当节点50456离集群后叫换到节点50457高达,并行保留了状态。

下面是本次示范的源代码:

build.sbt

name := "cluster-sharding"

version := "1.0"

scalaVersion := "2.11.9"

resolvers += "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"

val akkaversion = "2.4.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % akkaversion,
  "com.typesafe.akka" %% "akka-remote" % akkaversion,
  "com.typesafe.akka" %% "akka-cluster" % akkaversion,
  "com.typesafe.akka" %% "akka-cluster-tools" % akkaversion,
  "com.typesafe.akka" %% "akka-cluster-sharding" % akkaversion,
  "com.typesafe.akka" %% "akka-persistence" % "2.4.8",
  "com.typesafe.akka" %% "akka-contrib" % akkaversion,
  "org.iq80.leveldb" % "leveldb" % "0.7",
  "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8")

resources/sharding.conf 

akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off

akka {
  loglevel = INFO
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }

  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ShardingSystem@127.0.0.1:2551"]
    log-info = off
  }

  persistence {
    journal.plugin = "akka.persistence.journal.leveldb-shared"
    journal.leveldb-shared.store {
      # DO NOT USE 'native = off' IN PRODUCTION !!!
      native = off
      dir = "target/shared-journal"
    }
    snapshot-store.plugin = "akka.persistence.snapshot-store.local"
    snapshot-store.local.dir = "target/snapshots"
  }
}

Calculator.scala

package clustersharding.entity

import akka.actor._
import akka.cluster._
import akka.persistence._
import scala.concurrent.duration._
import akka.cluster.sharding._

object Calculator {
  sealed trait Command
  case class Num(d: Double) extends Command
  case class Add(d: Double) extends Command
  case class Sub(d: Double) extends Command
  case class Mul(d: Double) extends Command
  case class Div(d: Double) extends Command
  case object ShowResult extends Command



  sealed trait Event
  case class SetResult(d: Any) extends Event

  def getResult(res: Double, cmd: Command) = cmd match {
    case Num(x) => x
    case Add(x) => res + x
    case Sub(x) => res - x
    case Mul(x) => res * x
    case Div(x) => {
      val _ = res.toInt / x.toInt //yield ArithmeticException when /0.00
      res / x
    }
    case _ => new ArithmeticException("Invalid Operation!")
  }

  case class State(result: Double) {

    def updateState(evt: Event): State = evt match {
      case SetResult(n) => copy(result = n.asInstanceOf[Double])
    }
  }

  case object Disconnect extends Command    //exit cluster

  def props = Props(new Calcultor)

}

class Calcultor extends PersistentActor with ActorLogging {
  import Calculator._
  val cluster = Cluster(context.system)

  var state: State = State(0)

  override def persistenceId: String = self.path.parent.name+"-"+self.path.name

  override def receiveRecover: Receive = {
    case evt: Event => state = state.updateState(evt)
    case SnapshotOffer(_,st: State) => state = state.copy(result =  st.result)
  }

  override def receiveCommand: Receive = {
    case Num(n) => persist(SetResult(getResult(state.result,Num(n))))(evt => state = state.updateState(evt))
    case Add(n) => persist(SetResult(getResult(state.result,Add(n))))(evt => state = state.updateState(evt))
    case Sub(n) => persist(SetResult(getResult(state.result,Sub(n))))(evt => state = state.updateState(evt))
    case Mul(n) => persist(SetResult(getResult(state.result,Mul(n))))(evt => state = state.updateState(evt))
    case Div(n) => persist(SetResult(getResult(state.result,Div(n))))(evt => state = state.updateState(evt))
    case ShowResult => log.info(s"Result on ${cluster.selfAddress.hostPort} is: ${state.result}")
    case Disconnect =>
      log.info(s"${cluster.selfAddress} is leaving cluster!!!")
      cluster.leave (cluster.selfAddress)

  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting calculator: ${reason.getMessage}")
    super.preRestart(reason, message)
  }
}

class CalcSupervisor extends Actor {
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: ArithmeticException => SupervisorStrategy.Resume
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
      decider.orElse(SupervisorStrategy.defaultDecider)
    }
  val calcActor = context.actorOf(Calculator.props,"calculator")

  override def receive: Receive = {
    case msg@ _ => calcActor.forward(msg)
  }

}

object CalculatorShard {
  import Calculator._

  case class CalcCommands(eid: String, msg: Command)  //user should use it to talk to shardregion
  val shardName = "calcShard"
  val getEntityId: ShardRegion.ExtractEntityId = {
    case CalcCommands(id,msg) => (id,msg)
  }
  val getShardId: ShardRegion.ExtractShardId = {
    case CalcCommands(id,_) => id.head.toString
  }
  def entityProps = Props(new CalcSupervisor)
}

CalcShard.scala

package clustersharding.shard
import akka.persistence.journal.leveldb._
import akka.actor._
import akka.cluster.sharding._
import com.typesafe.config.ConfigFactory
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern._
import clustersharding.entity.CalculatorShard

object CalcShards {
  def create(port: Int) = {
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${port}")
      .withFallback(ConfigFactory.load("sharding"))
    // Create an Akka system
    val system = ActorSystem("ShardingSystem", config)

    startupSharding(port,system)

  }

  def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = {
    // Start the shared journal one one node (don't crash this SPOF)
    // This will not be needed with a distributed journal
    if (startStore)
      system.actorOf(Props[SharedLeveldbStore], "store")
    // register the shared journal
    import system.dispatcher
    implicit val timeout = Timeout(15.seconds)
    val f = (system.actorSelection(path) ? Identify(None))
    f.onSuccess {
      case ActorIdentity(_, Some(ref)) =>
        SharedLeveldbJournal.setStore(ref, system)
      case _ =>
        system.log.error("Shared journal not started at {}", path)
        system.terminate()
    }
    f.onFailure {
      case _ =>
        system.log.error("Lookup of shared journal at {} timed out", path)
        system.terminate()
    }
  }

  def startupSharding(port: Int, system: ActorSystem) = {

    startupSharedJournal(system, startStore = (port == 2551), path =
      ActorPath.fromString("akka.tcp://ShardingSystem@127.0.0.1:2551/user/store"))

    ClusterSharding(system).start(
      typeName = CalculatorShard.shardName,
      entityProps = CalculatorShard.entityProps,
      settings = ClusterShardingSettings(system),
      extractEntityId = CalculatorShard.getEntityId,
      extractShardId = CalculatorShard.getShardId
    )

  }

}

ClusterShardingDemo.scala

package clustersharding.demo
import akka.actor.ActorSystem
import akka.cluster.sharding._
import clustersharding.entity.CalculatorShard.CalcCommands
import clustersharding.entity._
import clustersharding.shard.CalcShards
import com.typesafe.config.ConfigFactory

object ClusterShardingDemo extends App {

  CalcShards.create(2551)
  CalcShards.create(0)
  CalcShards.create(0)
  CalcShards.create(0)

  Thread.sleep(1000)

  val shardingSystem = ActorSystem("ShardingSystem",ConfigFactory.load("sharding"))
  CalcShards.startupSharding(0,shardingSystem)

  Thread.sleep(1000)

  val calcRegion = ClusterSharding(shardingSystem).shardRegion(CalculatorShard.shardName)

  calcRegion ! CalcCommands("1012",Calculator.Num(13.0))   //shard 1, entity 1012
  calcRegion ! CalcCommands("1012",Calculator.Add(12.0))
  calcRegion ! CalcCommands("1012",Calculator.ShowResult)  //shows address too
  calcRegion ! CalcCommands("1012",Calculator.Disconnect)   //disengage cluster

  calcRegion ! CalcCommands("2012",Calculator.Num(10.0))   //shard 2, entity 2012
  calcRegion ! CalcCommands("2012",Calculator.Mul(3.0))
  calcRegion ! CalcCommands("2012",Calculator.Div(2.0))
  calcRegion ! CalcCommands("2012",Calculator.Div(0.0))   //divide by zero


  Thread.sleep(15000)
  calcRegion ! CalcCommands("1012",Calculator.ShowResult)   //check if restore result on another node
  calcRegion ! CalcCommands("2012",Calculator.ShowResult)
}