Merge "BUG-868: Migrate to SchemaContextListener"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index f38ef18973e8867b3825673cd07536b17a9293fd..b8e9653bc5db92ffa4cb15afd391c6a9ec05cd16 100644 (file)
@@ -19,15 +19,18 @@ import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
+import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -99,7 +102,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
     public RaftActor(String id, Map<String, String> peerAddresses) {
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(),
-            id, new ElectionTermImpl(getSelf().path().toString()),
+            id, new ElectionTermImpl(),
             -1, -1, replicatedLog, peerAddresses, LOG);
     }
 
@@ -118,6 +121,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
         } else if (message instanceof ReplicatedLogEntry) {
             replicatedLog.append((ReplicatedLogEntry) message);
+        } else if (message instanceof DeleteEntries) {
+            replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+        } else if (message instanceof UpdateElectionTerm) {
+            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
         } else if (message instanceof RecoveryCompleted) {
             LOG.debug(
                 "Last index in log : " + replicatedLog.lastIndex());
@@ -126,23 +133,26 @@ public abstract class RaftActor extends UntypedPersistentActor {
     }
 
     @Override public void onReceiveCommand(Object message) {
-        if (message instanceof ApplyState) {
-
+        if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
 
-            LOG.debug("Applying state for log index {}",
-                applyState.getReplicatedLogEntry().getIndex());
+            LOG.debug("Applying state for log index {} data {}",
+                applyState.getReplicatedLogEntry().getIndex(),
+                applyState.getReplicatedLogEntry().getData());
 
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
+
         } else if(message instanceof ApplySnapshot ) {
             applySnapshot(((ApplySnapshot) message).getSnapshot());
+
         } else if (message instanceof FindLeader) {
             getSender().tell(
                 new FindLeaderReply(
                     context.getPeerAddress(currentBehavior.getLeaderId())),
                 getSelf()
             );
+
         } else if (message instanceof SaveSnapshotSuccess) {
             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
 
@@ -150,8 +160,31 @@ public abstract class RaftActor extends UntypedPersistentActor {
             trimPersistentData(success.metadata().sequenceNr());
 
         } else if (message instanceof SaveSnapshotFailure) {
+
             // TODO: Handle failure in saving the snapshot
+
+        } else if (message instanceof FindLeader){
+
+            getSender().tell(new FindLeaderReply(
+                context.getPeerAddress(currentBehavior.getLeaderId())),
+                getSelf());
+
+        } else if (message instanceof AddRaftPeer){
+
+            // FIXME : Do not add raft peers like this.
+            // When adding a new Peer we have to ensure that the a majority of
+            // the peers know about the new Peer. Doing it this way may cause
+            // a situation where multiple Leaders may emerge
+            AddRaftPeer arp = (AddRaftPeer)message;
+           context.addToPeers(arp.getName(), arp.getAddress());
+
+        } else if (message instanceof RemoveRaftPeer){
+
+            RemoveRaftPeer rrp = (RemoveRaftPeer)message;
+            context.removePeer(rrp.getName());
+
         } else {
+
             RaftState state =
                 currentBehavior.handleMessage(getSender(), message);
             currentBehavior = switchBehavior(state);
@@ -169,12 +202,14 @@ public abstract class RaftActor extends UntypedPersistentActor {
      * @param data
      */
     protected void persistData(ActorRef clientActor, String identifier,
-        Object data) {
-        LOG.debug("Persist data " + identifier);
+        Payload data) {
+
         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
             context.getReplicatedLog().lastIndex() + 1,
             context.getTermInformation().getCurrentTerm(), data);
 
+        LOG.debug("Persist data {}", replicatedLogEntry);
+
         replicatedLog
             .appendAndPersist(clientActor, identifier, replicatedLogEntry);
     }
@@ -200,7 +235,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      *
      * @return A reference to the leader if known, null otherwise
      */
-    protected ActorSelection getLeader() {
+    protected ActorSelection getLeader(){
         String leaderId = currentBehavior.getLeaderId();
         if (leaderId == null) {
             return null;
@@ -211,6 +246,30 @@ public abstract class RaftActor extends UntypedPersistentActor {
         return context.actorSelection(peerAddress);
     }
 
+    protected RaftState getRaftState() {
+        return currentBehavior.state();
+    }
+
+    /**
+     * setPeerAddress sets the address of a known peer at a later time.
+     * <p>
+     * This is to account for situations where a we know that a peer
+     * exists but we do not know an address up-front. This may also be used in
+     * situations where a known peer starts off in a different location and we
+     * need to change it's address
+     * <p>
+     * Note that if the peerId does not match the list of peers passed to
+     * this actor during construction an IllegalStateException will be thrown.
+     *
+     * @param peerId
+     * @param peerAddress
+     */
+    protected void setPeerAddress(String peerId, String peerAddress){
+        context.setPeerAddress(peerId, peerAddress);
+    }
+
+
+
     /**
      * The applyState method will be called by the RaftActor when some data
      * needs to be applied to the actor's state
@@ -294,6 +353,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         deleteMessages(sequenceNumber);
     }
 
+
     private class ReplicatedLogImpl implements ReplicatedLog {
         private final List<ReplicatedLogEntry> journal;
         private final Object snapshot;
@@ -353,11 +413,30 @@ public abstract class RaftActor extends UntypedPersistentActor {
             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
                 return;
             }
-            for (int i = adjustedIndex;
-                 i < journal.size(); i++) {
-                deleteMessage(i);
-                journal.remove(i);
+
+            journal.subList(adjustedIndex , journal.size()).clear();
+        }
+
+
+        @Override public void removeFromAndPersist(long index) {
+            int adjustedIndex = adjustedIndex(index);
+
+            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+                return;
             }
+
+            // FIXME: Maybe this should be done after the command is saved
+            journal.subList(adjustedIndex , journal.size()).clear();
+
+            persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+
+                @Override public void apply(DeleteEntries param)
+                    throws Exception {
+                    //FIXME : Doing nothing for now
+                }
+            });
+
+
         }
 
         @Override public void append(
@@ -372,6 +451,8 @@ public abstract class RaftActor extends UntypedPersistentActor {
             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
                 return entries;
             }
+
+
             for (int i = adjustedIndex;
                  i < journal.size(); i++) {
                 entries.add(journal.get(i));
@@ -388,7 +469,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             final String identifier,
             final ReplicatedLogEntry replicatedLogEntry) {
             context.getLogger().debug(
-                "Append log entry and persist " + replicatedLogEntry.getIndex());
+                "Append log entry and persist {} ", replicatedLogEntry);
             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
             journal.add(replicatedLogEntry);
 
@@ -429,7 +510,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
 
         @Override public long size() {
-            return journal.size() + snapshotIndex;
+            return journal.size() + snapshotIndex + 1;
         }
 
         @Override public boolean isPresent(long index) {
@@ -466,30 +547,18 @@ public abstract class RaftActor extends UntypedPersistentActor {
     }
 
 
-    private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
-        Serializable {
-
-        private final long index;
-        private final long term;
-        private final Object payload;
 
-        public ReplicatedLogImplEntry(long index, long term, Object payload) {
 
-            this.index = index;
-            this.term = term;
-            this.payload = payload;
-        }
+    private static class DeleteEntries implements Serializable {
+        private final int fromIndex;
 
-        @Override public Object getData() {
-            return payload;
-        }
 
-        @Override public long getTerm() {
-            return term;
+        public DeleteEntries(int fromIndex) {
+            this.fromIndex = fromIndex;
         }
 
-        @Override public long getIndex() {
-            return index;
+        public int getFromIndex() {
+            return fromIndex;
         }
     }
 
@@ -542,5 +611,58 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
     }
 
+    private class ElectionTermImpl implements ElectionTerm {
+        /**
+         * Identifier of the actor whose election term information this is
+         */
+        private long currentTerm = 0;
+        private String votedFor = null;
+
+        public long getCurrentTerm() {
+            return currentTerm;
+        }
+
+        public String getVotedFor() {
+            return votedFor;
+        }
+
+        @Override public void update(long currentTerm, String votedFor) {
+            LOG.info("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+
+            this.currentTerm = currentTerm;
+            this.votedFor = votedFor;
+        }
+
+        @Override
+        public void updateAndPersist(long currentTerm, String votedFor){
+            update(currentTerm, votedFor);
+            // FIXME : Maybe first persist then update the state
+            persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
+
+                @Override public void apply(UpdateElectionTerm param)
+                    throws Exception {
+
+                }
+            });
+        }
+    }
+
+    private static class UpdateElectionTerm implements Serializable {
+        private final long currentTerm;
+        private final String votedFor;
+
+        public UpdateElectionTerm(long currentTerm, String votedFor) {
+            this.currentTerm = currentTerm;
+            this.votedFor = votedFor;
+        }
+
+        public long getCurrentTerm() {
+            return currentTerm;
+        }
+
+        public String getVotedFor() {
+            return votedFor;
+        }
+    }
 
 }