Bug-2397:Provide a mechanism for stakeholders to get notifications on Raft state...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / example / ExampleRoleChangeListener.java
1 package org.opendaylight.controller.cluster.example;
2
3 import akka.actor.Actor;
4 import akka.actor.ActorRef;
5 import akka.actor.Cancellable;
6 import akka.actor.Props;
7 import akka.japi.Creator;
8 import java.util.HashMap;
9 import java.util.Map;
10 import java.util.concurrent.TimeUnit;
11 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
12 import org.opendaylight.controller.cluster.example.messages.RegisterListener;
13 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
14 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
15 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
16 import scala.concurrent.Await;
17 import scala.concurrent.duration.FiniteDuration;
18
19 /**
20  * This is a sample implementation of a Role Change Listener which is an actor, which registers itself to the ClusterRoleChangeNotifier
21  * <p/>
22  * The Role Change listener receives a SetNotifiers message with the notifiers to register itself with.
23  * <p/>
24  * It kicks of a scheduler which sents registration messages to the notifiers, till it gets a RegisterRoleChangeListenerReply
25  * <p/>
26  * If all the notifiers have been regsitered with, then it cancels the scheduler.
27  * It starts the scheduler again when it receives a new registration
28  *
29  */
30 public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable{
31     // the akka url should be set to the notifiers actor-system and domain.
32     private static final String NOTIFIER_AKKA_URL = "akka.tcp://raft-test@127.0.0.1:2550/user/";
33
34     private Map<String, Boolean> notifierRegistrationStatus = new HashMap<>();
35     private Cancellable registrationSchedule = null;
36     private static final FiniteDuration duration = new FiniteDuration(100, TimeUnit.MILLISECONDS);
37     private static final FiniteDuration schedulerDuration = new FiniteDuration(1, TimeUnit.SECONDS);
38     private final String memberName;
39     private static final String[] shardsToMonitor = new String[] {"example"};
40
41     public ExampleRoleChangeListener(String memberName) {
42         super();
43         scheduleRegistrationListener(schedulerDuration);
44         this.memberName = memberName;
45         populateRegistry(memberName);
46     }
47
48     public static Props getProps(final String memberName) {
49         return Props.create(new Creator<Actor>() {
50             @Override
51             public Actor create() throws Exception {
52                 return new ExampleRoleChangeListener(memberName);
53             }
54         });
55     }
56
57     @Override
58     protected void handleReceive(Object message) throws Exception {
59         if (message instanceof RegisterListener) {
60             // called by the scheduler at intervals to register any unregistered notifiers
61             sendRegistrationRequests();
62
63         } else if (message instanceof RegisterRoleChangeListenerReply) {
64             // called by the Notifier
65             handleRegisterRoleChangeListenerReply(getSender().path().toString());
66
67         } else if (message instanceof RoleChangeNotification) {
68             // called by the Notifier
69             RoleChangeNotification notification = (RoleChangeNotification) message;
70
71             LOG.info("Role Change Notification received for member:{}, old role:{}, new role:{}",
72                 notification.getMemberId(), notification.getOldRole(), notification.getNewRole());
73
74             // the apps dependent on such notifications can be called here
75             //TODO: add implementation here
76
77         }
78     }
79
80     private void scheduleRegistrationListener(FiniteDuration interval) {
81         LOG.debug("--->scheduleRegistrationListener called.");
82         registrationSchedule = getContext().system().scheduler().schedule(
83             interval, interval, getSelf(), new RegisterListener(),
84             getContext().system().dispatcher(), getSelf());
85
86     }
87
88     private void populateRegistry(String memberName) {
89
90         for (String shard: shardsToMonitor) {
91             String notifier =(new StringBuilder()).append(NOTIFIER_AKKA_URL).append(memberName)
92                 .append("/").append(memberName).append("-notifier").toString();
93
94             if (!notifierRegistrationStatus.containsKey(notifier)) {
95                 notifierRegistrationStatus.put(notifier, false);
96             }
97         }
98
99         if (!registrationSchedule.isCancelled()) {
100             scheduleRegistrationListener(schedulerDuration);
101         }
102     }
103
104     private void sendRegistrationRequests() {
105         for (Map.Entry<String, Boolean> entry : notifierRegistrationStatus.entrySet()) {
106             if (!entry.getValue()) {
107                 try {
108                     LOG.debug("{} registering with {}", getSelf().path().toString(), entry.getKey());
109                     ActorRef notifier = Await.result(
110                         getContext().actorSelection(entry.getKey()).resolveOne(duration), duration);
111
112                     notifier.tell(new RegisterRoleChangeListener(), getSelf());
113
114                 } catch (Exception e) {
115                     LOG.error("ERROR!! Unable to send registration request to notifier {}", entry.getKey());
116                 }
117             }
118         }
119     }
120
121     private void handleRegisterRoleChangeListenerReply(String senderId) {
122         if (notifierRegistrationStatus.containsKey(senderId)) {
123             notifierRegistrationStatus.put(senderId, true);
124
125             //cancel the schedule when listener is registered with all notifiers
126             if (!registrationSchedule.isCancelled()) {
127                 boolean cancelScheduler = true;
128                 for (Boolean value : notifierRegistrationStatus.values()) {
129                     cancelScheduler = cancelScheduler & value;
130                 }
131                 if (cancelScheduler) {
132                     registrationSchedule.cancel();
133                 }
134             }
135         } else {
136             LOG.info("Unexpected, RegisterRoleChangeListenerReply received from notifier which is not known to Listener:{}",
137                 senderId);
138         }
139     }
140
141
142     @Override
143     public void close() throws Exception {
144         registrationSchedule.cancel();
145     }
146 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.