Merge "Refactor raft recovery code to a RaftActorRecoverySupport class"
authorMoiz Raja <moraja@cisco.com>
Fri, 10 Apr 2015 13:04:08 +0000 (13:04 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 10 Apr 2015 13:04:11 +0000 (13:04 +0000)
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java

index ed19f21dedb8d5900a0df35aa31fc20ce6b8aee4..eca29496662f724d8415c9a3196da2ade6fa7af1 100644 (file)
@@ -19,6 +19,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Map;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
@@ -26,6 +27,7 @@ import org.opendaylight.controller.cluster.example.messages.PrintState;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -34,7 +36,7 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa
 /**
  * A sample actor showing how the RaftActor is to be extended
  */
-public class ExampleActor extends RaftActor {
+public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort {
 
     private final Map<String, String> state = new HashMap();
 
@@ -192,22 +194,28 @@ public class ExampleActor extends RaftActor {
     }
 
     @Override
-    protected void startLogRecoveryBatch(int maxBatchSize) {
+    @Nonnull
+    protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+        return this;
     }
 
     @Override
-    protected void appendRecoveredLogEntry(Payload data) {
+    public void startLogRecoveryBatch(int maxBatchSize) {
     }
 
     @Override
-    protected void applyCurrentLogRecoveryBatch() {
+    public void appendRecoveredLogEntry(Payload data) {
     }
 
     @Override
-    protected void onRecoveryComplete() {
+    public void applyCurrentLogRecoveryBatch() {
     }
 
     @Override
-    protected void applyRecoverySnapshot(byte[] snapshot) {
+    public void onRecoveryComplete() {
+    }
+
+    @Override
+    public void applyRecoverySnapshot(byte[] snapshot) {
     }
 }
index 1c30fe23175b5af62ffe808d9428c3361688a4b0..1f1521d797d45790dc35c747ca9f8af2d4c6dd8d 100644 (file)
@@ -11,15 +11,12 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.japi.Procedure;
-import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
-import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.io.Serializable;
@@ -27,6 +24,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
@@ -36,7 +34,6 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -119,9 +116,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
 
-    private Stopwatch recoveryTimer;
-
-    private int currentRecoveryBatchCount;
+    private RaftActorRecoverySupport raftRecovery;
 
     private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
 
@@ -140,12 +135,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior));
     }
 
-    private void initRecoveryTimer() {
-        if(recoveryTimer == null) {
-            recoveryTimer = Stopwatch.createStarted();
-        }
-    }
-
     @Override
     public void preStart() throws Exception {
         LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
@@ -169,134 +158,28 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     public void handleRecover(Object message) {
-        if(persistence().isRecoveryApplicable()) {
-            if (message instanceof SnapshotOffer) {
-                onRecoveredSnapshot((SnapshotOffer) message);
-            } else if (message instanceof ReplicatedLogEntry) {
-                onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
-            } else if (message instanceof ApplyLogEntries) {
-                // Handle this message for backwards compatibility with pre-Lithium versions.
-                onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
-            } else if (message instanceof ApplyJournalEntries) {
-                onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
-            } else if (message instanceof DeleteEntries) {
-                replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
-            } else if (message instanceof UpdateElectionTerm) {
-                context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
-                        ((UpdateElectionTerm) message).getVotedFor());
-            } else if (message instanceof RecoveryCompleted) {
-                onRecoveryCompletedMessage();
-            }
-        } else {
-            if (message instanceof RecoveryCompleted) {
+        if(raftRecovery == null) {
+            raftRecovery = new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior,
+                    getRaftActorRecoveryCohort());
+        }
+
+        boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message);
+        if(recoveryComplete) {
+            if(!persistence().isRecoveryApplicable()) {
                 // Delete all the messages from the akka journal so that we do not end up with consistency issues
                 // Note I am not using the dataPersistenceProvider and directly using the akka api here
                 deleteMessages(lastSequenceNr());
 
                 // Delete all the akka snapshots as they will not be needed
                 deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
-
-                onRecoveryComplete();
-
-                initializeBehavior();
             }
-        }
-    }
-
-    private void onRecoveredSnapshot(SnapshotOffer offer) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: SnapshotOffer called..", persistenceId());
-        }
 
-        initRecoveryTimer();
+            onRecoveryComplete();
 
-        Snapshot snapshot = (Snapshot) offer.snapshot();
+            initializeBehavior();
 
-        // Create a replicated log with the snapshot information
-        // The replicated log can be used later on to retrieve this snapshot
-        // when we need to install it on a peer
-
-        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
-                currentBehavior));
-        context.setLastApplied(snapshot.getLastAppliedIndex());
-        context.setCommitIndex(snapshot.getLastAppliedIndex());
-
-        Stopwatch timer = Stopwatch.createStarted();
-
-        // Apply the snapshot to the actors state
-        applyRecoverySnapshot(snapshot.getState());
-
-        timer.stop();
-        LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
-                replicatedLog().size(), persistenceId(), timer.toString(),
-                replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm());
-    }
-
-    private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
+            raftRecovery = null;
         }
