Merge "bug 2266 : added more types of schema nodes to increase code coverage"
authorMoiz Raja <moraja@cisco.com>
Mon, 1 Dec 2014 21:59:11 +0000 (21:59 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 1 Dec 2014 21:59:11 +0000 (21:59 +0000)
26 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/RegisterListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/SetNotifiers.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListenerReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotification.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cluster/raft/InstallSnapshotMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/InstallSnapshot.proto
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java

index 6dfa4af..6c65021 100644 (file)
@@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
 import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -37,22 +38,23 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa
  */
 public class ExampleActor extends RaftActor {
 
-    private final Map<String, String> state = new HashMap<>();
+    private final Map<String, String> state = new HashMap();
     private final DataPersistenceProvider dataPersistenceProvider;
 
     private long persistIdentifier = 1;
+    private Optional<ActorRef> roleChangeNotifier;
 
 
-    public ExampleActor(final String id, final Map<String, String> peerAddresses,
-        final Optional<ConfigParams> configParams) {
+    public ExampleActor(String id, Map<String, String> peerAddresses,
+        Optional<ConfigParams> configParams) {
         super(id, peerAddresses, configParams);
         this.dataPersistenceProvider = new PersistentDataProvider();
+        roleChangeNotifier = createRoleChangeNotifier(id);
     }
 
     public static Props props(final String id, final Map<String, String> peerAddresses,
         final Optional<ConfigParams> configParams){
         return Props.create(new Creator<ExampleActor>(){
-            private static final long serialVersionUID = 1L;
 
             @Override public ExampleActor create() throws Exception {
                 return new ExampleActor(id, peerAddresses, configParams);
@@ -60,7 +62,7 @@ public class ExampleActor extends RaftActor {
         });
     }
 
-    @Override public void onReceiveCommand(final Object message) throws Exception{
+    @Override public void onReceiveCommand(Object message) throws Exception{
         if(message instanceof KeyValue){
             if(isLeader()) {
                 String persistId = Long.toString(persistIdentifier++);
@@ -82,9 +84,11 @@ public class ExampleActor extends RaftActor {
                 String followers = "";
                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
                     followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
-                    LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers);
+                    LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
+                        getRaftActorContext().getPeerAddresses().keySet(), followers);
                 } else {
-                    LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
+                    LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
+                        getRaftActorContext().getPeerAddresses().keySet());
                 }
 
 
@@ -95,6 +99,23 @@ public class ExampleActor extends RaftActor {
         }
     }
 
+    protected String getReplicatedLogState() {
+        return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
+            + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
+            + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
+    }
+
+    public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
+        ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
+            RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
+        return Optional.<ActorRef>of(exampleRoleChangeNotifier);
+    }
+
+    @Override
+    protected Optional<ActorRef> getRoleChangeNotifier() {
+        return roleChangeNotifier;
+    }
+
     @Override protected void applyState(final ActorRef clientActor, final String identifier,
         final Object data) {
         if(data instanceof KeyValue){
@@ -116,19 +137,19 @@ public class ExampleActor extends RaftActor {
         getSelf().tell(new CaptureSnapshotReply(bs), null);
     }
 
-    @Override protected void applySnapshot(final ByteString snapshot) {
+    @Override protected void applySnapshot(ByteString snapshot) {
         state.clear();
         try {
-            state.putAll((Map<String, String>) toObject(snapshot));
+            state.putAll((HashMap) toObject(snapshot));
         } catch (Exception e) {
            LOG.error(e, "Exception in applying snapshot");
         }
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Snapshot applied to state : {}", ((Map<?, ?>) state).size());
+            LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
         }
     }
 
-    private ByteString fromObject(final Object snapshot) throws Exception {
+    private ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
         try {
@@ -148,7 +169,7 @@ public class ExampleActor extends RaftActor {
         }
     }
 
-    private Object toObject(final ByteString bs) throws ClassNotFoundException, IOException {
+    private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
         Object obj = null;
         ByteArrayInputStream bis = null;
         ObjectInputStream ois = null;
@@ -176,7 +197,7 @@ public class ExampleActor extends RaftActor {
         return dataPersistenceProvider;
     }
 
-    @Override public void onReceiveRecover(final Object message)throws Exception {
+    @Override public void onReceiveRecover(Object message)throws Exception {
         super.onReceiveRecover(message);
     }
 
@@ -185,11 +206,11 @@ public class ExampleActor extends RaftActor {
     }
 
     @Override
-    protected void startLogRecoveryBatch(final int maxBatchSize) {
+    protected void startLogRecoveryBatch(int maxBatchSize) {
     }
 
     @Override
-    protected void appendRecoveredLogEntry(final Payload data) {
+    protected void appendRecoveredLogEntry(Payload data) {
     }
 
     @Override
@@ -201,6 +222,6 @@ public class ExampleActor extends RaftActor {
     }
 
     @Override
-    protected void applyRecoverySnapshot(final ByteString snapshot) {
+    protected void applyRecoverySnapshot(ByteString snapshot) {
     }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleRoleChangeListener.java
new file mode 100644 (file)
index 0000000..c0ee095
--- /dev/null
@@ -0,0 +1,146 @@
+package org.opendaylight.controller.cluster.example;
+
+import akka.actor.Actor;
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Props;
+import akka.japi.Creator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.example.messages.RegisterListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import scala.concurrent.Await;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * This is a sample implementation of a Role Change Listener which is an actor, which registers itself to the ClusterRoleChangeNotifier
+ * <p/>
+ * The Role Change listener receives a SetNotifiers message with the notifiers to register itself with.
+ * <p/>
+ * It kicks of a scheduler which sents registration messages to the notifiers, till it gets a RegisterRoleChangeListenerReply
+ * <p/>
+ * If all the notifiers have been regsitered with, then it cancels the scheduler.
+ * It starts the scheduler again when it receives a new registration
+ *
+ */
+public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable{
+    // the akka url should be set to the notifiers actor-system and domain.
+    private static final String NOTIFIER_AKKA_URL = "akka.tcp://raft-test@127.0.0.1:2550/user/";
+
+    private Map<String, Boolean> notifierRegistrationStatus = new HashMap<>();
+    private Cancellable registrationSchedule = null;
+    private static final FiniteDuration duration = new FiniteDuration(100, TimeUnit.MILLISECONDS);
+    private static final FiniteDuration schedulerDuration = new FiniteDuration(1, TimeUnit.SECONDS);
+    private final String memberName;
+    private static final String[] shardsToMonitor = new String[] {"example"};
+
+    public ExampleRoleChangeListener(String memberName) {
+        super();
+        scheduleRegistrationListener(schedulerDuration);
+        this.memberName = memberName;
+        populateRegistry(memberName);
+    }
+
+    public static Props getProps(final String memberName) {
+        return Props.create(new Creator<Actor>() {
+            @Override
+            public Actor create() throws Exception {
+                return new ExampleRoleChangeListener(memberName);
+            }
+        });
+    }
+
+    @Override
+    protected void handleReceive(Object message) throws Exception {
+        if (message instanceof RegisterListener) {
+            // called by the scheduler at intervals to register any unregistered notifiers
+            sendRegistrationRequests();
+
+        } else if (message instanceof RegisterRoleChangeListenerReply) {
+            // called by the Notifier
+            handleRegisterRoleChangeListenerReply(getSender().path().toString());
+
+        } else if (message instanceof RoleChangeNotification) {
+            // called by the Notifier
+            RoleChangeNotification notification = (RoleChangeNotification) message;
+
+            LOG.info("Role Change Notification received for member:{}, old role:{}, new role:{}",
+                notification.getMemberId(), notification.getOldRole(), notification.getNewRole());
+
+            // the apps dependent on such notifications can be called here
+            //TODO: add implementation here
+
+        }
+    }
+
+    private void scheduleRegistrationListener(FiniteDuration interval) {
+        LOG.debug("--->scheduleRegistrationListener called.");
+        registrationSchedule = getContext().system().scheduler().schedule(
+            interval, interval, getSelf(), new RegisterListener(),
+            getContext().system().dispatcher(), getSelf());
+
+    }
+
+    private void populateRegistry(String memberName) {
+
+        for (String shard: shardsToMonitor) {
+            String notifier =(new StringBuilder()).append(NOTIFIER_AKKA_URL).append(memberName)
+                .append("/").append(memberName).append("-notifier").toString();
+
+            if (!notifierRegistrationStatus.containsKey(notifier)) {
+                notifierRegistrationStatus.put(notifier, false);
+            }
+        }
+
+        if (!registrationSchedule.isCancelled()) {
+            scheduleRegistrationListener(schedulerDuration);
+        }
+    }
+
+    private void sendRegistrationRequests() {
+        for (Map.Entry<String, Boolean> entry : notifierRegistrationStatus.entrySet()) {
+            if (!entry.getValue()) {
+                try {
+                    LOG.debug("{} registering with {}", getSelf().path().toString(), entry.getKey());
+                    ActorRef notifier = Await.result(
+                        getContext().actorSelection(entry.getKey()).resolveOne(duration), duration);
+
+                    notifier.tell(new RegisterRoleChangeListener(), getSelf());
+
+                } catch (Exception e) {
+                    LOG.error("ERROR!! Unable to send registration request to notifier {}", entry.getKey());
+                }
+            }
+        }
+    }
+
+    private void handleRegisterRoleChangeListenerReply(String senderId) {
+        if (notifierRegistrationStatus.containsKey(senderId)) {
+            notifierRegistrationStatus.put(senderId, true);
+
+            //cancel the schedule when listener is registered with all notifiers
+            if (!registrationSchedule.isCancelled()) {
+                boolean cancelScheduler = true;
+                for (Boolean value : notifierRegistrationStatus.values()) {
+                    cancelScheduler = cancelScheduler & value;
+                }
+                if (cancelScheduler) {
+                    registrationSchedule.cancel();
+                }
+            }
+        } else {
+            LOG.info("Unexpected, RegisterRoleChangeListenerReply received from notifier which is not known to Listener:{}",
+                senderId);
+        }
+    }
+
+
+    @Override
+    public void close() throws Exception {
+        registrationSchedule.cancel();
+    }
+}
index de61697..cd2e4a5 100644 (file)
@@ -3,15 +3,17 @@ package org.opendaylight.controller.cluster.example;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import com.google.common.base.Optional;
-import org.opendaylight.controller.cluster.example.messages.PrintRole;
-import org.opendaylight.controller.cluster.example.messages.PrintState;
-import org.opendaylight.controller.cluster.raft.ConfigParams;
-
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.cluster.example.messages.PrintRole;
+import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.raft.ConfigParams;
 
 /**
  * This is a test driver for testing akka-raft implementation
@@ -21,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class TestDriver {
 
-    private static final ActorSystem actorSystem = ActorSystem.create();
+
     private static Map<String, String> allPeers = new HashMap<>();
     private static Map<String, ActorRef> clientActorRefs  = new HashMap<String, ActorRef>();
     private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
@@ -29,6 +31,9 @@ public class TestDriver {
     private int nameCounter = 0;
     private static ConfigParams configParams = new ExampleConfigParamsImpl();
 
+    private static ActorSystem actorSystem;
+    private static ActorSystem listenerActorSystem;
+
     /**
      * Create nodes, add clients and start logging.
      * Commands
@@ -53,6 +58,13 @@ public class TestDriver {
      * @throws Exception
      */
     public static void main(String[] args) throws Exception {
+
+        actorSystem = ActorSystem.create("raft-test", ConfigFactory
+            .load().getConfig("raft-test"));
+
+        listenerActorSystem = ActorSystem.create("raft-test-listener", ConfigFactory
+            .load().getConfig("raft-test-listener"));
+
         TestDriver td = new TestDriver();
 
         System.out.println("Enter command (type bye to exit):");
@@ -113,6 +125,16 @@ public class TestDriver {
         }
     }
 
+    // create the listener using a separate actor system for each example actor
+    private void createClusterRoleChangeListener(List<String> memberIds) {
+        System.out.println("memberIds="+memberIds);
+        for (String memberId : memberIds) {
+            ActorRef listenerActor = listenerActorSystem.actorOf(
+                ExampleRoleChangeListener.getProps(memberId), memberId + "-role-change-listener");
+            System.out.println("Role Change Listener created:" + listenerActor.path().toString());
+        }
+    }
+
     public static ActorRef createExampleActor(String name) {
         return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name),
             Optional.of(configParams)), name);
@@ -121,7 +143,7 @@ public class TestDriver {
     public void createNodes(int num) {
         for (int i=0; i < num; i++)  {
             nameCounter = nameCounter + 1;
-            allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
+            allPeers.put("example-"+nameCounter, "akka://raft-test/user/example-"+nameCounter);
         }
 
         for (String s : allPeers.keySet())  {
@@ -130,6 +152,8 @@ public class TestDriver {
             System.out.println("Created node:"+s);
 
         }
+
+        createClusterRoleChangeListener(Lists.newArrayList(allPeers.keySet()));
     }
 
     // add num clients to all nodes in the system
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/RegisterListener.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/RegisterListener.java
new file mode 100644 (file)
index 0000000..4507f43
--- /dev/null
@@ -0,0 +1,9 @@
+package org.opendaylight.controller.cluster.example.messages;
+
+/**
+ * Message sent by the Example Role Change Listener to itself for registering itself with the notifiers
+ *
+ * This message is sent by the scheduler
+ */
+public class RegisterListener {
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/SetNotifiers.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/SetNotifiers.java
new file mode 100644 (file)
index 0000000..8adc0da
--- /dev/null
@@ -0,0 +1,18 @@
+package org.opendaylight.controller.cluster.example.messages;
+
+import java.util.List;
+
+/**
+ * Created by kramesha on 11/18/14.
+ */
+public class SetNotifiers {
+    private List<String> notifierList;
+
+    public SetNotifiers(List<String> notifierList) {
+        this.notifierList = notifierList;
+    }
+
+    public List<String> getNotifierList() {
+        return notifierList;
+    }
+}
index 042b9fb..d647475 100644 (file)
@@ -22,8 +22,11 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
+import java.io.Serializable;
+import java.util.Map;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -41,9 +44,6 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
-import java.io.Serializable;
-import java.util.Map;
-
 /**
  * RaftActor encapsulates a state machine that needs to be kept synchronized
  * in a cluster. It implements the RAFT algorithm as described in the paper
@@ -169,8 +169,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
 
                 onRecoveryComplete();
+
+                RaftActorBehavior oldBehavior = currentBehavior;
                 currentBehavior = new Follower(context);
-                onStateChanged();
+                handleBehaviorChange(oldBehavior, currentBehavior);
             }
         }
     }
@@ -269,8 +271,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
             replicatedLog.snapshotTerm, replicatedLog.size());
 
+        RaftActorBehavior oldBehavior = currentBehavior;
         currentBehavior = new Follower(context);
-        onStateChanged();
+        handleBehaviorChange(oldBehavior, currentBehavior);
     }
 
     @Override public void handleCommand(Object message) {
@@ -366,26 +369,26 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             RaftActorBehavior oldBehavior = currentBehavior;
             currentBehavior = currentBehavior.handleMessage(getSender(), message);
 
-            if(oldBehavior != currentBehavior){
-                onStateChanged();
-            }
-
-            onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
+            handleBehaviorChange(oldBehavior, currentBehavior);
         }
     }
 
-    public java.util.Set<String> getPeers() {
-
-        return context.getPeerAddresses().keySet();
-    }
+    private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
+        if (oldBehavior != currentBehavior){
+            onStateChanged();
+        }
+        if (oldBehavior != null) {
+            // it can happen that the state has not changed but the leader has changed.
+            onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
 
-    protected String getReplicatedLogState() {
-        return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex()
-            + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm()
-            + ", im-mem journal size=" + context.getReplicatedLog().size();
+            if (getRoleChangeNotifier().isPresent() && oldBehavior.state() != currentBehavior.state()) {
+                // we do not want to notify when the behavior/role is set for the first time (i.e follower)
+                getRoleChangeNotifier().get().tell(new RoleChanged(getId(), oldBehavior.state().name(),
+                    currentBehavior.state().name()), getSelf());
+            }
+        }
     }
 
-
     /**
      * When a derived RaftActor needs to persist something it must call
      * persistData.
@@ -578,6 +581,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     protected abstract DataPersistenceProvider persistence();
 
+    /**
+     * Notifier Actor for this RaftActor to notify when a role change happens
+     * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
+     */
+    protected abstract Optional<ActorRef> getRoleChangeNotifier();
+
     protected void onLeaderChanged(String oldLeader, String newLeader){};
 
     private void trimPersistentData(long sequenceNumber) {
@@ -843,4 +852,5 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected RaftActorBehavior getCurrentBehavior() {
         return currentBehavior;
     }
+
 }
index d85ac8e..e5c5dc7 100644 (file)
@@ -67,6 +67,16 @@ import scala.concurrent.duration.FiniteDuration;
  * set commitIndex = N (§5.3, §5.4).
  */
 public abstract class AbstractLeader extends AbstractRaftActorBehavior {
+
+    // The index of the first chunk that is sent when installing a snapshot
+    public static final int FIRST_CHUNK_INDEX = 1;
+
+    // The index that the follower should respond with if it needs the install snapshot to be reset
+    public static final int INVALID_CHUNK_INDEX = -1;
+
+    // This would be passed as the hash code of the last chunk when sending the first chunk
+    public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
+
     protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
     protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
@@ -332,6 +342,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         "sending snapshot chunk failed, Will retry, Chunk:{}",
                     reply.getChunkIndex()
                 );
+
                 followerToSnapshot.markSendStatus(false);
             }
 
@@ -341,6 +352,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     " or Chunk Index in InstallSnapshotReply not matching {} != {}",
                 followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
             );
+
+            if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
+                // Since the Follower did not find this index to be valid we should reset the follower snapshot
+                // so that Installing the snapshot can resume from the beginning
+                followerToSnapshot.reset();
+            }
         }
     }
 
@@ -539,7 +556,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         context.getReplicatedLog().getSnapshotTerm(),
                         getNextSnapshotChunk(followerId,snapshot.get()),
                         mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
-                        mapFollowerToSnapshot.get(followerId).getTotalChunks()
+                        mapFollowerToSnapshot.get(followerId).getTotalChunks(),
+                        Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
                     ).toSerializable(),
                     actor()
                 );
@@ -636,11 +654,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         private boolean replyStatus = false;
         private int chunkIndex;
         private int totalChunks;
+        private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+        private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
 
         public FollowerToSnapshot(ByteString snapshotBytes) {
             this.snapshotBytes = snapshotBytes;
-            replyReceivedForOffset = -1;
-            chunkIndex = 1;
             int size = snapshotBytes.size();
             totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
@@ -648,6 +666,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 LOG.debug("Snapshot {} bytes, total chunks to send:{}",
                     size, totalChunks);
             }
+            replyReceivedForOffset = -1;
+            chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
         }
 
         public ByteString getSnapshotBytes() {
@@ -692,6 +712,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // if the chunk sent was successful
                 replyReceivedForOffset = offset;
                 replyStatus = true;
+                lastChunkHashCode = nextChunkHashCode;
             } else {
                 // if the chunk sent was failure
                 replyReceivedForOffset = offset;
@@ -715,8 +736,24 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 LOG.debug("length={}, offset={},size={}",
                     snapshotLength, start, size);
             }
-            return getSnapshotBytes().substring(start, start + size);
+            ByteString substring = getSnapshotBytes().substring(start, start + size);
+            nextChunkHashCode = substring.hashCode();
+            return substring;
+        }
+
+        /**
+         * reset should be called when the Follower needs to be sent the snapshot from the beginning
+         */
+        public void reset(){
+            offset = 0;
+            replyStatus = false;
+            replyReceivedForOffset = offset;
+            chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
+            lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+        }
 
+        public int getLastChunkHashCode() {
+            return lastChunkHashCode;
         }
     }
 
index 7ada8b3..b1c73f6 100644 (file)
@@ -9,7 +9,9 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
+import java.util.ArrayList;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
@@ -23,8 +25,6 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
-import java.util.ArrayList;
-
 /**
  * The behavior of a RaftActor in the Follower state
  * <p/>
@@ -36,7 +36,8 @@ import java.util.ArrayList;
  * </ul>
  */
 public class Follower extends AbstractRaftActorBehavior {
-    private ByteString snapshotChunksCollected = ByteString.EMPTY;
+
+    private SnapshotTracker snapshotTracker = null;
 
     public Follower(RaftActorContext context) {
         super(context);
@@ -280,48 +281,54 @@ public class Follower extends AbstractRaftActorBehavior {
             );
         }
 
-        try {
-            if (installSnapshot.getChunkIndex() == installSnapshot.getTotalChunks()) {
-                // this is the last chunk, create a snapshot object and apply
-
-                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Last chunk received: snapshotChunksCollected.size:{}",
-                            snapshotChunksCollected.size());
-                }
+        if(snapshotTracker == null){
+            snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
+        }
 
-                Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
-                    new ArrayList<ReplicatedLogEntry>(),
-                    installSnapshot.getLastIncludedIndex(),
-                    installSnapshot.getLastIncludedTerm(),
-                    installSnapshot.getLastIncludedIndex(),
-                    installSnapshot.getLastIncludedTerm());
+        try {
+            if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
+                    installSnapshot.getLastChunkHashCode())){
+                Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
+                        new ArrayList<ReplicatedLogEntry>(),
+                        installSnapshot.getLastIncludedIndex(),
+                        installSnapshot.getLastIncludedTerm(),
+                        installSnapshot.getLastIncludedIndex(),
+                        installSnapshot.getLastIncludedTerm());
 
                 actor().tell(new ApplySnapshot(snapshot), actor());
 
-            } else {
-                // we have more to go
-                snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
+                snapshotTracker = null;
 
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Chunk={},snapshotChunksCollected.size:{}",
-                        installSnapshot.getChunkIndex(), snapshotChunksCollected.size());
-                }
             }
 
             sender.tell(new InstallSnapshotReply(
-                currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
-                true), actor());
+                    currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
+                    true), actor());
+
+        } catch (SnapshotTracker.InvalidChunkException e) {
+
+            sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
+                    -1, false), actor());
+            snapshotTracker = null;
+
+        } catch (Exception e){
 
-        } catch (Exception e) {
             LOG.error(e, "Exception in InstallSnapshot of follower:");
             //send reply with success as false. The chunk will be sent again on failure
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
-                installSnapshot.getChunkIndex(), false), actor());
+                    installSnapshot.getChunkIndex(), false), actor());
+
         }
     }
 
     @Override public void close() throws Exception {
         stopElection();
     }
