Bug-2397:Provide a mechanism for stakeholders to get notifications on Raft state... 33/12933/7
authorKamal Rameshan <kramesha@cisco.com>
Tue, 18 Nov 2014 23:26:51 +0000 (15:26 -0800)
committerKamal Rameshan <kramesha@cisco.com>
Wed, 26 Nov 2014 20:49:28 +0000 (20:49 +0000)
A notifier actor is spawned from the RaftActor inherited implementation, in our case Shard or ExampleActor .

Its injected into the RaftActor.RaftActor notifies the notifier with the role changes. The Notification message is local to Shard or Example.

The idea is for any implementation to create a notifier, inject it and issue a notification from that notifier on a role change.

A sample example notifier, listener and notification is provided to show how a listener can register with the notifier and get notifications form the notifier.

Notifier and Notifications are assembled in commons, to be shared along with other apps who might need similar logic. It can be override by specific implementations.

Has been tested with the TestDriver, with separate actorsystems for listner and notifier

Tests have been added.

Change-Id: I23f16d4e76bb7dae640c544df282293274d9a1cb
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
16 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/RegisterListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/SetNotifiers.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListenerReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotification.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java

index 6dfa4afd6b6951a351790d0fb87679b40d19dc90..6c65021d86158b22483ada73704f68ba9f83d84c 100644 (file)
@@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
 import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -37,22 +38,23 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa
  */
 public class ExampleActor extends RaftActor {
 
-    private final Map<String, String> state = new HashMap<>();
+    private final Map<String, String> state = new HashMap();
     private final DataPersistenceProvider dataPersistenceProvider;
 
     private long persistIdentifier = 1;
+    private Optional<ActorRef> roleChangeNotifier;
 
 
-    public ExampleActor(final String id, final Map<String, String> peerAddresses,
-        final Optional<ConfigParams> configParams) {
+    public ExampleActor(String id, Map<String, String> peerAddresses,
+        Optional<ConfigParams> configParams) {
         super(id, peerAddresses, configParams);
         this.dataPersistenceProvider = new PersistentDataProvider();
+        roleChangeNotifier = createRoleChangeNotifier(id);
     }
 
     public static Props props(final String id, final Map<String, String> peerAddresses,
         final Optional<ConfigParams> configParams){
         return Props.create(new Creator<ExampleActor>(){
-            private static final long serialVersionUID = 1L;
 
             @Override public ExampleActor create() throws Exception {
                 return new ExampleActor(id, peerAddresses, configParams);
@@ -60,7 +62,7 @@ public class ExampleActor extends RaftActor {
         });
     }
 
-    @Override public void onReceiveCommand(final Object message) throws Exception{
+    @Override public void onReceiveCommand(Object message) throws Exception{
         if(message instanceof KeyValue){
             if(isLeader()) {
                 String persistId = Long.toString(persistIdentifier++);
@@ -82,9 +84,11 @@ public class ExampleActor extends RaftActor {
                 String followers = "";
                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
                     followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
-                    LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers);
+                    LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
+                        getRaftActorContext().getPeerAddresses().keySet(), followers);
                 } else {
-                    LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
+                    LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
+                        getRaftActorContext().getPeerAddresses().keySet());
                 }
 
 
@@ -95,6 +99,23 @@ public class ExampleActor extends RaftActor {
         }
     }
 
+    protected String getReplicatedLogState() {
+        return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
+            + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
+            + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
+    }
+
+    public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
+        ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
+            RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
+        return Optional.<ActorRef>of(exampleRoleChangeNotifier);
+    }
+
+    @Override
+    protected Optional<ActorRef> getRoleChangeNotifier() {
+        return roleChangeNotifier;
+    }
+
     @Override protected void applyState(final ActorRef clientActor, final String identifier,
         final Object data) {
         if(data instanceof KeyValue){
@@ -116,19 +137,19 @@ public class ExampleActor extends RaftActor {
         getSelf().tell(new CaptureSnapshotReply(bs), null);
     }
 
-    @Override protected void applySnapshot(final ByteString snapshot) {
+    @Override protected void applySnapshot(ByteString snapshot) {
         state.clear();
         try {
-            state.putAll((Map<String, String>) toObject(snapshot));
+            state.putAll((HashMap) toObject(snapshot));
         } catch (Exception e) {
            LOG.error(e, "Exception in applying snapshot");
         }
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Snapshot applied to state : {}", ((Map<?, ?>) state).size());
+            LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
         }
     }
 
-    private ByteString fromObject(final Object snapshot) throws Exception {
+    private ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
         try {
@@ -148,7 +169,7 @@ public class ExampleActor extends RaftActor {
         }
     }
 
-    private Object toObject(final ByteString bs) throws ClassNotFoundException, IOException {
+    private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
         Object obj = null;
         ByteArrayInputStream bis = null;
         ObjectInputStream ois = null;
@@ -176,7 +197,7 @@ public class ExampleActor extends RaftActor {
         return dataPersistenceProvider;
     }
 
-    @Override public void onReceiveRecover(final Object message)throws Exception {
+    @Override public void onReceiveRecover(Object message)throws Exception {
         super.onReceiveRecover(message);
     }
 
@@ -185,11 +206,11 @@ public class ExampleActor extends RaftActor {
     }
 
     @Override
-    protected void startLogRecoveryBatch(final int maxBatchSize) {
+    protected void startLogRecoveryBatch(int maxBatchSize) {
     }
 
     @Override
-    protected void appendRecoveredLogEntry(final Payload data) {
+    protected void appendRecoveredLogEntry(Payload data) {
     }
 
     @Override
@@ -201,6 +222,6 @@ public class ExampleActor extends RaftActor {
     }
 
     @Override
-    protected void applyRecoverySnapshot(final ByteString snapshot) {
+    protected void applyRecoverySnapshot(ByteString snapshot) {
     }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java
new file mode 100644 (file)
index 0000000..c0ee095
--- /dev/null
@@ -0,0 +1,146 @@
+package org.opendaylight.controller.cluster.example;
+
+import akka.actor.Actor;
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Props;
+import akka.japi.Creator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.example.messages.RegisterListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import scala.concurrent.Await;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * This is a sample implementation of a Role Change Listener which is an actor, which registers itself to the ClusterRoleChangeNotifier
+ * <p/>
+ * The Role Change listener receives a SetNotifiers message with the notifiers to register itself with.
+ * <p/>
+ * It kicks of a scheduler which sents registration messages to the notifiers, till it gets a RegisterRoleChangeListenerReply
+ * <p/>
+ * If all the notifiers have been regsitered with, then it cancels the scheduler.
+ * It starts the scheduler again when it receives a new registration
+ *
+ */
+public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable{
+    // the akka url should be set to the notifiers actor-system and domain.
+    private static final String NOTIFIER_AKKA_URL = "akka.tcp://raft-test@127.0.0.1:2550/user/";
+
+    private Map<String, Boolean> notifierRegistrationStatus = new HashMap<>();
+    private Cancellable registrationSchedule = null;
+    private static final FiniteDuration duration = new FiniteDuration(100, TimeUnit.MILLISECONDS);
+    private static final FiniteDuration schedulerDuration = new FiniteDuration(1, TimeUnit.SECONDS);
+    private final String memberName;
+    private static final String[] shardsToMonitor = new String[] {"example"};
+
+    public ExampleRoleChangeListener(String memberName) {
+        super();
+        scheduleRegistrationListener(schedulerDuration);
+        this.memberName = memberName;
+        populateRegistry(memberName);
+    }
+
+    public static Props getProps(final String memberName) {
+        return Props.create(new Creator<Actor>() {
+            @Override
+            public Actor create() throws Exception {
+                return new ExampleRoleChangeListener(memberName);
+            }
+        });
+    }
+
+    @Override
+    protected void handleReceive(Object message) throws Exception {
+        if (message instanceof RegisterListener) {
+            // called by the scheduler at intervals to register any unregistered notifiers
+            sendRegistrationRequests();
+
+        } else if (message instanceof RegisterRoleChangeListenerReply) {
+            // called by the Notifier
+            handleRegisterRoleChangeListenerReply(getSender().path().toString());
+
+        } else if (message instanceof RoleChangeNotification) {
+            // called by the Notifier
+            RoleChangeNotification notification = (RoleChangeNotification) message;
+
+            LOG.info("Role Change Notification received for member:{}, old role:{}, new role:{}",
+                notification.getMemberId(), notification.getOldRole(), notification.getNewRole());
+
+            // the apps dependent on such notifications can be called here
+            //TODO: add implementation here
+
+        }
+    }
+
+    private void scheduleRegistrationListener(FiniteDuration interval) {
+        LOG.debug("--->scheduleRegistrationListener called.");
+        registrationSchedule = getContext().system().scheduler().schedule(
+            interval, interval, getSelf(), new RegisterListener(),
+            getContext().system().dispatcher(), getSelf());
+
+    }
+
+    private void populateRegistry(String memberName) {
+
+        for (String shard: shardsToMonitor) {
+            String notifier =(new StringBuilder()).append(NOTIFIER_AKKA_URL).append(memberName)
+                .append("/").append(memberName).append("-notifier").toString();
+
+            if (!notifierRegistrationStatus.containsKey(notifier)) {
+                notifierRegistrationStatus.put(notifier, false);
+            }
+        }
+
+        if (!registrationSchedule.isCancelled()) {
+            scheduleRegistrationListener(schedulerDuration);
+        }
+    }
+
+    private void sendRegistrationRequests() {
+        for (Map.Entry<String, Boolean> entry : notifierRegistrationStatus.entrySet()) {
+            if (!entry.getValue()) {
+                try {
+                    LOG.debug("{} registering with {}", getSelf().path().toString(), entry.getKey());
+                    ActorRef notifier = Await.result(
+                        getContext().actorSelection(entry.getKey()).resolveOne(duration), duration);
+
+                    notifier.tell(new RegisterRoleChangeListener(), getSelf());
+
+                } catch (Exception e) {
+                    LOG.error("ERROR!! Unable to send registration request to notifier {}", entry.getKey());
+                }
+            }
+        }
+    }
+
+    private void handleRegisterRoleChangeListenerReply(String senderId) {
+        if (notifierRegistrationStatus.containsKey(senderId)) {
+            notifierRegistrationStatus.put(senderId, true);
+
+            //cancel the schedule when listener is registered with all notifiers
+            if (!registrationSchedule.isCancelled()) {
+                boolean cancelScheduler = true;
+                for (Boolean value : notifierRegistrationStatus.values()) {
+                    cancelScheduler = cancelScheduler & value;
+                }
+                if (cancelScheduler) {
+                    registrationSchedule.cancel();
+                }
+            }
+        } else {
+            LOG.info("Unexpected, RegisterRoleChangeListenerReply received from notifier which is not known to Listener:{}",
+                senderId);
+        }
+    }
+
+
+    @Override
+    public void close() throws Exception {
+        registrationSchedule.cancel();
+    }
+}
index de6169791ed4cb4405e6f47e85e0ed3535155fd1..cd2e4a506ce5365bc45cc89f160695516846b4f2 100644 (file)
@@ -3,15 +3,17 @@ package org.opendaylight.controller.cluster.example;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import com.google.common.base.Optional;
-import org.opendaylight.controller.cluster.example.messages.PrintRole;
-import org.opendaylight.controller.cluster.example.messages.PrintState;
-import org.opendaylight.controller.cluster.raft.ConfigParams;
-
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.cluster.example.messages.PrintRole;
+import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.raft.ConfigParams;
 
 /**
  * This is a test driver for testing akka-raft implementation
@@ -21,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class TestDriver {
 
-    private static final ActorSystem actorSystem = ActorSystem.create();
+
     private static Map<String, String> allPeers = new HashMap<>();
     private static Map<String, ActorRef> clientActorRefs  = new HashMap<String, ActorRef>();
     private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
@@ -29,6 +31,9 @@ public class TestDriver {
     private int nameCounter = 0;
     private static ConfigParams configParams = new ExampleConfigParamsImpl();
 
+    private static ActorSystem actorSystem;
+    private static ActorSystem listenerActorSystem;
+
     /**
      * Create nodes, add clients and start logging.
      * Commands
@@ -53,6 +58,13 @@ public class TestDriver {
      * @throws Exception
      */
     public static void main(String[] args) throws Exception {
+
+        actorSystem = ActorSystem.create("raft-test", ConfigFactory
+            .load().getConfig("raft-test"));
+
+        listenerActorSystem = ActorSystem.create("raft-test-listener", ConfigFactory
+            .load().getConfig("raft-test-listener"));
+
         TestDriver td = new TestDriver();
 
         System.out.println("Enter command (type bye to exit):");
@@ -113,6 +125,16 @@ public class TestDriver {
         }
     }
 
+    // create the listener using a separate actor system for each example actor
+    private void createClusterRoleChangeListener(List<String> memberIds) {
+        System.out.println("memberIds="+memberIds);
+        for (String memberId : memberIds) {
+            ActorRef listenerActor = listenerActorSystem.actorOf(
+                ExampleRoleChangeListener.getProps(memberId), memberId + "-role-change-listener");
+            System.out.println("Role Change Listener created:" + listenerActor.path().toString());
+        }
+    }
+
     public static ActorRef createExampleActor(String name) {
         return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name),
             Optional.of(configParams)), name);
@@ -121,7 +143,7 @@ public class TestDriver {
     public void createNodes(int num) {
         for (int i=0; i < num; i++)  {
             nameCounter = nameCounter + 1;
-            allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
+            allPeers.put("example-"+nameCounter, "akka://raft-test/user/example-"+nameCounter);
         }
 
         for (String s : allPeers.keySet())  {
@@ -130,6 +152,8 @@ public class TestDriver {
             System.out.println("Created node:"+s);
 
         }
+
+        createClusterRoleChangeListener(Lists.newArrayList(allPeers.keySet()));
     }
 
     // add num clients to all nodes in the system
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/RegisterListener.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/RegisterListener.java
new file mode 100644 (file)
index 0000000..4507f43
--- /dev/null
@@ -0,0 +1,9 @@
+package org.opendaylight.controller.cluster.example.messages;
+
+/**
+ * Message sent by the Example Role Change Listener to itself for registering itself with the notifiers
+ *
+ * This message is sent by the scheduler
+ */
+public class RegisterListener {
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/SetNotifiers.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/SetNotifiers.java
new file mode 100644 (file)
index 0000000..8adc0da
--- /dev/null
@@ -0,0 +1,18 @@
+package org.opendaylight.controller.cluster.example.messages;
+
+import java.util.List;
+
+/**
+ * Created by kramesha on 11/18/14.
+ */
+public class SetNotifiers {
+    private List<String> notifierList;
+
+    public SetNotifiers(List<String> notifierList) {
+        this.notifierList = notifierList;
+    }
+
+    public List<String> getNotifierList() {
+        return notifierList;
+    }
+}
index 042b9fb56995d259d99a1559d4a3e3d5389fdba3..d647475e4d53319e1638c0e4c03fe4dfcd58a891 100644 (file)
@@ -22,8 +22,11 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
+import java.io.Serializable;
+import java.util.Map;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -41,9 +44,6 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
-import java.io.Serializable;
-import java.util.Map;
-
 /**
  * RaftActor encapsulates a state machine that needs to be kept synchronized
  * in a cluster. It implements the RAFT algorithm as described in the paper
@@ -169,8 +169,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
 
                 onRecoveryComplete();
+
+                RaftActorBehavior oldBehavior = currentBehavior;
                 currentBehavior = new Follower(context);
-                onStateChanged();
+                handleBehaviorChange(oldBehavior, currentBehavior);
             }
         }
     }
@@ -269,8 +271,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
             replicatedLog.snapshotTerm, replicatedLog.size());
 
+        RaftActorBehavior oldBehavior = currentBehavior;
         currentBehavior = new Follower(context);
-        onStateChanged();
+        handleBehaviorChange(oldBehavior, currentBehavior);
     }
 
     @Override public void handleCommand(Object message) {
@@ -366,26 +369,26 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             RaftActorBehavior oldBehavior = currentBehavior;
             currentBehavior = currentBehavior.handleMessage(getSender(), message);
 
-            if(oldBehavior != currentBehavior){
-                onStateChanged();
-            }
-
-            onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
+            handleBehaviorChange(oldBehavior, currentBehavior);
         }
     }
 
-    public java.util.Set<String> getPeers() {
-
-        return context.getPeerAddresses().keySet();
-    }
+    private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
+        if (oldBehavior != currentBehavior){
+            onStateChanged();
+        }
+        if (oldBehavior != null) {
+            // it can happen that the state has not changed but the leader has changed.
+            onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
 
-    protected String getReplicatedLogState() {
-        return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex()
-            + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm()
-            + ", im-mem journal size=" + context.getReplicatedLog().size();
+            if (getRoleChangeNotifier().isPresent() && oldBehavior.state() != currentBehavior.state()) {
+                // we do not want to notify when the behavior/role is set for the first time (i.e follower)
+                getRoleChangeNotifier().get().tell(new RoleChanged(getId(), oldBehavior.state().name(),
+                    currentBehavior.state().name()), getSelf());
+            }
+        }
     }
 
-
     /**
      * When a derived RaftActor needs to persist something it must call
      * persistData.
@@ -578,6 +581,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     protected abstract DataPersistenceProvider persistence();
 
+    /**
+     * Notifier Actor for this RaftActor to notify when a role change happens
+     * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
+     */
+    protected abstract Optional<ActorRef> getRoleChangeNotifier();
+
     protected void onLeaderChanged(String oldLeader, String newLeader){};
 
     private void trimPersistentData(long sequenceNumber) {
@@ -843,4 +852,5 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected RaftActorBehavior getCurrentBehavior() {
         return currentBehavior;
     }
+
 }
index 9e42a13c6a0b1d948ff546c28f2f41bf894ff9d2..b2132b88b0c4f062414cf57a42fc8efdc3eb88dc 100644 (file)
@@ -13,8 +13,67 @@ akka {
 
         serialization-bindings {
             "org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java
+            "org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener" = java
             "com.google.protobuf.Message" = proto
             "com.google.protobuf.GeneratedMessage" = proto
         }
     }
 }
+
+raft-test {
+    akka {
+
+        loglevel = "DEBUG"
+
+        actor {
+            # enable to test serialization only.
+            # serialize-messages = on
+
+            provider = "akka.remote.RemoteActorRefProvider"
+
+            serializers {
+              java  = "akka.serialization.JavaSerializer"
+              proto = "akka.remote.serialization.ProtobufSerializer"
+            }
+
+            serialization-bindings {
+                "org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java
+                "org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener" = java
+                "com.google.protobuf.Message" = proto
+                "com.google.protobuf.GeneratedMessage" = proto
+            }
+        }
+
+        remote {
+                        log-remote-lifecycle-events = off
+                        netty.tcp {
+                            hostname = "127.0.0.1"
+                            port = 2550
+                        }
+                    }
+    }
+}
+
+raft-test-listener {
+
+  akka {
+    loglevel = "DEBUG"
+
+    actor {
+        provider = "akka.remote.RemoteActorRefProvider"
+    }
+
+    remote {
+        log-remote-lifecycle-events = off
+        netty.tcp {
+            hostname = "127.0.0.1"
+            port = 2554
+        }
+    }
+
+    member-id = "member-1"
+  }
+}
+
+
+
index 9eb2fb757bd2da0c6b2ce48ebbfeb72471ef48c1..c833a86e9b825c926002ce5fc5b163438a6a4ce0 100644 (file)
@@ -21,11 +21,25 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -36,27 +50,13 @@ import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
@@ -83,6 +83,9 @@ public class RaftActorTest extends AbstractActorTest {
 
         private final DataPersistenceProvider dataPersistenceProvider;
         private final RaftActor delegate;
+        private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+        private final List<Object> state;
+        private ActorRef roleChangeNotifier;
 
         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
             private static final long serialVersionUID = 1L;
@@ -90,25 +93,27 @@ public class RaftActorTest extends AbstractActorTest {
             private final String id;
             private final Optional<ConfigParams> config;
             private final DataPersistenceProvider dataPersistenceProvider;
+            private final ActorRef roleChangeNotifier;
 
             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
-                    Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
+                Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
+                ActorRef roleChangeNotifier) {
                 this.peerAddresses = peerAddresses;
                 this.id = id;
                 this.config = config;
                 this.dataPersistenceProvider = dataPersistenceProvider;
+                this.roleChangeNotifier = roleChangeNotifier;
             }
 
             @Override
             public MockRaftActor create() throws Exception {
-                return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
+                MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
+                    dataPersistenceProvider);
+                mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
+                return mockRaftActor;
             }
         }
 
-        private final CountDownLatch recoveryComplete = new CountDownLatch(1);
-
-        private final List<Object> state;
-
         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
             super(id, peerAddresses, config);
             state = new ArrayList<>();
@@ -134,23 +139,24 @@ public class RaftActorTest extends AbstractActorTest {
 
         public static Props props(final String id, final Map<String, String> peerAddresses,
                 Optional<ConfigParams> config){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
         }
 
         public static Props props(final String id, final Map<String, String> peerAddresses,
                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
         }
 
+        public static Props props(final String id, final Map<String, String> peerAddresses,
+            Optional<ConfigParams> config, ActorRef roleChangeNotifier){
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
+        }
 
         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
             delegate.applyState(clientActor, identifier, data);
             LOG.info("applyState called");
         }
 
-
-
-
         @Override
         protected void startLogRecoveryBatch(int maxBatchSize) {
         }
@@ -201,6 +207,11 @@ public class RaftActorTest extends AbstractActorTest {
             return this.dataPersistenceProvider;
         }
 
+        @Override
+        protected Optional<ActorRef> getRoleChangeNotifier() {
+            return Optional.fromNullable(roleChangeNotifier);
+        }
+
         @Override public String persistenceId() {
             return this.getId();
         }
@@ -862,6 +873,40 @@ public class RaftActorTest extends AbstractActorTest {
         };
     }
 
+    @Test
+    public void testRaftRoleChangeNotifier() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            ActorRef notifierActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+            String id = "testRaftRoleChangeNotifier";
+
+            TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(id,
+                Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), id);
+
+            MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+            mockRaftActor.setCurrentBehavior(new Follower(mockRaftActor.getRaftActorContext()));
+
+            // sleeping for a minimum of 2 seconds, if it spans more its fine.
+            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+            List<Object> matches =  MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
+            assertNotNull(matches);
+            assertEquals(2, matches.size());
+
+            // check if the notifier got a role change from Follower to Candidate
+            RoleChanged raftRoleChanged = (RoleChanged) matches.get(0);
+            assertEquals(id, raftRoleChanged.getMemberId());
+            assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
+            assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
+
+            // check if the notifier got a role change from Candidate to Leader
+            raftRoleChanged = (RoleChanged) matches.get(1);
+            assertEquals(id, raftRoleChanged.getMemberId());
+            assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
+            assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
+        }};
+    }
+
     private ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListener.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListener.java
new file mode 100644 (file)
index 0000000..1faa341
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * Message sent from the listener of Role Change messages to register itself to the Role Change Notifier
+ *
+ * The Listener could be in a separate ActorSystem and hence this message needs to be Serializable
+ */
+public class RegisterRoleChangeListener implements Serializable {
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListenerReply.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListenerReply.java
new file mode 100644 (file)
index 0000000..f1d13e3
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * Reply message sent from a RoleChangeNotifier to the Role Change Listener
+ *
+ * Can be sent to a separate actor system and hence should be made serializable.
+ */
+public class RegisterRoleChangeListenerReply implements Serializable {
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotification.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotification.java
new file mode 100644 (file)
index 0000000..de2733f
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * Notification message representing a Role change of a cluster member
+ *
+ * Roles generally are Leader, Follower and Candidate. But can be based on the consensus strategy/implementation
+ *
+ * The Listener could be in a separate ActorSystem and hence this message needs to be Serializable
+ */
+public class RoleChangeNotification implements Serializable {
+    private String memberId;
+    private String oldRole;
+    private String newRole;
+
+    public RoleChangeNotification(String memberId, String oldRole, String newRole) {
+        this.memberId = memberId;
+        this.oldRole = oldRole;
+        this.newRole = newRole;
+    }
+
+    public String getMemberId() {
+        return memberId;
+    }
+
+    public String getOldRole() {
+        return oldRole;
+    }
+
+    public String getNewRole() {
+        return newRole;
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java
new file mode 100644 (file)
index 0000000..a9aa561
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import akka.actor.Actor;
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
+import akka.serialization.Serialization;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+
+/**
+ * The RoleChangeNotifier is responsible for receiving Raft role change messages and notifying
+ * the listeners (within the same node), which are registered with it.
+ * <p/>
+ * The RoleChangeNotifier is instantiated by the Shard and injected into the RaftActor.
+ */
+public class RoleChangeNotifier extends AbstractUntypedActor implements AutoCloseable {
+
+    private String memberId;
+    private Map<ActorPath, ActorRef> registeredListeners = Maps.newHashMap();
+    private RoleChangeNotification latestRoleChangeNotification = null;
+
+    public RoleChangeNotifier(String memberId) {
+        this.memberId = memberId;
+    }
+
+    public static Props getProps(final String memberId) {
+        return Props.create(new Creator<Actor>() {
+            @Override
+            public Actor create() throws Exception {
+                return new RoleChangeNotifier(memberId);
+            }
+        });
+    }
+
+    @Override
+    public void preStart() throws Exception {
+        super.preStart();
+        LOG.info("RoleChangeNotifier:{} created and ready for shard:{}",
+            Serialization.serializedActorPath(getSelf()), memberId);
+    }
+
+    @Override
+    protected void handleReceive(Object message) throws Exception {
+        if (message instanceof RegisterRoleChangeListener) {
+            // register listeners for this shard
+
+            ActorRef curRef = registeredListeners.get(getSender().path());
+            if (curRef != null) {
+                // ActorPaths would pass equal even if the unique id of the actors are different
+                // if a listener actor is re-registering after reincarnation, then removing the existing
+                // entry so the actor path with correct unique id is registered.
+                registeredListeners.remove(getSender().path());
+            }
+            registeredListeners.put(getSender().path(), getSender());
+
+            LOG.info("RoleChangeNotifier for {} , registered listener {}", memberId,
+                getSender().path().toString());
+
+            getSender().tell(new RegisterRoleChangeListenerReply(), getSelf());
+
+            if (latestRoleChangeNotification != null) {
+                getSender().tell(latestRoleChangeNotification, getSelf());
+            }
+
+
+        } else if (message instanceof RoleChanged) {
+            // this message is sent by RaftActor. Notify registered listeners when this message is received.
+            RoleChanged roleChanged = (RoleChanged) message;
+
+            LOG.info("RoleChangeNotifier for {} , received role change from {} to {}", memberId,
+                roleChanged.getOldRole(), roleChanged.getNewRole());
+
+            latestRoleChangeNotification =
+                new RoleChangeNotification(roleChanged.getMemberId(),
+                    roleChanged.getOldRole(), roleChanged.getNewRole());
+
+            for (ActorRef listener: registeredListeners.values()) {
+                listener.tell(latestRoleChangeNotification, getSelf());
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        registeredListeners.clear();
+    }
+}
+
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java
new file mode 100644 (file)
index 0000000..f315bfd
--- /dev/null
@@ -0,0 +1,31 @@
+package org.opendaylight.controller.cluster.notifications;
+
+/**
+ * Role Change message initiated internally from the  Raft Actor when a the behavior/role changes.
+ *
+ * Since its internal , need not be serialized
+ *
+ */
+public class RoleChanged {
+    private String memberId;
+    private String oldRole;
+    private String newRole;
+
+    public RoleChanged(String memberId, String oldRole, String newRole) {
+        this.memberId = memberId;
+        this.oldRole = oldRole;
+        this.newRole = newRole;
+    }
+
+    public String getMemberId() {
+        return memberId;
+    }
+
+    public String getOldRole() {
+        return oldRole;
+    }
+
+    public String getNewRole() {
+        return newRole;
+    }
+}
index d53cb48e501c75c25e3532ce2c71e481717612ad..7073ea758b2b751d93708914bfc3935779076908 100644 (file)
@@ -65,6 +65,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -131,6 +132,8 @@ public class Shard extends RaftActor {
 
     private Cancellable txCommitTimeoutCheckSchedule;
 
+    private Optional<ActorRef> roleChangeNotifier;
+
     /**
      * Coordinates persistence recovery on startup.
      */
@@ -171,6 +174,9 @@ public class Shard extends RaftActor {
 
         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+
+        // create a notifier actor for each cluster member
+        roleChangeNotifier = createRoleChangeNotifier(name.toString());
     }
 
     private static Map<String, String> mapPeerAddresses(
@@ -196,6 +202,12 @@ public class Shard extends RaftActor {
         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
     }
 
+    private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
+        ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
+            RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
+        return Optional.<ActorRef>of(shardRoleChangeNotifier);
+    }
+
     @Override
     public void postStop() {
         super.postStop();
@@ -259,6 +271,11 @@ public class Shard extends RaftActor {
         }
     }
 
+    @Override
+    protected Optional<ActorRef> getRoleChangeNotifier() {
+        return roleChangeNotifier;
+    }
+
     private void handleTransactionCommitTimeoutCheck() {
         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
         if(cohortEntry != null) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java
new file mode 100644 (file)
index 0000000..4e61260
--- /dev/null
@@ -0,0 +1,79 @@
+package org.opendaylight.controller.cluster.datastore;
+
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class RoleChangeNotifierTest extends AbstractActorTest  {
+
+    @Test
+    public void testHandleRegisterRoleChangeListener() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String memberId = "testHandleRegisterRoleChangeListener";
+            ActorRef listenerActor =  getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
+                getSystem(), RoleChangeNotifier.getProps(memberId), memberId);
+
+            notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor);
+
+            RegisterRoleChangeListenerReply reply = (RegisterRoleChangeListenerReply)
+                MessageCollectorActor.getFirstMatching(listenerActor, RegisterRoleChangeListenerReply.class);
+            assertNotNull(reply);
+
+            RoleChangeNotification notification = (RoleChangeNotification)
+                MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class);
+            assertNull(notification);
+        }};
+
+    }
+
+    @Test
+    public void testHandleRaftRoleChanged() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String memberId = "testHandleRegisterRoleChangeListenerWithNotificationSet";
+            ActorRef listenerActor =  getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            ActorRef shardActor =  getTestActor();
+
+            TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
+                getSystem(), RoleChangeNotifier.getProps(memberId), memberId);
+
+            RoleChangeNotifier roleChangeNotifier = notifierTestActorRef.underlyingActor();
+
+            notifierTestActorRef.tell(new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor);
+
+            // no notification should be sent as listener has not yet registered
+            assertNull(MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class));
+
+            // listener registers after role has been changed, ensure we sent the latest role change after a reply
+            notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor);
+
+            RegisterRoleChangeListenerReply reply = (RegisterRoleChangeListenerReply)
+                MessageCollectorActor.getFirstMatching(listenerActor, RegisterRoleChangeListenerReply.class);
+            assertNotNull(reply);
+
+            RoleChangeNotification notification = (RoleChangeNotification)
+                MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class);
+            assertNotNull(notification);
+            assertEquals(RaftState.Candidate.name(), notification.getOldRole());
+            assertEquals(RaftState.Leader.name(), notification.getNewRole());
+
+        }};
+
+    }
+}
+
+
index f75aa5445bc34de451c900311c3c91b3a4866f4e..4bd0ad818fff437ba8fbddac035ba9b4075b60b0 100644 (file)
@@ -8,10 +8,19 @@
 
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import akka.actor.ActorRef;
 import akka.actor.UntypedActor;
 
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * MessageCollectorActor collects messages as it receives them. It can send
@@ -27,10 +36,55 @@ public class MessageCollectorActor extends UntypedActor {
     @Override public void onReceive(Object message) throws Exception {
         if(message instanceof String){
             if("messages".equals(message)){
-                getSender().tell(messages, getSelf());
+                getSender().tell(new ArrayList(messages), getSelf());
             }
         } else {
             messages.add(message);
         }
     }
+
+    public static List<Object> getAllMessages(ActorRef actor) throws Exception {
+        FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
+        Timeout operationTimeout = new Timeout(operationDuration);
+        Future<Object> future = Patterns.ask(actor, "messages", operationTimeout);
+
+        try {
+            return (List<Object>) Await.result(future, operationDuration);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    /**
+     * Get the first message that matches the specified class
+     * @param actor
+     * @param clazz
+     * @return
+     */
+    public static Object getFirstMatching(ActorRef actor, Class<?> clazz) throws Exception {
+        List<Object> allMessages = getAllMessages(actor);
+
+        for(Object message : allMessages){
+            if(message.getClass().equals(clazz)){
+                return message;
+            }
+        }
+
+        return null;
+    }
+
+    public static List<Object> getAllMatching(ActorRef actor, Class<?> clazz) throws Exception {
+        List<Object> allMessages = getAllMessages(actor);
+
+        List<Object> output = Lists.newArrayList();
+
+        for(Object message : allMessages){
+            if(message.getClass().equals(clazz)){
+                output.add(message);
+            }
+        }
+
+        return output;
+    }
+
 }