Bug-2590: Clustering : Minimize usage of in-memory journal
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index dc65869f6df051b01f277676ebb810cff47e419b..766b80e73dd12c890df3ed493e397a7cd144aab4 100644 (file)
@@ -107,8 +107,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private CaptureSnapshot captureSnapshot = null;
 
-    private volatile boolean hasSnapshotCaptureInitiated = false;
-
     private Stopwatch recoveryTimer;
 
     private int currentRecoveryBatchCount;
@@ -170,16 +168,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
                 onRecoveryComplete();
 
-                RaftActorBehavior oldBehavior = currentBehavior;
-                currentBehavior = new Follower(context);
-                handleBehaviorChange(oldBehavior, currentBehavior);
+                initializeBehavior();
             }
         }
     }
 
     private void onRecoveredSnapshot(SnapshotOffer offer) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("SnapshotOffer called..");
+            LOG.debug("{}: SnapshotOffer called..", persistenceId());
         }
 
         initRecoveryTimer();
@@ -199,17 +195,17 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         timer.start();
 
         // Apply the snapshot to the actors state
-        applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState()));
+        applyRecoverySnapshot(snapshot.getState());
 
         timer.stop();
         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
                 replicatedLog.size(), persistenceId(), timer.toString(),
-                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+                replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
     }
 
     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+            LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
         }
 
         replicatedLog.append(logEntry);
@@ -217,8 +213,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
-                    context.getLastApplied() + 1, ale.getToIndex());
+            LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
+                    persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
         }
 
         for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
@@ -268,11 +264,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 "Persistence Id =  " + persistenceId() +
                 " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
                 "journal-size={}",
-            replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
-            replicatedLog.snapshotTerm, replicatedLog.size());
+            replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
+            replicatedLog.getSnapshotTerm(), replicatedLog.size());
+
+        initializeBehavior();
+    }
 
+    protected void initializeBehavior(){
+        changeCurrentBehavior(new Follower(context));
+    }
+
+    protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
         RaftActorBehavior oldBehavior = currentBehavior;
-        currentBehavior = new Follower(context);
+        currentBehavior = newBehavior;
         handleBehaviorChange(oldBehavior, currentBehavior);
     }
 
@@ -281,8 +285,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             ApplyState applyState = (ApplyState) message;
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Applying state for log index {} data {}",
-                    applyState.getReplicatedLogEntry().getIndex(),
+                LOG.debug("{}: Applying state for log index {} data {}",
+                    persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
                     applyState.getReplicatedLogEntry().getData());
             }
 
@@ -292,7 +296,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof ApplyLogEntries){
             ApplyLogEntries ale = (ApplyLogEntries) message;
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+                LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
             }
             persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
                 @Override
@@ -304,12 +308,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("ApplySnapshot called on Follower Actor " +
-                        "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+                LOG.debug("{}: ApplySnapshot called on Follower Actor " +
+                        "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
                     snapshot.getLastAppliedTerm()
                 );
             }
-            applySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+            applySnapshot(snapshot.getState());
 
             //clears the followers log, sets the snapshot index to ensure adjusted-index works
             replicatedLog = new ReplicatedLogImpl(snapshot);
