* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
-import akka.testkit.JavaTestKit;
+import akka.protobuf.ByteString;
import akka.testkit.TestActorRef;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
+import akka.testkit.javadsl.TestKit;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
-import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.Payload;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.yangtools.concepts.Identifier;
}
@Test
- public void testHandleMessageForUnknownMessage() throws Exception {
+ public void testHandleMessageForUnknownMessage() {
logStart("testHandleMessageForUnknownMessage");
leader = new Leader(createActorContext());
}
@Test
- public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
+ public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
MockRaftActorContext actorContext = createActorContextWithFollower();
return sendReplicate(actorContext, 1, index);
}
- private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
+ private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term,
+ final long index) {
return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
}
- private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) {
- SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
- actorContext.getReplicatedLog().append(newEntry);
- return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
+ private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
+ final Payload payload) {
+ actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(index, term, payload));
+ return leader.handleMessage(leaderActor, new Replicate(index, true, null, null));
}
@Test
- public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
+ public void testHandleReplicateMessageSendAppendEntriesToFollower() {
logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
MockRaftActorContext actorContext = createActorContextWithFollower();
}
@Test
- public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
+ public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() {
logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
MockRaftActorContext actorContext = createActorContextWithFollower();
}
@Test
- public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
+ public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() {
logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
MockRaftActorContext actorContext = createActorContextWithFollower();
}
@Test
- public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
+ public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() {
logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
MockRaftActorContext actorContext = createActorContextWithFollower();
}
@Test
- public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
+ public void testMultipleReplicateWithReplyShouldResultInAppendEntries() {
logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
MockRaftActorContext actorContext = createActorContextWithFollower();
sendReplicate(actorContext, lastIndex + i + 1);
leader.handleMessage(followerActor, new AppendEntriesReply(
FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
-
}
- for (int i = 3; i < 5; i++) {
- sendReplicate(actorContext, lastIndex + i + 1);
+ // We are expecting six messages here -- a request to replicate and a consensus-reached message
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of request/consensus appends collected", 6, allMessages.size());
+ for (int i = 0; i < 3; i++) {
+ assertRequestEntry(lastIndex, allMessages, i);
+ assertCommitEntry(lastIndex, allMessages, i);
}
- List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
- // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
- // get sent to the follower - but not the 5th
- assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
+ // Now perform another commit, eliciting a request to persist
+ sendReplicate(actorContext, lastIndex + 3 + 1);
+ allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ // This elicits another message for request to replicate
+ assertEquals("The number of request entries collected", 7, allMessages.size());
+ assertRequestEntry(lastIndex, allMessages, 3);
- for (int i = 0; i < 4; i++) {
- long expected = allMessages.get(i).getEntries().get(0).getIndex();
- assertEquals(expected, i + 2);
- }
+ sendReplicate(actorContext, lastIndex + 4 + 1);
+ allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of request entries collected", 7, allMessages.size());
+ }
+
+ private static void assertCommitEntry(final long lastIndex, final List<AppendEntries> allMessages,
+ final int messageNr) {
+ final AppendEntries commitReq = allMessages.get(2 * messageNr + 1);
+ assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit());
+ assertEquals(List.of(), commitReq.getEntries());
+ }
+
+ private static void assertRequestEntry(final long lastIndex, final List<AppendEntries> allMessages,
+ final int messageNr) {
+ final AppendEntries req = allMessages.get(2 * messageNr);
+ assertEquals(lastIndex + messageNr, req.getLeaderCommit());
+
+ final List<ReplicatedLogEntry> entries = req.getEntries();
+ assertEquals(1, entries.size());
+ assertEquals(messageNr + 2, entries.get(0).getIndex());
}
@Test
- public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
+ public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() {
logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
MockRaftActorContext actorContext = createActorContextWithFollower();
}
@Test
- public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
+ public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() {
logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
MockRaftActorContext actorContext = createActorContextWithFollower();
}
@Test
- public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
+ public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() {
logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
MockRaftActorContext actorContext = createActorContextWithFollower();
@Test
- public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
+ public void testHandleReplicateMessageWhenThereAreNoFollowers() {
logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
MockRaftActorContext actorContext = createActorContext();
actorContext.setLastApplied(0);
- long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
- long term = actorContext.getTermInformation().getCurrentTerm();
- ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
- newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
+ final long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
+ final long term = actorContext.getTermInformation().getCurrentTerm();
+ final var data = new MockRaftActorContext.MockPayload("foo");
- actorContext.getReplicatedLog().append(newEntry);
+ actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(newLogIndex, term, data));
final Identifier id = new MockIdentifier("state-id");
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
- new Replicate(leaderActor, id, newEntry, true));
+ final var raftBehavior = leader.handleMessage(leaderActor, new Replicate(newLogIndex, true, leaderActor, id));
// State should not change
assertTrue(raftBehavior instanceof Leader);
// We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
// one since lastApplied state is 0.
- List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
- leaderActor, ApplyState.class);
+ final var applyStateList = MessageCollectorActor.getAllMatching(leaderActor, ApplyState.class);
assertEquals("ApplyState count", newLogIndex, applyStateList.size());
for (int i = 0; i <= newLogIndex - 1; i++) {
}
ApplyState last = applyStateList.get((int) newLogIndex - 1);
- assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
+ assertEquals("getData", data, last.getReplicatedLogEntry().getData());
assertEquals("getIdentifier", id, last.getIdentifier());
}
final MockRaftActorContext actorContext = createActorContextWithFollower();
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
-
//clears leaders log
actorContext.getReplicatedLog().removeFrom(0);
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
- ByteString bs = toByteString(leadersSnapshot);
+ ByteString bs = toByteString(Map.of("1", "A", "2", "B", "3", "C"));
leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
- Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
-1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
}
@Test
- public void testSendAppendEntriesSnapshotScenario() throws Exception {
+ public void testSendAppendEntriesSnapshotScenario() {
logStart("testSendAppendEntriesSnapshotScenario");
final MockRaftActorContext actorContext = createActorContextWithFollower();
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// new entry
- SimpleReplicatedLogEntry entry =
- new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
-
- actorContext.getReplicatedLog().append(entry);
+ actorContext.getReplicatedLog().append(
+ new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")));
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
// this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
RaftActorBehavior raftBehavior = leader.handleMessage(
- leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+ leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
assertTrue(raftBehavior instanceof Leader);
}
@Test
- public void testInitiateInstallSnapshot() throws Exception {
+ public void testInitiateInstallSnapshot() {
logStart("testInitiateInstallSnapshot");
MockRaftActorContext actorContext = createActorContextWithFollower();
leader.setSnapshotHolder(null);
// new entry
- SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
-
- actorContext.getReplicatedLog().append(entry);
+ actorContext.getReplicatedLog().append(
+ new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")));
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
- leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+ leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
assertEquals(2, cs.getLastTerm());
// if an initiate is started again when first is in progress, it shouldnt initiate Capture
- leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+ leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
actorContext.getReplicatedLog().removeFrom(0);
- AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
+ AtomicReference<Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
leader = new Leader(actorContext);
}
// new entry
- SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
-
- actorContext.getReplicatedLog().append(entry);
+ actorContext.getReplicatedLog().append(
+ new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")));
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
// Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
// installed with a SendInstallSnapshot
- leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+ RaftVersions.CURRENT_VERSION));
assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
MessageCollectorActor.clearMessages(followerActor);
// Sending Replicate message should not initiate another capture since the first is in progress.
- leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+ leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
// Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
- leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+ RaftVersions.CURRENT_VERSION));
assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
// Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
final byte[] bytes = new byte[]{1, 2, 3};
- installSnapshotStream.get().get().write(bytes);
+ installSnapshotStream.get().orElseThrow().write(bytes);
actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
Runtime.getRuntime().totalMemory());
MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
// Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
MessageCollectorActor.clearMessages(followerActor);
- leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+ RaftVersions.CURRENT_VERSION));
MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
}
@Test
- public void testInstallSnapshot() throws Exception {
+ public void testInstallSnapshot() {
logStart("testInstallSnapshot");
final MockRaftActorContext actorContext = createActorContextWithFollower();
leader.getFollower(FOLLOWER_ID).setNextIndex(0);
byte[] bytes = toByteString(leadersSnapshot).toByteArray();
- Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+ Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(),
lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
}
@Test
- public void testForceInstallSnapshot() throws Exception {
+ public void testForceInstallSnapshot() {
logStart("testForceInstallSnapshot");
final MockRaftActorContext actorContext = createActorContextWithFollower();
leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
byte[] bytes = toByteString(leadersSnapshot).toByteArray();
- Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.<ReplicatedLogEntry>emptyList(),
+ Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(),
lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null);
RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
ByteString bs = toByteString(leadersSnapshot);
leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
- Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
+ List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
-1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
}
@Test
- public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
+ public void testSendSnapshotfromInstallSnapshotReply() {
logStart("testSendSnapshotfromInstallSnapshotReply");
MockRaftActorContext actorContext = createActorContextWithFollower();
ByteString bs = toByteString(leadersSnapshot);
Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
- Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
- -1, null, null);
+ List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
@Test
- public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+ public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() {
logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
MockRaftActorContext actorContext = createActorContextWithFollower();
ByteString bs = toByteString(leadersSnapshot);
Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
- Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
- -1, null, null);
+ List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
}
@Test
- public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
+ public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() {
logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
MockRaftActorContext actorContext = createActorContextWithFollower();
ByteString bs = toByteString(leadersSnapshot);
Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()),
- Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
- -1, null, null);
+ List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null);
leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray())));
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
- assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
- installSnapshot.getLastChunkHashCode().get().intValue());
+ assertEquals(OptionalInt.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE),
+ installSnapshot.getLastChunkHashCode());
final int hashCode = Arrays.hashCode(installSnapshot.getData());
assertEquals(2, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
- assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
+ assertEquals(OptionalInt.of(hashCode), installSnapshot.getLastChunkHashCode());
}
@Test
private MockRaftActorContext createActorContextWithFollower() {
MockRaftActorContext actorContext = createActorContext();
- actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
- followerActor.path().toString()).build());
+ actorContext.setPeerAddresses(Map.of(FOLLOWER_ID, followerActor.path().toString()));
return actorContext;
}
DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
followerConfig.setElectionTimeoutFactor(10000);
followerActorContext.setConfigParams(followerConfig);
- followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
+ followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString()));
return followerActorContext;
}
@Test
- public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
+ public void testLeaderCreatedWithCommitIndexLessThanLastIndex() {
logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
final MockRaftActorContext leaderActorContext = createActorContextWithFollower();
}
@Test
- public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
+ public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() {
logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
final MockRaftActorContext leaderActorContext = createActorContext();
MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
- followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
+ followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString()));
Follower follower = new Follower(followerActorContext);
followerActor.underlyingActor().setBehavior(follower);
}
@Test
- public void testHandleAppendEntriesReplySuccess() throws Exception {
+ public void testHandleAppendEntriesReplySuccess() {
logStart("testHandleAppendEntriesReplySuccess");
MockRaftActorContext leaderActorContext = createActorContextWithFollower();
FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
- assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
+ assertEquals(RaftVersions.FLUORINE_VERSION, followerInfo.getRaftVersion());
AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
MockRaftActorContext leaderActorContext = createActorContextWithFollower();
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
new FiniteDuration(1000, TimeUnit.SECONDS));
- ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
+ // Note: the size here depends on estimate
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246);
leaderActorContext.setReplicatedLog(
new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
// kill 1 follower and verify if that got killed
- final JavaTestKit probe = new JavaTestKit(getSystem());
+ final TestKit probe = new TestKit(getSystem());
probe.watch(followerActor1);
followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
}
@Test
- public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+ public void testIsolatedLeaderCheckTwoFollowers() {
logStart("testIsolatedLeaderCheckTwoFollowers");
RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
}
@Test
- public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
+ public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() {
logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
}
@Test
- public void testLaggingFollowerStarvation() throws Exception {
+ public void testLaggingFollowerStarvation() {
logStart("testLaggingFollowerStarvation");
String leaderActorId = actorFactory.generateActorId("leader");
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
- doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
+ doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
MessageCollectorActor.clearMessages(followerActor);
RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
- doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
+ doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
MessageCollectorActor.clearMessages(followerActor);
RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
- doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
+ doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
- Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
+ List.of(new SimpleReplicatedLogEntry(0, 1,
new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
final MockRaftActorContext.MockPayload largePayload =
new MockRaftActorContext.MockPayload("large", serializedSize);
MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
}
+ @Test
+ public void testLeaderAddressInAppendEntries() {
+ logStart("testLeaderAddressInAppendEntries");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ FiniteDuration.create(50, TimeUnit.MILLISECONDS));
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
+ peerId -> leaderActor.path().toString());
+
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Initial heartbeat shouldn't have the leader address
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertNull(appendEntries.leaderAddress());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send AppendEntriesReply indicating the follower needs the leader address
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
+ RaftVersions.CURRENT_VERSION));
+
+ // Sleep for the heartbeat interval so AppendEntries is sent.
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals(leaderActor.path().toString(), appendEntries.leaderAddress());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send AppendEntriesReply indicating the follower does not need the leader address
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
+ RaftVersions.CURRENT_VERSION));
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertNull(appendEntries.leaderAddress());
+ }
+
@Override
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
final ActorRef actorRef, final RaftRPC rpc) {
assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
}
- private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
+ private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
private final long electionTimeOutIntervalMillis;
private final int snapshotChunkSize;