2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.example;
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Props;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.HashMap;
16 import java.util.concurrent.TimeUnit;
17 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
18 import org.opendaylight.controller.cluster.example.messages.RegisterListener;
19 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
20 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
21 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
22 import scala.concurrent.Await;
23 import scala.concurrent.duration.FiniteDuration;
26 * This is a sample implementation of a Role Change Listener which is an actor, which registers itself
27 * to the ClusterRoleChangeNotifier.
30 * The Role Change listener receives a SetNotifiers message with the notifiers to register itself with.
33 * It kicks of a scheduler which sends registration messages to the notifiers, till it gets a
34 * RegisterRoleChangeListenerReply.
37 * If all the notifiers have been regsitered with, then it cancels the scheduler.
38 * It starts the scheduler again when it receives a new registration
40 public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable {
41 // the akka url should be set to the notifiers actor-system and domain.
42 private static final String NOTIFIER_AKKA_URL = "akka://raft-test@127.0.0.1:2550/user/";
44 private final Map<String, Boolean> notifierRegistrationStatus = new HashMap<>();
45 private Cancellable registrationSchedule = null;
46 private static final FiniteDuration DURATION = new FiniteDuration(100, TimeUnit.MILLISECONDS);
47 private static final FiniteDuration SCHEDULER_DURATION = new FiniteDuration(1, TimeUnit.SECONDS);
48 private static final String[] SHARDS_TO_MONITOR = new String[] {"example"};
50 public ExampleRoleChangeListener(final String memberName) {
51 scheduleRegistrationListener(SCHEDULER_DURATION);
52 populateRegistry(memberName);
55 public static Props getProps(final String memberName) {
56 return Props.create(ExampleRoleChangeListener.class, memberName);
60 protected void handleReceive(final Object message) {
61 if (message instanceof RegisterListener) {
62 // called by the scheduler at intervals to register any unregistered notifiers
63 sendRegistrationRequests();
65 } else if (message instanceof RegisterRoleChangeListenerReply) {
66 // called by the Notifier
67 handleRegisterRoleChangeListenerReply(getSender().path().toString());
69 } else if (message instanceof RoleChangeNotification) {
70 // called by the Notifier
71 RoleChangeNotification notification = (RoleChangeNotification) message;
73 LOG.info("Role Change Notification received for member:{}, old role:{}, new role:{}",
74 notification.getMemberId(), notification.getOldRole(), notification.getNewRole());
76 // the apps dependent on such notifications can be called here
77 //TODO: add implementation here
82 private void scheduleRegistrationListener(final FiniteDuration interval) {
83 LOG.debug("--->scheduleRegistrationListener called.");
84 registrationSchedule = getContext().system().scheduler().schedule(
85 interval, interval, getSelf(), new RegisterListener(),
86 getContext().system().dispatcher(), getSelf());
90 private void populateRegistry(final String memberName) {
91 String notifier = new StringBuilder().append(NOTIFIER_AKKA_URL).append(memberName)
92 .append("/").append(memberName).append("-notifier").toString();
94 if (!notifierRegistrationStatus.containsKey(notifier)) {
95 notifierRegistrationStatus.put(notifier, false);
98 if (!registrationSchedule.isCancelled()) {
99 scheduleRegistrationListener(SCHEDULER_DURATION);
103 @SuppressWarnings("checkstyle:IllegalCatch")
104 @SuppressFBWarnings("REC_CATCH_EXCEPTION")
105 private void sendRegistrationRequests() {
106 for (Map.Entry<String, Boolean> entry : notifierRegistrationStatus.entrySet()) {
107 if (!entry.getValue()) {
109 LOG.debug("{} registering with {}", getSelf().path().toString(), entry.getKey());
110 ActorRef notifier = Await.result(
111 getContext().actorSelection(entry.getKey()).resolveOne(DURATION), DURATION);
113 notifier.tell(new RegisterRoleChangeListener(), getSelf());
115 } catch (Exception e) {
116 LOG.error("ERROR!! Unable to send registration request to notifier {}", entry.getKey(), e);
122 private void handleRegisterRoleChangeListenerReply(final String senderId) {
123 if (notifierRegistrationStatus.containsKey(senderId)) {
124 notifierRegistrationStatus.put(senderId, true);
126 //cancel the schedule when listener is registered with all notifiers
127 if (!registrationSchedule.isCancelled()) {
128 boolean cancelScheduler = true;
129 for (Boolean value : notifierRegistrationStatus.values()) {
130 cancelScheduler = cancelScheduler & value;
132 if (cancelScheduler) {
133 registrationSchedule.cancel();
138 "Unexpected, RegisterRoleChangeListenerReply received from notifier which is not known to Listener:{}",
145 public void close() {
146 registrationSchedule.cancel();