-
-        replicatedLog().append(logEntry);
-    }
-
-    private void onRecoveredApplyLogEntries(long toIndex) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
-                    persistenceId(), context.getLastApplied() + 1, toIndex);
-        }
-
-        for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
-            batchRecoveredLogEntry(replicatedLog().get(i));
-        }
-
-        context.setLastApplied(toIndex);
-        context.setCommitIndex(toIndex);
-    }
-
-    private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
-        initRecoveryTimer();
-
-        int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
-        if(currentRecoveryBatchCount == 0) {
-            startLogRecoveryBatch(batchSize);
-        }
-
-        appendRecoveredLogEntry(logEntry.getData());
-
-        if(++currentRecoveryBatchCount >= batchSize) {
-            endCurrentLogRecoveryBatch();
-        }
-    }
-
-    private void endCurrentLogRecoveryBatch() {
-        applyCurrentLogRecoveryBatch();
-        currentRecoveryBatchCount = 0;
-    }
-
-    private void onRecoveryCompletedMessage() {
-        if(currentRecoveryBatchCount > 0) {
-            endCurrentLogRecoveryBatch();
-        }
-
-        onRecoveryComplete();
-
-        String recoveryTime = "";
-        if(recoveryTimer != null) {
-            recoveryTimer.stop();
-            recoveryTime = " in " + recoveryTimer.toString();
-            recoveryTimer = null;
-        }
-
-        LOG.info(
-            "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
-                "Persistence Id =  " + persistenceId() +
-                " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
-                "journal-size={}",
-            replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
-            replicatedLog().getSnapshotTerm(), replicatedLog().size());
-
-        initializeBehavior();
     }
 
     protected void initializeBehavior(){
@@ -670,31 +553,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         Object data);
 
     /**
-     * This method is called during recovery at the start of a batch of state entries. Derived
-     * classes should perform any initialization needed to start a batch.
-     */
-    protected abstract void startLogRecoveryBatch(int maxBatchSize);
-
-    /**
-     * This method is called during recovery to append state data to the current batch. This method
-     * is called 1 or more times after {@link #startLogRecoveryBatch}.
-     *
-     * @param data the state data
-     */
-    protected abstract void appendRecoveredLogEntry(Payload data);
-
-    /**
-     * This method is called during recovery to reconstruct the state of the actor.
-     *
-     * @param snapshotBytes A snapshot of the state of the actor
-     */
-    protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
-
-    /**
-     * This method is called during recovery at the end of a batch to apply the current batched
-     * log entries. This method is called after {@link #appendRecoveredLogEntry}.
+     * Returns the RaftActorRecoveryCohort to participate in persistence recovery.
      */