+
+    @VisibleForTesting
+    ByteString getSnapshotChunksCollected(){
+        return snapshotTracker != null ? snapshotTracker.getCollectedChunks() : ByteString.EMPTY;
+    }
+
+
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java
new file mode 100644 (file)
index 0000000..26fbde0
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.event.LoggingAdapter;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
+
+/**
+ * SnapshotTracker does house keeping for a snapshot that is being installed in chunks on the Follower
+ */
+public class SnapshotTracker {
+    private final LoggingAdapter LOG;
+    private final int totalChunks;
+    private ByteString collectedChunks = ByteString.EMPTY;
+    private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1;
+    private boolean sealed = false;
+    private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+
+    SnapshotTracker(LoggingAdapter LOG, int totalChunks){
+        this.LOG = LOG;
+        this.totalChunks = totalChunks;
+    }
+
+    /**
+     * Adds a chunk to the tracker
+     *
+     * @param chunkIndex
+     * @param chunk
+     * @return true when the lastChunk is received
+     * @throws InvalidChunkException
+     */
+    boolean addChunk(int chunkIndex, ByteString chunk, Optional<Integer> lastChunkHashCode) throws InvalidChunkException{
+        if(sealed){
+            throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex + " all chunks already received");
+        }
+
+        if(lastChunkIndex + 1 != chunkIndex){
+            throw new InvalidChunkException("Expected chunkIndex " + (lastChunkIndex + 1) + " got " + chunkIndex);
+        }
+
+        if(lastChunkHashCode.isPresent()){
+            if(lastChunkHashCode.get() != this.lastChunkHashCode){
+                throw new InvalidChunkException("The hash code of the recorded last chunk does not match " +
+                        "the senders hash code expected " + lastChunkHashCode + " was " + lastChunkHashCode.get());
+            }
+        }
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Chunk={},collectedChunks.size:{}",
+                    chunkIndex, collectedChunks.size());
+        }
+
+        sealed = (chunkIndex == totalChunks);
+        lastChunkIndex = chunkIndex;
+        collectedChunks = collectedChunks.concat(chunk);
+        this.lastChunkHashCode = chunk.hashCode();
+        return sealed;
+    }
+
+    byte[] getSnapshot(){
+        if(!sealed) {
+            throw new IllegalStateException("lastChunk not received yet");
+        }
+
+        return collectedChunks.toByteArray();
+    }
+
+    ByteString getCollectedChunks(){
+        return collectedChunks;
+    }
+
+    public static class InvalidChunkException extends Exception {
+        InvalidChunkException(String message){
+            super(message);
+        }
+    }
+
+}
index 3c4e811..6337f8f 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 
@@ -22,9 +23,10 @@ public class InstallSnapshot extends AbstractRaftRPC {
     private final ByteString data;
     private final int chunkIndex;
     private final int totalChunks;
+    private final Optional<Integer> lastChunkHashCode;
 
     public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
-        long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
+        long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode) {
         super(term);
         this.leaderId = leaderId;
         this.lastIncludedIndex = lastIncludedIndex;
@@ -32,8 +34,15 @@ public class InstallSnapshot extends AbstractRaftRPC {
         this.data = data;
         this.chunkIndex = chunkIndex;
         this.totalChunks = totalChunks;
+        this.lastChunkHashCode = lastChunkHashCode;
     }
 
+    public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
+                           long lastIncludedTerm, ByteString data, int chunkIndex, int totalChunks) {
+        this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks, Optional.<Integer>absent());
+    }
+
+
     public String getLeaderId() {
         return leaderId;
     }
