Merge topic 'archetype'
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index f02b52beb9aa2c6aa14bbd5cc71dae725690f817..132e09d6de41e689ad0aef5f60de977f3783b196 100644 (file)
@@ -22,8 +22,11 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
+import java.io.Serializable;
+import java.util.Map;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -35,17 +38,12 @@ import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapsho
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
-import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
-import java.io.Serializable;
-import java.util.Map;
-
 /**
  * RaftActor encapsulates a state machine that needs to be kept synchronized
  * in a cluster. It implements the RAFT algorithm as described in the paper
@@ -115,6 +113,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private int currentRecoveryBatchCount;
 
+
+
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
     }
@@ -171,8 +171,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
 
                 onRecoveryComplete();
+
+                RaftActorBehavior oldBehavior = currentBehavior;
                 currentBehavior = new Follower(context);
-                onStateChanged();
+                handleBehaviorChange(oldBehavior, currentBehavior);
             }
         }
     }
@@ -271,8 +273,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
             replicatedLog.snapshotTerm, replicatedLog.size());
 
+        RaftActorBehavior oldBehavior = currentBehavior;
         currentBehavior = new Follower(context);
-        onStateChanged();
+        handleBehaviorChange(oldBehavior, currentBehavior);
     }
 
     @Override public void handleCommand(Object message) {
@@ -343,20 +346,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 context.getReplicatedLog().getSnapshotTerm(),
                 context.getReplicatedLog().size());
 
-        } else if (message instanceof AddRaftPeer){
-
-            // FIXME : Do not add raft peers like this.
-            // When adding a new Peer we have to ensure that the a majority of
-            // the peers know about the new Peer. Doing it this way may cause
-            // a situation where multiple Leaders may emerge
-            AddRaftPeer arp = (AddRaftPeer)message;
-           context.addToPeers(arp.getName(), arp.getAddress());
-
-        } else if (message instanceof RemoveRaftPeer){
-
-            RemoveRaftPeer rrp = (RemoveRaftPeer)message;
-            context.removePeer(rrp.getName());
-
         } else if (message instanceof CaptureSnapshot) {
             LOG.info("CaptureSnapshot received by actor");
             CaptureSnapshot cs = (CaptureSnapshot)message;
@@ -382,25 +371,28 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             RaftActorBehavior oldBehavior = currentBehavior;
             currentBehavior = currentBehavior.handleMessage(getSender(), message);
 
-            if(oldBehavior != currentBehavior){
-                onStateChanged();
-            }
-
-            onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
+            handleBehaviorChange(oldBehavior, currentBehavior);
         }
     }
 
-    public java.util.Set<String> getPeers() {
+    private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
+        if (oldBehavior != currentBehavior){
+            onStateChanged();
+        }
 
-        return context.getPeerAddresses().keySet();
-    }
+        String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId();
+        String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name();
 
-    protected String getReplicatedLogState() {
-        return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex()
-            + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm()
-            + ", im-mem journal size=" + context.getReplicatedLog().size();
-    }
+        // it can happen that the state has not changed but the leader has changed.
+        onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
 
+        if (getRoleChangeNotifier().isPresent() &&
+                (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
+            getRoleChangeNotifier().get().tell(
+                    new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()),
+                    getSelf());
+        }
+    }
 
     /**
      * When a derived RaftActor needs to persist something it must call
@@ -410,8 +402,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * @param identifier
      * @param data
      */
-    protected void persistData(ActorRef clientActor, String identifier,
-        Payload data) {
+    protected void persistData(final ActorRef clientActor, final String identifier,
+        final Payload data) {
 
         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
             context.getReplicatedLog().lastIndex() + 1,
@@ -421,9 +413,42 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             LOG.debug("Persist data {}", replicatedLogEntry);
         }
 
+        final RaftActorContext raftContext = getRaftActorContext();
+
         replicatedLog
-            .appendAndPersist(clientActor, identifier, replicatedLogEntry);
-    }
+                .appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
+                    @Override
+                    public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
+                        if(!hasFollowers()){
+                            // Increment the Commit Index and the Last Applied values
+                            raftContext.setCommitIndex(replicatedLogEntry.getIndex());
+                            raftContext.setLastApplied(replicatedLogEntry.getIndex());
+
+                            // Apply the state immediately
+                            applyState(clientActor, identifier, data);
+
+                            // Send a ApplyLogEntries message so that we write the fact that we applied
+                            // the state to durable storage
+                            self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
+
+                            // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
+                            if(!hasSnapshotCaptureInitiated){
+                                raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
+                                        raftContext.getTermInformation().getCurrentTerm());
+                                raftContext.getReplicatedLog().snapshotCommit();
+                            } else {
+                                LOG.debug("Skipping fake snapshotting for {} because real snapshotting is in progress", getId());
+                            }
+                        } else if (clientActor != null) {
+                            // Send message for replication
+                            currentBehavior.handleMessage(getSelf(),
+                                    new Replicate(clientActor, identifier,
+                                            replicatedLogEntry)
+                            );
+                        }
+
+                    }
+                });    }
 
     protected String getId() {
         return context.getId();
@@ -594,6 +619,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     protected abstract DataPersistenceProvider persistence();
 
+    /**
+     * Notifier Actor for this RaftActor to notify when a role change happens
+     * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
+     */
+    protected abstract Optional<ActorRef> getRoleChangeNotifier();
+
     protected void onLeaderChanged(String oldLeader, String newLeader){};
 
     private void trimPersistentData(long sequenceNumber) {
@@ -657,8 +688,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         hasSnapshotCaptureInitiated = false;
     }
 
+    protected boolean hasFollowers(){
+        return getRaftActorContext().getPeerAddresses().keySet().size() > 0;
+    }
+
     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
+        private static final int DATA_SIZE_DIVIDER = 5;
+        private long dataSizeSinceLastSnapshot = 0;
+
         public ReplicatedLogImpl(Snapshot snapshot) {
             super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
                 snapshot.getUnAppliedEntries());
@@ -683,18 +721,27 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 @Override public void apply(DeleteEntries param)
                     throws Exception {
                     //FIXME : Doing nothing for now
+                    dataSize = 0;
+                    for(ReplicatedLogEntry entry : journal){
+                        dataSize += entry.size();
+                    }
                 }
             });
         }
 
         @Override public void appendAndPersist(
             final ReplicatedLogEntry replicatedLogEntry) {
-            appendAndPersist(null, null, replicatedLogEntry);
+            appendAndPersist(replicatedLogEntry, null);
         }
 