-    protected abstract void applyCurrentLogRecoveryBatch();
+    @Nonnull
+    protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort();
 
     /**
      * This method is called when recovery is complete.
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java
new file mode 100644 (file)
index 0000000..a9f00aa
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+/**
+ * Interface for a class that participates in raft actor persistence recovery.
+ *
+ * @author Thomas Pantelis
+ */
+public interface RaftActorRecoveryCohort {
+
+    /**
+     * This method is called during recovery at the start of a batch of state entries. Derived
+     * classes should perform any initialization needed to start a batch.
+     */
+    void startLogRecoveryBatch(int maxBatchSize);
+
+    /**
+     * This method is called during recovery to append state data to the current batch. This method
+     * is called 1 or more times after {@link #startLogRecoveryBatch}.
+     *
+     * @param data the state data
+     */
+    void appendRecoveredLogEntry(Payload data);
+
+    /**
+     * This method is called during recovery to reconstruct the state of the actor.
+     *
+     * @param snapshotBytes A snapshot of the state of the actor
+     */
+    void applyRecoverySnapshot(byte[] snapshotBytes);
+
+    /**
+     * This method is called during recovery at the end of a batch to apply the current batched
+     * log entries. This method is called after {@link #appendRecoveredLogEntry}.
+     */
+    void applyCurrentLogRecoveryBatch();
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
new file mode 100644 (file)
index 0000000..5f33c73
--- /dev/null
@@ -0,0 +1,172 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SnapshotOffer;
+import com.google.common.base.Stopwatch;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+/**
+ * Support class that handles persistence recovery for a RaftActor.
+ *
+ * @author Thomas Pantelis
+ */
+class RaftActorRecoverySupport {
+    private final DataPersistenceProvider persistence;
+    private final RaftActorContext context;
+    private final RaftActorBehavior currentBehavior;
+    private final RaftActorRecoveryCohort cohort;
+
+    private int currentRecoveryBatchCount;
+
+    private Stopwatch recoveryTimer;
+    private final Logger log;
+
+    RaftActorRecoverySupport(DataPersistenceProvider persistence, RaftActorContext context,
+            RaftActorBehavior currentBehavior, RaftActorRecoveryCohort cohort) {
+        this.persistence = persistence;
+        this.context = context;
+        this.currentBehavior = currentBehavior;
+        this.cohort = cohort;
+        this.log = context.getLogger();
+    }
+
+    boolean handleRecoveryMessage(Object message) {
+        boolean recoveryComplete = false;
+        if(persistence.isRecoveryApplicable()) {
+            if (message instanceof SnapshotOffer) {
+                onRecoveredSnapshot((SnapshotOffer) message);
+            } else if (message instanceof ReplicatedLogEntry) {
+                onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
+            } else if (message instanceof ApplyLogEntries) {
+                // Handle this message for backwards compatibility with pre-Lithium versions.
+                onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
+            } else if (message instanceof ApplyJournalEntries) {
+                onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
+            } else if (message instanceof DeleteEntries) {
+                replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
+            } else if (message instanceof UpdateElectionTerm) {
+                context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+                        ((UpdateElectionTerm) message).getVotedFor());
+            } else if (message instanceof RecoveryCompleted) {
+                onRecoveryCompletedMessage();
+                recoveryComplete = true;
+            }
+        } else if (message instanceof RecoveryCompleted) {
+            recoveryComplete = true;
+        }
+
+        return recoveryComplete;
+    }
+
+    private ReplicatedLog replicatedLog() {
+        return context.getReplicatedLog();
+    }
+
+    private void initRecoveryTimer() {
+        if(recoveryTimer == null) {
+            recoveryTimer = Stopwatch.createStarted();
+        }
+    }
+
+    private void onRecoveredSnapshot(SnapshotOffer offer) {
+        if(log.isDebugEnabled()) {
+            log.debug("{}: SnapshotOffer called..", context.getId());
+        }
+
+        initRecoveryTimer();
+
+        Snapshot snapshot = (Snapshot) offer.snapshot();
+
+        // Create a replicated log with the snapshot information
+        // The replicated log can be used later on to retrieve this snapshot
+        // when we need to install it on a peer
+
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence, currentBehavior));
+        context.setLastApplied(snapshot.getLastAppliedIndex());
+        context.setCommitIndex(snapshot.getLastAppliedIndex());
+
+        Stopwatch timer = Stopwatch.createStarted();
+
+        // Apply the snapshot to the actors state
+        cohort.applyRecoverySnapshot(snapshot.getState());
+
+        timer.stop();
+        log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
+                replicatedLog().size(), context.getId(), timer.toString(),
+                replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm());
+    }
+
+    private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+        if(log.isDebugEnabled()) {
+            log.debug("{}: Received ReplicatedLogEntry for recovery: {}", context.getId(), logEntry.getIndex());
+        }
+
+        replicatedLog().append(logEntry);
+    }
+
+    private void onRecoveredApplyLogEntries(long toIndex) {
+        if(log.isDebugEnabled()) {
+            log.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
+                    context.getId(), context.getLastApplied() + 1, toIndex);
+        }
+
+        for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
+            batchRecoveredLogEntry(replicatedLog().get(i));
+        }
+
+        context.setLastApplied(toIndex);
+        context.setCommitIndex(toIndex);
+    }
+
+    private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
+        initRecoveryTimer();
+
+        int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
+        if(currentRecoveryBatchCount == 0) {
+            cohort.startLogRecoveryBatch(batchSize);
+        }
+
+        cohort.appendRecoveredLogEntry(logEntry.getData());
+
+        if(++currentRecoveryBatchCount >= batchSize) {
+            endCurrentLogRecoveryBatch();
+        }
+    }
+
+    private void endCurrentLogRecoveryBatch() {
+        cohort.applyCurrentLogRecoveryBatch();
+        currentRecoveryBatchCount = 0;
+    }
+
+    private void onRecoveryCompletedMessage() {
+        if(currentRecoveryBatchCount > 0) {
+            endCurrentLogRecoveryBatch();
+        }
+
+        String recoveryTime = "";
+        if(recoveryTimer != null) {
+            recoveryTimer.stop();
+            recoveryTime = " in " + recoveryTimer.toString();
+            recoveryTimer = null;
+        }
+
+        log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
+                 "Persistence Id =  " + context.getId() +
+                 " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, " +
+                 "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
+                 replicatedLog().getSnapshotTerm(), replicatedLog().size());
+    }
+}
index b910313b096015ad166c8be1178ac8f72ad12167..45fd26c930736dd0d17ff706154b6252246bdd0e 100644 (file)
@@ -119,7 +119,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         }
 
         @Override
