X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=8ec50754e8ec35a83d27255936178d66462e2db4;hb=b8da9f6fa8bf4284805349f4638ebdadf169ff5f;hp=da0493fe225137469a524e004204de257ff2aed3;hpb=fa96da71c5ab10973a9f93c2e8b35494b96fd7ed;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index da0493fe22..8ec50754e8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -82,6 +82,7 @@ import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; @@ -145,8 +146,7 @@ public class RaftActorTest extends AbstractActorTest { kit.watch(followerActor); List snapshotUnappliedEntries = new ArrayList<>(); - ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, - new MockRaftActorContext.MockPayload("E")); + ReplicatedLogEntry entry1 = new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("E")); snapshotUnappliedEntries.add(entry1); int lastAppliedDuringSnapshotCapture = 3; @@ -166,12 +166,9 @@ public class RaftActorTest extends AbstractActorTest { // add more entries after snapshot is taken List entries = new ArrayList<>(); - ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, - new MockRaftActorContext.MockPayload("F", 2)); - ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6, - new MockRaftActorContext.MockPayload("G", 3)); - ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7, - new MockRaftActorContext.MockPayload("H", 4)); + ReplicatedLogEntry entry2 = new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("F", 2)); + ReplicatedLogEntry entry3 = new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("G", 3)); + ReplicatedLogEntry entry4 = new SimpleReplicatedLogEntry(7, 1, new MockRaftActorContext.MockPayload("H", 4)); entries.add(entry2); entries.add(entry3); entries.add(entry4); @@ -299,8 +296,7 @@ public class RaftActorTest extends AbstractActorTest { SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot); mockRaftActor.handleRecover(snapshotOffer); - MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, - 1, new MockRaftActorContext.MockPayload("1", 5)); + ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("1", 5)); mockRaftActor.handleRecover(logEntry); ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2); @@ -417,8 +413,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.waitForInitializeBehaviorComplete(); - ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5, - new MockRaftActorContext.MockPayload("F")); + ReplicatedLogEntry entry = new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("F")); final Identifier id = new MockIdentifier("apply-state"); mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry)); @@ -595,7 +590,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(8, leaderActor.getReplicatedLog().size()); leaderActor.getRaftActorContext().getSnapshotManager().capture( - new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("x")), 4); + new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("x")), 4); verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class)); @@ -637,7 +632,7 @@ public class RaftActorTest extends AbstractActorTest { // add another non-replicated entry leaderActor.getReplicatedLog().append( - new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8"))); + new SimpleReplicatedLogEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8"))); //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0)); @@ -687,30 +682,23 @@ public class RaftActorTest extends AbstractActorTest { //snapshot on 4 followerActor.getRaftActorContext().getSnapshotManager().capture( - new MockRaftActorContext.MockReplicatedLogEntry(1, 5, - new MockRaftActorContext.MockPayload("D")), 4); + new SimpleReplicatedLogEntry(5, 1, new MockRaftActorContext.MockPayload("D")), 4); verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class)); assertEquals(6, followerActor.getReplicatedLog().size()); //fake snapshot on index 6 - List entries = - Arrays.asList( - (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6, - new MockRaftActorContext.MockPayload("foo-6")) - ); + List entries = Arrays.asList( + (ReplicatedLogEntry) new SimpleReplicatedLogEntry(6, 1, new MockRaftActorContext.MockPayload("foo-6"))); followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0)); assertEquals(7, followerActor.getReplicatedLog().size()); //fake snapshot on index 7 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); - entries = - Arrays.asList( - (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7, - new MockRaftActorContext.MockPayload("foo-7")) - ); + entries = Arrays.asList((ReplicatedLogEntry) new SimpleReplicatedLogEntry(7, 1, + new MockRaftActorContext.MockPayload("foo-7"))); followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0)); assertEquals(8, followerActor.getReplicatedLog().size()); @@ -733,11 +721,8 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log assertEquals(7, followerActor.getReplicatedLog().lastIndex()); - entries = - Arrays.asList( - (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8, - new MockRaftActorContext.MockPayload("foo-7")) - ); + entries = Arrays.asList((ReplicatedLogEntry) new SimpleReplicatedLogEntry(8, 1, + new MockRaftActorContext.MockPayload("foo-7"))); // send an additional entry 8 with leaderCommit = 7 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0)); @@ -850,7 +835,7 @@ public class RaftActorTest extends AbstractActorTest { leaderActor.waitForInitializeBehaviorComplete(); for (int i = 0; i < 4; i++) { - leaderActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, i, + leaderActor.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1, new MockRaftActorContext.MockPayload("A"))); } @@ -900,7 +885,7 @@ public class RaftActorTest extends AbstractActorTest { // Persist another entry (this will cause a CaptureSnapshot to be triggered leaderActor.persistData(mockActorRef, new MockIdentifier("x"), - new MockRaftActorContext.MockPayload("duh")); + new MockRaftActorContext.MockPayload("duh"), false); // Now send a CaptureSnapshotReply mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); @@ -1033,12 +1018,12 @@ public class RaftActorTest extends AbstractActorTest { long term = 3; long seqN = 1; InMemoryJournal.addEntry(persistenceId, seqN++, new UpdateElectionTerm(term, "member-1")); - InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 0, + InMemoryJournal.addEntry(persistenceId, seqN++, new SimpleReplicatedLogEntry(0, term, new MockRaftActorContext.MockPayload("A"))); - InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 1, + InMemoryJournal.addEntry(persistenceId, seqN++, new SimpleReplicatedLogEntry(1, term, new MockRaftActorContext.MockPayload("B"))); InMemoryJournal.addEntry(persistenceId, seqN++, new ApplyJournalEntries(1)); - InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 2, + InMemoryJournal.addEntry(persistenceId, seqN++, new SimpleReplicatedLogEntry(2, term, new MockRaftActorContext.MockPayload("C"))); TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, @@ -1116,8 +1101,7 @@ public class RaftActorTest extends AbstractActorTest { config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); List snapshotUnappliedEntries = new ArrayList<>(); - snapshotUnappliedEntries.add(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, - new MockRaftActorContext.MockPayload("E"))); + snapshotUnappliedEntries.add(new SimpleReplicatedLogEntry(4, 1, new MockRaftActorContext.MockPayload("E"))); int snapshotLastApplied = 3; int snapshotLastIndex = 4; @@ -1198,7 +1182,7 @@ public class RaftActorTest extends AbstractActorTest { Snapshot snapshot = Snapshot.create(fromObject(state).toByteArray(), Arrays.asList(), 5, 2, 5, 2, 2, "member-1"); - InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0, + InMemoryJournal.addEntry(persistenceId, 1, new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("B"))); TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId) @@ -1231,7 +1215,7 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(FiniteDuration.create(1, TimeUnit.MILLISECONDS)); String persistenceId = factory.generateActorId("test-actor-"); - InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0, + InMemoryJournal.addEntry(persistenceId, 1, new SimpleReplicatedLogEntry(0, 1, new ServerConfigurationPayload(Arrays.asList(new ServerInfo(persistenceId, false))))); TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId) @@ -1310,7 +1294,8 @@ public class RaftActorTest extends AbstractActorTest { Leader leader = new Leader(leaderActor.getRaftActorContext()); leaderActor.setCurrentBehavior(leader); - leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1")); + leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"), + false); ReplicatedLogEntry logEntry = leaderActor.getReplicatedLog().get(0); assertNotNull("ReplicatedLogEntry not found", logEntry); @@ -1328,4 +1313,42 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("getCommitIndex", 0, leaderActor.getRaftActorContext().getCommitIndex()); assertEquals("getLastApplied", 0, leaderActor.getRaftActorContext().getLastApplied()); } + + @Test + public void testReplicateWithBatchHint() throws Exception { + final String leaderId = factory.generateActorId("leader-"); + final String followerId = factory.generateActorId("follower-"); + + final ActorRef followerActor = factory.createActor(Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + TestActorRef leaderActorRef = factory.createTestActor( + MockRaftActor.props(leaderId, ImmutableMap.of(followerId, followerActor.path().toString()), config), + leaderId); + MockRaftActor leaderActor = leaderActorRef.underlyingActor(); + leaderActor.waitForInitializeBehaviorComplete(); + + leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId); + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + MessageCollectorActor.clearMessages(followerActor); + + leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, -1, -1, (short)0)); + + leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockPayload("1"), true); + MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500); + + leaderActor.persistData(leaderActorRef, new MockIdentifier("2"), new MockPayload("2"), true); + MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500); + + leaderActor.persistData(leaderActorRef, new MockIdentifier("3"), new MockPayload("3"), false); + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("AppendEntries size", 3, appendEntries.getEntries().size()); + } }