Bug 7362: Notify applyState synchronously 04/49404/7
authorTom Pantelis <tpanteli@brocade.com>
Thu, 15 Dec 2016 06:05:53 +0000 (01:05 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 6 Jan 2017 12:45:28 +0000 (12:45 +0000)
As per comments in bug 7362, it's problematic for snapshots that we
set lastApplied index but queue the ApplyState message. This can leave
the derived actor's captured state out-of-sync with the lastApplied index
if a snapshot occurs in between. We need to set lastApplied atomically
with apply to state so I added a callback that RaftActor sets in the
RaftActorContext that is called by the behavior's applyLogToStateMachine
method. The callback immediately calls applyState. The ApplyState message
is still queued for other compenents that trigger on it (and also unit
tests).

I considered setting lastApplied when the ApplyState message is processed,
and not in applyLogToStateMachine, but this would be problematic if
another AppendEntriesReply was queued before the ApplyState message
as it would try to apply it again.

Change-Id: Ie062af8440bc251eec9c9ef58f450dee23abd04c
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
20 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index fff6ce9ed17db138af3ec73be83ddcd4d324f102..2d2fce22f9c3c824cf977be4637cbf60e24e0a46 100644 (file)
@@ -51,6 +51,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftStat
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -132,7 +133,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
             -1, -1, peerAddresses,
             configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
-            delegatingPersistenceProvider, LOG);
+            delegatingPersistenceProvider, this::handleApplyState, LOG);
 
         context.setPayloadVersion(payloadVersion);
         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
@@ -224,29 +225,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         if (snapshotSupport.handleSnapshotMessage(message, getSender())) {
             return;
         }
-
         if (message instanceof ApplyState) {
             ApplyState applyState = (ApplyState) message;
 
-            long startTime = System.nanoTime();
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("{}: Applying state for log index {} data {}",
-                    persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
-                    applyState.getReplicatedLogEntry().getData());
-            }
-
-            if (!(applyState.getReplicatedLogEntry().getData() instanceof NoopPayload)) {
-                applyState(applyState.getClientActor(), applyState.getIdentifier(),
-                    applyState.getReplicatedLogEntry().getData());
-            }
-
-            long elapsedTime = System.nanoTime() - startTime;
-            if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
-                LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
-                        TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
-            }
-
             if (!hasFollowers()) {
                 // for single node, the capture should happen after the apply state
                 // as we delete messages from the persistent journal which have made it to the snapshot
@@ -257,9 +238,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 context.getSnapshotManager().trimLog(context.getLastApplied());
             }
 
-            // Send it to the current behavior - some behaviors like PreLeader need to be notified of ApplyState.
             possiblyHandleBehaviorMessage(message);
-
         } else if (message instanceof ApplyJournalEntries) {
             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
             LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
@@ -500,6 +479,29 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
+    private void handleApplyState(ApplyState applyState) {
+        long startTime = System.nanoTime();
+
+        Payload payload = applyState.getReplicatedLogEntry().getData();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{}: Applying state for log index {} data {}",
+                persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload);
+        }
+
+        if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) {
+            applyState(applyState.getClientActor(), applyState.getIdentifier(), payload);
+        }
+
+        long elapsedTime = System.nanoTime() - startTime;
+        if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
+            LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
+                    TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
+        }
+
+        // Send the ApplyState message back to self to handle further processing asynchronously.
+        self().tell(applyState, self());
+    }
+
     protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
         return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
     }
@@ -545,7 +547,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 raftContext.setLastApplied(persistedLogEntry.getIndex());
 
                 // Apply the state immediately.
-                self().tell(new ApplyState(clientActor, identifier, persistedLogEntry), self());
+                handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry));
 
                 // Send a ApplyJournalEntries message so that we write the fact that we applied
                 // the state to durable storage
index 2896c44a2cef79ef12e002a5332b414e3368e4db..74a214f90ae14715c8053b2c9dcec204c6b2b80b 100644 (file)
@@ -16,10 +16,12 @@ import akka.cluster.Cluster;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collection;
 import java.util.Optional;
