Remove explicit default super-constructor calls
[controller.git] / opendaylight / md-sal / sal-akka-raft-example / src / main / java / org / opendaylight / controller / cluster / example / ExampleRoleChangeListener.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Cancellable;
13 import akka.actor.Props;
14 import java.util.HashMap;
15 import java.util.Map;
16 import java.util.concurrent.TimeUnit;
17 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
18 import org.opendaylight.controller.cluster.example.messages.RegisterListener;
19 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
20 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
21 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
22 import scala.concurrent.Await;
23 import scala.concurrent.duration.FiniteDuration;
24
25 /**
26  * This is a sample implementation of a Role Change Listener which is an actor, which registers itself to the ClusterRoleChangeNotifier
27  * <p>
28  * The Role Change listener receives a SetNotifiers message with the notifiers to register itself with.
29  * <p>
30  * It kicks of a scheduler which sents registration messages to the notifiers, till it gets a RegisterRoleChangeListenerReply
31  * <p>
32  * If all the notifiers have been regsitered with, then it cancels the scheduler.
33  * It starts the scheduler again when it receives a new registration
34  *
35  */
36 public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable{
37     // the akka url should be set to the notifiers actor-system and domain.
38     private static final String NOTIFIER_AKKA_URL = "akka://raft-test@127.0.0.1:2550/user/";
39
40     private Map<String, Boolean> notifierRegistrationStatus = new HashMap<>();
41     private Cancellable registrationSchedule = null;
42     private static final FiniteDuration duration = new FiniteDuration(100, TimeUnit.MILLISECONDS);
43     private static final FiniteDuration schedulerDuration = new FiniteDuration(1, TimeUnit.SECONDS);
44     private final String memberName;
45     private static final String[] shardsToMonitor = new String[] {"example"};
46
47     public ExampleRoleChangeListener(String memberName) {
48         scheduleRegistrationListener(schedulerDuration);
49         this.memberName = memberName;
50         populateRegistry(memberName);
51     }
52
53     public static Props getProps(final String memberName) {
54         return Props.create(ExampleRoleChangeListener.class, memberName);
55     }
56
57     @Override
58     protected void handleReceive(Object message) throws Exception {
59         if (message instanceof RegisterListener) {
60             // called by the scheduler at intervals to register any unregistered notifiers
61             sendRegistrationRequests();
62
63         } else if (message instanceof RegisterRoleChangeListenerReply) {
64             // called by the Notifier
65             handleRegisterRoleChangeListenerReply(getSender().path().toString());
66
67         } else if (message instanceof RoleChangeNotification) {
68             // called by the Notifier
69             RoleChangeNotification notification = (RoleChangeNotification) message;
70
71             LOG.info("Role Change Notification received for member:{}, old role:{}, new role:{}",
72                 notification.getMemberId(), notification.getOldRole(), notification.getNewRole());
73
74             // the apps dependent on such notifications can be called here
75             //TODO: add implementation here
76
77         }
78     }
79
80     private void scheduleRegistrationListener(FiniteDuration interval) {
81         LOG.debug("--->scheduleRegistrationListener called.");
82         registrationSchedule = getContext().system().scheduler().schedule(
83             interval, interval, getSelf(), new RegisterListener(),
84             getContext().system().dispatcher(), getSelf());
85
86     }
87
88     private void populateRegistry(String memberName) {
89
90         for (String shard: shardsToMonitor) {
91             String notifier =(new StringBuilder()).append(NOTIFIER_AKKA_URL).append(memberName)
92                 .append("/").append(memberName).append("-notifier").toString();
93
94             if (!notifierRegistrationStatus.containsKey(notifier)) {
95                 notifierRegistrationStatus.put(notifier, false);
96             }
97         }
98
99         if (!registrationSchedule.isCancelled()) {
100             scheduleRegistrationListener(schedulerDuration);
101         }
102     }
103
104     private void sendRegistrationRequests() {
105         for (Map.Entry<String, Boolean> entry : notifierRegistrationStatus.entrySet()) {
106             if (!entry.getValue()) {
107                 try {
108                     LOG.debug("{} registering with {}", getSelf().path().toString(), entry.getKey());
109                     ActorRef notifier = Await.result(
110                         getContext().actorSelection(entry.getKey()).resolveOne(duration), duration);
111
112                     notifier.tell(new RegisterRoleChangeListener(), getSelf());
113
114                 } catch (Exception e) {
115                     LOG.error("ERROR!! Unable to send registration request to notifier {}", entry.getKey());
116                 }
117             }
118         }
119     }
120
121     private void handleRegisterRoleChangeListenerReply(String senderId) {
122         if (notifierRegistrationStatus.containsKey(senderId)) {
123             notifierRegistrationStatus.put(senderId, true);
124
125             //cancel the schedule when listener is registered with all notifiers
126             if (!registrationSchedule.isCancelled()) {
127                 boolean cancelScheduler = true;
128                 for (Boolean value : notifierRegistrationStatus.values()) {
129                     cancelScheduler = cancelScheduler & value;
130                 }
131                 if (cancelScheduler) {
132                     registrationSchedule.cancel();
133                 }
134             }
135         } else {
136             LOG.info("Unexpected, RegisterRoleChangeListenerReply received from notifier which is not known to Listener:{}",
137                 senderId);
138         }
139     }
140
141
142     @Override
143     public void close() throws Exception {
144         registrationSchedule.cancel();
145     }
146 }