import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
-1, -1, peerAddresses,
configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
- delegatingPersistenceProvider, LOG);
+ delegatingPersistenceProvider, this::handleApplyState, LOG);
context.setPayloadVersion(payloadVersion);
context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
if (snapshotSupport.handleSnapshotMessage(message, getSender())) {
return;
}
-
if (message instanceof ApplyState) {
ApplyState applyState = (ApplyState) message;
- long startTime = System.nanoTime();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Applying state for log index {} data {}",
- persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
- applyState.getReplicatedLogEntry().getData());
- }
-
- if (!(applyState.getReplicatedLogEntry().getData() instanceof NoopPayload)) {
- applyState(applyState.getClientActor(), applyState.getIdentifier(),
- applyState.getReplicatedLogEntry().getData());
- }
-
- long elapsedTime = System.nanoTime() - startTime;
- if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
- LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
- TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
- }
-
if (!hasFollowers()) {
// for single node, the capture should happen after the apply state
// as we delete messages from the persistent journal which have made it to the snapshot
context.getSnapshotManager().trimLog(context.getLastApplied());
}
- // Send it to the current behavior - some behaviors like PreLeader need to be notified of ApplyState.
possiblyHandleBehaviorMessage(message);
-
} else if (message instanceof ApplyJournalEntries) {
ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
}
}
+ private void handleApplyState(ApplyState applyState) {
+ long startTime = System.nanoTime();
+
+ Payload payload = applyState.getReplicatedLogEntry().getData();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Applying state for log index {} data {}",
+ persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload);
+ }
+
+ if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) {
+ applyState(applyState.getClientActor(), applyState.getIdentifier(), payload);
+ }
+
+ long elapsedTime = System.nanoTime() - startTime;
+ if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
+ LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
+ TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
+ }
+
+ // Send the ApplyState message back to self to handle further processing asynchronously.
+ self().tell(applyState, self());
+ }
+
protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
}
raftContext.setLastApplied(persistedLogEntry.getIndex());
// Apply the state immediately.
- self().tell(new ApplyState(clientActor, identifier, persistedLogEntry), self());
+ handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry));
// Send a ApplyJournalEntries message so that we write the fact that we applied
// the state to durable storage
import com.google.common.annotations.VisibleForTesting;
import java.util.Collection;
import java.util.Optional;
+import java.util.function.Consumer;
import java.util.function.LongSupplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
* @return current behavior.
*/
RaftActorBehavior getCurrentBehavior();
+
+ /**
+ * Returns the consumer of ApplyState operations. This is invoked by a behavior when a log entry needs to be
+ * applied to the state.
+ *
+ * @return the Consumer
+ */
+ Consumer<ApplyState> getApplyStateConsumer();
}
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Consumer;
import java.util.function.LongSupplier;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
private Optional<Cluster> cluster;
+ private final Consumer<ApplyState> applyStateConsumer;
+
public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
- ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
- ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
+ @Nonnull ElectionTerm termInformation, long commitIndex, long lastApplied,
+ @Nonnull Map<String, String> peerAddresses,
+ @Nonnull ConfigParams configParams, @Nonnull DataPersistenceProvider persistenceProvider,
+ @Nonnull Consumer<ApplyState> applyStateConsumer, @Nonnull Logger logger) {
this.actor = actor;
this.context = context;
this.id = id;
- this.termInformation = termInformation;
+ this.termInformation = Preconditions.checkNotNull(termInformation);
this.commitIndex = commitIndex;
this.lastApplied = lastApplied;
- this.configParams = configParams;
- this.persistenceProvider = persistenceProvider;
- this.log = logger;
+ this.configParams = Preconditions.checkNotNull(configParams);
+ this.persistenceProvider = Preconditions.checkNotNull(persistenceProvider);
+ this.log = Preconditions.checkNotNull(logger);
+ this.applyStateConsumer = Preconditions.checkNotNull(applyStateConsumer);
- for (Map.Entry<String, String> e: peerAddresses.entrySet()) {
+ for (Map.Entry<String, String> e: Preconditions.checkNotNull(peerAddresses).entrySet()) {
peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
}
}
this.currentBehavior = Preconditions.checkNotNull(behavior);
}
+ @Override
+ public Consumer<ApplyState> getApplyStateConsumer() {
+ return applyStateConsumer;
+ }
+
@SuppressWarnings("checkstyle:IllegalCatch")
void close() {
if (currentBehavior != null) {
int dataSize();
/**
- * Determines if a snapshot need to be captured based on the count/memory consumed.
+ * Determines if a snapshot needs to be captured based on the count/memory consumed and initiates the capture.
*
* @param replicatedLogEntry the last log entry.
*/
void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry);
+
+ /**
+ * Determines if a snapshot should be captured based on the count/memory consumed.
+ *
+ * @param logIndex the log index to use to determine if the log count has exceeded the threshold
+ * @return true if a snapshot should be captured, false otherwise
+ */
+ boolean shouldCaptureSnapshot(long logIndex);
}
}
@Override
- public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
+ public boolean shouldCaptureSnapshot(long logIndex) {
final ConfigParams config = context.getConfigParams();
- final long journalSize = replicatedLogEntry.getIndex() + 1;
+ final long journalSize = logIndex + 1;
final long dataThreshold = context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100;
- if (journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold) {
+ return journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold;
+ }
+
+ @Override
+ public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
+ if (shouldCaptureSnapshot(replicatedLogEntry.getIndex())) {
boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
context.getCurrentBehavior().getReplicatedToAllIndex());
if (started && !context.hasFollowers()) {
int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1;
if (replicatedCount == 0) {
- // We don't commit and apply a log entry until we've gotten the ack from our local persistence. Ideally
- // we should be able to update the commit index if we get a consensus amongst the followers
- // w/o the local persistence ack however this can cause timing issues with snapshot capture
- // which may lead to an entry that is neither in the serialized snapshot state nor in the snapshot's
- // unapplied entries. This can happen if the lastAppliedIndex is updated but the corresponding
- // ApplyState message is still pending in the message queue and thus the corresponding log entry hasn't
- // actually been applied to the state yet. This would be alleviated by eliminating the ApplyState
- // message in lieu of synchronously updating lastAppliedIndex and applying to state.
+ // We don't commit and apply a log entry until we've gotten the ack from our local persistence,
+ // even though there *shouldn't* be any issue with updating the commit index if we get a consensus
+ // amongst the followers w/o the local persistence ack.
break;
}
* @param index the log index
*/
protected void applyLogToStateMachine(final long index) {
- long newLastApplied = context.getLastApplied();
// Now maybe we apply to the state machine
for (long i = context.getLastApplied() + 1; i < index + 1; i++) {
// Send a local message to the local RaftActor (it's derived class to be
// specific to apply the log to it's index)
- final ApplyState msg;
+ final ApplyState applyState;
final ClientRequestTracker tracker = removeClientRequestTracker(i);
if (tracker != null) {
- msg = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
+ applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
} else {
- msg = new ApplyState(null, null, replicatedLogEntry);
+ applyState = new ApplyState(null, null, replicatedLogEntry);
}
- actor().tell(msg, actor());
- newLastApplied = i;
+ log.debug("{}: Setting last applied to {}", logName(), i);
+
+ context.setLastApplied(i);
+ context.getApplyStateConsumer().accept(applyState);
} else {
//if one index is not present in the log, no point in looping
// around as the rest wont be present either
}
}
- log.debug("{}: Setting last applied to {}", logName(), newLastApplied);
-
- context.setLastApplied(newLastApplied);
-
// send a message to persist a ApplyLogEntries marker message into akka's persistent journal
// will be used during recovery
//in case if the above code throws an error and this message is not sent, it would be fine
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
private final SyncStatusTracker initialSyncStatusTracker;
- private final Procedure<ReplicatedLogEntry> appendAndPersistCallback =
- logEntry -> context.getReplicatedLog().captureSnapshotIfReady(logEntry);
-
private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
private SnapshotTracker snapshotTracker = null;
private String leaderId;
log.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(),
lastIndex, addEntriesFrom);
+ // When persistence successfully completes for each new log entry appended, we need to determine if we
+ // should capture a snapshot to compact the persisted log. shouldCaptureSnapshot tracks whether or not
+ // one of the log entries has exceeded the log size threshold whereby a snapshot should be taken. However
+ // we don't initiate the snapshot at that log entry but rather after the last log entry has been persisted.
+ // This is done because subsequent log entries after the one that tripped the threshold may have been
+ // applied to the state already, as the persistence callback occurs async, and we want those entries
+ // purged from the persisted log as well.
+ final AtomicBoolean shouldCaptureSnapshot = new AtomicBoolean(false);
+ final Procedure<ReplicatedLogEntry> appendAndPersistCallback = logEntry -> {
+ final ReplicatedLogEntry lastEntryToAppend = appendEntries.getEntries().get(
+ appendEntries.getEntries().size() - 1);
+ if (shouldCaptureSnapshot.get() && logEntry == lastEntryToAppend) {
+ context.getSnapshotManager().capture(context.getReplicatedLog().last(), getReplicatedToAllIndex());
+ }
+ };
+
// 4. Append any new entries not already in the log
for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) {
ReplicatedLogEntry entry = appendEntries.getEntries().get(i);
context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback, false);
+ shouldCaptureSnapshot.compareAndSet(false,
+ context.getReplicatedLog().shouldCaptureSnapshot(entry.getIndex()));
+
if (entry.getData() instanceof ServerConfigurationPayload) {
context.updatePeerIds((ServerConfigurationPayload)entry.getData());
}
@Override
public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
}
+
+ @Override
+ public boolean shouldCaptureSnapshot(long logIndex) {
+ return false;
+ }
}
}
(snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
}
+ @Override
+ public RaftActorContext getRaftActorContext() {
+ return super.getRaftActorContext();
+ }
+
public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
return snapshotMessageSupport;
}
public MockRaftActorContext() {
super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
- new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG);
+ new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG);
setReplicatedLog(new MockReplicatedLogBuilder().build());
}
public MockRaftActorContext(String id, ActorSystem system, ActorRef actor) {
super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
- new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), LOG);
+ new DefaultConfigParamsImpl(), new NonPersistentDataProvider(),
+ applyState -> actor.tell(applyState, actor), LOG);
this.system = system;
public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
}
+ @Override
+ public boolean shouldCaptureSnapshot(long logIndex) {
+ return false;
+ }
+
@Override
public boolean removeFromAndPersist(long index) {
return removeFrom(index) >= 0;
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
"test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
- peerMap, configParams, new NonPersistentDataProvider(), log);
+ peerMap, configParams, new NonPersistentDataProvider(), applyState -> { }, log);
assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
assertEquals("getPeerAddress", null, context.getPeerAddress("peer2"));
RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
"test", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")), configParams,
- new NonPersistentDataProvider(), log);
+ new NonPersistentDataProvider(), applyState -> { }, log);
context.setPeerAddress("peer1", "peerAddress1_1");
assertEquals("getPeerAddress", "peerAddress1_1", context.getPeerAddress("peer1"));
RaftActorContextImpl context = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
"self", new ElectionTermImpl(new NonPersistentDataProvider(), "test", log), -1, -1,
Maps.newHashMap(ImmutableMap.<String, String>of("peer1", "peerAddress1")),
- new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), log);
+ new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, log);
context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo("self", false),
new ServerInfo("peer2", true), new ServerInfo("peer3", false))));
MockitoAnnotations.initMocks(this);
context = new RaftActorContextImpl(null, null, localId, new ElectionTermImpl(mockPersistentProvider, "test",
- LOG), -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
+ LOG), -1, -1, Collections.<String,String>emptyMap(), configParams,
+ mockPersistence, applyState -> { }, LOG);
support = new RaftActorRecoverySupport(context, mockCohort);
assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
newFollowerActorContext.getPeerIds());
- expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
- expectFirstMatching(followerActor, ApplyState.class);
-
assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
termInfo.update(1, LEADER_ID);
return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
- id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
+ id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
+ noPersistence, applyState -> actor.tell(applyState, actor), LOG);
}
abstract static class AbstractMockRaftActor extends MockRaftActor {
MockitoAnnotations.initMocks(this);
context = new RaftActorContextImpl(mockRaftActorRef, null, "test",
- new ElectionTermImpl(mockPersistence, "test", LOG),
- -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG) {
+ new ElectionTermImpl(mockPersistence, "test", LOG), -1, -1, Collections.<String,String>emptyMap(),
+ configParams, mockPersistence, applyState -> { }, LOG) {
@Override
public SnapshotManager getSnapshotManager() {
return mockSnapshotManager;
ReplicatedLogEntry entry = new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("F"));
final Identifier id = new MockIdentifier("apply-state");
- mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry));
+ mockRaftActor.getRaftActorContext().getApplyStateConsumer().accept(new ApplyState(mockActorRef, id, entry));
verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject());
}
MockitoAnnotations.initMocks(this);
context = new RaftActorContextImpl(null, null, "test",
- new ElectionTermImpl(mockPersistence, "test", LOG),
- -1, -1, Collections.<String,String>emptyMap(), configParams, mockPersistence, LOG);
+ new ElectionTermImpl(mockPersistence, "test", LOG), -1, -1, Collections.<String,String>emptyMap(),
+ configParams, mockPersistence, applyState -> { }, LOG);
}
private void verifyPersist(Object message) throws Exception {
Mockito.doReturn(1L).when(mockElectionTerm).getCurrentTerm();
RaftActorContext raftActorContext = new RaftActorContextImpl(candidateActor, candidateActor.actorContext(),
"candidate", mockElectionTerm, -1, -1, setupPeers(4), new DefaultConfigParamsImpl(),
- new NonPersistentDataProvider(), LOG);
+ new NonPersistentDataProvider(), applyState -> { }, LOG);
raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
raftActorContext.getPeerInfo("peer1").setVotingState(VotingState.NON_VOTING);
raftActorContext.getPeerInfo("peer4").setVotingState(VotingState.NON_VOTING);
import akka.actor.ActorRef;
import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.MockRaftActor;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorTest;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import scala.concurrent.duration.FiniteDuration;
verify(follower, never()).scheduleElection(any(FiniteDuration.class));
}
+ @Test
+ public void testCaptureSnapshotOnLastEntryInAppendEntries() throws Exception {
+ String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
+ logStart(id);
+
+ InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setSnapshotBatchCount(2);
+ config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+ final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
+ RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
+ Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
+ .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+ TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+ followerRaftActor.set(followerActorRef.underlyingActor());
+ followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+ InMemorySnapshotStore.addSnapshotSavedLatch(id);
+ InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+
+ List<ReplicatedLogEntry> entries = Arrays.asList(
+ newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
+
+ AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
+
+ followerActorRef.tell(appendEntries, leaderActor);
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertEquals("isSuccess", true, reply.isSuccess());
+
+ final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+
+ InMemoryJournal.waitForDeleteMessagesComplete(id);
+ // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
+ // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
+ // This is OK - on recovery it will be a no-op since index 1 has already been applied.
+ List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
+ assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
+ assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
+ assertEquals("ApplyJournalEntries index", 1, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
+
+ assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
+ assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+ assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
+ assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
+ assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
+ assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
+ MockRaftActor.toObject(snapshot.getState()));
+ }
+
+ @Test
+ public void testCaptureSnapshotOnMiddleEntryInAppendEntries() throws Exception {
+ String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
+ logStart(id);
+
+ InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setSnapshotBatchCount(2);
+ config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+ final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
+ RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
+ Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
+ .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+ TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+ followerRaftActor.set(followerActorRef.underlyingActor());
+ followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+ InMemorySnapshotStore.addSnapshotSavedLatch(id);
+ InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+
+ List<ReplicatedLogEntry> entries = Arrays.asList(
+ newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
+ newReplicatedLogEntry(1, 2, "three"));
+
+ AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 2, -1, (short)0);
+
+ followerActorRef.tell(appendEntries, leaderActor);
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertEquals("isSuccess", true, reply.isSuccess());
+
+ final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+
+ InMemoryJournal.waitForDeleteMessagesComplete(id);
+ // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
+ // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
+ // This is OK - on recovery it will be a no-op since index 2 has already been applied.
+ List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
+ assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
+ assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
+ assertEquals("ApplyJournalEntries index", 2, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
+
+ assertEquals("Snapshot unapplied size", 0, snapshot.getUnAppliedEntries().size());
+ assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+ assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
+ assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
+ assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
+ assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
+ entries.get(2).getData()), MockRaftActor.toObject(snapshot.getState()));
+
+ assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
+ assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
+
+ // Reinstate the actor from persistence
+
+ actorFactory.killActor(followerActorRef, new JavaTestKit(getSystem()));
+
+ followerActorRef = actorFactory.createTestActor(builder.props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+ followerRaftActor.set(followerActorRef.underlyingActor());
+ followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+ assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
+ assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
+ assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
+ assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
+ assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
+ entries.get(2).getData()), followerRaftActor.get().getState());
+ }
+
+ @Test
+ public void testCaptureSnapshotOnAppendEntriesWithUnapplied() throws Exception {
+ String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
+ logStart(id);
+
+ InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, null));
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setSnapshotBatchCount(1);
+ config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+ final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
+ RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
+ Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
+ .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+ TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
+ followerRaftActor.set(followerActorRef.underlyingActor());
+ followerRaftActor.get().waitForInitializeBehaviorComplete();
+
+ InMemorySnapshotStore.addSnapshotSavedLatch(id);
+ InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+
+ List<ReplicatedLogEntry> entries = Arrays.asList(
+ newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
+ newReplicatedLogEntry(1, 2, "three"));
+
+ AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 0, -1, (short)0);
+
+ followerActorRef.tell(appendEntries, leaderActor);
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertEquals("isSuccess", true, reply.isSuccess());
+
+ final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
+
+ InMemoryJournal.waitForDeleteMessagesComplete(id);
+ // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
+ // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
+ // This is OK - on recovery it will be a no-op since index 0 has already been applied.
+ List<Object> journalEntries = InMemoryJournal.get(id, Object.class);
+ assertEquals("Persisted journal entries size: " + journalEntries, 1, journalEntries.size());
+ assertEquals("Persisted journal entry type", ApplyJournalEntries.class, journalEntries.get(0).getClass());
+ assertEquals("ApplyJournalEntries index", 0, ((ApplyJournalEntries)journalEntries.get(0)).getToIndex());
+
+ assertEquals("Snapshot unapplied size", 2, snapshot.getUnAppliedEntries().size());
+ assertEquals("Snapshot unapplied entry index", 1, snapshot.getUnAppliedEntries().get(0).getIndex());
+ assertEquals("Snapshot unapplied entry index", 2, snapshot.getUnAppliedEntries().get(1).getIndex());
+ assertEquals("Snapshot getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
+ assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
+ assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
+ assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
+ assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
+ MockRaftActor.toObject(snapshot.getState()));
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private RaftActorSnapshotCohort newRaftActorSnapshotCohort(final AtomicReference<MockRaftActor> followerRaftActor) {
+ RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
+ @Override
+ public void createSnapshot(ActorRef actorRef) {
+ try {
+ actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(
+ followerRaftActor.get().getState()).toByteArray()), actorRef);
+ } catch (Exception e) {
+ Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void applySnapshot(byte[] snapshotBytes) {
+ }
+ };
+ return snapshotCohort;
+ }
+
public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) {
int snapshotLength = bs.size();
int start = offset;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
store.setSchemaContext(SCHEMA_CONTEXT);
- writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- final NormalizedNode<?, ?> root = readStore(store, YangInstanceIdentifier.EMPTY);
- final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
- Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
-
- shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
final DataTreeModification writeMod = store.takeSnapshot().newModification();
final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
writeMod.ready();
final TransactionIdentifier tx = nextTransactionId();
- final ApplyState applyState = new ApplyState(null, tx,
- new SimpleReplicatedLogEntry(1, 2, payloadForModification(store, writeMod, tx)));
-
- shard.tell(applyState, shard);
+ shard.underlyingActor().applyState(null, null, payloadForModification(store, writeMod, tx));
final Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 5) {