Merge "BUG-2184 Fix subtree filtering for identity-ref leaves"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 91bbeeca504b607147e39927dae1481ee9c4df6e..66a46ef3bde0ca8a8d1fa8fe1056ccb463a594d6 100644 (file)
@@ -20,16 +20,16 @@ import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
 import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
-import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@ -38,7 +38,6 @@ import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
 import java.io.Serializable;
 import java.util.Map;
 
@@ -96,7 +95,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      * This context should NOT be passed directly to any other actor it is
      * only to be consumed by the RaftActorBehaviors
      */
-    private RaftActorContext context;
+    private final RaftActorContext context;
 
     /**
      * The in-memory journal
@@ -107,6 +106,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     private volatile boolean hasSnapshotCaptureInitiated = false;
 
+    private Stopwatch recoveryTimer;
+
+    private int currentRecoveryBatchCount;
+
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
     }
@@ -121,57 +124,136 @@ public abstract class RaftActor extends UntypedPersistentActor {
             LOG);
     }
 
-    @Override public void onReceiveRecover(Object message) {
-        if (message instanceof SnapshotOffer) {
-            LOG.info("SnapshotOffer called..");
-            SnapshotOffer offer = (SnapshotOffer) message;
-            Snapshot snapshot = (Snapshot) offer.snapshot();
-
-            // Create a replicated log with the snapshot information
-            // The replicated log can be used later on to retrieve this snapshot
-            // when we need to install it on a peer
-            replicatedLog = new ReplicatedLogImpl(snapshot);
-
-            context.setReplicatedLog(replicatedLog);
-            context.setLastApplied(snapshot.getLastAppliedIndex());
-
-            LOG.info("Applied snapshot to replicatedLog. " +
-                    "snapshotIndex={}, snapshotTerm={}, journal-size={}",
-                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
-                replicatedLog.size()
-            );
+    private void initRecoveryTimer() {
+        if(recoveryTimer == null) {
+            recoveryTimer = new Stopwatch();
+            recoveryTimer.start();
+        }
+    }
 
-            // Apply the snapshot to the actors state
-            applySnapshot(ByteString.copyFrom(snapshot.getState()));
+    @Override
+    public void preStart() throws Exception {
+        LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
+                context.getConfigParams().getJournalRecoveryLogBatchSize());
+        super.preStart();
+    }
 
+    @Override
+    public void onReceiveRecover(Object message) {
+        if (message instanceof SnapshotOffer) {
+            onRecoveredSnapshot((SnapshotOffer)message);
         } else if (message instanceof ReplicatedLogEntry) {
-            ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
-
-            // Apply State immediately
-            replicatedLog.append(logEntry);
-            applyState(null, "recovery", logEntry.getData());
-            context.setLastApplied(logEntry.getIndex());
-            context.setCommitIndex(logEntry.getIndex());
+            onRecoveredJournalLogEntry((ReplicatedLogEntry)message);
+        } else if (message instanceof ApplyLogEntries) {
+            onRecoveredApplyLogEntries((ApplyLogEntries)message);
         } else if (message instanceof DeleteEntries) {
             replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
         } else if (message instanceof UpdateElectionTerm) {
-            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
+            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+                    ((UpdateElectionTerm) message).getVotedFor());
         } else if (message instanceof RecoveryCompleted) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug(
-                    "RecoveryCompleted - Switching actor to Follower - " +
-                        "Persistence Id =  " + persistenceId() +
-                        " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
-                        "journal-size={}",
-                    replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
-                    replicatedLog.snapshotTerm, replicatedLog.size()
-                );
-            }
-            currentBehavior = switchBehavior(RaftState.Follower);
-            onStateChanged();
+            onRecoveryCompletedMessage();
         }
     }
 
+    private void onRecoveredSnapshot(SnapshotOffer offer) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("SnapshotOffer called..");
+        }
+
+        initRecoveryTimer();
+
+        Snapshot snapshot = (Snapshot) offer.snapshot();
+
+        // Create a replicated log with the snapshot information
+        // The replicated log can be used later on to retrieve this snapshot
+        // when we need to install it on a peer
+        replicatedLog = new ReplicatedLogImpl(snapshot);
+
+        context.setReplicatedLog(replicatedLog);
+        context.setLastApplied(snapshot.getLastAppliedIndex());
+        context.setCommitIndex(snapshot.getLastAppliedIndex());
+
+        Stopwatch timer = new Stopwatch();
+        timer.start();
+
+        // Apply the snapshot to the actors state
+        applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+        timer.stop();
+        LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
+                replicatedLog.size(), persistenceId(), timer.toString(),
+                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+    }
+
+    private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+        }
+
+        replicatedLog.append(logEntry);
+    }
+
+    private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
+                    context.getLastApplied() + 1, ale.getToIndex());
+        }
+
+        for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+            batchRecoveredLogEntry(replicatedLog.get(i));
+        }
+
+        context.setLastApplied(ale.getToIndex());
+        context.setCommitIndex(ale.getToIndex());
+    }
+
+    private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
+        initRecoveryTimer();
+
+        int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
+        if(currentRecoveryBatchCount == 0) {
+            startLogRecoveryBatch(batchSize);
+        }
+
+        appendRecoveredLogEntry(logEntry.getData());
+
+        if(++currentRecoveryBatchCount >= batchSize) {
+            endCurrentLogRecoveryBatch();
+        }
+    }
+
+    private void endCurrentLogRecoveryBatch() {
+        applyCurrentLogRecoveryBatch();
+        currentRecoveryBatchCount = 0;
+    }
+
+    private void onRecoveryCompletedMessage() {
+        if(currentRecoveryBatchCount > 0) {
+            endCurrentLogRecoveryBatch();
+        }
+
+        onRecoveryComplete();
+
+        String recoveryTime = "";
+        if(recoveryTimer != null) {
+            recoveryTimer.stop();
+            recoveryTime = " in " + recoveryTimer.toString();
+            recoveryTimer = null;
+        }
+
+        LOG.info(
+            "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
+                "Persistence Id =  " + persistenceId() +
+                " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
+                "journal-size={}",
+            replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+            replicatedLog.snapshotTerm, replicatedLog.size());
+
+        currentBehavior = new Follower(context);
+        onStateChanged();
+    }
+
     @Override public void onReceiveCommand(Object message) {
         if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
@@ -185,6 +267,17 @@ public abstract class RaftActor extends UntypedPersistentActor {
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
 
+        } else if (message instanceof ApplyLogEntries){
+            ApplyLogEntries ale = (ApplyLogEntries) message;
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+            }
+            persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+                @Override
+                public void apply(ApplyLogEntries param) throws Exception {
+                }
+            });
+
         } else if(message instanceof ApplySnapshot ) {
             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
 
@@ -262,14 +355,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
             if (!(message instanceof AppendEntriesMessages.AppendEntries)
                 && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
                 if(LOG.isDebugEnabled()) {
-                    LOG.debug("onReceiveCommand: message:" + message.getClass());
+                    LOG.debug("onReceiveCommand: message: {}", message.getClass());
                 }
             }
 
-            RaftState state =
-                currentBehavior.handleMessage(getSender(), message);
             RaftActorBehavior oldBehavior = currentBehavior;
-            currentBehavior = switchBehavior(state);
+            currentBehavior = currentBehavior.handleMessage(getSender(), message);
+
             if(oldBehavior != currentBehavior){
                 onStateChanged();
             }
@@ -371,6 +463,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
         return context.getLastApplied();
     }
 
+    protected RaftActorContext getRaftActorContext() {
+        return context;
+    }
+
     /**
      * setPeerAddress sets the address of a known peer at a later time.
      * <p>
@@ -411,6 +507,38 @@ public abstract class RaftActor extends UntypedPersistentActor {
     protected abstract void applyState(ActorRef clientActor, String identifier,
         Object data);
 
+    /**
+     * This method is called during recovery at the start of a batch of state entries. Derived
+     * classes should perform any initialization needed to start a batch.
+     */
+    protected abstract void startLogRecoveryBatch(int maxBatchSize);
+
+    /**
+     * This method is called during recovery to append state data to the current batch. This method
+     * is called 1 or more times after {@link #startRecoveryStateBatch}.
+     *
+     * @param data the state data
+     */
+    protected abstract void appendRecoveredLogEntry(Payload data);
+
+    /**
+     * This method is called during recovery to reconstruct the state of the actor.
+     *
+     * @param snapshot A snapshot of the state of the actor
+     */
+    protected abstract void applyRecoverySnapshot(ByteString snapshot);
+
+    /**
+     * This method is called during recovery at the end of a batch to apply the current batched
+     * log entries. This method is called after {@link #appendRecoveryLogEntry}.
+     */
+    protected abstract void applyCurrentLogRecoveryBatch();
+
+    /**
+     * This method is called when recovery is complete.
+     */
+    protected abstract void onRecoveryComplete();
+
     /**
      * This method will be called by the RaftActor when a snapshot needs to be
      * created. The derived actor should respond with its current state.
@@ -423,10 +551,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
     protected abstract void createSnapshot();
 
     /**
-     * This method will be called by the RaftActor during recovery to
-     * reconstruct the state of the actor.
-     * <p/>
-     * This method may also be called at any other point during normal
+     * This method can be called at any other point during normal
      * operations when the derived actor is out of sync with it's peers
      * and the only way to bring it in sync is by applying a snapshot
      *
@@ -443,38 +568,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     protected void onLeaderChanged(String oldLeader, String newLeader){};
 
-    private RaftActorBehavior switchBehavior(RaftState state) {
-        if (currentBehavior != null) {
-            if (currentBehavior.state() == state) {
-                return currentBehavior;
-            }
-            LOG.info("Switching from state " + currentBehavior.state() + " to "
-                + state);
-
-            try {
-                currentBehavior.close();
-            } catch (Exception e) {
-                LOG.error(e,
-                    "Failed to close behavior : " + currentBehavior.state());
-            }
-
-        } else {
-            LOG.info("Switching behavior to " + state);
-        }
-        RaftActorBehavior behavior = null;
-        if (state == RaftState.Candidate) {
-            behavior = new Candidate(context);
-        } else if (state == RaftState.Follower) {
-            behavior = new Follower(context);
-        } else {
-            behavior = new Leader(context);
-        }
-
-
-
-        return behavior;
-    }
-
     private void trimPersistentData(long sequenceNumber) {
         // Trim akka snapshots
         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
@@ -496,8 +589,8 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
         String peerAddress = context.getPeerAddress(leaderId);
         if(LOG.isDebugEnabled()) {
-            LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
-                + peerAddress);
+            LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}",
+                    leaderId, peerAddress);
         }
 
         return peerAddress;
@@ -571,8 +664,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
         public void appendAndPersist(final ActorRef clientActor,
             final String identifier,
             final ReplicatedLogEntry replicatedLogEntry) {
-            context.getLogger().debug(
-                "Append log entry and persist {} ", replicatedLogEntry);
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
+            }
+
             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
             journal.add(replicatedLogEntry);
 
@@ -583,6 +679,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             // of a single command.
             persist(replicatedLogEntry,
                 new Procedure<ReplicatedLogEntry>() {
+                    @Override
                     public void apply(ReplicatedLogEntry evt) throws Exception {
                         // when a snaphsot is being taken, captureSnapshot != null
                         if (hasSnapshotCaptureInitiated == false &&
@@ -647,10 +744,12 @@ public abstract class RaftActor extends UntypedPersistentActor {
         private long currentTerm = 0;
         private String votedFor = null;
 
+        @Override
         public long getCurrentTerm() {
             return currentTerm;
         }
 
+        @Override
         public String getVotedFor() {
             return votedFor;
         }