From: Tom Pantelis Date: Thu, 15 Dec 2016 06:05:53 +0000 (-0500) Subject: Bug 7362: Notify applyState synchronously X-Git-Tag: release/carbon~332 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=913ae866cd0cc82991e1f66ac80f6a42b0daaa48 Bug 7362: Notify applyState synchronously As per comments in bug 7362, it's problematic for snapshots that we set lastApplied index but queue the ApplyState message. This can leave the derived actor's captured state out-of-sync with the lastApplied index if a snapshot occurs in between. We need to set lastApplied atomically with apply to state so I added a callback that RaftActor sets in the RaftActorContext that is called by the behavior's applyLogToStateMachine method. The callback immediately calls applyState. The ApplyState message is still queued for other compenents that trigger on it (and also unit tests). I considered setting lastApplied when the ApplyState message is processed, and not in applyLogToStateMachine, but this would be problematic if another AppendEntriesReply was queued before the ApplyState message as it would try to apply it again. Change-Id: Ie062af8440bc251eec9c9ef58f450dee23abd04c Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index fff6ce9ed1..2d2fce22f9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -51,6 +51,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftStat import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; @@ -132,7 +133,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(), - delegatingPersistenceProvider, LOG); + delegatingPersistenceProvider, this::handleApplyState, LOG); context.setPayloadVersion(payloadVersion); context.setReplicatedLog(ReplicatedLogImpl.newInstance(context)); @@ -224,29 +225,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (snapshotSupport.handleSnapshotMessage(message, getSender())) { return; } - if (message instanceof ApplyState) { ApplyState applyState = (ApplyState) message; - long startTime = System.nanoTime(); - - if (LOG.isDebugEnabled()) { - LOG.debug("{}: Applying state for log index {} data {}", - persistenceId(), applyState.getReplicatedLogEntry().getIndex(), - applyState.getReplicatedLogEntry().getData()); - } - - if (!(applyState.getReplicatedLogEntry().getData() instanceof NoopPayload)) { - applyState(applyState.getClientActor(), applyState.getIdentifier(), - applyState.getReplicatedLogEntry().getData()); - } - - long elapsedTime = System.nanoTime() - startTime; - if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) { - LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", - TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState); - } - if (!hasFollowers()) { // for single node, the capture should happen after the apply state // as we delete messages from the persistent journal which have made it to the snapshot @@ -257,9 +238,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getSnapshotManager().trimLog(context.getLastApplied()); } - // Send it to the current behavior - some behaviors like PreLeader need to be notified of ApplyState. possiblyHandleBehaviorMessage(message); - } else if (message instanceof ApplyJournalEntries) { ApplyJournalEntries applyEntries = (ApplyJournalEntries) message; LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); @@ -500,6 +479,29 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } + private void handleApplyState(ApplyState applyState) { + long startTime = System.nanoTime(); + + Payload payload = applyState.getReplicatedLogEntry().getData(); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: Applying state for log index {} data {}", + persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload); + } + + if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) { + applyState(applyState.getClientActor(), applyState.getIdentifier(), payload); + } + + long elapsedTime = System.nanoTime() - startTime; + if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) { + LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", + TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState); + } + + // Send the ApplyState message back to self to handle further processing asynchronously. + self().tell(applyState, self()); + } + protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) { return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion); } @@ -545,7 +547,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { raftContext.setLastApplied(persistedLogEntry.getIndex()); // Apply the state immediately. - self().tell(new ApplyState(clientActor, identifier, persistedLogEntry), self()); + handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry)); // Send a ApplyJournalEntries message so that we write the fact that we applied // the state to durable storage diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 2896c44a2c..74a214f90a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -16,10 +16,12 @@ import akka.cluster.Cluster; import com.google.common.annotations.VisibleForTesting; import java.util.Collection; import java.util.Optional; +import java.util.function.Consumer; import java.util.function.LongSupplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; @@ -316,4 +318,12 @@ public interface RaftActorContext { * @return current behavior. */ RaftActorBehavior getCurrentBehavior(); + + /** + * Returns the consumer of ApplyState operations. This is invoked by a behavior when a log entry needs to be + * applied to the state. + * + * @return the Consumer + */ + Consumer getApplyStateConsumer(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index dfbffb726a..43a58b9709 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -24,8 +24,11 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Consumer; import java.util.function.LongSupplier; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; @@ -82,20 +85,25 @@ public class RaftActorContextImpl implements RaftActorContext { private Optional cluster; + private final Consumer applyStateConsumer; + public RaftActorContextImpl(ActorRef actor, ActorContext context, String id, - ElectionTerm termInformation, long commitIndex, long lastApplied, Map peerAddresses, - ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) { + @Nonnull ElectionTerm termInformation, long commitIndex, long lastApplied, + @Nonnull Map peerAddresses, + @Nonnull ConfigParams configParams, @Nonnull DataPersistenceProvider persistenceProvider, + @Nonnull Consumer applyStateConsumer, @Nonnull Logger logger) { this.actor = actor; this.context = context; this.id = id; - this.termInformation = termInformation; + this.termInformation = Preconditions.checkNotNull(termInformation); this.commitIndex = commitIndex; this.lastApplied = lastApplied; - this.configParams = configParams; - this.persistenceProvider = persistenceProvider; - this.log = logger; + this.configParams = Preconditions.checkNotNull(configParams); + this.persistenceProvider = Preconditions.checkNotNull(persistenceProvider); + this.log = Preconditions.checkNotNull(logger); + this.applyStateConsumer = Preconditions.checkNotNull(applyStateConsumer); - for (Map.Entry e: peerAddresses.entrySet()) { + for (Map.Entry e: Preconditions.checkNotNull(peerAddresses).entrySet()) { peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING)); } } @@ -386,6 +394,11 @@ public class RaftActorContextImpl implements RaftActorContext { this.currentBehavior = Preconditions.checkNotNull(behavior); } + @Override + public Consumer getApplyStateConsumer() { + return applyStateConsumer; + } + @SuppressWarnings("checkstyle:IllegalCatch") void close() { if (currentBehavior != null) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index 6e34ff0393..71576f6d21 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -205,9 +205,17 @@ public interface ReplicatedLog { int dataSize(); /** - * Determines if a snapshot need to be captured based on the count/memory consumed. + * Determines if a snapshot needs to be captured based on the count/memory consumed and initiates the capture. * * @param replicatedLogEntry the last log entry. */ void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry); + + /** + * Determines if a snapshot should be captured based on the count/memory consumed. + * + * @param logIndex the log index to use to determine if the log count has exceeded the threshold + * @return true if a snapshot should be captured, false otherwise + */ + boolean shouldCaptureSnapshot(long logIndex); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java index 04606fbbab..fe873340aa 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java @@ -53,12 +53,17 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { } @Override - public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) { + public boolean shouldCaptureSnapshot(long logIndex) { final ConfigParams config = context.getConfigParams(); - final long journalSize = replicatedLogEntry.getIndex() + 1; + final long journalSize = logIndex + 1; final long dataThreshold = context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100; - if (journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold) { + return journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold; + } + + @Override + public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) { + if (shouldCaptureSnapshot(replicatedLogEntry.getIndex())) { boolean started = context.getSnapshotManager().capture(replicatedLogEntry, context.getCurrentBehavior().getReplicatedToAllIndex()); if (started && !context.hasFollowers()) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 4fae48e1aa..1b3abffbb0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -323,14 +323,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1; if (replicatedCount == 0) { - // We don't commit and apply a log entry until we've gotten the ack from our local persistence. Ideally - // we should be able to update the commit index if we get a consensus amongst the followers - // w/o the local persistence ack however this can cause timing issues with snapshot capture - // which may lead to an entry that is neither in the serialized snapshot state nor in the snapshot's - // unapplied entries. This can happen if the lastAppliedIndex is updated but the corresponding - // ApplyState message is still pending in the message queue and thus the corresponding log entry hasn't - // actually been applied to the state yet. This would be alleviated by eliminating the ApplyState - // message in lieu of synchronously updating lastAppliedIndex and applying to state. + // We don't commit and apply a log entry until we've gotten the ack from our local persistence, + // even though there *shouldn't* be any issue with updating the commit index if we get a consensus + // amongst the followers w/o the local persistence ack. break; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 5c5c520761..80ed698d6c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -367,7 +367,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param index the log index */ protected void applyLogToStateMachine(final long index) { - long newLastApplied = context.getLastApplied(); // Now maybe we apply to the state machine for (long i = context.getLastApplied() + 1; i < index + 1; i++) { @@ -376,16 +375,18 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { // Send a local message to the local RaftActor (it's derived class to be // specific to apply the log to it's index) - final ApplyState msg; + final ApplyState applyState; final ClientRequestTracker tracker = removeClientRequestTracker(i); if (tracker != null) { - msg = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry); + applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry); } else { - msg = new ApplyState(null, null, replicatedLogEntry); + applyState = new ApplyState(null, null, replicatedLogEntry); } - actor().tell(msg, actor()); - newLastApplied = i; + log.debug("{}: Setting last applied to {}", logName(), i); + + context.setLastApplied(i); + context.getApplyStateConsumer().accept(applyState); } else { //if one index is not present in the log, no point in looping // around as the rest wont be present either @@ -395,10 +396,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } } - log.debug("{}: Setting last applied to {}", logName(), newLastApplied); - - context.setLastApplied(newLastApplied); - // send a message to persist a ApplyLogEntries marker message into akka's persistent journal // will be used during recovery //in case if the above code throws an error and this message is not sent, it would be fine diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 7e8a772591..6f107e9ae6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; @@ -55,9 +56,6 @@ public class Follower extends AbstractRaftActorBehavior { private final SyncStatusTracker initialSyncStatusTracker; - private final Procedure appendAndPersistCallback = - logEntry -> context.getReplicatedLog().captureSnapshotIfReady(logEntry); - private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted(); private SnapshotTracker snapshotTracker = null; private String leaderId; @@ -236,6 +234,22 @@ public class Follower extends AbstractRaftActorBehavior { log.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(), lastIndex, addEntriesFrom); + // When persistence successfully completes for each new log entry appended, we need to determine if we + // should capture a snapshot to compact the persisted log. shouldCaptureSnapshot tracks whether or not + // one of the log entries has exceeded the log size threshold whereby a snapshot should be taken. However + // we don't initiate the snapshot at that log entry but rather after the last log entry has been persisted. + // This is done because subsequent log entries after the one that tripped the threshold may have been + // applied to the state already, as the persistence callback occurs async, and we want those entries + // purged from the persisted log as well. + final AtomicBoolean shouldCaptureSnapshot = new AtomicBoolean(false); + final Procedure appendAndPersistCallback = logEntry -> { + final ReplicatedLogEntry lastEntryToAppend = appendEntries.getEntries().get( + appendEntries.getEntries().size() - 1); + if (shouldCaptureSnapshot.get() && logEntry == lastEntryToAppend) { + context.getSnapshotManager().capture(context.getReplicatedLog().last(), getReplicatedToAllIndex()); + } + }; + // 4. Append any new entries not already in the log for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) { ReplicatedLogEntry entry = appendEntries.getEntries().get(i); @@ -244,6 +258,9 @@ public class Follower extends AbstractRaftActorBehavior { context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback, false); + shouldCaptureSnapshot.compareAndSet(false, + context.getReplicatedLog().shouldCaptureSnapshot(entry.getIndex())); + if (entry.getData() instanceof ServerConfigurationPayload) { context.updatePeerIds((ServerConfigurationPayload)entry.getData()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java index a37a131470..8ff6831024 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java @@ -333,5 +333,10 @@ public class AbstractReplicatedLogImplTest { @Override public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) { } + + @Override + public boolean shouldCaptureSnapshot(long logIndex) { + return false; + } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index b4b558b6f3..dbced5f4cb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -85,6 +85,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport()); } + @Override + public RaftActorContext getRaftActorContext() { + return super.getRaftActorContext(); + } + public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() { return snapshotMessageSupport; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 65f0a624e6..cdcaadf815 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -62,13 +62,14 @@ public class MockRaftActorContext extends RaftActorContextImpl { public MockRaftActorContext() { super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(), - new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG); + new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG); setReplicatedLog(new MockReplicatedLogBuilder().build()); } public MockRaftActorContext(String id, ActorSystem system, ActorRef actor) { super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(), - new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG); + new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), + applyState -> actor.tell(applyState, actor), LOG); this.system = system; @@ -142,6 +143,11 @@ public class MockRaftActorContext extends RaftActorContextImpl { public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) { } + @Override + public boolean shouldCaptureSnapshot(long logIndex) { + return false; + } + @Override public boolean removeFromAndPersist(long index) { return removeFrom(index) >= 0; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java index 874a17c432..56523cb60b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java @@ -59,7 +59,7 @@ public class RaftActorContextImplTest extends AbstractActorTest { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(), "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1, - peerMap, configParams, new NonPersistentDataProvider(), log); + peerMap, configParams, new NonPersistentDataProvider(), applyState -> { }, log); assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1")); assertEquals("getPeerAddress", null, context.getPeerAddress("peer2")); @@ -84,7 +84,7 @@ public class RaftActorContextImplTest extends AbstractActorTest { RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(), "test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1, Maps.newHashMap(ImmutableMap.of("peer1", "peerAddress1")), configParams, - new NonPersistentDataProvider(), log); + new NonPersistentDataProvider(), applyState -> { }, log); context.setPeerAddress("peer1", "peerAddress1_1"); assertEquals("getPeerAddress", "peerAddress1_1", context.getPeerAddress("peer1")); @@ -98,7 +98,7 @@ public class RaftActorContextImplTest extends AbstractActorTest { RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(), "self", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1, Maps.newHashMap(ImmutableMap.of("peer1", "peerAddress1")), - new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), log); + new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, log); context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo("self", false), new ServerInfo("peer2", true), new ServerInfo("peer3", false)))); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java index 9c421017be..e0abd1726f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java @@ -77,7 +77,8 @@ public class RaftActorRecoverySupportTest { MockitoAnnotations.initMocks(this); context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test", - LOG), -1, -1, Collections.emptyMap(), configParams, mockPersistence, LOG); + LOG), -1, -1, Collections.emptyMap(), configParams, + mockPersistence, applyState -> { }, LOG); support = new RaftActorRecoverySupport(context, mockCohort); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index af408e6b40..a19281ffd8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -204,9 +204,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID), newFollowerActorContext.getPeerIds()); - expectFirstMatching(newFollowerCollectorActor, ApplyState.class); - expectFirstMatching(followerActor, ApplyState.class); - assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex()); assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied()); assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex()); @@ -1479,7 +1476,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG); termInfo.update(1, LEADER_ID); return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(), - id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG); + id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, + noPersistence, applyState -> actor.tell(applyState, actor), LOG); } abstract static class AbstractMockRaftActor extends MockRaftActor { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java index b9b63dcc99..45b89b7e01 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java @@ -64,8 +64,8 @@ public class RaftActorSnapshotMessageSupportTest { MockitoAnnotations.initMocks(this); context = new RaftActorContextImpl(mockRaftActorRef, null, "test", - new ElectionTermImpl(mockPersistence, "test", LOG), - -1, -1, Collections.emptyMap(), configParams, mockPersistence, LOG) { + new ElectionTermImpl(mockPersistence, "test", LOG), -1, -1, Collections.emptyMap(), + configParams, mockPersistence, applyState -> { }, LOG) { @Override public SnapshotManager getSnapshotManager() { return mockSnapshotManager; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 9e35461261..edc51092d6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -416,7 +416,7 @@ public class RaftActorTest extends AbstractActorTest { ReplicatedLogEntry entry = new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("F")); final Identifier id = new MockIdentifier("apply-state"); - mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry)); + mockRaftActor.getRaftActorContext().getApplyStateConsumer().accept(new ApplyState(mockActorRef, id, entry)); verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java index c2321caff4..ff4d097010 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java @@ -57,8 +57,8 @@ public class ReplicatedLogImplTest { MockitoAnnotations.initMocks(this); context = new RaftActorContextImpl(null, null, "test", - new ElectionTermImpl(mockPersistence, "test", LOG), - -1, -1, Collections.emptyMap(), configParams, mockPersistence, LOG); + new ElectionTermImpl(mockPersistence, "test", LOG), -1, -1, Collections.emptyMap(), + configParams, mockPersistence, applyState -> { }, LOG); } private void verifyPersist(Object message) throws Exception { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java index 88a261328a..4124a94d5f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -177,7 +177,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm(); RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(), "candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(), - new NonPersistentDataProvider(), LOG); + new NonPersistentDataProvider(), applyState -> { }, LOG); raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING); raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 9d64b140fb..8cb914c226 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -21,8 +21,14 @@ import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.Props; +import akka.dispatch.Dispatchers; +import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; +import com.google.common.base.Optional; import com.google.common.base.Stopwatch; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; import java.util.ArrayList; @@ -31,15 +37,21 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; +import org.opendaylight.controller.cluster.raft.MockRaftActor; +import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; +import org.opendaylight.controller.cluster.raft.RaftActorTest; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; @@ -50,9 +62,14 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import scala.concurrent.duration.FiniteDuration; @@ -1087,6 +1104,208 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { verify(follower, never()).scheduleElection(any(FiniteDuration.class)); } + @Test + public void testCaptureSnapshotOnLastEntryInAppendEntries() throws Exception { + String id = "testCaptureSnapshotOnLastEntryInAppendEntries"; + logStart(id); + + InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setSnapshotBatchCount(2); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + final AtomicReference followerRaftActor = new AtomicReference<>(); + RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor); + Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id) + .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort); + TestActorRef followerActorRef = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), id); + followerRaftActor.set(followerActorRef.underlyingActor()); + followerRaftActor.get().waitForInitializeBehaviorComplete(); + + InMemorySnapshotStore.addSnapshotSavedLatch(id); + InMemoryJournal.addDeleteMessagesCompleteLatch(id); + + List entries = Arrays.asList( + newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two")); + + AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0); + + followerActorRef.tell(appendEntries, leaderActor); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + + final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class); + + InMemoryJournal.waitForDeleteMessagesComplete(id); + // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for + // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it. + // This is OK - on recovery it will be a no-op since index 1 has already been applied. + List journalEntries = InMemoryJournal.get(id, Object.class); + assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size()); + assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass()); + assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex()); + + assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size()); + assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm()); + assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex()); + assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); + assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex()); + assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()), + MockRaftActor.toObject(snapshot.getState())); + } + + @Test + public void testCaptureSnapshotOnMiddleEntryInAppendEntries() throws Exception { + String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries"; + logStart(id); + + InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setSnapshotBatchCount(2); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + final AtomicReference followerRaftActor = new AtomicReference<>(); + RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor); + Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id) + .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort); + TestActorRef followerActorRef = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), id); + followerRaftActor.set(followerActorRef.underlyingActor()); + followerRaftActor.get().waitForInitializeBehaviorComplete(); + + InMemorySnapshotStore.addSnapshotSavedLatch(id); + InMemoryJournal.addDeleteMessagesCompleteLatch(id); + + List entries = Arrays.asList( + newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"), + newReplicatedLogEntry(1, 2, "three")); + + AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0); + + followerActorRef.tell(appendEntries, leaderActor); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + + final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class); + + InMemoryJournal.waitForDeleteMessagesComplete(id); + // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for + // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it. + // This is OK - on recovery it will be a no-op since index 2 has already been applied. + List journalEntries = InMemoryJournal.get(id, Object.class); + assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size()); + assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass()); + assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex()); + + assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size()); + assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm()); + assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex()); + assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); + assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex()); + assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(), + entries.get(2).getData()), MockRaftActor.toObject(snapshot.getState())); + + assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size()); + assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex()); + + // Reinstate the actor from persistence + + actorFactory.killActor(followerActorRef, new JavaTestKit(getSystem())); + + followerActorRef = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), id); + followerRaftActor.set(followerActorRef.underlyingActor()); + followerRaftActor.get().waitForInitializeBehaviorComplete(); + + assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size()); + assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex()); + assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied()); + assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex()); + assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(), + entries.get(2).getData()), followerRaftActor.get().getState()); + } + + @Test + public void testCaptureSnapshotOnAppendEntriesWithUnapplied() throws Exception { + String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied"; + logStart(id); + + InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setSnapshotBatchCount(1); + config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + + final AtomicReference followerRaftActor = new AtomicReference<>(); + RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor); + Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id) + .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort); + TestActorRef followerActorRef = actorFactory.createTestActor(builder.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), id); + followerRaftActor.set(followerActorRef.underlyingActor()); + followerRaftActor.get().waitForInitializeBehaviorComplete(); + + InMemorySnapshotStore.addSnapshotSavedLatch(id); + InMemoryJournal.addDeleteMessagesCompleteLatch(id); + + List entries = Arrays.asList( + newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"), + newReplicatedLogEntry(1, 2, "three")); + + AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0); + + followerActorRef.tell(appendEntries, leaderActor); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + + final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class); + + InMemoryJournal.waitForDeleteMessagesComplete(id); + // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for + // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it. + // This is OK - on recovery it will be a no-op since index 0 has already been applied. + List journalEntries = InMemoryJournal.get(id, Object.class); + assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size()); + assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass()); + assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex()); + + assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size()); + assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex()); + assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex()); + assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm()); + assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex()); + assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm()); + assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex()); + assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()), + MockRaftActor.toObject(snapshot.getState())); + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private RaftActorSnapshotCohort newRaftActorSnapshotCohort(final AtomicReference followerRaftActor) { + RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() { + @Override + public void createSnapshot(ActorRef actorRef) { + try { + actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject( + followerRaftActor.get().getState()).toByteArray()), actorRef); + } catch (Exception e) { + Throwables.propagate(e); + } + } + + @Override + public void applySnapshot(byte[] snapshotBytes) { + } + }; + return snapshotCohort; + } + public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) { int snapshotLength = bs.size(); int start = offset; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index ddd2cb0327..ab07b667b2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -86,7 +86,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; @@ -472,13 +471,6 @@ public class ShardTest extends AbstractShardTest { final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); store.setSchemaContext(SCHEMA_CONTEXT); - writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - final NormalizedNode root = readStore(store, YangInstanceIdentifier.EMPTY); - final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(), - Collections.emptyList(), 1, 2, 3, 4); - - shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender()); final DataTreeModification writeMod = store.takeSnapshot().newModification(); final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -486,10 +478,7 @@ public class ShardTest extends AbstractShardTest { writeMod.ready(); final TransactionIdentifier tx = nextTransactionId(); - final ApplyState applyState = new ApplyState(null, tx, - new SimpleReplicatedLogEntry(1, 2, payloadForModification(store, writeMod, tx))); - - shard.tell(applyState, shard); + shard.underlyingActor().applyState(null, null, payloadForModification(store, writeMod, tx)); final Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 5) {