@@ -58,25 +67,38 @@ public class InstallSnapshot extends AbstractRaftRPC {
         return totalChunks;
     }
 
-    public <T extends Object> Object toSerializable(){
-        return InstallSnapshotMessages.InstallSnapshot.newBuilder()
-            .setLeaderId(this.getLeaderId())
-            .setChunkIndex(this.getChunkIndex())
-            .setData(this.getData())
-            .setLastIncludedIndex(this.getLastIncludedIndex())
-            .setLastIncludedTerm(this.getLastIncludedTerm())
-            .setTotalChunks(this.getTotalChunks()).build();
+    public Optional<Integer> getLastChunkHashCode() {
+        return lastChunkHashCode;
+    }
 
+    public <T extends Object> Object toSerializable(){
+        InstallSnapshotMessages.InstallSnapshot.Builder builder = InstallSnapshotMessages.InstallSnapshot.newBuilder()
+                .setLeaderId(this.getLeaderId())
+                .setChunkIndex(this.getChunkIndex())
+                .setData(this.getData())
+                .setLastIncludedIndex(this.getLastIncludedIndex())
+                .setLastIncludedTerm(this.getLastIncludedTerm())
+                .setTotalChunks(this.getTotalChunks());
+
+        if(lastChunkHashCode.isPresent()){
+            builder.setLastChunkHashCode(lastChunkHashCode.get());
+        }
+        return builder.build();
     }
 
     public static InstallSnapshot fromSerializable (Object o) {
         InstallSnapshotMessages.InstallSnapshot from =
             (InstallSnapshotMessages.InstallSnapshot) o;
 
+        Optional<Integer> lastChunkHashCode = Optional.absent();
+        if(from.hasLastChunkHashCode()){
+            lastChunkHashCode = Optional.of(from.getLastChunkHashCode());
+        }
+
         InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
             from.getLeaderId(), from.getLastIncludedIndex(),
             from.getLastIncludedTerm(), from.getData(),
-            from.getChunkIndex(), from.getTotalChunks());
+            from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode);
 
         return installSnapshot;
     }