@@ -324,7 +329,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         } else if (message instanceof SaveSnapshotSuccess) {
             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
-            LOG.info("SaveSnapshotSuccess received for snapshot");
+            LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
 
             long sequenceNumber = success.metadata().sequenceNr();
 
@@ -333,36 +338,33 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof SaveSnapshotFailure) {
             SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
 
-            LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
-            LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
+            LOG.error(saveSnapshotFailure.cause(), "{}: SaveSnapshotFailure received for snapshot Cause:",
+                    persistenceId());
 
             context.getReplicatedLog().snapshotRollback();
 
-            LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
-                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
+            LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
+                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
                 context.getReplicatedLog().getSnapshotIndex(),
                 context.getReplicatedLog().getSnapshotTerm(),
                 context.getReplicatedLog().size());
 
         } else if (message instanceof CaptureSnapshot) {
-            LOG.info("CaptureSnapshot received by actor");
-            CaptureSnapshot cs = (CaptureSnapshot)message;
-            captureSnapshot = cs;
-            createSnapshot();
+            LOG.info("{}: CaptureSnapshot received by actor", persistenceId());
 
-        } else if (message instanceof CaptureSnapshotReply){
-            LOG.info("CaptureSnapshotReply received by actor");
-            CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
+            if(captureSnapshot == null) {
+                captureSnapshot = (CaptureSnapshot)message;
+                createSnapshot();
+            }
 
-            ByteString stateInBytes = csr.getSnapshot();
-            LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
-            handleCaptureSnapshotReply(stateInBytes);
+        } else if (message instanceof CaptureSnapshotReply){
+            handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
 
         } else {
             if (!(message instanceof AppendEntriesMessages.AppendEntries)
                 && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
                 if(LOG.isDebugEnabled()) {
-                    LOG.debug("onReceiveCommand: message: {}", message.getClass());
+                    LOG.debug("{}: onReceiveCommand: message: {}", persistenceId(), message.getClass());
                 }
             }
 
@@ -400,20 +402,54 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * @param identifier
      * @param data
      */
-    protected void persistData(ActorRef clientActor, String identifier,
-        Payload data) {
+    protected void persistData(final ActorRef clientActor, final String identifier,
+        final Payload data) {
 
         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
             context.getReplicatedLog().lastIndex() + 1,
             context.getTermInformation().getCurrentTerm(), data);
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Persist data {}", replicatedLogEntry);
+            LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
         }
 
+        final RaftActorContext raftContext = getRaftActorContext();
+
         replicatedLog
-            .appendAndPersist(clientActor, identifier, replicatedLogEntry);
-    }
+                .appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
+                    @Override
+                    public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
+                        if(!hasFollowers()){
+                            // Increment the Commit Index and the Last Applied values
+                            raftContext.setCommitIndex(replicatedLogEntry.getIndex());
+                            raftContext.setLastApplied(replicatedLogEntry.getIndex());
+
+                            // Apply the state immediately
+                            applyState(clientActor, identifier, data);
+
+                            // Send a ApplyLogEntries message so that we write the fact that we applied
+                            // the state to durable storage
+                            self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
+
+                            // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
+                            if(!context.isSnapshotCaptureInitiated()){
+                                raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
+                                        raftContext.getTermInformation().getCurrentTerm());
+                                raftContext.getReplicatedLog().snapshotCommit();
+                            } else {
+                                LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
+                                        persistenceId(), getId());
+                            }
+                        } else if (clientActor != null) {
+                            // Send message for replication
+                            currentBehavior.handleMessage(getSelf(),
+                                    new Replicate(clientActor, identifier,
+                                            replicatedLogEntry)
+                            );
+                        }
+
+                    }
+                });    }
 
     protected String getId() {
         return context.getId();
@@ -542,7 +578,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      *
      * @param snapshot A snapshot of the state of the actor
      */
-    protected abstract void applyRecoverySnapshot(ByteString snapshot);
+    protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
 
     /**
      * This method is called during recovery at the end of a batch to apply the current batched
@@ -571,9 +607,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * operations when the derived actor is out of sync with it's peers
      * and the only way to bring it in sync is by applying a snapshot
      *
-     * @param snapshot A snapshot of the state of the actor
+     * @param snapshotBytes A snapshot of the state of the actor
      */
-    protected abstract void applySnapshot(ByteString snapshot);
+    protected abstract void applySnapshot(byte[] snapshotBytes);
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -613,25 +649,27 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
         String peerAddress = context.getPeerAddress(leaderId);
         if(LOG.isDebugEnabled()) {
-            LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}",
-                    leaderId, peerAddress);
+            LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
+                    persistenceId(), leaderId, peerAddress);
         }
 
         return peerAddress;
     }
 
-    private void handleCaptureSnapshotReply(ByteString stateInBytes) {
+    private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
+        LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
+
         // create a snapshot object from the state provided and save it
         // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
-        Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
+        Snapshot sn = Snapshot.create(snapshotBytes,
             context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
             captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
             captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
 
         persistence().saveSnapshot(sn);
 
-        LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+        LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
 
         //be greedy and remove entries from in-mem journal which are in the snapshot
         // and update snapshotIndex and snapshotTerm without waiting for the success,
@@ -640,21 +678,29 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             captureSnapshot.getLastAppliedIndex(),
             captureSnapshot.getLastAppliedTerm());
 
-        LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
-            "and term:{}", captureSnapshot.getLastAppliedIndex(),
+        LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+            "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
             captureSnapshot.getLastAppliedTerm());
 
         if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
             // this would be call straight to the leader and won't initiate in serialization
-            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(stateInBytes));
+            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
+                    ByteString.copyFrom(snapshotBytes)));
         }
 
         captureSnapshot = null;
-        hasSnapshotCaptureInitiated = false;
+        context.setSnapshotCaptureInitiated(false);
+    }
+
+    protected boolean hasFollowers(){
+        return getRaftActorContext().getPeerAddresses().keySet().size() > 0;
     }
 
     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
