*/
package org.opendaylight.controller.cluster.raft.behaviors;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import akka.testkit.TestActorRef;
import akka.testkit.javadsl.TestKit;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActor;
Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
.getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
- follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(),
-1, -1, (short) 1));
Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
.getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
- follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(),
-1, -1, (short) 1));
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
context.getReplicatedLog().setSnapshotIndex(99);
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
- Assert.assertEquals(1, context.getReplicatedLog().size());
+ assertEquals(1, context.getReplicatedLog().size());
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
MockRaftActorContext context = createActorContext();
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
context.getReplicatedLog().setSnapshotIndex(99);
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
context.getReplicatedLog().clear(0,2);
context.getReplicatedLog().setSnapshotIndex(100);
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
context.getReplicatedLog().clear(0,2);
context.getReplicatedLog().setSnapshotIndex(100);
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 105, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 105, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
MockRaftActorContext context = createActorContext();
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
context.setCommitIndex(101);
setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
- entries = Arrays.asList(newReplicatedLogEntry(2, 101, "foo"));
+ entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
MockRaftActorContext context = createActorContext();
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
setLastLogEntry(context, 1, 100,
new MockRaftActorContext.MockPayload(""));
- entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// leader-2 is becoming the leader now and it says the commitIndex is 45
appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
MockRaftActorContext context = createActorContext();
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
setLastLogEntry(context, 1, 101,
new MockRaftActorContext.MockPayload(""));
- entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
setLastLogEntry(context, 1, 100,
new MockRaftActorContext.MockPayload(""));
- entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// leader-2 is becoming the leader now and it says the commitIndex is 45
appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
new MockRaftActorContext.MockPayload(""));
context.getReplicatedLog().setSnapshotIndex(99);
- List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
MockRaftActorContext context = createActorContext();
- AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, Collections.emptyList(), 101, -1, (short)0);
+ AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, List.of(), 101, -1, (short)0);
follower = createBehavior(context);
RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
- Assert.assertSame(follower, newBehavior);
+ assertSame(follower, newBehavior);
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
AppendEntriesReply.class);
context.getReplicatedLog().setSnapshotIndex(4);
context.getReplicatedLog().setSnapshotTerm(3);
- AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, Collections.emptyList(), 8, -1, (short)0);
+ AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, List.of(), 8, -1, (short)0);
follower = createBehavior(context);
RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
- Assert.assertSame(follower, newBehavior);
+ assertSame(follower, newBehavior);
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
context.setReplicatedLog(log);
// Prepare the entries to be sent with AppendEntries
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- entries.add(newReplicatedLogEntry(1, 3, "three"));
- entries.add(newReplicatedLogEntry(1, 4, "four"));
+ List<ReplicatedLogEntry> entries = List.of(
+ newReplicatedLogEntry(1, 3, "three"), newReplicatedLogEntry(1, 4, "four"));
// Send appendEntries with the same term as was set on the receiver
// before the new behavior was created (1 in this case)
RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
- Assert.assertSame(follower, newBehavior);
+ assertSame(follower, newBehavior);
assertEquals("Next index", 5, log.last().getIndex() + 1);
assertEquals("Entry 3", entries.get(0), log.get(3));
context.setReplicatedLog(log);
// Prepare the entries to be sent with AppendEntries
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- entries.add(newReplicatedLogEntry(2, 2, "two-1"));
- entries.add(newReplicatedLogEntry(2, 3, "three"));
+ List<ReplicatedLogEntry> entries = List.of(
+ newReplicatedLogEntry(2, 2, "two-1"), newReplicatedLogEntry(2, 3, "three"));
// Send appendEntries with the same term as was set on the receiver
// before the new behavior was created (1 in this case)
RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
- Assert.assertSame(follower, newBehavior);
+ assertSame(follower, newBehavior);
// The entry at index 2 will be found out-of-sync with the leader
// and will be removed
context.setReplicatedLog(log);
// Prepare the entries to be sent with AppendEntries
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- entries.add(newReplicatedLogEntry(2, 2, "two-1"));
- entries.add(newReplicatedLogEntry(2, 3, "three"));
+ List<ReplicatedLogEntry> entries = List.of(
+ newReplicatedLogEntry(2, 2, "two-1"), newReplicatedLogEntry(2, 3, "three"));
// Send appendEntries with the same term as was set on the receiver
// before the new behavior was created (1 in this case)
RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
- Assert.assertSame(follower, newBehavior);
+ assertSame(follower, newBehavior);
expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
}
context.setReplicatedLog(log);
// Prepare the entries to be sent with AppendEntries
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- entries.add(newReplicatedLogEntry(1, 4, "four"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 4, "four"));
AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
- Assert.assertSame(follower, newBehavior);
+ assertSame(follower, newBehavior);
expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
}
context.setReplicatedLog(log);
// Send the last entry again.
- List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 1, "one"));
follower = createBehavior(context);
// Send the last entry again and also a new one.
- entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
+ entries = List.of(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
MessageCollectorActor.clearMessages(leaderActor);
follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
context.setReplicatedLog(log);
// Prepare the entries to be sent with AppendEntries
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- entries.add(newReplicatedLogEntry(1, 4, "four"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(1, 4, "four"));
AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
- Assert.assertSame(follower, newBehavior);
+ assertSame(follower, newBehavior);
expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
}
snapshot.getLastAppliedIndex());
assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
- Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
+ assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
applySnapshot.getCallback().onSuccess();
// Send an append entry
AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
- Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
+ List.of(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
follower.handleMessage(leaderActor, appendEntries);
// Send appendEntries with a new term and leader.
AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
- Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
+ List.of(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
follower.handleMessage(leaderActor, appendEntries);
setLastLogEntry(context, 1, 101,
new MockRaftActorContext.MockPayload(""));
- List<ReplicatedLogEntry> entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ List<ReplicatedLogEntry> entries = List.of(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
@Test
public void testFollowerSchedulesElectionIfNonVoting() {
MockRaftActorContext context = createActorContext();
- context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
+ context.updatePeerIds(new ServerConfigurationPayload(List.of(new ServerInfo(context.getId(), false))));
((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
- .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+ .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), id);
followerRaftActor.set(followerActorRef.underlyingActor());
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
- List<ReplicatedLogEntry> entries = Arrays.asList(
+ List<ReplicatedLogEntry> entries = List.of(
newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
AppendEntries appendEntries = new AppendEntries(1, "leader", -1, -1, entries, 1, -1, (short)0);
assertEquals("Snapshot getLastAppliedIndex", 1, snapshot.getLastAppliedIndex());
assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
- assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
+ assertEquals("Snapshot state", List.of(entries.get(0).getData(), entries.get(1).getData()),
MockRaftActor.fromState(snapshot.getState()));
}
final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
- .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+ .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), id);
followerRaftActor.set(followerActorRef.underlyingActor());
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
- List<ReplicatedLogEntry> entries = Arrays.asList(
+ List<ReplicatedLogEntry> entries = List.of(
newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
newReplicatedLogEntry(1, 2, "three"));
assertEquals("Snapshot getLastAppliedIndex", 2, snapshot.getLastAppliedIndex());
assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
- assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
+ assertEquals("Snapshot state", List.of(entries.get(0).getData(), entries.get(1).getData(),
entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
assertEquals("Last index", 2, followerRaftActor.get().getReplicatedLog().lastIndex());
assertEquals("Last applied index", 2, followerRaftActor.get().getRaftActorContext().getLastApplied());
assertEquals("Commit index", 2, followerRaftActor.get().getRaftActorContext().getCommitIndex());
- assertEquals("State", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
+ assertEquals("State", List.of(entries.get(0).getData(), entries.get(1).getData(),
entries.get(2).getData()), followerRaftActor.get().getState());
}
final AtomicReference<MockRaftActor> followerRaftActor = new AtomicReference<>();
RaftActorSnapshotCohort snapshotCohort = newRaftActorSnapshotCohort(followerRaftActor);
Builder builder = MockRaftActor.builder().persistent(Optional.of(true)).id(id)
- .peerAddresses(ImmutableMap.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
+ .peerAddresses(Map.of("leader", "")).config(config).snapshotCohort(snapshotCohort);
TestActorRef<MockRaftActor> followerActorRef = actorFactory.createTestActor(builder.props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), id);
followerRaftActor.set(followerActorRef.underlyingActor());
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
- List<ReplicatedLogEntry> entries = Arrays.asList(
+ List<ReplicatedLogEntry> entries = List.of(
newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
newReplicatedLogEntry(1, 2, "three"));
assertEquals("Snapshot getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
- assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
+ assertEquals("Snapshot state", List.of(entries.get(0).getData()),
MockRaftActor.fromState(snapshot.getState()));
}
follower = createBehavior(context);
follower.handleMessage(leaderActor,
- new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1, (short)0));
+ new AppendEntries(1, "leader", -1, -1, List.of(), -1, -1, (short)0));
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
assertTrue(reply.isNeedsLeaderAddress());
PeerAddressResolver mockResolver = mock(PeerAddressResolver.class);
((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(mockResolver);
- follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1,
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, List.of(), -1, -1,
(short)0, RaftVersions.CURRENT_VERSION, leaderActor.path().toString()));
reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
int size = chunkSize;
if (chunkSize > snapshotLength) {
size = snapshotLength;
- } else {
- if (start + chunkSize > snapshotLength) {
- size = snapshotLength - start;
- }
+ } else if (start + chunkSize > snapshotLength) {
+ size = snapshotLength - start;
}
byte[] nextChunk = new byte[size];
}
private ByteString createSnapshot() {
- HashMap<String, String> followerSnapshot = new HashMap<>();
- followerSnapshot.put("1", "A");
- followerSnapshot.put("2", "B");
- followerSnapshot.put("3", "C");
-
- return toByteString(followerSnapshot);
+ return toByteString(Map.of("1", "A", "2", "B", "3", "C"));
}
@Override