index 9e42a13..b2132b8 100644 (file)
@@ -13,8 +13,67 @@ akka {
 
         serialization-bindings {
             "org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java
+            "org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener" = java
             "com.google.protobuf.Message" = proto
             "com.google.protobuf.GeneratedMessage" = proto
         }
     }
 }
+
+raft-test {
+    akka {
+
+        loglevel = "DEBUG"
+
+        actor {
+            # enable to test serialization only.
+            # serialize-messages = on
+
+            provider = "akka.remote.RemoteActorRefProvider"
+
+            serializers {
+              java  = "akka.serialization.JavaSerializer"
+              proto = "akka.remote.serialization.ProtobufSerializer"
+            }
+
+            serialization-bindings {
+                "org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java
+                "org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener" = java
+                "com.google.protobuf.Message" = proto
+                "com.google.protobuf.GeneratedMessage" = proto
+            }
+        }
+
+        remote {
+                        log-remote-lifecycle-events = off
+                        netty.tcp {
+                            hostname = "127.0.0.1"
+                            port = 2550
+                        }
+                    }
+    }
+}
+
+raft-test-listener {
+
+  akka {
+    loglevel = "DEBUG"
+
+    actor {
+        provider = "akka.remote.RemoteActorRefProvider"
+    }
+
+    remote {
+        log-remote-lifecycle-events = off
+        netty.tcp {
+            hostname = "127.0.0.1"
+            port = 2554
+        }
+    }
+
+    member-id = "member-1"
+  }
+}
+
+
+
index 9eb2fb7..c833a86 100644 (file)
@@ -21,11 +21,25 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -36,27 +50,13 @@ import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
@@ -83,6 +83,9 @@ public class RaftActorTest extends AbstractActorTest {
 
         private final DataPersistenceProvider dataPersistenceProvider;
         private final RaftActor delegate;
+        private final CountDownLatch recoveryComplete = new CountDownLatch(1);
+        private final List<Object> state;
+        private ActorRef roleChangeNotifier;
 
         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
             private static final long serialVersionUID = 1L;
@@ -90,25 +93,27 @@ public class RaftActorTest extends AbstractActorTest {
             private final String id;
             private final Optional<ConfigParams> config;
             private final DataPersistenceProvider dataPersistenceProvider;
+            private final ActorRef roleChangeNotifier;
 
             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
-                    Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
+                Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
+                ActorRef roleChangeNotifier) {
                 this.peerAddresses = peerAddresses;
                 this.id = id;
                 this.config = config;
                 this.dataPersistenceProvider = dataPersistenceProvider;
+                this.roleChangeNotifier = roleChangeNotifier;
             }
 
             @Override
             public MockRaftActor create() throws Exception {
-                return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
+                MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
+                    dataPersistenceProvider);
+                mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
+                return mockRaftActor;
             }
         }
 
