+ ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration)
+ .datastoreContextFactory(datastoreContextFactory)
+ .waitTillReadyCountdownLatch(waitTillReadyCountDownLatch)
+ .primaryShardInfoCache(primaryShardInfoCache)
+ .restoreFromSnapshot(restoreFromSnapshot);
+
+ actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
+ shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(),
+ primaryShardInfoCache);
+
+ final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(), actorContext);
+ final ActorRef clientActor = actorSystem.actorOf(clientProps);
+ try {
+ client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Failed to get actor for {}", clientProps, e);
+ clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ throw Throwables.propagate(e);
+ }
+
+ identifier = client.getIdentifier();
+ LOG.debug("Distributed data store client {} started", identifier);
+
+ this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout()
+ .duration().toMillis() * READY_WAIT_FACTOR;