*/
package org.opendaylight.controller.cluster.raft;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import akka.persistence.SaveSnapshotSuccess;
import com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.clearMessages(follower2CollectorActor);
+ expSnapshotState.add(payload0);
+ expSnapshotState.add(payload1);
+ expSnapshotState.add(payload2);
+
testLog.info("testInitialReplications complete");
}
testLog.info("testSubsequentReplicationsAndSnapshots starting: sending first payload, replicatedToAllIndex: {}",
leader.getReplicatedToAllIndex());
- leaderActor.underlyingActor().setSnapshot(new byte[] {2});
-
follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
// Send the first payload - this should cause the first snapshot.
MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
- byte[] snapshot = new byte[] {6};
- leaderActor.underlyingActor().setSnapshot(snapshot);
+ expSnapshotState.add(payload3);
testLog.info("testSubsequentReplicationsAndSnapshots: sending 4 more payloads");
// Verify the leader's persisted snapshot.
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
- verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7, snapshot);
+ verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7);
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 4, unAppliedEntry.size());
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.clearMessages(follower2CollectorActor);
+ expSnapshotState.add(payload4);
+ expSnapshotState.add(payload5);
+ expSnapshotState.add(payload6);
+ expSnapshotState.add(payload7);
+
testLog.info("testSubsequentReplicationsAndSnapshots complete");
}
leader.getReplicatedToAllIndex());
leaderActor.underlyingActor().setMockTotalMemory(1000);
- byte[] snapshot = new byte[] {6};
- leaderActor.underlyingActor().setSnapshot(snapshot);
// We'll expect a ReplicatedLogImplEntry message and an ApplyJournalEntries message added to the journal.
InMemoryJournal.addWriteMessagesCompleteLatch(leaderId, 2);
CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
+ expSnapshotState.add(payload8);
+
// Send another payload with a large enough relative size in combination with the last payload
// that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
payload9 = sendPayloadData(leaderActor, "nine", 201);
// Verify the leader's persisted snapshot.
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
- verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9, snapshot);
+ verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9);
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 9, payload9);
MessageCollectorActor.clearMessages(leaderCollectorActor);
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.clearMessages(follower2CollectorActor);
+
+ expSnapshotState.add(payload10);
}
/**
InstallSnapshot installSnapshot;
InstallSnapshotReply installSnapshotReply;
- byte[] snapshot = new byte[] {10};
- leaderActor.underlyingActor().setSnapshot(snapshot);
+ expSnapshotState.add(payload9);
// Now stop dropping AppendEntries in follower 2.
follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex());
assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
- assertEquals("InstallSnapshot getLastIncludedIndex", 8, installSnapshot.getLastIncludedIndex());
- assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
+ assertEquals("InstallSnapshot getLastIncludedIndex", 9, installSnapshot.getLastIncludedIndex());
+ //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
// Verify follower 2 applies the snapshot.
applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
- verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8, snapshot);
+ verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 9, currentTerm, 9);
assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size());
- // Verify follower 2 only applies the second log entry (9) as the first one (8) was in the snapshot.
- applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
- verifyApplyState(applyState, null, null, currentTerm, 9, payload9);
-
// Wait for the snapshot to complete.
MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
- verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9, snapshot);
+ verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9);
unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
/**
* Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and
* snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower.
+ * @throws Exception
*/
- private void testFinalReplicationsAndSnapshot() {
+ private void testFinalReplicationsAndSnapshot() throws Exception {
List<ApplyState> applyStates;
ApplyState applyState;
testLog.info("testFinalReplicationsAndSnapshot starting: replicatedToAllIndex: {}", leader.getReplicatedToAllIndex());
- byte[] snapshot = new byte[] {14};
- leaderActor.underlyingActor().setSnapshot(snapshot);
-
// Send another payload - a snapshot should occur.
payload11 = sendPayloadData(leaderActor, "eleven");
// Verify the leader's last persisted snapshot (previous ones may not be purged yet).
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
- verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11, snapshot);
+ verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11);
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 11, payload11);