1 package org.opendaylight.controller.cluster.example;
3 import akka.actor.Actor;
4 import akka.actor.ActorRef;
5 import akka.actor.Cancellable;
6 import akka.actor.Props;
7 import akka.japi.Creator;
8 import java.util.HashMap;
10 import java.util.concurrent.TimeUnit;
11 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
12 import org.opendaylight.controller.cluster.example.messages.RegisterListener;
13 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
14 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
15 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
16 import scala.concurrent.Await;
17 import scala.concurrent.duration.FiniteDuration;
20 * This is a sample implementation of a Role Change Listener which is an actor, which registers itself to the ClusterRoleChangeNotifier
22 * The Role Change listener receives a SetNotifiers message with the notifiers to register itself with.
24 * It kicks of a scheduler which sents registration messages to the notifiers, till it gets a RegisterRoleChangeListenerReply
26 * If all the notifiers have been regsitered with, then it cancels the scheduler.
27 * It starts the scheduler again when it receives a new registration
30 public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable{
31 // the akka url should be set to the notifiers actor-system and domain.
32 private static final String NOTIFIER_AKKA_URL = "akka.tcp://raft-test@127.0.0.1:2550/user/";
34 private Map<String, Boolean> notifierRegistrationStatus = new HashMap<>();
35 private Cancellable registrationSchedule = null;
36 private static final FiniteDuration duration = new FiniteDuration(100, TimeUnit.MILLISECONDS);
37 private static final FiniteDuration schedulerDuration = new FiniteDuration(1, TimeUnit.SECONDS);
38 private final String memberName;
39 private static final String[] shardsToMonitor = new String[] {"example"};
41 public ExampleRoleChangeListener(String memberName) {
43 scheduleRegistrationListener(schedulerDuration);
44 this.memberName = memberName;
45 populateRegistry(memberName);
48 public static Props getProps(final String memberName) {
49 return Props.create(new Creator<Actor>() {
51 public Actor create() throws Exception {
52 return new ExampleRoleChangeListener(memberName);
58 protected void handleReceive(Object message) throws Exception {
59 if (message instanceof RegisterListener) {
60 // called by the scheduler at intervals to register any unregistered notifiers
61 sendRegistrationRequests();
63 } else if (message instanceof RegisterRoleChangeListenerReply) {
64 // called by the Notifier
65 handleRegisterRoleChangeListenerReply(getSender().path().toString());
67 } else if (message instanceof RoleChangeNotification) {
68 // called by the Notifier
69 RoleChangeNotification notification = (RoleChangeNotification) message;
71 LOG.info("Role Change Notification received for member:{}, old role:{}, new role:{}",
72 notification.getMemberId(), notification.getOldRole(), notification.getNewRole());
74 // the apps dependent on such notifications can be called here
75 //TODO: add implementation here
80 private void scheduleRegistrationListener(FiniteDuration interval) {
81 LOG.debug("--->scheduleRegistrationListener called.");
82 registrationSchedule = getContext().system().scheduler().schedule(
83 interval, interval, getSelf(), new RegisterListener(),
84 getContext().system().dispatcher(), getSelf());
88 private void populateRegistry(String memberName) {
90 for (String shard: shardsToMonitor) {
91 String notifier =(new StringBuilder()).append(NOTIFIER_AKKA_URL).append(memberName)
92 .append("/").append(memberName).append("-notifier").toString();
94 if (!notifierRegistrationStatus.containsKey(notifier)) {
95 notifierRegistrationStatus.put(notifier, false);
99 if (!registrationSchedule.isCancelled()) {
100 scheduleRegistrationListener(schedulerDuration);
104 private void sendRegistrationRequests() {
105 for (Map.Entry<String, Boolean> entry : notifierRegistrationStatus.entrySet()) {
106 if (!entry.getValue()) {
108 LOG.debug("{} registering with {}", getSelf().path().toString(), entry.getKey());
109 ActorRef notifier = Await.result(
110 getContext().actorSelection(entry.getKey()).resolveOne(duration), duration);
112 notifier.tell(new RegisterRoleChangeListener(), getSelf());
114 } catch (Exception e) {
115 LOG.error("ERROR!! Unable to send registration request to notifier {}", entry.getKey());
121 private void handleRegisterRoleChangeListenerReply(String senderId) {
122 if (notifierRegistrationStatus.containsKey(senderId)) {
123 notifierRegistrationStatus.put(senderId, true);
125 //cancel the schedule when listener is registered with all notifiers
126 if (!registrationSchedule.isCancelled()) {
127 boolean cancelScheduler = true;
128 for (Boolean value : notifierRegistrationStatus.values()) {
129 cancelScheduler = cancelScheduler & value;
131 if (cancelScheduler) {
132 registrationSchedule.cancel();
136 LOG.info("Unexpected, RegisterRoleChangeListenerReply received from notifier which is not known to Listener:{}",
143 public void close() throws Exception {
144 registrationSchedule.cancel();