BUG 2676 : Add more context to the Initiate snapshot capture log message
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 3dc6ae469a932474effca186c956a7a6f4ec8631..65254f2d6277c34c9773570e5938d8a005ab3015 100644 (file)
@@ -31,16 +31,13 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -126,8 +123,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private void initRecoveryTimer() {
         if(recoveryTimer == null) {
-            recoveryTimer = new Stopwatch();
-            recoveryTimer.start();
+            recoveryTimer = Stopwatch.createStarted();
         }
     }
 
@@ -190,8 +186,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context.setLastApplied(snapshot.getLastAppliedIndex());
         context.setCommitIndex(snapshot.getLastAppliedIndex());
 
-        Stopwatch timer = new Stopwatch();
-        timer.start();
+        Stopwatch timer = Stopwatch.createStarted();
 
         // Apply the snapshot to the actors state
         applyRecoverySnapshot(snapshot.getState());
@@ -360,13 +355,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
 
         } else {
-            if (!(message instanceof AppendEntriesMessages.AppendEntries)
-                && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("{}: onReceiveCommand: message: {}", persistenceId(), message.getClass());
-                }
-            }
-
             RaftActorBehavior oldBehavior = currentBehavior;
             currentBehavior = currentBehavior.handleMessage(getSender(), message);
 
@@ -575,7 +563,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     /**
      * This method is called during recovery to reconstruct the state of the actor.
      *
-     * @param snapshot A snapshot of the state of the actor
+     * @param snapshotBytes A snapshot of the state of the actor
      */
     protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
 
@@ -670,12 +658,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
 
-        //be greedy and remove entries from in-mem journal which are in the snapshot
-        // and update snapshotIndex and snapshotTerm without waiting for the success,
+        long dataThreshold = Runtime.getRuntime().totalMemory() *
+                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+        if (context.getReplicatedLog().dataSize() > dataThreshold) {
+            // if memory is less, clear the log based on lastApplied.
+            // this could/should only happen if one of the followers is down
+            // as normally we keep removing from the log when its replicated to all.
+            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
+                    captureSnapshot.getLastAppliedTerm());
 
-        context.getReplicatedLog().snapshotPreCommit(
-            captureSnapshot.getLastAppliedIndex(),
-            captureSnapshot.getLastAppliedTerm());
+        } else {
+            // clear the log based on replicatedToAllIndex
+            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
+                    captureSnapshot.getReplicatedToAllTerm());
+        }
+        getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
 
         LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
             "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
@@ -719,13 +716,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             // FIXME: Maybe this should be done after the command is saved
             journal.subList(adjustedIndex , journal.size()).clear();
 
-            persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+            persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
 
-                @Override public void apply(DeleteEntries param)
-                    throws Exception {
+                @Override
+                public void apply(DeleteEntries param)
+                        throws Exception {
                     //FIXME : Doing nothing for now
                     dataSize = 0;
-                    for(ReplicatedLogEntry entry : journal){
+                    for (ReplicatedLogEntry entry : journal) {
                         dataSize += entry.size();
                     }
                 }
@@ -737,11 +735,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             appendAndPersist(replicatedLogEntry, null);
         }
 
-        @Override
-        public int dataSize() {
-            return dataSize;
-        }
-
         public void appendAndPersist(
             final ReplicatedLogEntry replicatedLogEntry,
             final Procedure<ReplicatedLogEntry> callback)  {
@@ -768,7 +761,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                         long dataSizeForCheck = dataSize;
 
                         dataSizeSinceLastSnapshot += logEntrySize;
-                        long journalSize = lastIndex()+1;
+                        long journalSize = lastIndex() + 1;
 
                         if(!hasFollowers()) {
                             // When we do not have followers we do not maintain an in-memory log
@@ -795,7 +788,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
                             dataSizeSinceLastSnapshot = 0;
 
-                            LOG.info("{}: Initiating Snapshot Capture..", persistenceId());
+                            LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
+                                " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
+
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
 
@@ -819,12 +814,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             }
 
                             // send a CaptureSnapshot to self to make the expensive operation async.
-                            getSelf().tell(new CaptureSnapshot(
-                                lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
+                            long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
+                            ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
+                            getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
+                                (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
+                                (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
                                 null);
                             context.setSnapshotCaptureInitiated(true);
                         }
-                        if(callback != null){
+                        if (callback != null){
                             callback.apply(replicatedLogEntry);
                         }
                     }