import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
-import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.SerializationUtils;
import org.junit.After;
import org.junit.Test;
+import org.opendaylight.controller.cluster.messaging.MessageSlice;
+import org.opendaylight.controller.cluster.messaging.MessageSliceReply;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.yangtools.concepts.Identifier;
@Override
@After
- public void tearDown() throws Exception {
+ public void tearDown() {
if (leader != null) {
leader.close();
}
}
- private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
+ private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
return sendReplicate(actorContext, 1, index);
}
private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) {
- MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+ return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo"));
+ }
+
+ private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) {
SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
actorContext.getReplicatedLog().append(newEntry);
return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
leader.markFollowerActive(FOLLOWER_ID);
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+ leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
-1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// set the snapshot as absent and check if capture-snapshot is invoked.
- leader.setSnapshot(null);
+ leader.setSnapshotHolder(null);
// new entry
SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
actorContext.getReplicatedLog().removeFrom(0);
AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
- actorContext.setCreateSnapshotProcedure(out -> installSnapshotStream.set(out));
+ actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
leader = new Leader(actorContext);
actorContext.setCurrentBehavior(leader);
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// set the snapshot as absent and check if capture-snapshot is invoked.
- leader.setSnapshot(null);
+ leader.setSnapshotHolder(null);
for (int i = 0; i < 4; i++) {
actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1,
actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
+ leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()),
Collections.<ReplicatedLogEntry>emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
-1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
}
@Override
- protected MockRaftActorContext createActorContext(ActorRef actorRef) {
+ protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
return createActorContext(LEADER_ID, actorRef);
}
- private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
+ private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
configParams.setElectionTimeoutFactor(100000);
assertTrue("Expected Leader", newBehavior instanceof Leader);
}
- private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
+ private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader);
// kill 1 follower and verify if that got killed
- final JavaTestKit probe = new JavaTestKit(getSystem());
+ final TestKit probe = new TestKit(getSystem());
probe.watch(followerActor1);
followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
leaderActorContext.setLastApplied(-1);
String nonVotingFollowerId = "nonvoting-follower";
- TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
- Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
+ ActorRef nonVotingFollowerActor = actorFactory.createActor(
+ MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId));
leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(),
VotingState.NON_VOTING);
MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
}
+ @Test
+ public void testReplicationWithPayloadSizeThatExceedsThreshold() {
+ logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
+
+ final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
+ Arrays.asList(new SimpleReplicatedLogEntry(0, 1,
+ new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
+ final MockRaftActorContext.MockPayload largePayload =
+ new MockRaftActorContext.MockPayload("large", serializedSize);
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(300, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Send initial heartbeat reply so follower is marked active
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send normal payload first to prime commit index.
+ final long term = leaderActorContext.getTermInformation().getCurrentTerm();
+ sendReplicate(leaderActorContext, term, 0);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
+
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
+ assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
+ sendReplicate(leaderActorContext, term, 1, largePayload);
+
+ MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+ assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
+ assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
+
+ final Identifier slicingId = messageSlice.getIdentifier();
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
+
+ // Sleep for the heartbeat interval so AppendEntries is sent.
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
+
+ leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
+ messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+ assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
+
+ leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
+
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
+
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send another normal payload.
+
+ sendReplicate(leaderActorContext, term, 2);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
+ }
+
+ @Test
+ public void testLargePayloadSlicingExpiration() {
+ logStart("testLargePayloadSlicingExpiration");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ final long term = leaderActorContext.getTermInformation().getCurrentTerm();
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Send initial heartbeat reply so follower is marked active
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
+ leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
+ MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+
+ // Sleep for at least 3 * election timeout so the slicing state expires.
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+
+ MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send an AppendEntriesReply - this should restart the slicing.
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
+
+ MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+ }
+
@Override
- protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
- ActorRef actorRef, RaftRPC rpc) throws Exception {
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
+ final ActorRef actorRef, final RaftRPC rpc) {
super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
}
private final long electionTimeOutIntervalMillis;
private final int snapshotChunkSize;
- MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
- super();
+ MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
this.snapshotChunkSize = snapshotChunkSize;
}