X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=d9a5487e556171813bf47d49af2a18a85b0b165c;hb=HEAD;hp=f847dbce9e860159bbd131a3dc88f39f97486ed1;hpb=86e8e4a06b682aa772c834a2cef56d0596540e1b;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index f847dbce9e..0f16f92c49 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -25,18 +25,16 @@ import akka.actor.Terminated; import akka.protobuf.ByteString; import akka.testkit.TestActorRef; import akka.testkit.javadsl.TestKit; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.io.OutputStream; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.SerializationUtils; @@ -170,9 +168,8 @@ public class LeaderTest extends AbstractLeaderTest { private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index, final Payload payload) { - SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload); - actorContext.getReplicatedLog().append(newEntry); - return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true)); + actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(index, term, payload)); + return leader.handleMessage(leaderActor, new Replicate(index, true, null, null)); } @Test @@ -403,7 +400,7 @@ public class LeaderTest extends AbstractLeaderTest { final int messageNr) { final AppendEntries commitReq = allMessages.get(2 * messageNr + 1); assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit()); - assertEquals(ImmutableList.of(), commitReq.getEntries()); + assertEquals(List.of(), commitReq.getEntries()); } private static void assertRequestEntry(final long lastIndex, final List allMessages, @@ -548,16 +545,14 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setLastApplied(0); - long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1; - long term = actorContext.getTermInformation().getCurrentTerm(); - ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry( - newLogIndex, term, new MockRaftActorContext.MockPayload("foo")); + final long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1; + final long term = actorContext.getTermInformation().getCurrentTerm(); + final var data = new MockRaftActorContext.MockPayload("foo"); - actorContext.getReplicatedLog().append(newEntry); + actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(newLogIndex, term, data)); final Identifier id = new MockIdentifier("state-id"); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new Replicate(leaderActor, id, newEntry, true)); + final var raftBehavior = leader.handleMessage(leaderActor, new Replicate(newLogIndex, true, leaderActor, id)); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -566,8 +561,7 @@ public class LeaderTest extends AbstractLeaderTest { // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous // one since lastApplied state is 0. - List applyStateList = MessageCollectorActor.getAllMatching( - leaderActor, ApplyState.class); + final var applyStateList = MessageCollectorActor.getAllMatching(leaderActor, ApplyState.class); assertEquals("ApplyState count", newLogIndex, applyStateList.size()); for (int i = 0; i <= newLogIndex - 1; i++) { @@ -577,7 +571,7 @@ public class LeaderTest extends AbstractLeaderTest { } ApplyState last = applyStateList.get((int) newLogIndex - 1); - assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData()); + assertEquals("getData", data, last.getReplicatedLogEntry().getData()); assertEquals("getIdentifier", id, last.getIdentifier()); } @@ -587,11 +581,6 @@ public class LeaderTest extends AbstractLeaderTest { final MockRaftActorContext actorContext = createActorContextWithFollower(); - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); - //clears leaders log actorContext.getReplicatedLog().removeFrom(0); @@ -614,12 +603,12 @@ public class LeaderTest extends AbstractLeaderTest { //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); - ByteString bs = toByteString(leadersSnapshot); + ByteString bs = toByteString(Map.of("1", "A", "2", "B", "3", "C")); leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), - Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null), ByteSource.wrap(bs.toByteArray()))); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( - actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); + actorContext.getConfigParams().getMaximumMessageSliceSize(), leader.logName()); fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray())); leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts); @@ -677,18 +666,15 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // new entry - SimpleReplicatedLogEntry entry = - new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); - - actorContext.getReplicatedLog().append(entry); + actorContext.getReplicatedLog().append( + new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D"))); //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex RaftActorBehavior raftBehavior = leader.handleMessage( - leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); + leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id"))); assertTrue(raftBehavior instanceof Leader); @@ -725,15 +711,13 @@ public class LeaderTest extends AbstractLeaderTest { leader.setSnapshotHolder(null); // new entry - SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); - - actorContext.getReplicatedLog().append(entry); + actorContext.getReplicatedLog().append( + new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D"))); //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); + leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id"))); assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); @@ -745,7 +729,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(2, cs.getLastTerm()); // if an initiate is started again when first is in progress, it shouldnt initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); + leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id"))); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @@ -788,10 +772,8 @@ public class LeaderTest extends AbstractLeaderTest { } // new entry - SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); - - actorContext.getReplicatedLog().append(entry); + actorContext.getReplicatedLog().append( + new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D"))); //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); @@ -815,7 +797,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.clearMessages(followerActor); // Sending Replicate message should not initiate another capture since the first is in progress. - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); + leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id"))); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture. @@ -825,7 +807,7 @@ public class LeaderTest extends AbstractLeaderTest { // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent. final byte[] bytes = new byte[]{1, 2, 3}; - installSnapshotStream.get().get().write(bytes); + installSnapshotStream.get().orElseThrow().write(bytes); actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(), Runtime.getRuntime().totalMemory()); MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -873,7 +855,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.getFollower(FOLLOWER_ID).setNextIndex(0); byte[] bytes = toByteString(leadersSnapshot).toByteArray(); - Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.emptyList(), + Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(), lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null); RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, @@ -925,7 +907,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.getFollower(FOLLOWER_ID).setNextIndex(-1); byte[] bytes = toByteString(leadersSnapshot).toByteArray(); - Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.emptyList(), + Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(), lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null); RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, @@ -980,10 +962,10 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), - Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null), ByteSource.wrap(bs.toByteArray()))); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( - actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); + actorContext.getConfigParams().getMaximumMessageSliceSize(), leader.logName()); fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray())); leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts); while (!fts.isLastChunk(fts.getChunkIndex())) { @@ -1021,7 +1003,7 @@ public class LeaderTest extends AbstractLeaderTest { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() { @Override - public int getSnapshotChunkSize() { + public int getMaximumMessageSliceSize() { return 50; } }; @@ -1049,8 +1031,7 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), - Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, - -1, null, null); + List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null); leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); @@ -1099,7 +1080,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setConfigParams(new DefaultConfigParamsImpl() { @Override - public int getSnapshotChunkSize() { + public int getMaximumMessageSliceSize() { return 50; } }); @@ -1123,8 +1104,7 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), - Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, - -1, null, null); + List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null); Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); @@ -1164,7 +1144,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setConfigParams(new DefaultConfigParamsImpl() { @Override - public int getSnapshotChunkSize() { + public int getMaximumMessageSliceSize() { return 50; } }); @@ -1188,8 +1168,7 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), - Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, - -1, null, null); + List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null); leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); @@ -1198,8 +1177,8 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); - assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE, - installSnapshot.getLastChunkHashCode().getAsInt()); + assertEquals(OptionalInt.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE), + installSnapshot.getLastChunkHashCode()); final int hashCode = Arrays.hashCode(installSnapshot.getData()); @@ -1212,7 +1191,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(2, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); - assertEquals(hashCode, installSnapshot.getLastChunkHashCode().getAsInt()); + assertEquals(OptionalInt.of(hashCode), installSnapshot.getLastChunkHashCode()); } @Test @@ -1282,8 +1261,7 @@ public class LeaderTest extends AbstractLeaderTest { private MockRaftActorContext createActorContextWithFollower() { MockRaftActorContext actorContext = createActorContext(); - actorContext.setPeerAddresses(ImmutableMap.builder().put(FOLLOWER_ID, - followerActor.path().toString()).build()); + actorContext.setPeerAddresses(Map.of(FOLLOWER_ID, followerActor.path().toString())); return actorContext; } @@ -1292,7 +1270,7 @@ public class LeaderTest extends AbstractLeaderTest { DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl(); followerConfig.setElectionTimeoutFactor(10000); followerActorContext.setConfigParams(followerConfig); - followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); + followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString())); return followerActorContext; } @@ -1358,7 +1336,7 @@ public class LeaderTest extends AbstractLeaderTest { final MockRaftActorContext leaderActorContext = createActorContext(); MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); - followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); + followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString())); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); @@ -1739,7 +1717,7 @@ public class LeaderTest extends AbstractLeaderTest { FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); assertEquals(payloadVersion, leader.getLeaderPayloadVersion()); - assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion()); + assertEquals(RaftVersions.FLUORINE_VERSION, followerInfo.getRaftVersion()); AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion); @@ -1794,7 +1772,7 @@ public class LeaderTest extends AbstractLeaderTest { ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( new FiniteDuration(1000, TimeUnit.SECONDS)); // Note: the size here depends on estimate - ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(246); leaderActorContext.setReplicatedLog( new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build()); @@ -2269,7 +2247,7 @@ public class LeaderTest extends AbstractLeaderTest { logStart("testReplicationWithPayloadSizeThatExceedsThreshold"); final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1, - Arrays.asList(new SimpleReplicatedLogEntry(0, 1, + List.of(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length; final MockRaftActorContext.MockPayload largePayload = new MockRaftActorContext.MockPayload("large", serializedSize); @@ -2277,7 +2255,7 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext leaderActorContext = createActorContextWithFollower(); ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( new FiniteDuration(300, TimeUnit.MILLISECONDS)); - ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(serializedSize - 50); leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leaderActorContext.setCommitIndex(-1); leaderActorContext.setLastApplied(-1); @@ -2361,7 +2339,7 @@ public class LeaderTest extends AbstractLeaderTest { ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( new FiniteDuration(100, TimeUnit.MILLISECONDS)); ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1); - ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(10); leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leaderActorContext.setCommitIndex(-1); leaderActorContext.setLastApplied(-1); @@ -2376,7 +2354,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.clearMessages(followerActor); sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large", - leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1)); + leaderActorContext.getConfigParams().getMaximumMessageSliceSize() + 1)); MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); // Sleep for at least 3 * election timeout so the slicing state expires. @@ -2423,7 +2401,7 @@ public class LeaderTest extends AbstractLeaderTest { // Initial heartbeat shouldn't have the leader address AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - assertFalse(appendEntries.getLeaderAddress().isPresent()); + assertNull(appendEntries.leaderAddress()); MessageCollectorActor.clearMessages(followerActor); // Send AppendEntriesReply indicating the follower needs the leader address @@ -2438,8 +2416,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - assertTrue(appendEntries.getLeaderAddress().isPresent()); - assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get()); + assertEquals(leaderActor.path().toString(), appendEntries.leaderAddress()); MessageCollectorActor.clearMessages(followerActor); // Send AppendEntriesReply indicating the follower does not need the leader address @@ -2453,7 +2430,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - assertFalse(appendEntries.getLeaderAddress().isPresent()); + assertNull(appendEntries.leaderAddress()); } @Override @@ -2466,11 +2443,11 @@ public class LeaderTest extends AbstractLeaderTest { private static class MockConfigParamsImpl extends DefaultConfigParamsImpl { private final long electionTimeOutIntervalMillis; - private final int snapshotChunkSize; + private final int maximumMessageSliceSize; - MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) { + MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int maximumMessageSliceSize) { this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis; - this.snapshotChunkSize = snapshotChunkSize; + this.maximumMessageSliceSize = maximumMessageSliceSize; } @Override @@ -2479,8 +2456,8 @@ public class LeaderTest extends AbstractLeaderTest { } @Override - public int getSnapshotChunkSize() { - return snapshotChunkSize; + public int getMaximumMessageSliceSize() { + return maximumMessageSliceSize; } } }