-        private final CountDownLatch recoveryComplete = new CountDownLatch(1);
-
-        private final List<Object> state;
-
         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
             super(id, peerAddresses, config);
             state = new ArrayList<>();
@@ -134,23 +139,24 @@ public class RaftActorTest extends AbstractActorTest {
 
         public static Props props(final String id, final Map<String, String> peerAddresses,
                 Optional<ConfigParams> config){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
         }
 
         public static Props props(final String id, final Map<String, String> peerAddresses,
                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
-            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
         }
 
+        public static Props props(final String id, final Map<String, String> peerAddresses,
+            Optional<ConfigParams> config, ActorRef roleChangeNotifier){
+            return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
+        }
 
         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
             delegate.applyState(clientActor, identifier, data);
             LOG.info("applyState called");
         }
 
-
-
-
         @Override
         protected void startLogRecoveryBatch(int maxBatchSize) {
         }
@@ -201,6 +207,11 @@ public class RaftActorTest extends AbstractActorTest {
             return this.dataPersistenceProvider;
         }
 
+        @Override
+        protected Optional<ActorRef> getRoleChangeNotifier() {
+            return Optional.fromNullable(roleChangeNotifier);
+        }
+
         @Override public String persistenceId() {
             return this.getId();
         }
@@ -862,6 +873,40 @@ public class RaftActorTest extends AbstractActorTest {
         };
     }
 
+    @Test
+    public void testRaftRoleChangeNotifier() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            ActorRef notifierActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+            String id = "testRaftRoleChangeNotifier";
+
+            TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(id,
+                Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), id);
+
+            MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+            mockRaftActor.setCurrentBehavior(new Follower(mockRaftActor.getRaftActorContext()));
+
+            // sleeping for a minimum of 2 seconds, if it spans more its fine.
+            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+            List<Object> matches =  MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
+            assertNotNull(matches);
+            assertEquals(2, matches.size());
+
+            // check if the notifier got a role change from Follower to Candidate
+            RoleChanged raftRoleChanged = (RoleChanged) matches.get(0);
+            assertEquals(id, raftRoleChanged.getMemberId());
+            assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
+            assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
+
+            // check if the notifier got a role change from Candidate to Leader
+            raftRoleChanged = (RoleChanged) matches.get(1);
+            assertEquals(id, raftRoleChanged.getMemberId());
+            assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
+            assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
+        }};
+    }
+
     private ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
index 83b9ad3..0ee9693 100644 (file)
@@ -1,9 +1,20 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
@@ -20,19 +31,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
     private final ActorRef followerActor = getSystem().actorOf(Props.create(
@@ -452,18 +450,20 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             int offset = 0;
             int snapshotLength = bsSnapshot.size();
             int i = 1;
+            int chunkIndex = 1;
 
             do {
                 chunkData = getNextChunk(bsSnapshot, offset);
                 final InstallSnapshot installSnapshot =
                     new InstallSnapshot(1, "leader-1", i, 1,
-                        chunkData, i, 3);
+                        chunkData, chunkIndex, 3);
                 follower.handleMessage(leaderActor, installSnapshot);
                 offset = offset + 50;
                 i++;
+                chunkIndex++;
             } while ((offset+50) < snapshotLength);
 
-            final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3);
+            final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3);
             follower.handleMessage(leaderActor, installSnapshot3);
 
             String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
@@ -490,6 +490,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 }
             }.get();
 
+            // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty
+            assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+
             String applySnapshotMatch = "";
             for (String reply: matches) {
                 if (reply.startsWith("applySnapshot")) {
@@ -517,6 +520,52 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+    @Test
+    public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
+        JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {
+            {
+
+                ActorRef leaderActor = getSystem().actorOf(Props.create(
+                        MessageCollectorActor.class));
+
+                MockRaftActorContext context = (MockRaftActorContext)
+                        createActorContext(getRef());
+
+                Follower follower = (Follower) createBehavior(context);
+
+                HashMap<String, String> followerSnapshot = new HashMap<>();
+                followerSnapshot.put("1", "A");
+                followerSnapshot.put("2", "B");
+                followerSnapshot.put("3", "C");
+
+                ByteString bsSnapshot = toByteString(followerSnapshot);
+
+                final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3);
+                follower.handleMessage(leaderActor, installSnapshot);
+
+                Object messages = executeLocalOperation(leaderActor, "get-all-messages");
+
+                assertNotNull(messages);
+                assertTrue(messages instanceof List);
+                List<Object> listMessages = (List<Object>) messages;
+
+                int installSnapshotReplyReceivedCount = 0;
+                for (Object message: listMessages) {
+                    if (message instanceof InstallSnapshotReply) {
+                        ++installSnapshotReplyReceivedCount;
+                    }
+                }
+
+                assertEquals(1, installSnapshotReplyReceivedCount);
+                InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0);
+                assertEquals(false, reply.isSuccess());
+                assertEquals(-1, reply.getChunkIndex());
+                assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+
+
+            }};
+    }
+
     public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
         return MessageCollectorActor.getAllMessages(actor);
     }
index 0fc0b4c..895fe35 100644 (file)
@@ -1,11 +1,16 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -41,9 +46,6 @@ import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
@@ -125,7 +127,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     Map<String, String> peerAddresses = new HashMap<>();
 
                     peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+                            followerActor.path().toString());
 
                     actorContext.setPeerAddresses(peerAddresses);
 
