+ /**
+ * Task for handling the lookup of the backend for the configuration shard.
+ */
+ private static class ConfigShardLookupTask extends LookupTask {
+
+ private final ActorSystem system;
+ private final ActorRef replyTo;
+ private final ActorContext context;
+
+ ConfigShardLookupTask(final ActorSystem system,
+ final ActorRef replyTo,
+ final ActorContext context,
+ final StartConfigShardLookup message,
+ final int lookupMaxRetries) {
+ super(replyTo, lookupMaxRetries);
+ this.system = system;
+ this.replyTo = replyTo;
+ this.context = context;
+ }
+
+ @Override
+ void reschedule(int retries) {
+ LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries);
+ system.scheduler().scheduleOnce(
+ SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher());
+ }
+
+ @Override
+ public void run() {
+ final Optional<ActorRef> localShard =
+ context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
+
+ if (!localShard.isPresent()) {
+ tryReschedule(null);
+ } else {
+ LOG.debug("Local backend for prefix configuration shard lookup successful");
+ replyTo.tell(new Status.Success(null), ActorRef.noSender());
+ }
+ }
+ }
+
+ /**
+ * Task for handling the readiness state of the config shard. Reports success once the leader is elected.
+ */
+ private static class ConfigShardReadinessTask extends LookupTask {
+
+ private final ActorSystem system;
+ private final ActorRef replyTo;
+ private final ActorContext context;
+ private final ClusterWrapper clusterWrapper;
+ private final ActorRef shard;
+
+ ConfigShardReadinessTask(final ActorSystem system,
+ final ActorRef replyTo,
+ final ActorContext context,
+ final ClusterWrapper clusterWrapper,
+ final ActorRef shard,
+ final int lookupMaxRetries) {
+ super(replyTo, lookupMaxRetries);
+ this.system = system;
+ this.replyTo = replyTo;
+ this.context = context;
+ this.clusterWrapper = clusterWrapper;
+ this.shard = shard;
+ }
+
+ @Override
+ void reschedule(int retries) {
+ LOG.debug("{} - Leader for config shard not found on try: {}, retrying..",
+ clusterWrapper.getCurrentMemberName(), retries);
+ system.scheduler().scheduleOnce(
+ SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher());
+ }
+
+ @Override
+ public void run() {
+ final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
+
+ ask.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable throwable, final Object findLeaderReply) {
+ if (throwable != null) {
+ tryReschedule(throwable);
+ } else {
+ final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
+ final java.util.Optional<String> leaderActor = findLeader.getLeaderActor();
+ if (leaderActor.isPresent()) {
+ // leader is found, backend seems ready, check if the frontend is ready
+ LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
+ clusterWrapper.getCurrentMemberName());
+ replyTo.tell(new Status.Success(null), ActorRef.noSender());
+ } else {
+ tryReschedule(null);
+ }
+ }
+ }
+ }, system.dispatcher());
+ }
+ }
+