0d11d7201d7cbd18efa2909128c5ea2b1d3482a9
[controller.git] / opendaylight / md-sal / sal-akka-raft-example / src / main / java / org / opendaylight / controller / cluster / example / ExampleRoleChangeListener.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Cancellable;
13 import akka.actor.Props;
14 import java.util.HashMap;
15 import java.util.Map;
16 import java.util.concurrent.TimeUnit;
17 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
18 import org.opendaylight.controller.cluster.example.messages.RegisterListener;
19 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
20 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
21 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
22 import scala.concurrent.Await;
23 import scala.concurrent.duration.FiniteDuration;
24
25 /**
26  * This is a sample implementation of a Role Change Listener which is an actor, which registers itself
27  * to the ClusterRoleChangeNotifier.
28  *
29  * <p>
30  * The Role Change listener receives a SetNotifiers message with the notifiers to register itself with.
31  *
32  * <p>
33  * It kicks of a scheduler which sends registration messages to the notifiers, till it gets a
34  *  RegisterRoleChangeListenerReply.
35  *
36  * <p>
37  * If all the notifiers have been regsitered with, then it cancels the scheduler.
38  * It starts the scheduler again when it receives a new registration
39  */
40 public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable {
41     // the akka url should be set to the notifiers actor-system and domain.
42     private static final String NOTIFIER_AKKA_URL = "akka://raft-test@127.0.0.1:2550/user/";
43
44     private final Map<String, Boolean> notifierRegistrationStatus = new HashMap<>();
45     private Cancellable registrationSchedule = null;
46     private static final FiniteDuration DURATION = new FiniteDuration(100, TimeUnit.MILLISECONDS);
47     private static final FiniteDuration SCHEDULER_DURATION = new FiniteDuration(1, TimeUnit.SECONDS);
48     private static final String[] SHARDS_TO_MONITOR = new String[] {"example"};
49
50     public ExampleRoleChangeListener(String memberName) {
51         scheduleRegistrationListener(SCHEDULER_DURATION);
52         populateRegistry(memberName);
53     }
54
55     public static Props getProps(final String memberName) {
56         return Props.create(ExampleRoleChangeListener.class, memberName);
57     }
58
59     @Override
60     protected void handleReceive(Object message) throws Exception {
61         if (message instanceof RegisterListener) {
62             // called by the scheduler at intervals to register any unregistered notifiers
63             sendRegistrationRequests();
64
65         } else if (message instanceof RegisterRoleChangeListenerReply) {
66             // called by the Notifier
67             handleRegisterRoleChangeListenerReply(getSender().path().toString());
68
69         } else if (message instanceof RoleChangeNotification) {
70             // called by the Notifier
71             RoleChangeNotification notification = (RoleChangeNotification) message;
72
73             LOG.info("Role Change Notification received for member:{}, old role:{}, new role:{}",
74                 notification.getMemberId(), notification.getOldRole(), notification.getNewRole());
75
76             // the apps dependent on such notifications can be called here
77             //TODO: add implementation here
78
79         }
80     }
81
82     private void scheduleRegistrationListener(FiniteDuration interval) {
83         LOG.debug("--->scheduleRegistrationListener called.");
84         registrationSchedule = getContext().system().scheduler().schedule(
85             interval, interval, getSelf(), new RegisterListener(),
86             getContext().system().dispatcher(), getSelf());
87
88     }
89
90     private void populateRegistry(String memberName) {
91         for (String shard: SHARDS_TO_MONITOR) {
92             String notifier = new StringBuilder().append(NOTIFIER_AKKA_URL).append(memberName)
93                 .append("/").append(memberName).append("-notifier").toString();
94
95             if (!notifierRegistrationStatus.containsKey(notifier)) {
96                 notifierRegistrationStatus.put(notifier, false);
97             }
98         }
99
100         if (!registrationSchedule.isCancelled()) {
101             scheduleRegistrationListener(SCHEDULER_DURATION);
102         }
103     }
104
105     @SuppressWarnings("checkstyle:IllegalCatch")
106     private void sendRegistrationRequests() {
107         for (Map.Entry<String, Boolean> entry : notifierRegistrationStatus.entrySet()) {
108             if (!entry.getValue()) {
109                 try {
110                     LOG.debug("{} registering with {}", getSelf().path().toString(), entry.getKey());
111                     ActorRef notifier = Await.result(
112                         getContext().actorSelection(entry.getKey()).resolveOne(DURATION), DURATION);
113
114                     notifier.tell(new RegisterRoleChangeListener(), getSelf());
115
116                 } catch (Exception e) {
117                     LOG.error("ERROR!! Unable to send registration request to notifier {}", entry.getKey());
118                 }
119             }
120         }
121     }
122
123     private void handleRegisterRoleChangeListenerReply(String senderId) {
124         if (notifierRegistrationStatus.containsKey(senderId)) {
125             notifierRegistrationStatus.put(senderId, true);
126
127             //cancel the schedule when listener is registered with all notifiers
128             if (!registrationSchedule.isCancelled()) {
129                 boolean cancelScheduler = true;
130                 for (Boolean value : notifierRegistrationStatus.values()) {
131                     cancelScheduler = cancelScheduler & value;
132                 }
133                 if (cancelScheduler) {
134                     registrationSchedule.cancel();
135                 }
136             }
137         } else {
138             LOG.info(
139                 "Unexpected, RegisterRoleChangeListenerReply received from notifier which is not known to Listener:{}",
140                 senderId);
141         }
142     }
143
144
145     @Override
146     public void close() throws Exception {
147         registrationSchedule.cancel();
148     }
149 }