Fix license header violations in sal-akka-raft
[controller.git] / opendaylight / md-sal / sal-akka-raft-example / src / main / java / org / opendaylight / controller / cluster / example / ExampleRoleChangeListener.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Cancellable;
13 import akka.actor.Props;
14 import java.util.HashMap;
15 import java.util.Map;
16 import java.util.concurrent.TimeUnit;
17 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
18 import org.opendaylight.controller.cluster.example.messages.RegisterListener;
19 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
20 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
21 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
22 import scala.concurrent.Await;
23 import scala.concurrent.duration.FiniteDuration;
24
25 /**
26  * This is a sample implementation of a Role Change Listener which is an actor, which registers itself to the ClusterRoleChangeNotifier
27  * <p/>
28  * The Role Change listener receives a SetNotifiers message with the notifiers to register itself with.
29  * <p/>
30  * It kicks of a scheduler which sents registration messages to the notifiers, till it gets a RegisterRoleChangeListenerReply
31  * <p/>
32  * If all the notifiers have been regsitered with, then it cancels the scheduler.
33  * It starts the scheduler again when it receives a new registration
34  *
35  */
36 public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable{
37     // the akka url should be set to the notifiers actor-system and domain.
38     private static final String NOTIFIER_AKKA_URL = "akka.tcp://raft-test@127.0.0.1:2550/user/";
39
40     private Map<String, Boolean> notifierRegistrationStatus = new HashMap<>();
41     private Cancellable registrationSchedule = null;
42     private static final FiniteDuration duration = new FiniteDuration(100, TimeUnit.MILLISECONDS);
43     private static final FiniteDuration schedulerDuration = new FiniteDuration(1, TimeUnit.SECONDS);
44     private final String memberName;
45     private static final String[] shardsToMonitor = new String[] {"example"};
46
47     public ExampleRoleChangeListener(String memberName) {
48         super();
49         scheduleRegistrationListener(schedulerDuration);
50         this.memberName = memberName;
51         populateRegistry(memberName);
52     }
53
54     public static Props getProps(final String memberName) {
55         return Props.create(ExampleRoleChangeListener.class, memberName);
56     }
57
58     @Override
59     protected void handleReceive(Object message) throws Exception {
60         if (message instanceof RegisterListener) {
61             // called by the scheduler at intervals to register any unregistered notifiers
62             sendRegistrationRequests();
63
64         } else if (message instanceof RegisterRoleChangeListenerReply) {
65             // called by the Notifier
66             handleRegisterRoleChangeListenerReply(getSender().path().toString());
67
68         } else if (message instanceof RoleChangeNotification) {
69             // called by the Notifier
70             RoleChangeNotification notification = (RoleChangeNotification) message;
71
72             LOG.info("Role Change Notification received for member:{}, old role:{}, new role:{}",
73                 notification.getMemberId(), notification.getOldRole(), notification.getNewRole());
74
75             // the apps dependent on such notifications can be called here
76             //TODO: add implementation here
77
78         }
79     }
80
81     private void scheduleRegistrationListener(FiniteDuration interval) {
82         LOG.debug("--->scheduleRegistrationListener called.");
83         registrationSchedule = getContext().system().scheduler().schedule(
84             interval, interval, getSelf(), new RegisterListener(),
85             getContext().system().dispatcher(), getSelf());
86
87     }
88
89     private void populateRegistry(String memberName) {
90
91         for (String shard: shardsToMonitor) {
92             String notifier =(new StringBuilder()).append(NOTIFIER_AKKA_URL).append(memberName)
93                 .append("/").append(memberName).append("-notifier").toString();
94
95             if (!notifierRegistrationStatus.containsKey(notifier)) {
96                 notifierRegistrationStatus.put(notifier, false);
97             }
98         }
99
100         if (!registrationSchedule.isCancelled()) {
101             scheduleRegistrationListener(schedulerDuration);
102         }
103     }
104
105     private void sendRegistrationRequests() {
106         for (Map.Entry<String, Boolean> entry : notifierRegistrationStatus.entrySet()) {
107             if (!entry.getValue()) {
108                 try {
109                     LOG.debug("{} registering with {}", getSelf().path().toString(), entry.getKey());
110                     ActorRef notifier = Await.result(
111                         getContext().actorSelection(entry.getKey()).resolveOne(duration), duration);
112
113                     notifier.tell(new RegisterRoleChangeListener(), getSelf());
114
115                 } catch (Exception e) {
116                     LOG.error("ERROR!! Unable to send registration request to notifier {}", entry.getKey());
117                 }
118             }
119         }
120     }
121
122     private void handleRegisterRoleChangeListenerReply(String senderId) {
123         if (notifierRegistrationStatus.containsKey(senderId)) {
124             notifierRegistrationStatus.put(senderId, true);
125
126             //cancel the schedule when listener is registered with all notifiers
127             if (!registrationSchedule.isCancelled()) {
128                 boolean cancelScheduler = true;
129                 for (Boolean value : notifierRegistrationStatus.values()) {
130                     cancelScheduler = cancelScheduler & value;
131                 }
132                 if (cancelScheduler) {
133                     registrationSchedule.cancel();
134                 }
135             }
136         } else {
137             LOG.info("Unexpected, RegisterRoleChangeListenerReply received from notifier which is not known to Listener:{}",
138                 senderId);
139         }
140     }
141
142
143     @Override
144     public void close() throws Exception {
145         registrationSchedule.cancel();
146     }
147 }