}
private void onRecoveredApplyLogEntries(long toIndex) {
+ long lastUnappliedIndex = context.getLastApplied() + 1;
+
if(log.isDebugEnabled()) {
- log.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
- context.getId(), context.getLastApplied() + 1, toIndex);
+ log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}",
+ context.getId(), lastUnappliedIndex, toIndex);
}
- for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
- batchRecoveredLogEntry(replicatedLog().get(i));
+ long lastApplied = lastUnappliedIndex - 1;
+ for (long i = lastUnappliedIndex; i <= toIndex; i++) {
+ ReplicatedLogEntry logEntry = replicatedLog().get(i);
+ if(logEntry != null) {
+ lastApplied++;
+ batchRecoveredLogEntry(logEntry);
+ } else {
+ // Shouldn't happen but cover it anyway.
+ log.error("Log entry not found for index {}", i);
+ break;
+ }
}
- context.setLastApplied(toIndex);
- context.setCommitIndex(toIndex);
+ context.setLastApplied(lastApplied);
+ context.setCommitIndex(lastApplied);
}
private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.protobuf.ByteString;
+import java.util.List;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
public class SnapshotManager implements SnapshotState {
-
private final SnapshotState IDLE = new Idle();
private final SnapshotState CAPTURING = new Capturing();
private final SnapshotState PERSISTING = new Persisting();
private SnapshotState currentState = IDLE;
private CaptureSnapshot captureSnapshot;
+ private long lastSequenceNumber = -1;
public SnapshotManager(RaftActorContext context, Logger logger) {
this.context = context;
long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
// send a CaptureSnapshot to self to make the expensive operation async.
+
+ List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
+
captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
- newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null);
+ newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
SnapshotManager.this.currentState = CAPTURING;
- if(targetFollower != null){
- LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
- } else {
+ if(captureSnapshot.isInstallSnapshotInitiated()) {
LOG.info("{}: Initiating snapshot capture {} to install on {}",
persistenceId(), captureSnapshot, targetFollower);
+ } else {
+ LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
}
+ lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
+
+ LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
+
context.getActor().tell(captureSnapshot, context.getActor());
return true;
// when snapshot is saved async, SaveSnapshotSuccess is raised.
Snapshot sn = Snapshot.create(snapshotBytes,
- context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+ captureSnapshot.getUnAppliedEntries(),
captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
- persistenceProvider.deleteMessages(sequenceNumber);
+ persistenceProvider.deleteMessages(lastSequenceNumber);
+ lastSequenceNumber = -1;
SnapshotManager.this.currentState = IDLE;
}
package org.opendaylight.controller.cluster.raft.base.messages;
+import java.util.Collections;
+import java.util.List;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+
public class CaptureSnapshot {
private final long lastAppliedIndex;
private final long lastAppliedTerm;
private final boolean installSnapshotInitiated;
private final long replicatedToAllIndex;
private final long replicatedToAllTerm;
+ private final List<ReplicatedLogEntry> unAppliedEntries;
- public CaptureSnapshot(long lastIndex, long lastTerm,
- long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm) {
- this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex , replicatedToAllTerm, false);
+ public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm,
+ long replicatedToAllIndex, long replicatedToAllTerm, List<ReplicatedLogEntry> unAppliedEntries) {
+ this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex, replicatedToAllTerm,
+ unAppliedEntries, false);
}
- public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex,
- long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, boolean installSnapshotInitiated) {
+ public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex,
+ long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm,
+ List<ReplicatedLogEntry> unAppliedEntries, boolean installSnapshotInitiated) {
this.lastIndex = lastIndex;
this.lastTerm = lastTerm;
this.lastAppliedIndex = lastAppliedIndex;
this.installSnapshotInitiated = installSnapshotInitiated;
this.replicatedToAllIndex = replicatedToAllIndex;
this.replicatedToAllTerm = replicatedToAllTerm;
+ this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries : Collections.<ReplicatedLogEntry>emptyList();
}
public long getLastAppliedIndex() {
return replicatedToAllTerm;
}
+ public List<ReplicatedLogEntry> getUnAppliedEntries() {
+ return unAppliedEntries;
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
.append(lastAppliedTerm).append(", lastIndex=").append(lastIndex).append(", lastTerm=")
.append(lastTerm).append(", installSnapshotInitiated=").append(installSnapshotInitiated)
.append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", replicatedToAllTerm=")
- .append(replicatedToAllTerm).append("]");
+ .append(replicatedToAllTerm).append(", unAppliedEntries size=").append(unAppliedEntries.size()).append("]");
return builder.toString();
}
+
+
}
*/
package org.opendaylight.controller.cluster.raft;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
private final TestActorRef<MessageCollectorActor> collectorActor;
private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
- private volatile byte[] snapshot;
private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
TestActorRef<MessageCollectorActor> collectorActor) {
@Override
public void createSnapshot(ActorRef actorRef) {
- if(snapshot != null) {
- getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender());
+ try {
+ actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
- @Override
- public void applyRecoverySnapshot(byte[] bytes) {
- }
-
- void setSnapshot(byte[] snapshot) {
- this.snapshot = snapshot;
- }
-
public ActorRef collectorActor() {
return collectorActor;
}
protected long initialTerm = 5;
protected long currentTerm;
+ protected List<Object> expSnapshotState = new ArrayList<>();
+
@After
public void tearDown() {
InMemoryJournal.clear();
});
}
+ @SuppressWarnings("unchecked")
protected void verifySnapshot(String prefix, Snapshot snapshot, long lastAppliedTerm,
- int lastAppliedIndex, long lastTerm, long lastIndex, byte[] data) {
+ int lastAppliedIndex, long lastTerm, long lastIndex)
+ throws Exception {
assertEquals(prefix + " Snapshot getLastAppliedTerm", lastAppliedTerm, snapshot.getLastAppliedTerm());
assertEquals(prefix + " Snapshot getLastAppliedIndex", lastAppliedIndex, snapshot.getLastAppliedIndex());
assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm());
assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex());
- assertArrayEquals(prefix + " Snapshot getState", data, snapshot.getState());
+
+ List<Object> actualState = (List<Object>)MockRaftActor.toObject(snapshot.getState());
+ assertEquals(prefix + " Snapshot getState size", expSnapshotState.size(), actualState.size());
+ for(int i = 0; i < expSnapshotState.size(); i++) {
+ assertEquals(prefix + " Snapshot state " + i, expSnapshotState.get(i), actualState.get(i));
+ }
}
protected void verifyPersistedJournal(String persistenceId, List<? extends ReplicatedLogEntry> expJournal) {
return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
}
-
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
actorDelegate.applyState(clientActor, identifier, data);
- LOG.info("{}: applyState called", persistenceId());
+ LOG.info("{}: applyState called: {}", persistenceId(), data);
+
+ state.add(data);
}
@Override
return this.getId();
}
- private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
+ public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
Object obj = null;
ByteArrayInputStream bis = null;
ObjectInputStream ois = null;
@Test
public void testOnCaptureSnapshot() throws Exception {
- sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1));
+ sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1, null));
ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
verify(mockSnapshotManager).create(procedure.capture());
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
public class RaftActorTest extends AbstractActorTest {
+ static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
+
private TestActorFactory factory;
@Before
@Test
public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
+ TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
+
new JavaTestKit(getSystem()) {{
String persistenceId = factory.generateActorId("follower-");
// log entry.
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
- ImmutableMap.<String, String>builder().put("member1", "address").build(),
- Optional.<ConfigParams>of(config)), persistenceId);
+ peerAddresses, Optional.<ConfigParams>of(config)), persistenceId);
watch(followerActor);
//reinstate the actor
TestActorRef<MockRaftActor> ref = factory.createTestActor(
- MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
- Optional.<ConfigParams>of(config)));
+ MockRaftActor.props(persistenceId, peerAddresses, Optional.<ConfigParams>of(config)));
MockRaftActor mockRaftActor = ref.underlyingActor();
assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
}};
+
+ TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
}
@Test
doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
mockRaftActor.handleCommand(applySnapshot);
- CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1);
+ CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
mockRaftActor.handleCommand(captureSnapshot);
}};
}
- private ByteString fromObject(Object snapshot) throws Exception {
+ public static ByteString fromObject(Object snapshot) throws Exception {
ByteArrayOutputStream b = null;
ObjectOutputStream o = null;
try {
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import akka.persistence.SaveSnapshotSuccess;
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+
+/**
+ * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
+ *
+ * @author Thomas Pantelis
+ */
+public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
+
+ private MockPayload payload0;
+ private MockPayload payload1;
+
+ @Before
+ public void setup() {
+ follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+ newFollowerConfigParams());
+
+ peerAddresses = ImmutableMap.<String, String>builder().
+ put(follower1Id, follower1Actor.path().toString()).build();
+
+ leaderConfigParams = newLeaderConfigParams();
+ leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+ follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
+ leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
+
+ leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+ }
+
+ @Test
+ public void testStatePersistedBetweenSnapshotCaptureAndPersist() {
+
+ send2InitialPayloads();
+
+ // Block these messages initially so we can control the sequence.
+ leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+ leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
+ follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+ MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+ // This should trigger a snapshot.
+ MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+ MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+ CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
+ leaderCollectorActor, CaptureSnapshot.class);
+
+ // First, deliver the CaptureSnapshot to the leader.
+ leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
+ leaderActor.tell(captureSnapshot, leaderActor);
+
+ // Send another payload.
+ MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+ // Now deliver the AppendEntries to the follower
+ follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+ MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+ // Now deliver the CaptureSnapshotReply to the leader.
+ CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
+ leaderCollectorActor, CaptureSnapshotReply.class);
+ leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
+ leaderActor.tell(captureSnapshotReply, leaderActor);
+
+ // Wait for snapshot complete.
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+ reinstateLeaderActor();
+
+ assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+ assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+ assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+ assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+ assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+ assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
+ leaderActor.underlyingActor().getState());
+ }
+
+ @Test
+ public void testStatePersistedBetweenInitiateSnapshotAndCapture() {
+
+ send2InitialPayloads();
+
+ // Block these messages initially so we can control the sequence.
+ leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+ follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+ MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+ // This should trigger a snapshot.
+ MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+ // Send another payload.
+ MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+ MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+ CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
+ leaderCollectorActor, CaptureSnapshot.class);
+
+ // First, deliver the AppendEntries to the follower
+ follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+ MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+ // Now deliver the CaptureSnapshot to the leader.
+ leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
+ leaderActor.tell(captureSnapshot, leaderActor);
+
+ // Wait for snapshot complete.
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+ reinstateLeaderActor();
+
+ assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+ assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+ assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+ assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+ assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+ // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so
+ // were included in the snapshot. They were also included as unapplied entries in the snapshot as
+ // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the
+ // state on recovery by the ApplyJournalEntries messages which remained in the persisted log.
+ // This is a side effect of trimming the persisted log to the sequence number captured at the time
+ // the snapshot was initiated.
+ assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2,
+ payload3, payload4), leaderActor.underlyingActor().getState());
+ }
+
+ @Test
+ public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() {
+
+ send2InitialPayloads();
+
+ // Block these messages initially so we can control the sequence.
+ follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+ MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+ // This should trigger a snapshot.
+ MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+ // Send another payload.
+ MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+ MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+ // Wait for snapshot complete.
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+ // Now deliver the AppendEntries to the follower
+ follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+ MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+ reinstateLeaderActor();
+
+ assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+ assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+ assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+ assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+ assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+ assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
+ leaderActor.underlyingActor().getState());
+ }
+
+ private void reinstateLeaderActor() {
+ killActor(leaderActor);
+
+ leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+ leaderActor.underlyingActor().waitForRecoveryComplete();
+
+ leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+ }
+
+ private void send2InitialPayloads() {
+ waitUntilLeader(leaderActor);
+ currentTerm = leaderContext.getTermInformation().getCurrentTerm();
+
+ payload0 = sendPayloadData(leaderActor, "zero");
+ payload1 = sendPayloadData(leaderActor, "one");
+
+ // Verify the leader applies the states.
+ MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
+
+ assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
+
+ MessageCollectorActor.clearMessages(leaderCollectorActor);
+ MessageCollectorActor.clearMessages(follower1CollectorActor);
+ }
+}
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
private MockPayload recoveredPayload0;
private MockPayload recoveredPayload1;
private MockPayload recoveredPayload2;
+ private MockPayload payload3;
private MockPayload payload4;
private MockPayload payload5;
private MockPayload payload6;
private MockPayload payload7;
@Test
- public void runTest() {
+ public void runTest() throws Exception {
testLog.info("testReplicationAndSnapshots starting");
// Setup the persistent journal for the leader. We'll start up with 3 journal log entries (one less
InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(1, initialTerm, recoveredPayload1));
recoveredPayload2 = new MockPayload("two");
InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(2, initialTerm, recoveredPayload2));
- InMemoryJournal.addEntry(leaderId, seqId++, new ApplyLogEntries(2));
+ InMemoryJournal.addEntry(leaderId, seqId++, new ApplyJournalEntries(2));
origLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
* 4 and we already have 3 entries in the journal log, this should initiate a snapshot. In this
* scenario, the follower consensus and application of state is delayed until after the snapshot
* completes.
+ * @throws Exception
*/
- private void testFirstSnapshot() {
+ private void testFirstSnapshot() throws Exception {
testLog.info("testFirstSnapshot starting");
- byte[] snapshot = new byte[] {1,2,3,4};
- leaderActor.underlyingActor().setSnapshot(snapshot);
+ expSnapshotState.add(recoveredPayload0);
+ expSnapshotState.add(recoveredPayload1);
+ expSnapshotState.add(recoveredPayload2);
// Delay the consensus by temporarily dropping the AppendEntries to both followers.
follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
// Send the payload.
- MockPayload payload3 = sendPayloadData(leaderActor, "three");
+ payload3 = sendPayloadData(leaderActor, "three");
// Wait for snapshot complete.
MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
// the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index.
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
- verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3, snapshot);
+ verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3);
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
/**
* Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until
* consensus occurs and the leader applies the state.
+ * @throws Exception
*/
- private void testSecondSnapshot() {
+ private void testSecondSnapshot() throws Exception {
testLog.info("testSecondSnapshot starting");
- byte[] snapshot = new byte[] {5,6,7,8};
- leaderActor.underlyingActor().setSnapshot(snapshot);
+ expSnapshotState.add(payload3);
+ expSnapshotState.add(payload4);
+ expSnapshotState.add(payload5);
+ expSnapshotState.add(payload6);
// Delay the CaptureSnapshot message to the leader actor.
leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
- // Verify the persisted snapshot. This should reflect the advanced snapshot index as the last applied
- // log entry (6).
+ expSnapshotState.add(payload7);
+
+ // Verify the persisted snapshot. This should reflect the snapshot index as the last applied
+ // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
+ // when the snapshot is created (ie when the CaptureSnapshot is processed).
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
- verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7, snapshot);
+ verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
leaderActor.underlyingActor().waitForRecoveryComplete();
+ leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+
assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
*/
package org.opendaylight.controller.cluster.raft;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import akka.persistence.SaveSnapshotSuccess;
import com.google.common.collect.ImmutableMap;
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.clearMessages(follower2CollectorActor);
+ expSnapshotState.add(payload0);
+ expSnapshotState.add(payload1);
+ expSnapshotState.add(payload2);
+
testLog.info("testInitialReplications complete");
}
testLog.info("testSubsequentReplicationsAndSnapshots starting: sending first payload, replicatedToAllIndex: {}",
leader.getReplicatedToAllIndex());
- leaderActor.underlyingActor().setSnapshot(new byte[] {2});
-
follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
// Send the first payload - this should cause the first snapshot.
MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
- byte[] snapshot = new byte[] {6};
- leaderActor.underlyingActor().setSnapshot(snapshot);
+ expSnapshotState.add(payload3);
testLog.info("testSubsequentReplicationsAndSnapshots: sending 4 more payloads");
// Verify the leader's persisted snapshot.
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
- verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7, snapshot);
+ verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7);
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 4, unAppliedEntry.size());
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.clearMessages(follower2CollectorActor);
+ expSnapshotState.add(payload4);
+ expSnapshotState.add(payload5);
+ expSnapshotState.add(payload6);
+ expSnapshotState.add(payload7);
+
testLog.info("testSubsequentReplicationsAndSnapshots complete");
}
leader.getReplicatedToAllIndex());
leaderActor.underlyingActor().setMockTotalMemory(1000);
- byte[] snapshot = new byte[] {6};
- leaderActor.underlyingActor().setSnapshot(snapshot);
// We'll expect a ReplicatedLogImplEntry message and an ApplyJournalEntries message added to the journal.
InMemoryJournal.addWriteMessagesCompleteLatch(leaderId, 2);
CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
+ expSnapshotState.add(payload8);
+
// Send another payload with a large enough relative size in combination with the last payload
// that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
payload9 = sendPayloadData(leaderActor, "nine", 201);
// Verify the leader's persisted snapshot.
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
- verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9, snapshot);
+ verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9);
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 9, payload9);
MessageCollectorActor.clearMessages(leaderCollectorActor);
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.clearMessages(follower2CollectorActor);
+
+ expSnapshotState.add(payload10);
}
/**
InstallSnapshot installSnapshot;
InstallSnapshotReply installSnapshotReply;
- byte[] snapshot = new byte[] {10};
- leaderActor.underlyingActor().setSnapshot(snapshot);
+ expSnapshotState.add(payload9);
// Now stop dropping AppendEntries in follower 2.
follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
assertEquals("InstallSnapshot getLastIncludedIndex", 8, installSnapshot.getLastIncludedIndex());
- assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
+ //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
// Verify follower 2 applies the snapshot.
applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
- verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8, snapshot);
+ verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8);
assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size());
// Verify follower 2 only applies the second log entry (9) as the first one (8) was in the snapshot.
persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
- verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9, snapshot);
+ verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9);
unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
/**
* Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and
* snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower.
+ * @throws Exception
*/
- private void testFinalReplicationsAndSnapshot() {
+ private void testFinalReplicationsAndSnapshot() throws Exception {
List<ApplyState> applyStates;
ApplyState applyState;
testLog.info("testFinalReplicationsAndSnapshot starting: replicatedToAllIndex: {}", leader.getReplicatedToAllIndex());
- byte[] snapshot = new byte[] {14};
- leaderActor.underlyingActor().setSnapshot(snapshot);
-
// Send another payload - a snapshot should occur.
payload11 = sendPayloadData(leaderActor, "eleven");
// Verify the leader's last persisted snapshot (previous ones may not be purged yet).
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
- verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11, snapshot);
+ verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11);
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 11, payload11);
doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
doReturn("123").when(mockRaftActorContext).getId();
+ doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider();
doReturn("123").when(mockRaftActorBehavior).getLeaderId();
ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
@Test
public void testCommit(){
+ doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
+
// when replicatedToAllIndex = -1
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
verify(mockReplicatedLog).snapshotCommit();
- verify(mockDataPersistenceProvider).deleteMessages(100L);
+ verify(mockDataPersistenceProvider).deleteMessages(50L);
ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
@Test
public void testCallingCommitMultipleTimesCausesNoHarm(){
+ doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
+
// when replicatedToAllIndex = -1
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
verify(mockReplicatedLog, times(1)).snapshotCommit();
- verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L);
+ verify(mockDataPersistenceProvider, times(1)).deleteMessages(50L);
verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
}
@Override
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
+ LOG.trace("doAsyncDeleteMessagesTo: {}", toSequenceNr);
Map<Long, Object> journal = journals.get(persistenceId);
if(journal != null) {
synchronized (journal) {