Merge "Make Raft messages serializable"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 0ff2341c708431d8da540d625638c9d70bb3dd11..15239795a8abc2aab0019439e2a64bbab2701afc 100644 (file)
@@ -30,6 +30,7 @@ import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -141,14 +142,17 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
+
         } else if(message instanceof ApplySnapshot ) {
             applySnapshot(((ApplySnapshot) message).getSnapshot());
+
         } else if (message instanceof FindLeader) {
             getSender().tell(
                 new FindLeaderReply(
                     context.getPeerAddress(currentBehavior.getLeaderId())),
                 getSelf()
             );
+
         } else if (message instanceof SaveSnapshotSuccess) {
             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
 
@@ -158,7 +162,12 @@ public abstract class RaftActor extends UntypedPersistentActor {
         } else if (message instanceof SaveSnapshotFailure) {
 
             // TODO: Handle failure in saving the snapshot
-            // Maybe do retries on failure
+
+        } else if (message instanceof FindLeader){
+
+            getSender().tell(new FindLeaderReply(
+                context.getPeerAddress(currentBehavior.getLeaderId())),
+                getSelf());
 
         } else if (message instanceof AddRaftPeer){
 
@@ -193,7 +202,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      * @param data
      */
     protected void persistData(ActorRef clientActor, String identifier,
-        Object data) {
+        Payload data) {
 
         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
             context.getReplicatedLog().lastIndex() + 1,
@@ -520,39 +529,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
     }
 
 
-    private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
-        Serializable {
-
-        private final long index;
-        private final long term;
-        private final Object payload;
-
-        public ReplicatedLogImplEntry(long index, long term, Object payload) {
-
-            this.index = index;
-            this.term = term;
-            this.payload = payload;
-        }
 
-        @Override public Object getData() {
-            return payload;
-        }
-
-        @Override public long getTerm() {
-            return term;
-        }
-
-        @Override public long getIndex() {
-            return index;
-        }
-
-        @Override public String toString() {
-            return "Entry{" +
-                "index=" + index +
-                ", term=" + term +
-                '}';
-        }
-    }
 
     private static class DeleteEntries implements Serializable {
         private final int fromIndex;