- actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
- ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch)
- .withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId ),
- cluster, configuration, datastoreContext);
+ PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
+
+ ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration).
+ datastoreContextFactory(datastoreContextFactory).waitTillReadyCountdownLatch(waitTillReadyCountDownLatch).
+ primaryShardInfoCache(primaryShardInfoCache).restoreFromSnapshot(restoreFromSnapshot);
+
+ actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
+ shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache);
+
+ final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(), actorContext);
+ final ActorRef clientActor = actorSystem.actorOf(clientProps);
+ try {
+ client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Failed to get actor for {}", clientProps, e);
+ clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ throw Throwables.propagate(e);
+ }
+
+ identifier = client.getIdentifier();
+ LOG.debug("Distributed data store client {} started", identifier);