+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public DistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
+ final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
+ final DatastoreSnapshot restoreFromSnapshot) {
+ Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
+ Preconditions.checkNotNull(cluster, "cluster should not be null");
+ Preconditions.checkNotNull(configuration, "configuration should not be null");
+ Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
+
+ String shardManagerId = ShardManagerIdentifier.builder()
+ .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString();
+
+ LOG.info("Creating ShardManager : {}", shardManagerId);
+
+ String shardDispatcher =
+ new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
+
+ PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
+
+ ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration)
+ .datastoreContextFactory(datastoreContextFactory)
+ .waitTillReadyCountdownLatch(waitTillReadyCountDownLatch)
+ .primaryShardInfoCache(primaryShardInfoCache)
+ .restoreFromSnapshot(restoreFromSnapshot);
+
+ actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
+ shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(),
+ primaryShardInfoCache);
+
+ final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(), actorContext);
+ final ActorRef clientActor = actorSystem.actorOf(clientProps);
+ try {
+ client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Failed to get actor for {}", clientProps, e);
+ clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ throw Throwables.propagate(e);
+ }