-        protected void applyRecoverySnapshot(byte[] bytes) {
+        public void applyRecoverySnapshot(byte[] bytes) {
         }
 
         void setSnapshot(byte[] snapshot) {
index 17a81ac3c39aa9c0a27743e1739892629a157ba8..f71cb984b3e414bf879669cc64535e2d3d64c78c 100644 (file)
@@ -49,6 +49,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -97,9 +98,10 @@ public class RaftActorTest extends AbstractActorTest {
         InMemorySnapshotStore.clear();
     }
 
-    public static class MockRaftActor extends RaftActor {
+    public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort {
 
-        private final RaftActor delegate;
+        private final RaftActor actorDelegate;
+        private final RaftActorRecoveryCohort cohortDelegate;
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
         private final List<Object> state;
         private ActorRef roleChangeNotifier;
@@ -136,7 +138,8 @@ public class RaftActorTest extends AbstractActorTest {
                              DataPersistenceProvider dataPersistenceProvider) {
             super(id, peerAddresses, config);
             state = new ArrayList<>();
-            this.delegate = mock(RaftActor.class);
+            this.actorDelegate = mock(RaftActor.class);
+            this.cohortDelegate = mock(RaftActorRecoveryCohort.class);
             if(dataPersistenceProvider == null){
                 setPersistence(true);
             } else {
@@ -197,26 +200,32 @@ public class RaftActorTest extends AbstractActorTest {
 
 
         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
-            delegate.applyState(clientActor, identifier, data);
+            actorDelegate.applyState(clientActor, identifier, data);
             LOG.info("{}: applyState called", persistenceId());
         }
 
         @Override
-        protected void startLogRecoveryBatch(int maxBatchSize) {
+        @Nonnull
+        protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+            return this;
         }
 
         @Override
-        protected void appendRecoveredLogEntry(Payload data) {
+        public void startLogRecoveryBatch(int maxBatchSize) {
+        }
+
+        @Override
+        public void appendRecoveredLogEntry(Payload data) {
             state.add(data);
         }
 
         @Override
-        protected void applyCurrentLogRecoveryBatch() {
+        public void applyCurrentLogRecoveryBatch() {
         }
 
         @Override
         protected void onRecoveryComplete() {
-            delegate.onRecoveryComplete();
+            actorDelegate.onRecoveryComplete();
             recoveryComplete.countDown();
         }
 
@@ -227,8 +236,8 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         @Override
-        protected void applyRecoverySnapshot(byte[] bytes) {
-            delegate.applyRecoverySnapshot(bytes);
+        public void applyRecoverySnapshot(byte[] bytes) {
+            cohortDelegate.applyRecoverySnapshot(bytes);
             try {
                 Object data = toObject(bytes);
                 if (data instanceof List) {
@@ -241,16 +250,16 @@ public class RaftActorTest extends AbstractActorTest {
 
         @Override protected void createSnapshot() {
             LOG.info("{}: createSnapshot called", persistenceId());
-            delegate.createSnapshot();
+            actorDelegate.createSnapshot();
         }
 
         @Override protected void applySnapshot(byte [] snapshot) {
             LOG.info("{}: applySnapshot called", persistenceId());
-            delegate.applySnapshot(snapshot);
+            actorDelegate.applySnapshot(snapshot);
         }
 
         @Override protected void onStateChanged() {
-            delegate.onStateChanged();
+            actorDelegate.onStateChanged();
         }
 
         @Override
@@ -516,7 +525,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
 
-                verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
+                verify(mockRaftActor.cohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
 
                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
 
@@ -583,7 +592,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
 
-                verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
+                verify(mockRaftActor.cohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
 
                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
 
@@ -810,7 +819,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
 
-                verify(mockRaftActor.delegate).createSnapshot();
+                verify(mockRaftActor.actorDelegate).createSnapshot();
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
 
@@ -860,7 +869,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
 
-                verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
+                verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
 
             }
         };
@@ -907,7 +916,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
 
-                verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
+                verify(mockRaftActor.actorDelegate).applySnapshot(eq(snapshot.getState()));
 
                 assertTrue("The replicatedLog should have changed",
                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
@@ -1131,7 +1140,7 @@ public class RaftActorTest extends AbstractActorTest {
                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
                                 new MockRaftActorContext.MockPayload("x")), 4);
 
-                verify(leaderActor.delegate).createSnapshot();
+                verify(leaderActor.actorDelegate).createSnapshot();
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
@@ -1230,7 +1239,7 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
                                 new MockRaftActorContext.MockPayload("D")), 4);
 
-                verify(followerActor.delegate).createSnapshot();
+                verify(followerActor.actorDelegate).createSnapshot();
 
                 assertEquals(6, followerActor.getReplicatedLog().size());
 
index 9cd52b219a442320bd0dc8bf12cdfc6b6c4a577b..17f1abb92cc4ec7d4d0a7dffb818cec20b5cff0b 100644 (file)
@@ -63,11 +63,11 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -121,12 +121,6 @@ public class Shard extends RaftActor {
     private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
             Serialization.serializedActorPath(getSelf()));
 
-
-    /**
-     * Coordinates persistence recovery on startup.
-     */
-    private ShardRecoveryCoordinator recoveryCoordinator;
-
     private final DOMTransactionFactory transactionFactory;
 
     private final String txnDispatcherPath;
@@ -177,8 +171,6 @@ public class Shard extends RaftActor {
 
         appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
-
-        recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG);
     }
 
     private void setTransactionCommitTimeout() {
@@ -649,30 +641,13 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected
-    void startLogRecoveryBatch(final int maxBatchSize) {
-        recoveryCoordinator.startLogRecoveryBatch(maxBatchSize);
-    }
-
-    @Override
-    protected void appendRecoveredLogEntry(final Payload data) {
-        recoveryCoordinator.appendRecoveredLogPayload(data);
-    }
-
-    @Override
-    protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
-        recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes);
-    }
-
-    @Override
-    protected void applyCurrentLogRecoveryBatch() {
-        recoveryCoordinator.applyCurrentLogRecoveryBatch();
+    @Nonnull
+    protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+        return new ShardRecoveryCoordinator(store, persistenceId(), LOG);
     }
 
     @Override
     protected void onRecoveryComplete() {
-        recoveryCoordinator = null;
-
         //notify shard manager
         getContext().parent().tell(new ActorInitialized(), getSelf());
 
index 7e547d7257dc495dc34a796d1c7b7a0944cb0e76..01a124b6977c801e3f273c57341efe91d97c52b2 100644 (file)
@@ -13,6 +13,7 @@ import java.util.List;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@@ -32,7 +33,7 @@ import org.slf4j.Logger;
  *
  * @author Thomas Panetelis
  */
-class ShardRecoveryCoordinator {
+class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
 
     private final InMemoryDOMDataStore store;
     private List<ModificationPayload> currentLogRecoveryBatch;
@@ -45,13 +46,15 @@ class ShardRecoveryCoordinator {
         this.log = log;
     }
 
-    void startLogRecoveryBatch(int maxBatchSize) {
+    @Override
+    public void startLogRecoveryBatch(int maxBatchSize) {
         currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
 
         log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
     }
 
-    void appendRecoveredLogPayload(Payload payload) {
+    @Override
+    public void appendRecoveredLogEntry(Payload payload) {
         try {
             if(payload instanceof ModificationPayload) {
                 currentLogRecoveryBatch.add((ModificationPayload) payload);
@@ -83,7 +86,8 @@ class ShardRecoveryCoordinator {
     /**
      * Applies the current batched log entries to the data store.
      */
-    void applyCurrentLogRecoveryBatch() {
+    @Override
+    public void applyCurrentLogRecoveryBatch() {
         log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
 
         DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
@@ -105,7 +109,8 @@ class ShardRecoveryCoordinator {
      *
      * @param snapshotBytes the serialized snapshot
      */
-    void applyRecoveredSnapshot(final byte[] snapshotBytes) {
+    @Override
+    public void applyRecoverySnapshot(final byte[] snapshotBytes) {
         log.debug("{}: Applyng recovered sbapshot", shardName);
 
         DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();