+import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
@@ -316,4 +318,12 @@ public interface RaftActorContext {
      * @return current behavior.
      */
     RaftActorBehavior getCurrentBehavior();
+
+    /**
+     * Returns the consumer of ApplyState operations. This is invoked by a behavior when a log entry needs to be
+     * applied to the state.
+     *
+     * @return the Consumer
+     */
+    Consumer<ApplyState> getApplyStateConsumer();
 }
index dfbffb726ab4f7a079d4efc3c770f8ab3dbd5bb9..43a58b9709099130014c8669fa20263af2f28f32 100644 (file)
@@ -24,8 +24,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Consumer;
 import java.util.function.LongSupplier;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
@@ -82,20 +85,25 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private Optional<Cluster> cluster;
 
+    private final Consumer<ApplyState> applyStateConsumer;
+
     public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
-            ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
-            ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
+            @Nonnull ElectionTerm termInformation, long commitIndex, long lastApplied,
+            @Nonnull Map<String, String> peerAddresses,
+            @Nonnull ConfigParams configParams, @Nonnull DataPersistenceProvider persistenceProvider,
+            @Nonnull Consumer<ApplyState> applyStateConsumer, @Nonnull Logger logger) {
         this.actor = actor;
         this.context = context;
         this.id = id;
-        this.termInformation = termInformation;
+        this.termInformation = Preconditions.checkNotNull(termInformation);
         this.commitIndex = commitIndex;
         this.lastApplied = lastApplied;
-        this.configParams = configParams;
-        this.persistenceProvider = persistenceProvider;
-        this.log = logger;
+        this.configParams = Preconditions.checkNotNull(configParams);
+        this.persistenceProvider = Preconditions.checkNotNull(persistenceProvider);
+        this.log = Preconditions.checkNotNull(logger);
+        this.applyStateConsumer = Preconditions.checkNotNull(applyStateConsumer);
 
-        for (Map.Entry<String, String> e: peerAddresses.entrySet()) {
+        for (Map.Entry<String, String> e: Preconditions.checkNotNull(peerAddresses).entrySet()) {
             peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
         }
     }
@@ -386,6 +394,11 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.currentBehavior = Preconditions.checkNotNull(behavior);
     }
 