@@ -570,6 +572,156 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+    @Test
+    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            TestActorRef<MessageCollectorActor> followerActor =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                    followerActor.path().toString());
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            MockRaftActorContext actorContext =
+                    (MockRaftActorContext) createActorContext();
+
+            actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+                @Override
+                public int getSnapshotChunkSize() {
+                    return 50;
+                }
+            });
+            actorContext.setPeerAddresses(peerAddresses);
+            actorContext.setCommitIndex(followersLastIndex);
+
+            MockLeader leader = new MockLeader(actorContext);
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+
+            leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+            Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+            InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+            followerActor.tell(PoisonPill.getInstance(), getRef());
+        }};
+    }
+
+    @Test
+    public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+
+                TestActorRef<MessageCollectorActor> followerActor =
+                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+                final int followersLastIndex = 2;
+                final int snapshotIndex = 3;
+                final int snapshotTerm = 1;
+                final int currentTerm = 2;
+
+                MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
+
+                actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+                    @Override
+                    public int getSnapshotChunkSize() {
+                        return 50;
+                    }
+                });
+                actorContext.setPeerAddresses(peerAddresses);
+                actorContext.setCommitIndex(followersLastIndex);
+
+                MockLeader leader = new MockLeader(actorContext);
+
+                Map<String, String> leadersSnapshot = new HashMap<>();
+                leadersSnapshot.put("1", "A");
+                leadersSnapshot.put("2", "B");
+                leadersSnapshot.put("3", "C");
+
+                // set the snapshot variables in replicatedlog
+                actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+                actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+                ByteString bs = toByteString(leadersSnapshot);
+                leader.setSnapshot(Optional.of(bs));
+
+                leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+                Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+                assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+                InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+                assertEquals(1, installSnapshot.getChunkIndex());
+                assertEquals(3, installSnapshot.getTotalChunks());
+                assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+
+                int hashCode = installSnapshot.getData().hashCode();
+
+                leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
+
+                leader.handleMessage(leaderActor, new SendHeartBeat());
+
+                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+                o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+                assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+                installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+                assertEquals(2, installSnapshot.getChunkIndex());
+                assertEquals(3, installSnapshot.getTotalChunks());
+                assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+
+                followerActor.tell(PoisonPill.getInstance(), getRef());
+            }};
+    }
+
     @Test
     public void testFollowerToSnapshotLogic() {
 
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java
new file mode 100644 (file)
index 0000000..1b3a8f5
--- /dev/null
@@ -0,0 +1,181 @@
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import akka.event.LoggingAdapter;
+import com.google.common.base.Optional;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SnapshotTrackerTest {
+
+    Map<String, String> data;
+    ByteString byteString;
+    ByteString chunk1;
+    ByteString chunk2;
+    ByteString chunk3;
+
+    @Before
+    public void setup(){
+        data = new HashMap<>();
+        data.put("key1", "value1");
+        data.put("key2", "value2");
+        data.put("key3", "value3");
+
+        byteString = toByteString(data);
+        chunk1 = getNextChunk(byteString, 0, 10);
+        chunk2 = getNextChunk(byteString, 10, 10);
+        chunk3 = getNextChunk(byteString, 20, byteString.size());
+    }
+
+    @Test
+    public void testAddChunk() throws SnapshotTracker.InvalidChunkException {
+        SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+        tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+        tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
+        tracker1.addChunk(3, chunk3, Optional.<Integer>absent());
+
+        // Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker
+        SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+        tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
+        tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
+
+        try {
+            tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
+            Assert.fail();
+        } catch(SnapshotTracker.InvalidChunkException e){
+            e.getMessage().startsWith("Invalid chunk");
+        }
+
+        // The first chunk's index must at least be FIRST_CHUNK_INDEX
+        SnapshotTracker tracker3 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+        try {
+            tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
+            Assert.fail();
+        } catch(SnapshotTracker.InvalidChunkException e){
+
+        }
+
+        // Out of sequence chunk indexes won't work
+        SnapshotTracker tracker4 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+        tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
+
+        try {
+            tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX+2, chunk2, Optional.<Integer>absent());
+            Assert.fail();
+        } catch(SnapshotTracker.InvalidChunkException e){
+
+        }
+
+        // No exceptions will be thrown when invalid chunk is added with the right sequence
+        // If the lastChunkHashCode is missing
+        SnapshotTracker tracker5 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+        tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
+        // Look I can add the same chunk again
+        tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk1, Optional.<Integer>absent());
+
+        // An exception will be thrown when an invalid chunk is addedd with the right sequence
+        // when the lastChunkHashCode is present
+        SnapshotTracker tracker6 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+
+        tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1));
+
+        try {
+            // Here we add a second chunk and tell addChunk that the previous chunk had a hash code 777
+            tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777));
+            Assert.fail();
+        }catch(SnapshotTracker.InvalidChunkException e){
+
+        }
+
+    }
+
+    @Test
+    public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException {
+
+        // Trying to get a snapshot before all chunks have been received will throw an exception
+        SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+        tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+        try {
+            tracker1.getSnapshot();
+            Assert.fail();
+        } catch(IllegalStateException e){
+
+        }
+
+        SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 3);
+
+        tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
+        tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
+        tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
+
+        byte[] snapshot = tracker2.getSnapshot();
+
+        assertEquals(byteString, ByteString.copyFrom(snapshot));
+    }
+
+    @Test
+    public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException {
+        SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+
+        ByteString chunks = chunk1.concat(chunk2);
+
+        tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
+        tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
+
+        assertEquals(chunks, tracker1.getCollectedChunks());
+    }
+
+    public ByteString getNextChunk (ByteString bs, int offset, int size){
+        int snapshotLength = bs.size();
+        int start = offset;
+        if (size > snapshotLength) {
+            size = snapshotLength;
+        } else {
+            if ((start + size) > snapshotLength) {
+                size = snapshotLength - start;
+            }
+        }
+        return bs.substring(start, start + size);
+    }
+
+    private ByteString toByteString(Map<String, String> state) {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            try {
+                b = new ByteArrayOutputStream();
+                o = new ObjectOutputStream(b);
+                o.writeObject(state);
+                byte[] snapshotBytes = b.toByteArray();
+                return ByteString.copyFrom(snapshotBytes);
+            } finally {
+                if (o != null) {
+                    o.flush();
+                    o.close();
+                }
+                if (b != null) {
+                    b.close();
+                }
+            }
+        } catch (IOException e) {
+            org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+        }
+        return null;
+    }
+
+
+}
\ No newline at end of file
index 9fbdd45..3469a95 100644 (file)
@@ -13,15 +13,14 @@ import akka.actor.UntypedActor;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 
 public class MessageCollectorActor extends UntypedActor {
     private List<Object> messages = new ArrayList<>();
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListener.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListener.java
new file mode 100644 (file)
index 0000000..1faa341
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * Message sent from the listener of Role Change messages to register itself to the Role Change Notifier
+ *
+ * The Listener could be in a separate ActorSystem and hence this message needs to be Serializable
+ */
+public class RegisterRoleChangeListener implements Serializable {
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListenerReply.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RegisterRoleChangeListenerReply.java
new file mode 100644 (file)
index 0000000..f1d13e3
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * Reply message sent from a RoleChangeNotifier to the Role Change Listener
+ *
+ * Can be sent to a separate actor system and hence should be made serializable.
+ */
+public class RegisterRoleChangeListenerReply implements Serializable {
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotification.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotification.java
new file mode 100644 (file)
index 0000000..de2733f
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * Notification message representing a Role change of a cluster member
+ *
+ * Roles generally are Leader, Follower and Candidate. But can be based on the consensus strategy/implementation
+ *
+ * The Listener could be in a separate ActorSystem and hence this message needs to be Serializable
+ */
+public class RoleChangeNotification implements Serializable {
+    private String memberId;
+    private String oldRole;
+    private String newRole;
+
+    public RoleChangeNotification(String memberId, String oldRole, String newRole) {
+        this.memberId = memberId;
+        this.oldRole = oldRole;
+        this.newRole = newRole;
+    }
+
+    public String getMemberId() {
+        return memberId;
+    }
+
+    public String getOldRole() {
+        return oldRole;
+    }
+
+    public String getNewRole() {
+        return newRole;
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java
new file mode 100644 (file)
index 0000000..a9aa561
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.notifications;
+
+import akka.actor.Actor;
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
+import akka.serialization.Serialization;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+
+/**
+ * The RoleChangeNotifier is responsible for receiving Raft role change messages and notifying
+ * the listeners (within the same node), which are registered with it.
+ * <p/>
+ * The RoleChangeNotifier is instantiated by the Shard and injected into the RaftActor.
+ */
+public class RoleChangeNotifier extends AbstractUntypedActor implements AutoCloseable {
+
+    private String memberId;
+    private Map<ActorPath, ActorRef> registeredListeners = Maps.newHashMap();
+    private RoleChangeNotification latestRoleChangeNotification = null;
+
+    public RoleChangeNotifier(String memberId) {
+        this.memberId = memberId;
+    }
+
+    public static Props getProps(final String memberId) {
+        return Props.create(new Creator<Actor>() {
+            @Override
+            public Actor create() throws Exception {
+                return new RoleChangeNotifier(memberId);
+            }
+        });
+    }
+
+    @Override
+    public void preStart() throws Exception {
+        super.preStart();
+        LOG.info("RoleChangeNotifier:{} created and ready for shard:{}",
+            Serialization.serializedActorPath(getSelf()), memberId);
+    }
+
+    @Override
+    protected void handleReceive(Object message) throws Exception {
+        if (message instanceof RegisterRoleChangeListener) {
+            // register listeners for this shard
+
+            ActorRef curRef = registeredListeners.get(getSender().path());
+            if (curRef != null) {
+                // ActorPaths would pass equal even if the unique id of the actors are different
+                // if a listener actor is re-registering after reincarnation, then removing the existing
+                // entry so the actor path with correct unique id is registered.
+                registeredListeners.remove(getSender().path());
+            }
+            registeredListeners.put(getSender().path(), getSender());
+
+            LOG.info("RoleChangeNotifier for {} , registered listener {}", memberId,
+                getSender().path().toString());
+
+            getSender().tell(new RegisterRoleChangeListenerReply(), getSelf());
+
+            if (latestRoleChangeNotification != null) {
+                getSender().tell(latestRoleChangeNotification, getSelf());
+            }
+
+
+        } else if (message instanceof RoleChanged) {
+            // this message is sent by RaftActor. Notify registered listeners when this message is received.
+            RoleChanged roleChanged = (RoleChanged) message;
+
+            LOG.info("RoleChangeNotifier for {} , received role change from {} to {}", memberId,
+                roleChanged.getOldRole(), roleChanged.getNewRole());
+
+            latestRoleChangeNotification =
+                new RoleChangeNotification(roleChanged.getMemberId(),
+                    roleChanged.getOldRole(), roleChanged.getNewRole());
+
+            for (ActorRef listener: registeredListeners.values()) {
+                listener.tell(latestRoleChangeNotification, getSelf());
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        registeredListeners.clear();
+    }
+}
+
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChanged.java
new file mode 100644 (file)
index 0000000..f315bfd
--- /dev/null
@@ -0,0 +1,31 @@
+package org.opendaylight.controller.cluster.notifications;
+
+/**
+ * Role Change message initiated internally from the  Raft Actor when a the behavior/role changes.
+ *
+ * Since its internal , need not be serialized
+ *
+ */
+public class RoleChanged {
+    private String memberId;
+    private String oldRole;
+    private String newRole;
+
+    public RoleChanged(String memberId, String oldRole, String newRole) {
+        this.memberId = memberId;
+        this.oldRole = oldRole;
+        this.newRole = newRole;
+    }
+
+    public String getMemberId() {
+        return memberId;
+    }
+
+    public String getOldRole() {
+        return oldRole;
+    }
+
+    public String getNewRole() {
+        return newRole;
+    }
+}
index b93be3e..de7f44e 100644 (file)
@@ -85,6 +85,16 @@ public final class InstallSnapshotMessages {
      * <code>optional int32 totalChunks = 7;</code>
      */
     int getTotalChunks();
+
+    // optional int32 lastChunkHashCode = 8;
+    /**
+     * <code>optional int32 lastChunkHashCode = 8;</code>
+     */
+    boolean hasLastChunkHashCode();
+    /**
+     * <code>optional int32 lastChunkHashCode = 8;</code>
+     */
+    int getLastChunkHashCode();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.cluster.raft.InstallSnapshot}
@@ -172,6 +182,11 @@ public final class InstallSnapshotMessages {
               totalChunks_ = input.readInt32();
               break;
             }
+            case 64: {
+              bitField0_ |= 0x00000080;
+              lastChunkHashCode_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -351,6 +366,22 @@ public final class InstallSnapshotMessages {
       return totalChunks_;
     }
 
+    // optional int32 lastChunkHashCode = 8;
+    public static final int LASTCHUNKHASHCODE_FIELD_NUMBER = 8;
+    private int lastChunkHashCode_;
+    /**
+     * <code>optional int32 lastChunkHashCode = 8;</code>
+     */
+    public boolean hasLastChunkHashCode() {
+      return ((bitField0_ & 0x00000080) == 0x00000080);
+    }
+    /**
+     * <code>optional int32 lastChunkHashCode = 8;</code>
+     */
+    public int getLastChunkHashCode() {
+      return lastChunkHashCode_;
+    }
+
     private void initFields() {
       term_ = 0L;
       leaderId_ = "";
@@ -359,6 +390,7 @@ public final class InstallSnapshotMessages {
       data_ = com.google.protobuf.ByteString.EMPTY;
       chunkIndex_ = 0;
       totalChunks_ = 0;
+      lastChunkHashCode_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -393,6 +425,9 @@ public final class InstallSnapshotMessages {
       if (((bitField0_ & 0x00000040) == 0x00000040)) {
         output.writeInt32(7, totalChunks_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        output.writeInt32(8, lastChunkHashCode_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -430,6 +465,10 @@ public final class InstallSnapshotMessages {
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(7, totalChunks_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(8, lastChunkHashCode_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -560,6 +599,8 @@ public final class InstallSnapshotMessages {
         bitField0_ = (bitField0_ & ~0x00000020);
         totalChunks_ = 0;
         bitField0_ = (bitField0_ & ~0x00000040);
+        lastChunkHashCode_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000080);
         return this;
       }
 
@@ -616,6 +657,10 @@ public final class InstallSnapshotMessages {
           to_bitField0_ |= 0x00000040;
         }
         result.totalChunks_ = totalChunks_;
+        if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+          to_bitField0_ |= 0x00000080;
+        }
+        result.lastChunkHashCode_ = lastChunkHashCode_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -655,6 +700,9 @@ public final class InstallSnapshotMessages {
         if (other.hasTotalChunks()) {
           setTotalChunks(other.getTotalChunks());
         }
+        if (other.hasLastChunkHashCode()) {
+          setLastChunkHashCode(other.getLastChunkHashCode());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -957,6 +1005,39 @@ public final class InstallSnapshotMessages {
         return this;
       }
 
+      // optional int32 lastChunkHashCode = 8;
+      private int lastChunkHashCode_ ;
+      /**
+       * <code>optional int32 lastChunkHashCode = 8;</code>
+       */
+      public boolean hasLastChunkHashCode() {
+        return ((bitField0_ & 0x00000080) == 0x00000080);
+      }
+      /**
+       * <code>optional int32 lastChunkHashCode = 8;</code>
+       */
+      public int getLastChunkHashCode() {
+        return lastChunkHashCode_;
+      }
+      /**
+       * <code>optional int32 lastChunkHashCode = 8;</code>
+       */
+      public Builder setLastChunkHashCode(int value) {
+        bitField0_ |= 0x00000080;
+        lastChunkHashCode_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 lastChunkHashCode = 8;</code>
+       */
+      public Builder clearLastChunkHashCode() {
+        bitField0_ = (bitField0_ & ~0x00000080);
+        lastChunkHashCode_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.cluster.raft.InstallSnapshot)
     }
 
@@ -983,13 +1064,14 @@ public final class InstallSnapshotMessages {
   static {
     java.lang.String[] descriptorData = {
       "\n\025InstallSnapshot.proto\022(org.opendayligh" +
-      "t.controller.cluster.raft\"\235\001\n\017InstallSna" +
+      "t.controller.cluster.raft\"\270\001\n\017InstallSna" +
       "pshot\022\014\n\004term\030\001 \001(\003\022\020\n\010leaderId\030\002 \001(\t\022\031\n" +
       "\021lastIncludedIndex\030\003 \001(\003\022\030\n\020lastIncluded" +
       "Term\030\004 \001(\003\022\014\n\004data\030\005 \001(\014\022\022\n\nchunkIndex\030\006" +
-      " \001(\005\022\023\n\013totalChunks\030\007 \001(\005BX\n;org.openday" +
-      "light.controller.protobuff.messages.clus" +
-      "ter.raftB\027InstallSnapshotMessagesH\001"
+      " \001(\005\022\023\n\013totalChunks\030\007 \001(\005\022\031\n\021lastChunkHa" +
+      "shCode\030\010 \001(\005BX\n;org.opendaylight.control" +
+      "ler.protobuff.messages.cluster.raftB\027Ins" +
+      "tallSnapshotMessagesH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1001,7 +1083,7 @@ public final class InstallSnapshotMessages {
           internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_cluster_raft_InstallSnapshot_descriptor,
-              new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", });
+              new java.lang.String[] { "Term", "LeaderId", "LastIncludedIndex", "LastIncludedTerm", "Data", "ChunkIndex", "TotalChunks", "LastChunkHashCode", });
           return null;
         }
       };
index 4198644..adb58ae 100644 (file)
@@ -12,4 +12,5 @@ message InstallSnapshot {
     optional bytes data = 5;
     optional int32 chunkIndex = 6;
     optional int32 totalChunks = 7;
+    optional int32 lastChunkHashCode = 8;
 }
index d53cb48..7073ea7 100644 (file)
@@ -65,6 +65,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -131,6 +132,8 @@ public class Shard extends RaftActor {
 
     private Cancellable txCommitTimeoutCheckSchedule;
 
+    private Optional<ActorRef> roleChangeNotifier;
+
     /**
      * Coordinates persistence recovery on startup.
      */
@@ -171,6 +174,9 @@ public class Shard extends RaftActor {
 
         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+
+        // create a notifier actor for each cluster member
+        roleChangeNotifier = createRoleChangeNotifier(name.toString());
     }
 
     private static Map<String, String> mapPeerAddresses(
@@ -196,6 +202,12 @@ public class Shard extends RaftActor {
         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
     }
 
+    private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
+        ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
+            RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
+        return Optional.<ActorRef>of(shardRoleChangeNotifier);
+    }
+
     @Override
     public void postStop() {
         super.postStop();
@@ -259,6 +271,11 @@ public class Shard extends RaftActor {
         }
     }
 
+    @Override
+    protected Optional<ActorRef> getRoleChangeNotifier() {
+        return roleChangeNotifier;
+    }
+
     private void handleTransactionCommitTimeoutCheck() {
         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
         if(cohortEntry != null) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java
new file mode 100644 (file)
index 0000000..4e61260
--- /dev/null
@@ -0,0 +1,79 @@
+package org.opendaylight.controller.cluster.datastore;
+
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class RoleChangeNotifierTest extends AbstractActorTest  {
+
+    @Test
+    public void testHandleRegisterRoleChangeListener() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String memberId = "testHandleRegisterRoleChangeListener";
+            ActorRef listenerActor =  getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
+                getSystem(), RoleChangeNotifier.getProps(memberId), memberId);
+
+            notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor);
+
+            RegisterRoleChangeListenerReply reply = (RegisterRoleChangeListenerReply)
+                MessageCollectorActor.getFirstMatching(listenerActor, RegisterRoleChangeListenerReply.class);
+            assertNotNull(reply);
+
+            RoleChangeNotification notification = (RoleChangeNotification)
+                MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class);
+            assertNull(notification);
+        }};
+
+    }
+
+    @Test
+    public void testHandleRaftRoleChanged() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String memberId = "testHandleRegisterRoleChangeListenerWithNotificationSet";
+            ActorRef listenerActor =  getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            ActorRef shardActor =  getTestActor();
+
+            TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
+                getSystem(), RoleChangeNotifier.getProps(memberId), memberId);
+
+            RoleChangeNotifier roleChangeNotifier = notifierTestActorRef.underlyingActor();
+
+            notifierTestActorRef.tell(new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor);
+
+            // no notification should be sent as listener has not yet registered
+            assertNull(MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class));
+
+            // listener registers after role has been changed, ensure we sent the latest role change after a reply
+            notifierTestActorRef.tell(new RegisterRoleChangeListener(), listenerActor);
+
+            RegisterRoleChangeListenerReply reply = (RegisterRoleChangeListenerReply)
+                MessageCollectorActor.getFirstMatching(listenerActor, RegisterRoleChangeListenerReply.class);
+            assertNotNull(reply);
+
+            RoleChangeNotification notification = (RoleChangeNotification)
+                MessageCollectorActor.getFirstMatching(listenerActor, RoleChangeNotification.class);
+            assertNotNull(notification);
+            assertEquals(RaftState.Candidate.name(), notification.getOldRole());
+            assertEquals(RaftState.Leader.name(), notification.getNewRole());
+
+        }};
+
+    }
+}
+
+
index f75aa54..4bd0ad8 100644 (file)
@@ -8,10 +8,19 @@
 
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import akka.actor.ActorRef;
 import akka.actor.UntypedActor;
 
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * MessageCollectorActor collects messages as it receives them. It can send
@@ -27,10 +36,55 @@ public class MessageCollectorActor extends UntypedActor {
     @Override public void onReceive(Object message) throws Exception {
         if(message instanceof String){
             if("messages".equals(message)){
-                getSender().tell(messages, getSelf());
+                getSender().tell(new ArrayList(messages), getSelf());
             }
         } else {
             messages.add(message);
         }
     }
+
+    public static List<Object> getAllMessages(ActorRef actor) throws Exception {
+        FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
+        Timeout operationTimeout = new Timeout(operationDuration);
+        Future<Object> future = Patterns.ask(actor, "messages", operationTimeout);
+
+        try {
+            return (List<Object>) Await.result(future, operationDuration);
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    /**
+     * Get the first message that matches the specified class
+     * @param actor
+     * @param clazz
+     * @return
+     */
+    public static Object getFirstMatching(ActorRef actor, Class<?> clazz) throws Exception {
+        List<Object> allMessages = getAllMessages(actor);
+
+        for(Object message : allMessages){
+            if(message.getClass().equals(clazz)){
+                return message;
+            }
+        }
+
+        return null;
+    }
+
+    public static List<Object> getAllMatching(ActorRef actor, Class<?> clazz) throws Exception {
+        List<Object> allMessages = getAllMessages(actor);
+
+        List<Object> output = Lists.newArrayList();
+
+        for(Object message : allMessages){
+            if(message.getClass().equals(clazz)){
+                output.add(message);
+            }
+        }
+
+        return output;
+    }
+
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.