Merge "BUG-628 Allow configuration to override module based capabilities from remote...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 7814bad00b4d1423cea9847b06882d36138e0238..b8e9653bc5db92ffa4cb15afd391c6a9ec05cd16 100644 (file)
@@ -19,6 +19,9 @@ import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -26,10 +29,8 @@ import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
-import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -99,7 +100,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
-        final String id1 = getSelf().path().toString();
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(),
             id, new ElectionTermImpl(),
@@ -142,14 +142,17 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
+
         } else if(message instanceof ApplySnapshot ) {
             applySnapshot(((ApplySnapshot) message).getSnapshot());
+
         } else if (message instanceof FindLeader) {
             getSender().tell(
                 new FindLeaderReply(
                     context.getPeerAddress(currentBehavior.getLeaderId())),
                 getSelf()
             );
+
         } else if (message instanceof SaveSnapshotSuccess) {
             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
 
@@ -157,20 +160,31 @@ public abstract class RaftActor extends UntypedPersistentActor {
             trimPersistentData(success.metadata().sequenceNr());
 
         } else if (message instanceof SaveSnapshotFailure) {
+
             // TODO: Handle failure in saving the snapshot
+
         } else if (message instanceof FindLeader){
+
             getSender().tell(new FindLeaderReply(
                 context.getPeerAddress(currentBehavior.getLeaderId())),
                 getSelf());
 
         } else if (message instanceof AddRaftPeer){
+
+            // FIXME : Do not add raft peers like this.
+            // When adding a new Peer we have to ensure that the a majority of
+            // the peers know about the new Peer. Doing it this way may cause
+            // a situation where multiple Leaders may emerge
             AddRaftPeer arp = (AddRaftPeer)message;
            context.addToPeers(arp.getName(), arp.getAddress());
 
         } else if (message instanceof RemoveRaftPeer){
+
             RemoveRaftPeer rrp = (RemoveRaftPeer)message;
             context.removePeer(rrp.getName());
+
         } else {
+
             RaftState state =
                 currentBehavior.handleMessage(getSender(), message);
             currentBehavior = switchBehavior(state);
@@ -188,7 +202,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      * @param data
      */
     protected void persistData(ActorRef clientActor, String identifier,
-        Object data) {
+        Payload data) {
 
         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
             context.getReplicatedLog().lastIndex() + 1,
@@ -236,6 +250,24 @@ public abstract class RaftActor extends UntypedPersistentActor {
         return currentBehavior.state();
     }
 
+    /**
+     * setPeerAddress sets the address of a known peer at a later time.
+     * <p>
+     * This is to account for situations where a we know that a peer
+     * exists but we do not know an address up-front. This may also be used in
+     * situations where a known peer starts off in a different location and we
+     * need to change it's address
+     * <p>
+     * Note that if the peerId does not match the list of peers passed to
+     * this actor during construction an IllegalStateException will be thrown.
+     *
+     * @param peerId
+     * @param peerAddress
+     */
+    protected void setPeerAddress(String peerId, String peerAddress){
+        context.setPeerAddress(peerId, peerAddress);
+    }
+
 
 
     /**
@@ -437,7 +469,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             final String identifier,
             final ReplicatedLogEntry replicatedLogEntry) {
             context.getLogger().debug(
-                "Append log entry and persist " + replicatedLogEntry);
+                "Append log entry and persist {} ", replicatedLogEntry);
             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
             journal.add(replicatedLogEntry);
 
@@ -515,39 +547,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
     }
 
 
-    private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
-        Serializable {
-
-        private final long index;
-        private final long term;
-        private final Object payload;
-
-        public ReplicatedLogImplEntry(long index, long term, Object payload) {
-
-            this.index = index;
-            this.term = term;
-            this.payload = payload;
-        }
-
-        @Override public Object getData() {
-            return payload;
-        }
 
-        @Override public long getTerm() {
-            return term;
-        }
-
-        @Override public long getIndex() {
-            return index;
-        }
-
-        @Override public String toString() {
-            return "Entry{" +
-                "index=" + index +
-                ", term=" + term +
-                '}';
-        }
-    }
 
     private static class DeleteEntries implements Serializable {
         private final int fromIndex;