+    @Override
+    public Consumer<ApplyState> getApplyStateConsumer() {
+        return applyStateConsumer;
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     void close() {
         if (currentBehavior != null) {
index 6e34ff0393e8840ea70e0b7d398cc155c257d294..71576f6d21b71fd02704e50d74e431f16a641f50 100644 (file)
@@ -205,9 +205,17 @@ public interface ReplicatedLog {
     int dataSize();
 
     /**
-     * Determines if a snapshot need to be captured based on the count/memory consumed.
+     * Determines if a snapshot needs to be captured based on the count/memory consumed and initiates the capture.
      *
      * @param replicatedLogEntry the last log entry.
      */
     void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry);
+
+    /**
+     * Determines if a snapshot should be captured based on the count/memory consumed.
+     *
+     * @param logIndex the log index to use to determine if the log count has exceeded the threshold
+     * @return true if a snapshot should be captured, false otherwise
+     */
+    boolean shouldCaptureSnapshot(long logIndex);
 }
index 04606fbbabc61e6f47c9780cd9207c6ebfa5ae54..fe873340aa3b414d4f89f60354dabcede970b52c 100644 (file)
@@ -53,12 +53,17 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     }
 
     @Override
-    public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
+    public boolean shouldCaptureSnapshot(long logIndex) {
         final ConfigParams config = context.getConfigParams();
-        final long journalSize = replicatedLogEntry.getIndex() + 1;
+        final long journalSize = logIndex + 1;
         final long dataThreshold = context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100;
 
-        if (journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold) {
+        return journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold;
+    }
+
+    @Override
+    public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
+        if (shouldCaptureSnapshot(replicatedLogEntry.getIndex())) {
             boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
                     context.getCurrentBehavior().getReplicatedToAllIndex());
             if (started && !context.hasFollowers()) {
index 4fae48e1aa42143d0867338d1ca3a3536ea9b26f..1b3abffbb0a09031eafcc55fd8a9cc3ac070a2d9 100644 (file)
@@ -323,14 +323,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1;
 
             if (replicatedCount == 0) {
-                // We don't commit and apply a log entry until we've gotten the ack from our local persistence. Ideally
-                // we should be able to update the commit index if we get a consensus amongst the followers
-                // w/o the local persistence ack however this can cause timing issues with snapshot capture
-                // which may lead to an entry that is neither in the serialized snapshot state nor in the snapshot's
-                // unapplied entries. This can happen if the lastAppliedIndex is updated but the corresponding
-                // ApplyState message is still pending in the message queue and thus the corresponding log entry hasn't
-                // actually been applied to the state yet. This would be alleviated by eliminating the ApplyState
-                // message in lieu of synchronously updating lastAppliedIndex and applying to state.
+                // We don't commit and apply a log entry until we've gotten the ack from our local persistence,
+                // even though there *shouldn't* be any issue with updating the commit index if we get a consensus
+                // amongst the followers w/o the local persistence ack.
                 break;
             }
 
index 5c5c520761ecc4fba107cf5274a95292dc207273..80ed698d6cdcfae29120c7d93e92f8283f9ae81d 100644 (file)
@@ -367,7 +367,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param index the log index
      */
     protected void applyLogToStateMachine(final long index) {
-        long newLastApplied = context.getLastApplied();
         // Now maybe we apply to the state machine
         for (long i = context.getLastApplied() + 1; i < index + 1; i++) {
 
@@ -376,16 +375,18 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                 // Send a local message to the local RaftActor (it's derived class to be
                 // specific to apply the log to it's index)
 
-                final ApplyState msg;
+                final ApplyState applyState;
                 final ClientRequestTracker tracker = removeClientRequestTracker(i);
                 if (tracker != null) {
-                    msg = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
+                    applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
                 } else {
-                    msg = new ApplyState(null, null, replicatedLogEntry);
+                    applyState = new ApplyState(null, null, replicatedLogEntry);
                 }
 
-                actor().tell(msg, actor());
-                newLastApplied = i;
+                log.debug("{}: Setting last applied to {}", logName(), i);
+
+                context.setLastApplied(i);
+                context.getApplyStateConsumer().accept(applyState);
             } else {
                 //if one index is not present in the log, no point in looping
                 // around as the rest wont be present either
@@ -395,10 +396,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             }
         }
 
-        log.debug("{}: Setting last applied to {}", logName(), newLastApplied);
-
-        context.setLastApplied(newLastApplied);
-
         // send a message to persist a ApplyLogEntries marker message into akka's persistent journal
         // will be used during recovery
         //in case if the above code throws an error and this message is not sent, it would be fine
index 7e8a7725910d599f7cd56ef40daaa06de2f8bece..6f107e9ae61ee060071c842810cb82c3e7f92f85 100644 (file)
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -55,9 +56,6 @@ public class Follower extends AbstractRaftActorBehavior {
 
     private final SyncStatusTracker initialSyncStatusTracker;
 
-    private final Procedure<ReplicatedLogEntry> appendAndPersistCallback =
-        logEntry -> context.getReplicatedLog().captureSnapshotIfReady(logEntry);
-
     private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
     private SnapshotTracker snapshotTracker = null;
     private String leaderId;
@@ -236,6 +234,22 @@ public class Follower extends AbstractRaftActorBehavior {
             log.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(),
                     lastIndex, addEntriesFrom);
 
+            // When persistence successfully completes for each new log entry appended, we need to determine if we
+            // should capture a snapshot to compact the persisted log. shouldCaptureSnapshot tracks whether or not
+            // one of the log entries has exceeded the log size threshold whereby a snapshot should be taken. However
+            // we don't initiate the snapshot at that log entry but rather after the last log entry has been persisted.
+            // This is done because subsequent log entries after the one that tripped the threshold may have been
+            // applied to the state already, as the persistence callback occurs async, and we want those entries
+            // purged from the persisted log as well.
+            final AtomicBoolean shouldCaptureSnapshot = new AtomicBoolean(false);
+            final Procedure<ReplicatedLogEntry> appendAndPersistCallback = logEntry -> {
+                final ReplicatedLogEntry lastEntryToAppend = appendEntries.getEntries().get(
+                        appendEntries.getEntries().size() - 1);
+                if (shouldCaptureSnapshot.get() && logEntry == lastEntryToAppend) {
+                    context.getSnapshotManager().capture(context.getReplicatedLog().last(), getReplicatedToAllIndex());
+                }
+            };
+
             // 4. Append any new entries not already in the log
             for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) {
                 ReplicatedLogEntry entry = appendEntries.getEntries().get(i);
@@ -244,6 +258,9 @@ public class Follower extends AbstractRaftActorBehavior {
 
                 context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback, false);
 
+                shouldCaptureSnapshot.compareAndSet(false,
+                        context.getReplicatedLog().shouldCaptureSnapshot(entry.getIndex()));
+
                 if (entry.getData() instanceof ServerConfigurationPayload) {
                     context.updatePeerIds((ServerConfigurationPayload)entry.getData());
                 }
index a37a1314708100f962e8554c5c4ba075bbfbd896..8ff68310243ac8129fb719ed9150cad1c5c69123 100644 (file)
@@ -333,5 +333,10 @@ public class AbstractReplicatedLogImplTest {
         @Override
         public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
         }
+
+        @Override
+        public boolean shouldCaptureSnapshot(long logIndex) {
+            return false;
+        }
     }
 }
index b4b558b6f357a908c8ac537dd80e86c3abdd5eca..dbced5f4cb6b1c9372e662144e0d60f0f8d8517f 100644 (file)
@@ -85,6 +85,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
             (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
     }
 
+    @Override
+    public RaftActorContext getRaftActorContext() {
+        return super.getRaftActorContext();
+    }
+
     public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
         return snapshotMessageSupport;
     }
index 65f0a624e6b83c6a3fb3cfff52a00d9d64c55850..cdcaadf8151999f8df9a5a9961285b36944daaa6 100644 (file)
@@ -62,13 +62,14 @@ public class MockRaftActorContext extends RaftActorContextImpl {
 
     public MockRaftActorContext() {
         super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
-                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG);
+                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG);
         setReplicatedLog(new MockReplicatedLogBuilder().build());
     }
 
     public MockRaftActorContext(String id, ActorSystem system, ActorRef actor) {
         super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
-                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG);
+            new DefaultConfigParamsImpl(), new NonPersistentDataProvider(),
+            applyState -> actor.tell(applyState, actor), LOG);
 
         this.system = system;
 