+        private static final int DATA_SIZE_DIVIDER = 5;
+        private long dataSizeSinceLastSnapshot = 0;
+
         public ReplicatedLogImpl(Snapshot snapshot) {
             super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
                 snapshot.getUnAppliedEntries());
@@ -689,7 +735,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         @Override public void appendAndPersist(
             final ReplicatedLogEntry replicatedLogEntry) {
-            appendAndPersist(null, null, replicatedLogEntry);
+            appendAndPersist(replicatedLogEntry, null);
         }
 
         @Override
@@ -697,12 +743,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             return dataSize;
         }
 
-        public void appendAndPersist(final ActorRef clientActor,
-            final String identifier,
-            final ReplicatedLogEntry replicatedLogEntry) {
+        public void appendAndPersist(
+            final ReplicatedLogEntry replicatedLogEntry,
+            final Procedure<ReplicatedLogEntry> callback)  {
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
+                LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
             }
 
             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
@@ -717,46 +763,70 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 new Procedure<ReplicatedLogEntry>() {
                     @Override
                     public void apply(ReplicatedLogEntry evt) throws Exception {
-                        dataSize += replicatedLogEntry.size();
+                        int logEntrySize = replicatedLogEntry.size();
+
+                        dataSize += logEntrySize;
+                        long dataSizeForCheck = dataSize;
+
+                        dataSizeSinceLastSnapshot += logEntrySize;
+                        long journalSize = lastIndex()+1;
+
+                        if(!hasFollowers()) {
+                            // When we do not have followers we do not maintain an in-memory log
+                            // due to this the journalSize will never become anything close to the
+                            // snapshot batch count. In fact will mostly be 1.
+                            // Similarly since the journal's dataSize depends on the entries in the
+                            // journal the journal's dataSize will never reach a value close to the
+                            // memory threshold.
+                            // By maintaining the dataSize outside the journal we are tracking essentially
+                            // what we have written to the disk however since we no longer are in
+                            // need of doing a snapshot just for the sake of freeing up memory we adjust
+                            // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
+                            // as if we were maintaining a real snapshot
+                            dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
+                        }
 
                         long dataThreshold = Runtime.getRuntime().totalMemory() *
                                 getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
 
                         // when a snaphsot is being taken, captureSnapshot != null
-                        if (hasSnapshotCaptureInitiated == false &&
-                                ( journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0 ||
-                                        dataSize > dataThreshold)) {
+                        if (!context.isSnapshotCaptureInitiated() &&
+                                ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
+                                        dataSizeForCheck > dataThreshold)) {
 
-                            LOG.info("Initiating Snapshot Capture..");
+                            dataSizeSinceLastSnapshot = 0;
+
+                            LOG.info("{}: Initiating Snapshot Capture..", persistenceId());
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
 
                             ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
-                            if (lastAppliedEntry != null) {
+                            if (!hasFollowers()) {
+                                lastAppliedIndex = replicatedLogEntry.getIndex();
+                                lastAppliedTerm = replicatedLogEntry.getTerm();
+                            } else if (lastAppliedEntry != null) {
                                 lastAppliedIndex = lastAppliedEntry.getIndex();
                                 lastAppliedTerm = lastAppliedEntry.getTerm();
                             }
 
                             if(LOG.isDebugEnabled()) {
-                                LOG.debug("Snapshot Capture logSize: {}", journal.size());
-                                LOG.debug("Snapshot Capture lastApplied:{} ",
-                                    context.getLastApplied());
-                                LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
-                                LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+                                LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
+                                LOG.debug("{}: Snapshot Capture lastApplied:{} ",
+                                        persistenceId(), context.getLastApplied());
+                                LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
+                                        lastAppliedIndex);
+                                LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
+                                        lastAppliedTerm);
                             }
 
                             // send a CaptureSnapshot to self to make the expensive operation async.
                             getSelf().tell(new CaptureSnapshot(
                                 lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
                                 null);
-                            hasSnapshotCaptureInitiated = true;
+                            context.setSnapshotCaptureInitiated(true);
                         }
-                        // Send message for replication
-                        if (clientActor != null) {
-                            currentBehavior.handleMessage(getSelf(),
-                                new Replicate(clientActor, identifier,
-                                    replicatedLogEntry)
-                            );
+                        if(callback != null){
+                            callback.apply(replicatedLogEntry);
                         }
                     }
                 }
@@ -798,7 +868,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         @Override public void update(long currentTerm, String votedFor) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+                LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
             }
             this.currentTerm = currentTerm;
             this.votedFor = votedFor;