X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=9e354612616b7e39235e7335a7fe618ea7973243;hb=000960f6451af770f5463e41e1fb6defb6f3ab27;hp=d57c354e5a0fbf7f4dcdcca4bb3c246763d6db4a;hpb=79e6240ad565717e2fba62a339f11fcbd239f440;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index d57c354e5a..9e35461261 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -146,8 +146,7 @@ public class RaftActorTest extends AbstractActorTest { kit.watch(followerActor); List snapshotUnappliedEntries = new ArrayList<>(); - ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, - new MockRaftActorContext.MockPayload("E")); + ReplicatedLogEntry entry1 = new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("E")); snapshotUnappliedEntries.add(entry1); int lastAppliedDuringSnapshotCapture = 3; @@ -167,12 +166,9 @@ public class RaftActorTest extends AbstractActorTest { // add more entries after snapshot is taken List entries = new ArrayList<>(); - ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, - new MockRaftActorContext.MockPayload("F", 2)); - ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, - new MockRaftActorContext.MockPayload("G", 3)); - ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, - new MockRaftActorContext.MockPayload("H", 4)); + ReplicatedLogEntry entry2 = new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("F", 2)); + ReplicatedLogEntry entry3 = new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("G", 3)); + ReplicatedLogEntry entry4 = new SimpleReplicatedLogEntry(7, 1, new MockRaftActorContext.MockPayload("H", 4)); entries.add(entry2); entries.add(entry3); entries.add(entry4); @@ -300,8 +296,7 @@ public class RaftActorTest extends AbstractActorTest { SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot); mockRaftActor.handleRecover(snapshotOffer); - MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, - 1, new MockRaftActorContext.MockPayload("1", 5)); + ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5)); mockRaftActor.handleRecover(logEntry); ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2); @@ -418,8 +413,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.waitForInitializeBehaviorComplete(); - ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, - new MockRaftActorContext.MockPayload("F")); + ReplicatedLogEntry entry = new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("F")); final Identifier id = new MockIdentifier("apply-state"); mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry)); @@ -596,7 +590,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(8, leaderActor.getReplicatedLog().size()); leaderActor.getRaftActorContext().getSnapshotManager().capture( - new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("x")), 4); + new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("x")), 4); verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class)); @@ -688,30 +682,23 @@ public class RaftActorTest extends AbstractActorTest { //snapshot on 4 followerActor.getRaftActorContext().getSnapshotManager().capture( - new MockRaftActorContext.MockReplicatedLogEntry(1, 5, - new MockRaftActorContext.MockPayload("D")), 4); + new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("D")), 4); verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class)); assertEquals(6, followerActor.getReplicatedLog().size()); //fake snapshot on index 6 - List entries = - Arrays.asList( - (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6, - new MockRaftActorContext.MockPayload("foo-6")) - ); + List entries = Arrays.asList( + (ReplicatedLogEntry) new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("foo-6"))); followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0)); assertEquals(7, followerActor.getReplicatedLog().size()); //fake snapshot on index 7 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); - entries = - Arrays.asList( - (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7, - new MockRaftActorContext.MockPayload("foo-7")) - ); + entries = Arrays.asList((ReplicatedLogEntry) new SimpleReplicatedLogEntry(7, 1, + new MockRaftActorContext.MockPayload("foo-7"))); followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0)); assertEquals(8, followerActor.getReplicatedLog().size()); @@ -734,11 +721,8 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log assertEquals(7, followerActor.getReplicatedLog().lastIndex()); - entries = - Arrays.asList( - (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8, - new MockRaftActorContext.MockPayload("foo-7")) - ); + entries = Arrays.asList((ReplicatedLogEntry) new SimpleReplicatedLogEntry(8, 1, + new MockRaftActorContext.MockPayload("foo-7"))); // send an additional entry 8 with leaderCommit = 7 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0)); @@ -851,7 +835,7 @@ public class RaftActorTest extends AbstractActorTest { leaderActor.waitForInitializeBehaviorComplete(); for (int i = 0; i < 4; i++) { - leaderActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, i, + leaderActor.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1, new MockRaftActorContext.MockPayload("A"))); } @@ -901,7 +885,7 @@ public class RaftActorTest extends AbstractActorTest { // Persist another entry (this will cause a CaptureSnapshot to be triggered leaderActor.persistData(mockActorRef, new MockIdentifier("x"), - new MockRaftActorContext.MockPayload("duh")); + new MockRaftActorContext.MockPayload("duh"), false); // Now send a CaptureSnapshotReply mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); @@ -1034,12 +1018,12 @@ public class RaftActorTest extends AbstractActorTest { long term = 3; long seqN = 1; InMemoryJournal.addEntry(persistenceId, seqN++, new UpdateElectionTerm(term, "member-1")); - InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 0, + InMemoryJournal.addEntry(persistenceId, seqN++, new SimpleReplicatedLogEntry(0, term, new MockRaftActorContext.MockPayload("A"))); - InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 1, + InMemoryJournal.addEntry(persistenceId, seqN++, new SimpleReplicatedLogEntry(1, term, new MockRaftActorContext.MockPayload("B"))); InMemoryJournal.addEntry(persistenceId, seqN++, new ApplyJournalEntries(1)); - InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 2, + InMemoryJournal.addEntry(persistenceId, seqN++, new SimpleReplicatedLogEntry(2, term, new MockRaftActorContext.MockPayload("C"))); TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, @@ -1198,7 +1182,7 @@ public class RaftActorTest extends AbstractActorTest { Snapshot snapshot = Snapshot.create(fromObject(state).toByteArray(), Arrays.asList(), 5, 2, 5, 2, 2, "member-1"); - InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0, + InMemoryJournal.addEntry(persistenceId, 1, new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("B"))); TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId) @@ -1231,7 +1215,7 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(FiniteDuration.create(1, TimeUnit.MILLISECONDS)); String persistenceId = factory.generateActorId("test-actor-"); - InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0, + InMemoryJournal.addEntry(persistenceId, 1, new SimpleReplicatedLogEntry(0, 1, new ServerConfigurationPayload(Arrays.asList(new ServerInfo(persistenceId, false))))); TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId) @@ -1275,7 +1259,7 @@ public class RaftActorTest extends AbstractActorTest { MessageCollectorActor.clearMessages(notifierActor); - raftActorRef.tell(LeaderTransitioning.INSTANCE, ActorRef.noSender()); + raftActorRef.tell(new LeaderTransitioning("leader"), ActorRef.noSender()); leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId()); @@ -1310,7 +1294,8 @@ public class RaftActorTest extends AbstractActorTest { Leader leader = new Leader(leaderActor.getRaftActorContext()); leaderActor.setCurrentBehavior(leader); - leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1")); + leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"), + false); ReplicatedLogEntry logEntry = leaderActor.getReplicatedLog().get(0); assertNotNull("ReplicatedLogEntry not found", logEntry); @@ -1328,4 +1313,42 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("getCommitIndex", 0, leaderActor.getRaftActorContext().getCommitIndex()); assertEquals("getLastApplied", 0, leaderActor.getRaftActorContext().getLastApplied()); } + + @Test + public void testReplicateWithBatchHint() throws Exception { + final String leaderId = factory.generateActorId("leader-"); + final String followerId = factory.generateActorId("follower-"); + + final ActorRef followerActor = factory.createActor(Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + TestActorRef leaderActorRef = factory.createTestActor( + MockRaftActor.props(leaderId, ImmutableMap.of(followerId, followerActor.path().toString()), config), + leaderId); + MockRaftActor leaderActor = leaderActorRef.underlyingActor(); + leaderActor.waitForInitializeBehaviorComplete(); + + leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId); + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + MessageCollectorActor.clearMessages(followerActor); + + leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, -1, -1, (short)0)); + + leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockPayload("1"), true); + MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500); + + leaderActor.persistData(leaderActorRef, new MockIdentifier("2"), new MockPayload("2"), true); + MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500); + + leaderActor.persistData(leaderActorRef, new MockIdentifier("3"), new MockPayload("3"), false); + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("AppendEntries size", 3, appendEntries.getEntries().size()); + } }