Merge "Snapshot changes"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index 532201b26e8e6b64915c2a187d37b60a397fe2ca..c8cd41dfa1883a1c7d1d43d2f7697b0ff988ab38 100644 (file)
@@ -12,8 +12,8 @@ import akka.actor.ActorRef;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -40,6 +40,11 @@ public class Follower extends AbstractRaftActorBehavior {
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
+        if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
+            context.getLogger()
+                .info("Follower: Received {}", appendEntries.toString());
+        }
+
         // TODO : Refactor this method into a bunch of smaller methods
         // to make it easier to read. Before refactoring ensure tests
         // cover the code properly
@@ -121,15 +126,10 @@ public class Follower extends AbstractRaftActorBehavior {
             int addEntriesFrom = 0;
             if (context.getReplicatedLog().size() > 0) {
 
-                // Find the entry up until which the one that is not in the
-                // follower's log
-                for (int i = 0;
-                     i < appendEntries.getEntries()
-                         .size(); i++, addEntriesFrom++) {
-                    ReplicatedLogEntry matchEntry =
-                        appendEntries.getEntries().get(i);
-                    ReplicatedLogEntry newEntry = context.getReplicatedLog()
-                        .get(matchEntry.getIndex());
+                // Find the entry up until which the one that is not in the follower's log
+                for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
+                    ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
+                    ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex());
 
                     if (newEntry == null) {
                         //newEntry not found in the log
@@ -162,8 +162,9 @@ public class Follower extends AbstractRaftActorBehavior {
             for (int i = addEntriesFrom;
                  i < appendEntries.getEntries().size(); i++) {
 
-                context.getLogger().debug(
-                    "Append entry to log " + appendEntries.getEntries().get(i).getData()
+                context.getLogger().info(
+                    "Append entry to log " + appendEntries.getEntries().get(
+                        i).getData()
                         .toString()
                 );
                 context.getReplicatedLog()
@@ -214,7 +215,10 @@ public class Follower extends AbstractRaftActorBehavior {
         return RaftState.Follower;
     }
 
-    @Override public RaftState handleMessage(ActorRef sender, Object message) {
+    @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+
+        Object message = fromSerializableMessage(originalMessage);
+
         if (message instanceof RaftRPC) {
             RaftRPC rpc = (RaftRPC) message;
             // If RPC request or response contains term T > currentTerm:
@@ -227,9 +231,10 @@ public class Follower extends AbstractRaftActorBehavior {
 
         if (message instanceof ElectionTimeout) {
             return RaftState.Candidate;
+
         } else if (message instanceof InstallSnapshot) {
-            InstallSnapshot snapshot = (InstallSnapshot) message;
-            actor().tell(new ApplySnapshot(snapshot), actor());
+            InstallSnapshot installSnapshot = (InstallSnapshot) message;
+            actor().tell(new ApplySnapshot(installSnapshot.getData()), actor());
         }
 
         scheduleElection(electionDuration());