BUG 2676 : Add more context to the Initiate snapshot capture log message
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 8e97c5877cba4cb90d195a6bb33cd42c449179dc..65254f2d6277c34c9773570e5938d8a005ab3015 100644 (file)
@@ -10,8 +10,6 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import akka.japi.Procedure;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
@@ -33,16 +31,15 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * RaftActor encapsulates a state machine that needs to be kept synchronized
@@ -85,8 +82,7 @@ import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntries
  * </ul>
  */
 public abstract class RaftActor extends AbstractUntypedPersistentActor {
-    protected final LoggingAdapter LOG =
-        Logging.getLogger(getContext().system(), this);
+    protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
     /**
      * The current state determines the current behavior of a RaftActor
@@ -107,14 +103,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private CaptureSnapshot captureSnapshot = null;
 
-    private volatile boolean hasSnapshotCaptureInitiated = false;
-
     private Stopwatch recoveryTimer;
 
     private int currentRecoveryBatchCount;
 
-
-
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
     }
@@ -131,8 +123,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private void initRecoveryTimer() {
         if(recoveryTimer == null) {
-            recoveryTimer = new Stopwatch();
-            recoveryTimer.start();
+            recoveryTimer = Stopwatch.createStarted();
         }
     }
 
@@ -179,7 +170,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private void onRecoveredSnapshot(SnapshotOffer offer) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("SnapshotOffer called..");
+            LOG.debug("{}: SnapshotOffer called..", persistenceId());
         }
 
         initRecoveryTimer();
@@ -195,21 +186,20 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context.setLastApplied(snapshot.getLastAppliedIndex());
         context.setCommitIndex(snapshot.getLastAppliedIndex());
 
-        Stopwatch timer = new Stopwatch();
-        timer.start();
+        Stopwatch timer = Stopwatch.createStarted();
 
         // Apply the snapshot to the actors state
-        applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState()));
+        applyRecoverySnapshot(snapshot.getState());
 
         timer.stop();
         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
                 replicatedLog.size(), persistenceId(), timer.toString(),
-                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+                replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
     }
 
     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+            LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
         }
 
         replicatedLog.append(logEntry);
@@ -217,8 +207,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
-                    context.getLastApplied() + 1, ale.getToIndex());
+            LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
+                    persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
         }
 
         for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
@@ -268,8 +258,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 "Persistence Id =  " + persistenceId() +
                 " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
                 "journal-size={}",
-            replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
-            replicatedLog.snapshotTerm, replicatedLog.size());
+            replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
+            replicatedLog.getSnapshotTerm(), replicatedLog.size());
 
         initializeBehavior();
     }
@@ -289,8 +279,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             ApplyState applyState = (ApplyState) message;
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Applying state for log index {} data {}",
-                    applyState.getReplicatedLogEntry().getIndex(),
+                LOG.debug("{}: Applying state for log index {} data {}",
+                    persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
                     applyState.getReplicatedLogEntry().getData());
             }
 
@@ -300,7 +290,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof ApplyLogEntries){
             ApplyLogEntries ale = (ApplyLogEntries) message;
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+                LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
             }
             persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
                 @Override
@@ -312,12 +302,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("ApplySnapshot called on Follower Actor " +
-                        "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+                LOG.debug("{}: ApplySnapshot called on Follower Actor " +
+                        "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
                     snapshot.getLastAppliedTerm()
                 );
             }
-            applySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+            applySnapshot(snapshot.getState());
 
             //clears the followers log, sets the snapshot index to ensure adjusted-index works
             replicatedLog = new ReplicatedLogImpl(snapshot);
@@ -332,7 +323,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         } else if (message instanceof SaveSnapshotSuccess) {
             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
-            LOG.info("SaveSnapshotSuccess received for snapshot");
+            LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
 
             long sequenceNumber = success.metadata().sequenceNr();
 
@@ -341,39 +332,29 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof SaveSnapshotFailure) {
             SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
 
-            LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
-            LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
+            LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
+                    persistenceId(), saveSnapshotFailure.cause());
 
             context.getReplicatedLog().snapshotRollback();
 
-            LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
-                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
+            LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
+                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
                 context.getReplicatedLog().getSnapshotIndex(),
                 context.getReplicatedLog().getSnapshotTerm(),
                 context.getReplicatedLog().size());
 
         } else if (message instanceof CaptureSnapshot) {
-            LOG.info("CaptureSnapshot received by actor");
-            CaptureSnapshot cs = (CaptureSnapshot)message;
-            captureSnapshot = cs;
-            createSnapshot();
+            LOG.info("{}: CaptureSnapshot received by actor", persistenceId());
 
-        } else if (message instanceof CaptureSnapshotReply){
-            LOG.info("CaptureSnapshotReply received by actor");
-            CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
+            if(captureSnapshot == null) {
+                captureSnapshot = (CaptureSnapshot)message;
+                createSnapshot();
+            }
 
-            ByteString stateInBytes = csr.getSnapshot();
-            LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
-            handleCaptureSnapshotReply(stateInBytes);
+        } else if (message instanceof CaptureSnapshotReply){
+            handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
 
         } else {
-            if (!(message instanceof AppendEntriesMessages.AppendEntries)
-                && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("onReceiveCommand: message: {}", message.getClass());
-                }
-            }
-
             RaftActorBehavior oldBehavior = currentBehavior;
             currentBehavior = currentBehavior.handleMessage(getSender(), message);
 
@@ -416,7 +397,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             context.getTermInformation().getCurrentTerm(), data);
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Persist data {}", replicatedLogEntry);
+            LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
         }
 
         final RaftActorContext raftContext = getRaftActorContext();
@@ -438,12 +419,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
 
                             // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
-                            if(!hasSnapshotCaptureInitiated){
+                            if(!context.isSnapshotCaptureInitiated()){
                                 raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
                                         raftContext.getTermInformation().getCurrentTerm());
                                 raftContext.getReplicatedLog().snapshotCommit();
                             } else {
-                                LOG.debug("Skipping fake snapshotting for {} because real snapshotting is in progress", getId());
+                                LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
+                                        persistenceId(), getId());
                             }
                         } else if (clientActor != null) {
                             // Send message for replication
@@ -581,9 +563,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     /**
      * This method is called during recovery to reconstruct the state of the actor.
      *
-     * @param snapshot A snapshot of the state of the actor
+     * @param snapshotBytes A snapshot of the state of the actor
      */
-    protected abstract void applyRecoverySnapshot(ByteString snapshot);
+    protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
 
     /**
      * This method is called during recovery at the end of a batch to apply the current batched
@@ -612,9 +594,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * operations when the derived actor is out of sync with it's peers
      * and the only way to bring it in sync is by applying a snapshot
      *
-     * @param snapshot A snapshot of the state of the actor
+     * @param snapshotBytes A snapshot of the state of the actor
      */
-    protected abstract void applySnapshot(ByteString snapshot);
+    protected abstract void applySnapshot(byte[] snapshotBytes);
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -654,44 +636,56 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
         String peerAddress = context.getPeerAddress(leaderId);
         if(LOG.isDebugEnabled()) {
-            LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}",
-                    leaderId, peerAddress);
+            LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
+                    persistenceId(), leaderId, peerAddress);
         }
 
         return peerAddress;
     }
 
-    private void handleCaptureSnapshotReply(ByteString stateInBytes) {
+    private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
+        LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
+
         // create a snapshot object from the state provided and save it
         // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
-        Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
+        Snapshot sn = Snapshot.create(snapshotBytes,
             context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
             captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
             captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
 
         persistence().saveSnapshot(sn);
 
-        LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+        LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
 
-        //be greedy and remove entries from in-mem journal which are in the snapshot
-        // and update snapshotIndex and snapshotTerm without waiting for the success,
+        long dataThreshold = Runtime.getRuntime().totalMemory() *
+                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+        if (context.getReplicatedLog().dataSize() > dataThreshold) {
+            // if memory is less, clear the log based on lastApplied.
+            // this could/should only happen if one of the followers is down
+            // as normally we keep removing from the log when its replicated to all.
+            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
+                    captureSnapshot.getLastAppliedTerm());
 
-        context.getReplicatedLog().snapshotPreCommit(
-            captureSnapshot.getLastAppliedIndex(),
-            captureSnapshot.getLastAppliedTerm());
+        } else {
+            // clear the log based on replicatedToAllIndex
+            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
+                    captureSnapshot.getReplicatedToAllTerm());
+        }
+        getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
 
-        LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
-            "and term:{}", captureSnapshot.getLastAppliedIndex(),
+        LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+            "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
             captureSnapshot.getLastAppliedTerm());
 
         if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
             // this would be call straight to the leader and won't initiate in serialization
-            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(stateInBytes));
+            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
+                    ByteString.copyFrom(snapshotBytes)));
         }
 
         captureSnapshot = null;
-        hasSnapshotCaptureInitiated = false;
+        context.setSnapshotCaptureInitiated(false);
     }
 
     protected boolean hasFollowers(){
@@ -722,13 +716,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             // FIXME: Maybe this should be done after the command is saved
             journal.subList(adjustedIndex , journal.size()).clear();
 
-            persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+            persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
 
-                @Override public void apply(DeleteEntries param)
-                    throws Exception {
+                @Override
+                public void apply(DeleteEntries param)
+                        throws Exception {
                     //FIXME : Doing nothing for now
                     dataSize = 0;
-                    for(ReplicatedLogEntry entry : journal){
+                    for (ReplicatedLogEntry entry : journal) {
                         dataSize += entry.size();
                     }
                 }
@@ -740,17 +735,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             appendAndPersist(replicatedLogEntry, null);
         }
 
-        @Override
-        public int dataSize() {
-            return dataSize;
-        }
-
         public void appendAndPersist(
             final ReplicatedLogEntry replicatedLogEntry,
             final Procedure<ReplicatedLogEntry> callback)  {
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
+                LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
             }
 
             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
@@ -771,7 +761,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                         long dataSizeForCheck = dataSize;
 
                         dataSizeSinceLastSnapshot += logEntrySize;
-                        long journalSize = lastIndex()+1;
+                        long journalSize = lastIndex() + 1;
 
                         if(!hasFollowers()) {
                             // When we do not have followers we do not maintain an in-memory log
@@ -792,13 +782,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                                 getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
 
                         // when a snaphsot is being taken, captureSnapshot != null
-                        if (hasSnapshotCaptureInitiated == false &&
+                        if (!context.isSnapshotCaptureInitiated() &&
                                 ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
                                         dataSizeForCheck > dataThreshold)) {
 
                             dataSizeSinceLastSnapshot = 0;
 
-                            LOG.info("Initiating Snapshot Capture..");
+                            LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
+                                " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
+
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
 
@@ -812,20 +804,25 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             }
 
                             if(LOG.isDebugEnabled()) {
-                                LOG.debug("Snapshot Capture logSize: {}", journal.size());
-                                LOG.debug("Snapshot Capture lastApplied:{} ",
-                                    context.getLastApplied());
-                                LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
-                                LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+                                LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
+                                LOG.debug("{}: Snapshot Capture lastApplied:{} ",
+                                        persistenceId(), context.getLastApplied());
+                                LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
+                                        lastAppliedIndex);
+                                LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
+                                        lastAppliedTerm);
                             }
 
                             // send a CaptureSnapshot to self to make the expensive operation async.
-                            getSelf().tell(new CaptureSnapshot(
-                                lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
+                            long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
+                            ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
+                            getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
+                                (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
+                                (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
                                 null);
-                            hasSnapshotCaptureInitiated = true;
+                            context.setSnapshotCaptureInitiated(true);
                         }
-                        if(callback != null){
+                        if (callback != null){
                             callback.apply(replicatedLogEntry);
                         }
                     }
@@ -868,7 +865,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         @Override public void update(long currentTerm, String votedFor) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+                LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
             }
             this.currentTerm = currentTerm;
             this.votedFor = votedFor;