@@ -142,6 +143,11 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
         }
 
+        @Override
+        public boolean shouldCaptureSnapshot(long logIndex) {
+            return false;
+        }
+
         @Override
         public boolean removeFromAndPersist(long index) {
             return removeFrom(index) >= 0;
index 874a17c432c6dd0ddc6fb641f29d91906d1a7525..56523cb60b233a5487da8c6c83173ec63b2a1cad 100644 (file)
@@ -59,7 +59,7 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
-                peerMap, configParams, new NonPersistentDataProvider(), log);
+                peerMap, configParams, new NonPersistentDataProvider(), applyState -> { }, log);
 
         assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
         assertEquals("getPeerAddress", null, context.getPeerAddress("peer2"));
@@ -84,7 +84,7 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
                 Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")), configParams,
-                new NonPersistentDataProvider(), log);
+                new NonPersistentDataProvider(), applyState -> { }, log);
 
         context.setPeerAddress("peer1", "peerAddress1_1");
         assertEquals("getPeerAddress", "peerAddress1_1", context.getPeerAddress("peer1"));
@@ -98,7 +98,7 @@ public class RaftActorContextImplTest extends AbstractActorTest {
         RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 "self", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
                 Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")),
-                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), log);
+                new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, log);
 
         context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo("self", false),
                 new ServerInfo("peer2", true), new ServerInfo("peer3", false))));
index 9c421017be989a3e19a8dd413c04dc4325255946..e0abd1726ffa54cbb7ba13ce0628d9b0181a06df 100644 (file)
@@ -77,7 +77,8 @@ public class RaftActorRecoverySupportTest {
         MockitoAnnotations.initMocks(this);
 
         context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
-                LOG), -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
+                LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
+                mockPersistence, applyState -> { }, LOG);
 
         support = new RaftActorRecoverySupport(context, mockCohort);
 
index af408e6b409489d1a597e424c0cf20d76814ae69..a19281ffd8cbedd86062c008edd65cc6c2bbb8f4 100644 (file)
@@ -204,9 +204,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
                 newFollowerActorContext.getPeerIds());
 
