* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft.behaviors;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
-import akka.actor.Props;
import akka.dispatch.Dispatchers;
-import akka.testkit.JavaTestKit;
+import akka.protobuf.ByteString;
import akka.testkit.TestActorRef;
-import com.google.common.base.Optional;
+import akka.testkit.javadsl.TestKit;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
-import com.google.protobuf.ByteString;
import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActor;
import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.NoopPeerAddressResolver;
+import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
- private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
- Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
+ private final ActorRef followerActor = actorFactory.createActor(
+ MessageCollectorActor.props(), actorFactory.generateActorId("follower"));
- private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
- Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
+ private final ActorRef leaderActor = actorFactory.createActor(
+ MessageCollectorActor.props(), actorFactory.generateActorId("leader"));
private Follower follower;
protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
context.setPayloadVersion(payloadVersion);
+ ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(
+ peerId -> leaderActor.path().toString());
return context;
}
Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
.getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
- follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(),
-1, -1, (short) 1));
Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
.getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
- follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(),
-1, -1, (short) 1));
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
context.getReplicatedLog().setSnapshotIndex(99);
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
- Assert.assertEquals(1, context.getReplicatedLog().size());
+ assertEquals(1, context.getReplicatedLog().size());
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
MockRaftActorContext context = createActorContext();
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
@Test
public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
- logStart("testHandleFirstAppendEntries");
+ logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog");
MockRaftActorContext context = createActorContext();
context.getReplicatedLog().clear(0,2);
context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
context.getReplicatedLog().setSnapshotIndex(99);
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
@Test
public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
- logStart("testHandleFirstAppendEntries");
+ logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot");
MockRaftActorContext context = createActorContext();
context.getReplicatedLog().clear(0,2);
context.getReplicatedLog().setSnapshotIndex(100);
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
context.getReplicatedLog().clear(0,2);
context.getReplicatedLog().setSnapshotIndex(100);
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 105, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 105, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
MockRaftActorContext context = createActorContext();
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
assertFalse(syncStatus.isInitialSyncDone());
// Clear all the messages
- followerActor.underlyingActor().clear();
+ MessageCollectorActor.clearMessages(followerActor);
context.setLastApplied(101);
context.setCommitIndex(101);
setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
- entries = Arrays.asList(newReplicatedLogEntry(2, 101, "foo"));
+ entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
assertTrue(syncStatus.isInitialSyncDone());
- followerActor.underlyingActor().clear();
+ MessageCollectorActor.clearMessages(followerActor);
// Sending the same message again should not generate another message
MockRaftActorContext context = createActorContext();
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
assertFalse(syncStatus.isInitialSyncDone());
// Clear all the messages
- followerActor.underlyingActor().clear();
+ MessageCollectorActor.clearMessages(followerActor);
context.setLastApplied(100);
setLastLogEntry(context, 1, 100,
new MockRaftActorContext.MockPayload(""));
- entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// leader-2 is becoming the leader now and it says the commitIndex is 45
appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
MockRaftActorContext context = createActorContext();
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
assertFalse(syncStatus.isInitialSyncDone());
// Clear all the messages
- followerActor.underlyingActor().clear();
+ MessageCollectorActor.clearMessages(followerActor);
context.setLastApplied(101);
context.setCommitIndex(101);
setLastLogEntry(context, 1, 101,
new MockRaftActorContext.MockPayload(""));
- entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
assertTrue(syncStatus.isInitialSyncDone());
// Clear all the messages
- followerActor.underlyingActor().clear();
+ MessageCollectorActor.clearMessages(followerActor);
context.setLastApplied(100);
setLastLogEntry(context, 1, 100,
new MockRaftActorContext.MockPayload(""));
- entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// leader-2 is becoming the leader now and it says the commitIndex is 45
appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
new MockRaftActorContext.MockPayload(""));
context.getReplicatedLog().setSnapshotIndex(99);
- List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
}
/**
- * This test verifies that when an AppendEntries is received a specific prevLogTerm
+ * This test verifies that when an AppendEntries is received with a prevLogTerm
* which does not match the term that is in RaftActors log entry at prevLogIndex
* then the RaftActor does not change it's state and it returns a failure.
*/
MockRaftActorContext context = createActorContext();
- // First set the receivers term to lower number
- context.getTermInformation().update(95, "test");
-
- // AppendEntries is now sent with a bigger term
- // this will set the receivers term to be the same as the sender's term
- AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, Collections.emptyList(), 101, -1,
- (short)0);
+ AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, List.of(), 101, -1, (short)0);
follower = createBehavior(context);
RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
- Assert.assertSame(follower, newBehavior);
+ assertSame(follower, newBehavior);
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
AppendEntriesReply.class);
assertEquals("isSuccess", false, reply.isSuccess());
}
+ @Test
+ public void testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot() {
+ logStart("testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot");
+
+ MockRaftActorContext context = createActorContext();
+ context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(5, 8, 3).build());
+ context.getReplicatedLog().setSnapshotIndex(4);
+ context.getReplicatedLog().setSnapshotTerm(3);
+
+ AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, List.of(), 8, -1, (short)0);
+
+ follower = createBehavior(context);
+
+ RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
+
+ assertSame(follower, newBehavior);
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertEquals("isSuccess", true, reply.isSuccess());
+ }
+
/**
* This test verifies that when a new AppendEntries message is received with
* new entries and the logs of the sender and receiver match that the new
context.setReplicatedLog(log);
// Prepare the entries to be sent with AppendEntries
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- entries.add(newReplicatedLogEntry(1, 3, "three"));
- entries.add(newReplicatedLogEntry(1, 4, "four"));
+ List<ReplicatedLogEntry> entries = List.of(
+ newReplicatedLogEntry(1, 3, "three"), newReplicatedLogEntry(1, 4, "four"));
// Send appendEntries with the same term as was set on the receiver
// before the new behavior was created (1 in this case)
RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
- Assert.assertSame(follower, newBehavior);
+ assertSame(follower, newBehavior);
assertEquals("Next index", 5, log.last().getIndex() + 1);
assertEquals("Entry 3", entries.get(0), log.get(3));
context.setReplicatedLog(log);
// Prepare the entries to be sent with AppendEntries
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- entries.add(newReplicatedLogEntry(2, 2, "two-1"));
- entries.add(newReplicatedLogEntry(2, 3, "three"));
+ List<ReplicatedLogEntry> entries = List.of(
+ newReplicatedLogEntry(2, 2, "two-1"), newReplicatedLogEntry(2, 3, "three"));
// Send appendEntries with the same term as was set on the receiver
// before the new behavior was created (1 in this case)
RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
- Assert.assertSame(follower, newBehavior);
+ assertSame(follower, newBehavior);
// The entry at index 2 will be found out-of-sync with the leader
// and will be removed
context.setReplicatedLog(log);
// Prepare the entries to be sent with AppendEntries
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- entries.add(newReplicatedLogEntry(2, 2, "two-1"));
- entries.add(newReplicatedLogEntry(2, 3, "three"));
+ List<ReplicatedLogEntry> entries = List.of(
+ newReplicatedLogEntry(2, 2, "two-1"), newReplicatedLogEntry(2, 3, "three"));
// Send appendEntries with the same term as was set on the receiver
// before the new behavior was created (1 in this case)
RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
- Assert.assertSame(follower, newBehavior);
+ assertSame(follower, newBehavior);
expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
}
context.setReplicatedLog(log);
// Prepare the entries to be sent with AppendEntries
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- entries.add(newReplicatedLogEntry(1, 4, "four"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 4, "four"));
AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
- Assert.assertSame(follower, newBehavior);
+ assertSame(follower, newBehavior);
expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
}
context.setReplicatedLog(log);
// Send the last entry again.
- List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 1, "one"));
follower = createBehavior(context);
// Send the last entry again and also a new one.
- entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
+ entries = List.of(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
- leaderActor.underlyingActor().clear();
+ MessageCollectorActor.clearMessages(leaderActor);
follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
assertEquals("Next index", 3, log.last().getIndex() + 1);
context.setReplicatedLog(log);
// Prepare the entries to be sent with AppendEntries
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- entries.add(newReplicatedLogEntry(1, 4, "four"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 4, "four"));
AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
- Assert.assertSame(follower, newBehavior);
+ assertSame(follower, newBehavior);
expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
}
-
/**
* This test verifies that when InstallSnapshot is received by
* the follower its applied correctly.
snapshot.getLastAppliedIndex());
assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
- Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
+ assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
applySnapshot.getCallback().onSuccess();
// Send an append entry
AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
- Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
+ List.of(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
follower.handleMessage(leaderActor, appendEntries);
// Send appendEntries with a new term and leader.
AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
- Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
+ List.of(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
follower.handleMessage(leaderActor, appendEntries);
assertFalse(syncStatus.isInitialSyncDone());
// Clear all the messages
- followerActor.underlyingActor().clear();
+ MessageCollectorActor.clearMessages(followerActor);
context.setLastApplied(101);
context.setCommitIndex(101);
setLastLogEntry(context, 1, 101,
new MockRaftActorContext.MockPayload(""));
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
@Test
public void testFollowerSchedulesElectionIfNonVoting() {
MockRaftActorContext context = createActorContext();
- context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
+ context.updatePeerIds(new ServerConfigurationPayload(List.of(new ServerInfo(context.getId(), false))));
((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
- .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+ .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), id);
followerRaftActor.set(followerActorRef.underlyingActor());
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
- List<ReplicatedLogEntry> entries = Arrays.asList(
+ List<ReplicatedLogEntry> entries = List.of(
newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
- assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
+ assertEquals("Snapshot state", List.of(entries.get(0).getData(), entries.get(1).getData()),
MockRaftActor.fromState(snapshot.getState()));
}
final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
- .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+ .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), id);
followerRaftActor.set(followerActorRef.underlyingActor());
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
- List<ReplicatedLogEntry> entries = Arrays.asList(
+ List<ReplicatedLogEntry> entries = List.of(
newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
newReplicatedLogEntry(1, 2, "three"));
assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
- assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
+ assertEquals("Snapshot state", List.of(entries.get(0).getData(), entries.get(1).getData(),
entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
// Reinstate the actor from persistence
- actorFactory.killActor(followerActorRef, new JavaTestKit(getSystem()));
+ actorFactory.killActor(followerActorRef, new TestKit(getSystem()));
followerActorRef = actorFactory.createTestActor(builder.props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), id);
assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
- assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
+ assertEquals("State", List.of(entries.get(0).getData(), entries.get(1).getData(),
entries.get(2).getData()), followerRaftActor.get().getState());
}
final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
- .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+ .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), id);
followerRaftActor.set(followerActorRef.underlyingActor());
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
- List<ReplicatedLogEntry> entries = Arrays.asList(
+ List<ReplicatedLogEntry> entries = List.of(
newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
newReplicatedLogEntry(1, 2, "three"));
assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
- assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
+ assertEquals("Snapshot state", List.of(entries.get(0).getData()),
MockRaftActor.fromState(snapshot.getState()));
}
+ @Test
+ public void testNeedsLeaderAddress() {
+ logStart("testNeedsLeaderAddress");
+
+ MockRaftActorContext context = createActorContext();
+ context.setReplicatedLog(new MockRaftActorContext.SimpleReplicatedLog());
+ context.addToPeers("leader", null, VotingState.VOTING);
+ ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(NoopPeerAddressResolver.INSTANCE);
+
+ follower = createBehavior(context);
+
+ follower.handleMessage(leaderActor,
+ new AppendEntries(1, "leader", -1, -1, List.of(), -1, -1, (short)0));
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertTrue(reply.isNeedsLeaderAddress());
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ PeerAddressResolver mockResolver = mock(PeerAddressResolver.class);
+ ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(mockResolver);
+
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(), -1, -1,
+ (short)0, RaftVersions.CURRENT_VERSION, leaderActor.path().toString()));
+
+ reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertFalse(reply.isNeedsLeaderAddress());
+
+ verify(mockResolver).setResolved("leader", leaderActor.path().toString());
+ }
+
@SuppressWarnings("checkstyle:IllegalCatch")
private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
final AtomicReference<MockRaftActor> followerRaftActor) {
RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
@Override
- public void createSnapshot(final ActorRef actorRef,
- final java.util.Optional<OutputStream> installSnapshotStream) {
+ public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
try {
actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
installSnapshotStream), actorRef);
int size = chunkSize;
if (chunkSize > snapshotLength) {
size = snapshotLength;
- } else {
- if (start + chunkSize > snapshotLength) {
- size = snapshotLength - start;
- }
+ } else if (start + chunkSize > snapshotLength) {
+ size = snapshotLength - start;
}
byte[] nextChunk = new byte[size];
assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
+ assertEquals("isNeedsLeaderAddress", false, reply.isNeedsLeaderAddress());
}
}
private ByteString createSnapshot() {
- HashMap<String, String> followerSnapshot = new HashMap<>();
- followerSnapshot.put("1", "A");
- followerSnapshot.put("2", "B");
- followerSnapshot.put("3", "C");
-
- return toByteString(followerSnapshot);
+ return toByteString(Map.of("1", "A", "2", "B", "3", "C"));
}
@Override
}
@Override
- protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor) {
+ protected void handleAppendEntriesAddSameEntryToLogReply(final ActorRef replyActor) {
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
assertEquals("isSuccess", true, reply.isSuccess());
}