+
+ updateHandler = new PrefixedShardConfigUpdateHandler(shardedDataTreeActor,
+ distributedConfigDatastore.getActorContext().getCurrentMemberName());
+
+ LOG.debug("{} - Starting prefix configuration shards", memberName);
+ createPrefixConfigShard(distributedConfigDatastore);
+ createPrefixConfigShard(distributedOperDatastore);
+ }
+
+ private void createPrefixConfigShard(final AbstractDataStore dataStore) {
+ Configuration configuration = dataStore.getActorContext().getConfiguration();
+ Collection<MemberName> memberNames = configuration.getUniqueMemberNamesForAllShards();
+ CreateShard createShardMessage =
+ new CreateShard(new ModuleShardConfiguration(PrefixShards.QNAME.getNamespace(),
+ "prefix-shard-configuration", ClusterUtils.PREFIX_CONFIG_SHARD_ID, ModuleShardStrategy.NAME,
+ memberNames),
+ Shard.builder(), dataStore.getActorContext().getDatastoreContext());
+
+ dataStore.getActorContext().getShardManager().tell(createShardMessage, noSender());
+ }
+
+ /**
+ * This will try to initialize prefix configuration shards upon their
+ * successful start. We need to create writers to these shards, so we can
+ * satisfy future {@link #createDistributedShard} and
+ * {@link #resolveShardAdditions} requests and update prefix configuration
+ * shards accordingly.
+ *
+ * <p>
+ * We also need to initialize listeners on these shards, so we can react
+ * on changes made on them by other cluster members or even by ourselves.
+ *
+ * <p>
+ * Finally, we need to be sure that default shards for both operational and
+ * configuration data stores are up and running and we have distributed
+ * shards frontend created for them.
+ *
+ * <p>
+ * This is intended to be invoked by blueprint as initialization method.
+ */
+ public void init() {
+ // create our writers to the configuration
+ try {
+ LOG.debug("{} - starting config shard lookup.",
+ distributedConfigDatastore.getActorContext().getCurrentMemberName());
+
+ // We have to wait for prefix config shards to be up and running
+ // so we can create datastore clients for them
+ handleConfigShardLookup().get(SHARD_FUTURE_TIMEOUT_DURATION.length(), SHARD_FUTURE_TIMEOUT_DURATION.unit());
+
+ LOG.debug("Prefix configuration shards ready - creating clients");
+
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new IllegalStateException("Prefix config shards not found", e);
+ }
+
+ try {
+ LOG.debug("Prefix configuration shards ready - creating clients");
+ configurationShardMap.put(LogicalDatastoreType.CONFIGURATION,
+ createDatastoreClient(ClusterUtils.PREFIX_CONFIG_SHARD_ID,
+ distributedConfigDatastore.getActorContext()));
+ } catch (final DOMDataTreeShardCreationFailedException e) {
+ throw new IllegalStateException(
+ "Unable to create datastoreClient for config DS prefix configuration shard.", e);
+ }
+
+ try {
+ configurationShardMap.put(LogicalDatastoreType.OPERATIONAL,
+ createDatastoreClient(ClusterUtils.PREFIX_CONFIG_SHARD_ID,
+ distributedOperDatastore.getActorContext()));
+
+ } catch (final DOMDataTreeShardCreationFailedException e) {
+ throw new IllegalStateException(
+ "Unable to create datastoreClient for oper DS prefix configuration shard.", e);
+ }
+
+ writerMap.put(LogicalDatastoreType.CONFIGURATION, new PrefixedShardConfigWriter(
+ configurationShardMap.get(LogicalDatastoreType.CONFIGURATION).getKey()));
+
+ writerMap.put(LogicalDatastoreType.OPERATIONAL, new PrefixedShardConfigWriter(
+ configurationShardMap.get(LogicalDatastoreType.OPERATIONAL).getKey()));
+
+ updateHandler.initListener(distributedConfigDatastore, LogicalDatastoreType.CONFIGURATION);
+ updateHandler.initListener(distributedOperDatastore, LogicalDatastoreType.OPERATIONAL);
+
+ distributedConfigDatastore.getActorContext().getShardManager().tell(InitConfigListener.INSTANCE, noSender());
+ distributedOperDatastore.getActorContext().getShardManager().tell(InitConfigListener.INSTANCE, noSender());
+
+
+ //create shard registration for DEFAULT_SHARD
+ try {
+ defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION,
+ initDefaultShard(LogicalDatastoreType.CONFIGURATION));
+ } catch (final InterruptedException | ExecutionException e) {
+ throw new IllegalStateException("Unable to create default shard frontend for config shard", e);
+ }
+
+ try {
+ defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL,
+ initDefaultShard(LogicalDatastoreType.OPERATIONAL));
+ } catch (final InterruptedException | ExecutionException e) {
+ throw new IllegalStateException("Unable to create default shard frontend for operational shard", e);
+ }
+ }
+
+ private ListenableFuture<List<Void>> handleConfigShardLookup() {
+
+ final ListenableFuture<Void> configFuture = lookupConfigShard(LogicalDatastoreType.CONFIGURATION);
+ final ListenableFuture<Void> operFuture = lookupConfigShard(LogicalDatastoreType.OPERATIONAL);
+
+ return Futures.allAsList(configFuture, operFuture);
+ }
+
+ private ListenableFuture<Void> lookupConfigShard(final LogicalDatastoreType type) {
+ final SettableFuture<Void> future = SettableFuture.create();
+
+ final Future<Object> ask =
+ Patterns.ask(shardedDataTreeActor, new StartConfigShardLookup(type), SHARD_FUTURE_TIMEOUT);
+
+ ask.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable throwable, final Object result) throws Throwable {
+ if (throwable != null) {
+ future.setException(throwable);
+ } else {
+ future.set(null);
+ }
+ }
+ }, actorSystem.dispatcher());
+
+ return future;