import scala.concurrent.duration.FiniteDuration;
/**
- * This is a sample implementation of a Role Change Listener which is an actor, which registers itself to the ClusterRoleChangeNotifier
+ * This is a sample implementation of a Role Change Listener which is an actor, which registers itself
+ * to the ClusterRoleChangeNotifier.
+ *
* <p>
* The Role Change listener receives a SetNotifiers message with the notifiers to register itself with.
+ *
* <p>
- * It kicks of a scheduler which sents registration messages to the notifiers, till it gets a RegisterRoleChangeListenerReply
+ * It kicks of a scheduler which sends registration messages to the notifiers, till it gets a
+ * RegisterRoleChangeListenerReply.
+ *
* <p>
* If all the notifiers have been regsitered with, then it cancels the scheduler.
* It starts the scheduler again when it receives a new registration
- *
*/
-public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable{
+public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable {
// the akka url should be set to the notifiers actor-system and domain.
private static final String NOTIFIER_AKKA_URL = "akka://raft-test@127.0.0.1:2550/user/";
- private Map<String, Boolean> notifierRegistrationStatus = new HashMap<>();
+ private final Map<String, Boolean> notifierRegistrationStatus = new HashMap<>();
private Cancellable registrationSchedule = null;
- private static final FiniteDuration duration = new FiniteDuration(100, TimeUnit.MILLISECONDS);
- private static final FiniteDuration schedulerDuration = new FiniteDuration(1, TimeUnit.SECONDS);
- private final String memberName;
- private static final String[] shardsToMonitor = new String[] {"example"};
+ private static final FiniteDuration DURATION = new FiniteDuration(100, TimeUnit.MILLISECONDS);
+ private static final FiniteDuration SCHEDULER_DURATION = new FiniteDuration(1, TimeUnit.SECONDS);
+ private static final String[] SHARDS_TO_MONITOR = new String[] {"example"};
public ExampleRoleChangeListener(String memberName) {
- scheduleRegistrationListener(schedulerDuration);
- this.memberName = memberName;
+ scheduleRegistrationListener(SCHEDULER_DURATION);
populateRegistry(memberName);
}
}
private void populateRegistry(String memberName) {
-
- for (String shard: shardsToMonitor) {
- String notifier =(new StringBuilder()).append(NOTIFIER_AKKA_URL).append(memberName)
+ String notifier = new StringBuilder().append(NOTIFIER_AKKA_URL).append(memberName)
.append("/").append(memberName).append("-notifier").toString();
- if (!notifierRegistrationStatus.containsKey(notifier)) {
- notifierRegistrationStatus.put(notifier, false);
- }
+ if (!notifierRegistrationStatus.containsKey(notifier)) {
+ notifierRegistrationStatus.put(notifier, false);
}
if (!registrationSchedule.isCancelled()) {
- scheduleRegistrationListener(schedulerDuration);
+ scheduleRegistrationListener(SCHEDULER_DURATION);
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void sendRegistrationRequests() {
for (Map.Entry<String, Boolean> entry : notifierRegistrationStatus.entrySet()) {
if (!entry.getValue()) {
try {
LOG.debug("{} registering with {}", getSelf().path().toString(), entry.getKey());
ActorRef notifier = Await.result(
- getContext().actorSelection(entry.getKey()).resolveOne(duration), duration);
+ getContext().actorSelection(entry.getKey()).resolveOne(DURATION), DURATION);
notifier.tell(new RegisterRoleChangeListener(), getSelf());
}
}
} else {
- LOG.info("Unexpected, RegisterRoleChangeListenerReply received from notifier which is not known to Listener:{}",
+ LOG.info(
+ "Unexpected, RegisterRoleChangeListenerReply received from notifier which is not known to Listener:{}",
senderId);
}
}