-        public void appendAndPersist(final ActorRef clientActor,
-            final String identifier,
-            final ReplicatedLogEntry replicatedLogEntry) {
+        @Override
+        public int dataSize() {
+            return dataSize;
+        }
+
+        public void appendAndPersist(
+            final ReplicatedLogEntry replicatedLogEntry,
+            final Procedure<ReplicatedLogEntry> callback)  {
 
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
@@ -712,16 +759,48 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 new Procedure<ReplicatedLogEntry>() {
                     @Override
                     public void apply(ReplicatedLogEntry evt) throws Exception {
+                        int logEntrySize = replicatedLogEntry.size();
+
+                        dataSize += logEntrySize;
+                        long dataSizeForCheck = dataSize;
+
+                        dataSizeSinceLastSnapshot += logEntrySize;
+                        long journalSize = lastIndex()+1;
+
+                        if(!hasFollowers()) {
+                            // When we do not have followers we do not maintain an in-memory log
+                            // due to this the journalSize will never become anything close to the
+                            // snapshot batch count. In fact will mostly be 1.
+                            // Similarly since the journal's dataSize depends on the entries in the
+                            // journal the journal's dataSize will never reach a value close to the
+                            // memory threshold.
+                            // By maintaining the dataSize outside the journal we are tracking essentially
+                            // what we have written to the disk however since we no longer are in
+                            // need of doing a snapshot just for the sake of freeing up memory we adjust
+                            // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
+                            // as if we were maintaining a real snapshot
+                            dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
+                        }
+
+                        long dataThreshold = Runtime.getRuntime().totalMemory() *
+                                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+
                         // when a snaphsot is being taken, captureSnapshot != null
                         if (hasSnapshotCaptureInitiated == false &&
-                            journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
+                                ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
+                                        dataSizeForCheck > dataThreshold)) {
+
+                            dataSizeSinceLastSnapshot = 0;
 
                             LOG.info("Initiating Snapshot Capture..");
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
 
                             ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
-                            if (lastAppliedEntry != null) {
+                            if (!hasFollowers()) {
+                                lastAppliedIndex = replicatedLogEntry.getIndex();
+                                lastAppliedTerm = replicatedLogEntry.getTerm();
+                            } else if (lastAppliedEntry != null) {
                                 lastAppliedIndex = lastAppliedEntry.getIndex();
                                 lastAppliedTerm = lastAppliedEntry.getTerm();
                             }
@@ -740,12 +819,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                                 null);
                             hasSnapshotCaptureInitiated = true;
                         }
-                        // Send message for replication
-                        if (clientActor != null) {
-                            currentBehavior.handleMessage(getSelf(),
-                                new Replicate(clientActor, identifier,
-                                    replicatedLogEntry)
-                            );
+                        if(callback != null){
+                            callback.apply(replicatedLogEntry);
                         }
                     }
                 }
@@ -755,9 +830,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     static class DeleteEntries implements Serializable {
+        private static final long serialVersionUID = 1L;
         private final int fromIndex;
 
-
         public DeleteEntries(int fromIndex) {
             this.fromIndex = fromIndex;
         }
@@ -808,6 +883,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     static class UpdateElectionTerm implements Serializable {
+        private static final long serialVersionUID = 1L;
         private final long currentTerm;
         private final String votedFor;
 
@@ -858,4 +934,5 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected RaftActorBehavior getCurrentBehavior() {
         return currentBehavior;
     }
+
 }