+
+ shardAvailabilityChangesRegFuture = ask(actorContext.getShardManager(), new RegisterForShardAvailabilityChanges(
+ this::onShardAvailabilityChange), Timeout.apply(60, TimeUnit.MINUTES))
+ .map(reply -> (Registration)reply, ExecutionContexts.global());
+
+ shardAvailabilityChangesRegFuture.onComplete(new OnComplete<Registration>() {
+ @Override
+ public void onComplete(Throwable failure, Registration reply) {
+ if (failure != null) {
+ LOG.error("RegisterForShardAvailabilityChanges failed", failure);
+ }
+ }
+ }, ExecutionContexts.global());
+ }
+
+ private void onShardAvailabilityChange(String shardName) {
+ LOG.debug("onShardAvailabilityChange for {}", shardName);
+
+ Long cookie = shards.get(shardName);
+ if (cookie == null) {
+ LOG.debug("No shard cookie found for {}", shardName);
+ return;
+ }
+
+ notifyStaleBackendInfoCallbacks(cookie);