Merge "BUG 2954 : Thread local DocumentBuilder's for XmlUtil"
[controller.git] / opendaylight / md-sal / sal-akka-raft-example / src / main / java / org / opendaylight / controller / cluster / example / ExampleRoleChangeListener.java
1 package org.opendaylight.controller.cluster.example;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Cancellable;
5 import akka.actor.Props;
6 import java.util.HashMap;
7 import java.util.Map;
8 import java.util.concurrent.TimeUnit;
9 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
10 import org.opendaylight.controller.cluster.example.messages.RegisterListener;
11 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
12 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
13 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
14 import scala.concurrent.Await;
15 import scala.concurrent.duration.FiniteDuration;
16
17 /**
18  * This is a sample implementation of a Role Change Listener which is an actor, which registers itself to the ClusterRoleChangeNotifier
19  * <p/>
20  * The Role Change listener receives a SetNotifiers message with the notifiers to register itself with.
21  * <p/>
22  * It kicks of a scheduler which sents registration messages to the notifiers, till it gets a RegisterRoleChangeListenerReply
23  * <p/>
24  * If all the notifiers have been regsitered with, then it cancels the scheduler.
25  * It starts the scheduler again when it receives a new registration
26  *
27  */
28 public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable{
29     // the akka url should be set to the notifiers actor-system and domain.
30     private static final String NOTIFIER_AKKA_URL = "akka.tcp://raft-test@127.0.0.1:2550/user/";
31
32     private Map<String, Boolean> notifierRegistrationStatus = new HashMap<>();
33     private Cancellable registrationSchedule = null;
34     private static final FiniteDuration duration = new FiniteDuration(100, TimeUnit.MILLISECONDS);
35     private static final FiniteDuration schedulerDuration = new FiniteDuration(1, TimeUnit.SECONDS);
36     private final String memberName;
37     private static final String[] shardsToMonitor = new String[] {"example"};
38
39     public ExampleRoleChangeListener(String memberName) {
40         super();
41         scheduleRegistrationListener(schedulerDuration);
42         this.memberName = memberName;
43         populateRegistry(memberName);
44     }
45
46     public static Props getProps(final String memberName) {
47         return Props.create(ExampleRoleChangeListener.class, memberName);
48     }
49
50     @Override
51     protected void handleReceive(Object message) throws Exception {
52         if (message instanceof RegisterListener) {
53             // called by the scheduler at intervals to register any unregistered notifiers
54             sendRegistrationRequests();
55
56         } else if (message instanceof RegisterRoleChangeListenerReply) {
57             // called by the Notifier
58             handleRegisterRoleChangeListenerReply(getSender().path().toString());
59
60         } else if (message instanceof RoleChangeNotification) {
61             // called by the Notifier
62             RoleChangeNotification notification = (RoleChangeNotification) message;
63
64             LOG.info("Role Change Notification received for member:{}, old role:{}, new role:{}",
65                 notification.getMemberId(), notification.getOldRole(), notification.getNewRole());
66
67             // the apps dependent on such notifications can be called here
68             //TODO: add implementation here
69
70         }
71     }
72
73     private void scheduleRegistrationListener(FiniteDuration interval) {
74         LOG.debug("--->scheduleRegistrationListener called.");
75         registrationSchedule = getContext().system().scheduler().schedule(
76             interval, interval, getSelf(), new RegisterListener(),
77             getContext().system().dispatcher(), getSelf());
78
79     }
80
81     private void populateRegistry(String memberName) {
82
83         for (String shard: shardsToMonitor) {
84             String notifier =(new StringBuilder()).append(NOTIFIER_AKKA_URL).append(memberName)
85                 .append("/").append(memberName).append("-notifier").toString();
86
87             if (!notifierRegistrationStatus.containsKey(notifier)) {
88                 notifierRegistrationStatus.put(notifier, false);
89             }
90         }
91
92         if (!registrationSchedule.isCancelled()) {
93             scheduleRegistrationListener(schedulerDuration);
94         }
95     }
96
97     private void sendRegistrationRequests() {
98         for (Map.Entry<String, Boolean> entry : notifierRegistrationStatus.entrySet()) {
99             if (!entry.getValue()) {
100                 try {
101                     LOG.debug("{} registering with {}", getSelf().path().toString(), entry.getKey());
102                     ActorRef notifier = Await.result(
103                         getContext().actorSelection(entry.getKey()).resolveOne(duration), duration);
104
105                     notifier.tell(new RegisterRoleChangeListener(), getSelf());
106
107                 } catch (Exception e) {
108                     LOG.error("ERROR!! Unable to send registration request to notifier {}", entry.getKey());
109                 }
110             }
111         }
112     }
113
114     private void handleRegisterRoleChangeListenerReply(String senderId) {
115         if (notifierRegistrationStatus.containsKey(senderId)) {
116             notifierRegistrationStatus.put(senderId, true);
117
118             //cancel the schedule when listener is registered with all notifiers
119             if (!registrationSchedule.isCancelled()) {
120                 boolean cancelScheduler = true;
121                 for (Boolean value : notifierRegistrationStatus.values()) {
122                     cancelScheduler = cancelScheduler & value;
123                 }
124                 if (cancelScheduler) {
125                     registrationSchedule.cancel();
126                 }
127             }
128         } else {
129             LOG.info("Unexpected, RegisterRoleChangeListenerReply received from notifier which is not known to Listener:{}",
130                 senderId);
131         }
132     }
133
134
135     @Override
136     public void close() throws Exception {
137         registrationSchedule.cancel();
138     }
139 }