-        expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
-        expectFirstMatching(followerActor, ApplyState.class);
-
         assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
         assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
         assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
@@ -1479,7 +1476,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
         termInfo.update(1, LEADER_ID);
         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
-                id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
+                id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
+                noPersistence, applyState -> actor.tell(applyState, actor), LOG);
     }
 
     abstract static class AbstractMockRaftActor extends MockRaftActor {
index b9b63dcc9955d8a073d2bfebbd37b3766891e2b2..45b89b7e01c42ff93ca4bc972b9084450d84ccda 100644 (file)
@@ -64,8 +64,8 @@ public class RaftActorSnapshotMessageSupportTest {
         MockitoAnnotations.initMocks(this);
 
         context = new RaftActorContextImpl(mockRaftActorRef, null, "test",
-                new ElectionTermImpl(mockPersistence, "test", LOG),
-                -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG) {
+                new ElectionTermImpl(mockPersistence, "test", LOG), -1, -1, Collections.<String,String>emptyMap(),
+                configParams, mockPersistence, applyState -> { }, LOG) {
             @Override
             public SnapshotManager getSnapshotManager() {
                 return mockSnapshotManager;
index 9e354612616b7e39235e7335a7fe618ea7973243..edc51092d66b8ec7cc776cbe4b337cdbc95f7909 100644 (file)
@@ -416,7 +416,7 @@ public class RaftActorTest extends AbstractActorTest {
         ReplicatedLogEntry entry = new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("F"));
 
         final Identifier id = new MockIdentifier("apply-state");
-        mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry));
+        mockRaftActor.getRaftActorContext().getApplyStateConsumer().accept(new ApplyState(mockActorRef, id, entry));
 
         verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject());
     }
index c2321caff420b68cc49cbb81562964e826723302..ff4d097010081de5fcf8814fc4e12a9b44fbf396 100644 (file)
@@ -57,8 +57,8 @@ public class ReplicatedLogImplTest {
         MockitoAnnotations.initMocks(this);
 
         context = new RaftActorContextImpl(null, null, "test",
-                new ElectionTermImpl(mockPersistence, "test", LOG),
-                -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
+                new ElectionTermImpl(mockPersistence, "test", LOG), -1, -1, Collections.<String,String>emptyMap(),
+                configParams, mockPersistence, applyState -> { }, LOG);
     }
 
     private void verifyPersist(Object message) throws Exception {
index 88a261328ade2700077121ecfb765edfb311f2fd..4124a94d5fb3f6be5c0ed86f0cc21f56cd39d2cc 100644 (file)
@@ -177,7 +177,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest<Candidate> {
         Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm();
         RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(),
                 "candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(),
-                new NonPersistentDataProvider(), LOG);
+                new NonPersistentDataProvider(), applyState -> { }, LOG);
         raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
         raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING);
         raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING);
index 9d64b140fb7d9db2c191eb12ffc847deb5273561..8cb914c2267b51375ef819ce26f9ff0818882347 100644 (file)
@@ -21,8 +21,14 @@ import static org.mockito.Mockito.verify;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
 import java.util.ArrayList;
@@ -31,15 +37,21 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.MockRaftActor;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorTest;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
@@ -50,9 +62,14 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -1087,6 +1104,208 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         verify(follower, never()).scheduleElection(any(FiniteDuration.class));
     }
 
