Bug-2590: Clustering : Minimize usage of in-memory journal
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index a7c3db4fc246bcce77d23e8d324a4301d930568e..766b80e73dd12c890df3ed493e397a7cd144aab4 100644 (file)
@@ -107,14 +107,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private CaptureSnapshot captureSnapshot = null;
 
-    private volatile boolean hasSnapshotCaptureInitiated = false;
-
     private Stopwatch recoveryTimer;
 
     private int currentRecoveryBatchCount;
 
-
-
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
     }
@@ -172,16 +168,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
                 onRecoveryComplete();
 
-                RaftActorBehavior oldBehavior = currentBehavior;
-                currentBehavior = new Follower(context);
-                handleBehaviorChange(oldBehavior, currentBehavior);
+                initializeBehavior();
             }
         }
     }
 
     private void onRecoveredSnapshot(SnapshotOffer offer) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("SnapshotOffer called..");
+            LOG.debug("{}: SnapshotOffer called..", persistenceId());
         }
 
         initRecoveryTimer();
@@ -201,17 +195,17 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         timer.start();
 
         // Apply the snapshot to the actors state
-        applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState()));
+        applyRecoverySnapshot(snapshot.getState());
 
         timer.stop();
         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
                 replicatedLog.size(), persistenceId(), timer.toString(),
-                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+                replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
     }
 
     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+            LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
         }
 
         replicatedLog.append(logEntry);
@@ -219,8 +213,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
-                    context.getLastApplied() + 1, ale.getToIndex());
+            LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
+                    persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
         }
 
         for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
@@ -270,11 +264,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 "Persistence Id =  " + persistenceId() +
                 " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
                 "journal-size={}",
-            replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
-            replicatedLog.snapshotTerm, replicatedLog.size());
+            replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
+            replicatedLog.getSnapshotTerm(), replicatedLog.size());
+
+        initializeBehavior();
+    }
+
+    protected void initializeBehavior(){
+        changeCurrentBehavior(new Follower(context));
+    }
 
+    protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
         RaftActorBehavior oldBehavior = currentBehavior;
-        currentBehavior = new Follower(context);
+        currentBehavior = newBehavior;
         handleBehaviorChange(oldBehavior, currentBehavior);
     }
 
@@ -283,8 +285,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             ApplyState applyState = (ApplyState) message;
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Applying state for log index {} data {}",
-                    applyState.getReplicatedLogEntry().getIndex(),
+                LOG.debug("{}: Applying state for log index {} data {}",
+                    persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
                     applyState.getReplicatedLogEntry().getData());
             }
 
@@ -294,7 +296,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof ApplyLogEntries){
             ApplyLogEntries ale = (ApplyLogEntries) message;
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+                LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
             }
             persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
                 @Override
@@ -306,12 +308,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("ApplySnapshot called on Follower Actor " +
-                        "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+                LOG.debug("{}: ApplySnapshot called on Follower Actor " +
+                        "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
                     snapshot.getLastAppliedTerm()
                 );
             }
-            applySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+            applySnapshot(snapshot.getState());
 
             //clears the followers log, sets the snapshot index to ensure adjusted-index works
             replicatedLog = new ReplicatedLogImpl(snapshot);
@@ -326,7 +329,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         } else if (message instanceof SaveSnapshotSuccess) {
             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
-            LOG.info("SaveSnapshotSuccess received for snapshot");
+            LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
 
             long sequenceNumber = success.metadata().sequenceNr();
 
@@ -335,36 +338,33 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof SaveSnapshotFailure) {
             SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
 
-            LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
-            LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
+            LOG.error(saveSnapshotFailure.cause(), "{}: SaveSnapshotFailure received for snapshot Cause:",
+                    persistenceId());
 
             context.getReplicatedLog().snapshotRollback();
 
-            LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
-                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
+            LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
+                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
                 context.getReplicatedLog().getSnapshotIndex(),
                 context.getReplicatedLog().getSnapshotTerm(),
                 context.getReplicatedLog().size());
 
         } else if (message instanceof CaptureSnapshot) {
-            LOG.info("CaptureSnapshot received by actor");
-            CaptureSnapshot cs = (CaptureSnapshot)message;
-            captureSnapshot = cs;
-            createSnapshot();
+            LOG.info("{}: CaptureSnapshot received by actor", persistenceId());
 
-        } else if (message instanceof CaptureSnapshotReply){
-            LOG.info("CaptureSnapshotReply received by actor");
-            CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
+            if(captureSnapshot == null) {
+                captureSnapshot = (CaptureSnapshot)message;
+                createSnapshot();
+            }
 
-            ByteString stateInBytes = csr.getSnapshot();
-            LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
-            handleCaptureSnapshotReply(stateInBytes);
+        } else if (message instanceof CaptureSnapshotReply){
+            handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
 
         } else {
             if (!(message instanceof AppendEntriesMessages.AppendEntries)
                 && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
                 if(LOG.isDebugEnabled()) {
-                    LOG.debug("onReceiveCommand: message: {}", message.getClass());
+                    LOG.debug("{}: onReceiveCommand: message: {}", persistenceId(), message.getClass());
                 }
             }
 
@@ -379,15 +379,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         if (oldBehavior != currentBehavior){
             onStateChanged();
         }
-        if (oldBehavior != null) {
-            // it can happen that the state has not changed but the leader has changed.
-            onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
 
-            if (getRoleChangeNotifier().isPresent() && oldBehavior.state() != currentBehavior.state()) {
-                // we do not want to notify when the behavior/role is set for the first time (i.e follower)
-                getRoleChangeNotifier().get().tell(new RoleChanged(getId(), oldBehavior.state().name(),
-                    currentBehavior.state().name()), getSelf());
-            }
+        String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId();
+        String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name();
+
+        // it can happen that the state has not changed but the leader has changed.
+        onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+
+        if (getRoleChangeNotifier().isPresent() &&
+                (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
+            getRoleChangeNotifier().get().tell(
+                    new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()),
+                    getSelf());
         }
     }
 
@@ -407,7 +410,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             context.getTermInformation().getCurrentTerm(), data);
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Persist data {}", replicatedLogEntry);
+            LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
         }
 
         final RaftActorContext raftContext = getRaftActorContext();
@@ -429,12 +432,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
 
                             // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
-                            if(!hasSnapshotCaptureInitiated){
+                            if(!context.isSnapshotCaptureInitiated()){
                                 raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
                                         raftContext.getTermInformation().getCurrentTerm());
                                 raftContext.getReplicatedLog().snapshotCommit();
                             } else {
-                                LOG.debug("Skipping fake snapshotting for {} because real snapshotting is in progress", getId());
+                                LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
+                                        persistenceId(), getId());
                             }
                         } else if (clientActor != null) {
                             // Send message for replication
@@ -574,7 +578,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      *
      * @param snapshot A snapshot of the state of the actor
      */
-    protected abstract void applyRecoverySnapshot(ByteString snapshot);
+    protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
 
     /**
      * This method is called during recovery at the end of a batch to apply the current batched
@@ -603,9 +607,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * operations when the derived actor is out of sync with it's peers
      * and the only way to bring it in sync is by applying a snapshot
      *
-     * @param snapshot A snapshot of the state of the actor
+     * @param snapshotBytes A snapshot of the state of the actor
      */
-    protected abstract void applySnapshot(ByteString snapshot);
+    protected abstract void applySnapshot(byte[] snapshotBytes);
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -645,25 +649,27 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
         String peerAddress = context.getPeerAddress(leaderId);
         if(LOG.isDebugEnabled()) {
-            LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}",
-                    leaderId, peerAddress);
+            LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
+                    persistenceId(), leaderId, peerAddress);
         }
 
         return peerAddress;
     }
 
-    private void handleCaptureSnapshotReply(ByteString stateInBytes) {
+    private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
+        LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
+
         // create a snapshot object from the state provided and save it
         // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
-        Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
+        Snapshot sn = Snapshot.create(snapshotBytes,
             context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
             captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
             captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
 
         persistence().saveSnapshot(sn);
 
-        LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+        LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
 
         //be greedy and remove entries from in-mem journal which are in the snapshot
         // and update snapshotIndex and snapshotTerm without waiting for the success,
@@ -672,17 +678,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             captureSnapshot.getLastAppliedIndex(),
             captureSnapshot.getLastAppliedTerm());
 
-        LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
-            "and term:{}", captureSnapshot.getLastAppliedIndex(),
+        LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+            "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
             captureSnapshot.getLastAppliedTerm());
 
         if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
             // this would be call straight to the leader and won't initiate in serialization
-            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(stateInBytes));
+            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
+                    ByteString.copyFrom(snapshotBytes)));
         }
 
         captureSnapshot = null;
-        hasSnapshotCaptureInitiated = false;
+        context.setSnapshotCaptureInitiated(false);
     }
 
     protected boolean hasFollowers(){
@@ -741,7 +748,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             final Procedure<ReplicatedLogEntry> callback)  {
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
+                LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
             }
 
             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
@@ -783,13 +790,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                                 getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
 
                         // when a snaphsot is being taken, captureSnapshot != null
-                        if (hasSnapshotCaptureInitiated == false &&
+                        if (!context.isSnapshotCaptureInitiated() &&
                                 ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
                                         dataSizeForCheck > dataThreshold)) {
 
                             dataSizeSinceLastSnapshot = 0;
 
-                            LOG.info("Initiating Snapshot Capture..");
+                            LOG.info("{}: Initiating Snapshot Capture..", persistenceId());
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
 
@@ -803,18 +810,20 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             }
 
                             if(LOG.isDebugEnabled()) {
-                                LOG.debug("Snapshot Capture logSize: {}", journal.size());
-                                LOG.debug("Snapshot Capture lastApplied:{} ",
-                                    context.getLastApplied());
-                                LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
-                                LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+                                LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
+                                LOG.debug("{}: Snapshot Capture lastApplied:{} ",
+                                        persistenceId(), context.getLastApplied());
+                                LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
+                                        lastAppliedIndex);
+                                LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
+                                        lastAppliedTerm);
                             }
 
                             // send a CaptureSnapshot to self to make the expensive operation async.
                             getSelf().tell(new CaptureSnapshot(
                                 lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
                                 null);
-                            hasSnapshotCaptureInitiated = true;
+                            context.setSnapshotCaptureInitiated(true);
                         }
                         if(callback != null){
                             callback.apply(replicatedLogEntry);
@@ -859,7 +868,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         @Override public void update(long currentTerm, String votedFor) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+                LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
             }
             this.currentTerm = currentTerm;
             this.votedFor = votedFor;