+    @Test
+    public void testCaptureSnapshotOnLastEntryInAppendEntries() throws Exception {
+        String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
+        logStart(id);
+
+        InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setSnapshotBatchCount(2);
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
+        RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
+        Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
+                .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+        TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        followerRaftActor.set(followerActorRef.underlyingActor());
+        followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(id);
+        InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
+
+        AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
+
+        followerActorRef.tell(appendEntries, leaderActor);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+
+        final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+
+        InMemoryJournal.waitForDeleteMessagesComplete(id);
+        // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
+        // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
+        // This is OK - on recovery it will be a no-op since index 1 has already been applied.
+        List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
+        assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
+        assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
+        assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
+
+        assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
+        assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+        assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
+        assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
+        assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
+        assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
+                MockRaftActor.toObject(snapshot.getState()));
+    }
+
+    @Test
+    public void testCaptureSnapshotOnMiddleEntryInAppendEntries() throws Exception {
+        String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
+        logStart(id);
+
+        InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setSnapshotBatchCount(2);
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
+        RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
+        Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
+                .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+        TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        followerRaftActor.set(followerActorRef.underlyingActor());
+        followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(id);
+        InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
+                newReplicatedLogEntry(1, 2, "three"));
+
+        AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
+
+        followerActorRef.tell(appendEntries, leaderActor);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+
+        final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+
+        InMemoryJournal.waitForDeleteMessagesComplete(id);
+        // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
+        // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
+        // This is OK - on recovery it will be a no-op since index 2 has already been applied.
+        List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
+        assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
+        assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
+        assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
+
+        assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
+        assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+        assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
+        assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
+        assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
+        assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
+                entries.get(2).getData()), MockRaftActor.toObject(snapshot.getState()));
+
+        assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
+        assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
+
+        // Reinstate the actor from persistence
+
+        actorFactory.killActor(followerActorRef, new JavaTestKit(getSystem()));
+
+        followerActorRef = actorFactory.createTestActor(builder.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        followerRaftActor.set(followerActorRef.underlyingActor());
+        followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+        assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
+        assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
+        assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
+        assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
+        assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
+                entries.get(2).getData()), followerRaftActor.get().getState());
+    }
+
+    @Test
+    public void testCaptureSnapshotOnAppendEntriesWithUnapplied() throws Exception {
+        String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
+        logStart(id);
+
+        InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setSnapshotBatchCount(1);
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
+        RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
+        Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
+                .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+        TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
+                .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+        followerRaftActor.set(followerActorRef.underlyingActor());
+        followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(id);
+        InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
+                newReplicatedLogEntry(1, 2, "three"));
+
+        AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
+
+        followerActorRef.tell(appendEntries, leaderActor);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+
+        final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+
+        InMemoryJournal.waitForDeleteMessagesComplete(id);
+        // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
+        // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
+        // This is OK - on recovery it will be a no-op since index 0 has already been applied.
+        List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
+        assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
+        assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
+        assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
+
+        assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
+        assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
+        assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
+        assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+        assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
+        assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
+        assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
+        assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
+                MockRaftActor.toObject(snapshot.getState()));
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private RaftActorSnapshotCohort newRaftActorSnapshotCohort(final AtomicReference<MockRaftActor> followerRaftActor) {
+        RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
+            @Override
+            public void createSnapshot(ActorRef actorRef) {
+                try {
+                    actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(
+                            followerRaftActor.get().getState()).toByteArray()), actorRef);
+                } catch (Exception e) {
+                    Throwables.propagate(e);
+                }
+            }
+
+            @Override
+            public void applySnapshot(byte[] snapshotBytes) {
+            }
+        };
+        return snapshotCohort;
+    }
+
     public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) {
         int snapshotLength = bs.size();
         int start = offset;
index ddd2cb03271f9f8d7582b2e451caa67ada4d6f34..ab07b667b2d405f566b9902aa377e3022a6634fa 100644 (file)
@@ -86,7 +86,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
@@ -472,13 +471,6 @@ public class ShardTest extends AbstractShardTest {
 
         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
         store.setSchemaContext(SCHEMA_CONTEXT);
-        writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-        final NormalizedNode<?, ?> root = readStore(store, YangInstanceIdentifier.EMPTY);
-        final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
-                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
-
-        shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
 
         final DataTreeModification writeMod = store.takeSnapshot().newModification();
         final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
@@ -486,10 +478,7 @@ public class ShardTest extends AbstractShardTest {
         writeMod.ready();
 
         final TransactionIdentifier tx = nextTransactionId();
-        final ApplyState applyState = new ApplyState(null, tx,
-                new SimpleReplicatedLogEntry(1, 2, payloadForModification(store, writeMod, tx)));
-
-        shard.tell(applyState, shard);
+        shard.underlyingActor().applyState(null, null, payloadForModification(store, writeMod, tx));
 
         final Stopwatch sw = Stopwatch.createStarted();
         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {