import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.io.ByteSource;
+import java.io.OutputStream;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.SerializationUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
-import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
// Leader should install snapshot - capture and verify ApplySnapshot contents
ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
- @SuppressWarnings("unchecked")
- List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
+ List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
newFollowerActorContext.getPeerIds());
- expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
- expectFirstMatching(followerActor, ApplyState.class);
-
assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
// Leader should install snapshot - capture and verify ApplySnapshot contents
ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
- @SuppressWarnings("unchecked")
- List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
+ List<Object> snapshotState = MockRaftActor.fromState(applySnapshot.getSnapshot().getState());
assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(
noLeaderActor.underlyingActor());
- ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
+ ReplicatedLogEntry serverConfigEntry = new SimpleReplicatedLogEntry(1, 1,
new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
assertEquals("Message handled", true, handled);
- ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
+ ReplicatedLogEntry nonServerConfigEntry = new SimpleReplicatedLogEntry(1, 1,
new MockRaftActorContext.MockPayload("1"));
handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
assertEquals("Message handled", false, handled);
}
@Test
- public void testRemoveServer() {
+ public void testRemoveServer() throws Exception {
LOG.info("testRemoveServer starting");
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
- final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
- final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
+ final String follower1ActorId = actorFactory.generateActorId(FOLLOWER_ID);
+ final String follower1ActorPath = actorFactory.createTestActorPath(follower1ActorId);
+ final String follower2ActorId = actorFactory.generateActorId(FOLLOWER_ID2);
+ final String follower2ActorPath = actorFactory.createTestActorPath(follower2ActorId);
RaftActorContext initialActorContext = new MockRaftActorContext();
- TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
- MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
+ final String downNodeId = "downNode";
+ TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(MockLeaderRaftActor.props(
+ ImmutableMap.of(FOLLOWER_ID, follower1ActorPath, FOLLOWER_ID2, follower2ActorPath, downNodeId, ""),
initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId(LEADER_ID));
final TestActorRef<MessageCollectorActor> leaderCollector =
newLeaderCollectorActor(leaderActor.underlyingActor());
- TestActorRef<MessageCollectorActor> collector = actorFactory.createTestActor(MessageCollectorActor.props()
- .withDispatcher(Dispatchers.DefaultDispatcherId()),
+ TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
actorFactory.generateActorId("collector"));
- actorFactory.createTestActor(
- CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
- configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()),
- followerActorId);
+ final TestActorRef<CollectingMockRaftActor> follower1Actor = actorFactory.createTestActor(
+ CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
+ FOLLOWER_ID2, follower2ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
+ follower1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
+
+ TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId("collector"));
+ final TestActorRef<CollectingMockRaftActor> follower2Actor = actorFactory.createTestActor(
+ CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
+ FOLLOWER_ID, follower1ActorPath, downNodeId, ""), configParams, NO_PERSISTENCE,
+ follower2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
+
+ leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
+ follower1Actor.underlyingActor().waitForInitializeBehaviorComplete();
+ follower2Actor.underlyingActor().waitForInitializeBehaviorComplete();
leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"),
RemoveServerReply.class);
assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
- final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
+ ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
- votingServer(LEADER_ID));
+ votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
+
+ applyState = MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
+ assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
+ verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
+ votingServer(LEADER_ID), votingServer(FOLLOWER_ID2), votingServer(downNodeId));
RaftActorBehavior currentBehavior = leaderActor.underlyingActor().getCurrentBehavior();
assertTrue("Expected Leader", currentBehavior instanceof Leader);
- assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
+ assertEquals("Follower ids size", 2, ((Leader)currentBehavior).getFollowerIds().size());
- MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
+ MessageCollectorActor.expectFirstMatching(follower1Collector, ServerRemoved.class);
LOG.info("testRemoveServer ending");
}
node1RaftActorRef.tell(changeServers, testKit.getRef());
ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
+ assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
// Send an AppendEntries so node1 has a leaderId
- MessageCollectorActor.clearMessages(node1Collector);
-
long term = node1RaftActor.getRaftActorContext().getTermInformation().getCurrentTerm();
node1RaftActorRef.tell(new AppendEntries(term, "downNode1", -1L, -1L,
Collections.<ReplicatedLogEntry>emptyList(), 0, -1, (short)1), ActorRef.noSender());
- // Wait for the ElectionTimeout to clear the leaderId. he leaderId must be null so on the
+ // Wait for the ElectionTimeout to clear the leaderId. The leaderId must be null so on the next
// ChangeServersVotingStatus message, it will try to elect a leader.
- MessageCollectorActor.expectFirstMatching(node1Collector, ElectionTimeout.class);
+ AbstractRaftActorIntegrationTest.verifyRaftState(node1RaftActorRef,
+ rs -> assertEquals("getLeader", null, rs.getLeader()));
// Update node2's peer address and send the message again
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
configParams.setElectionTimeoutFactor(100000);
- NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
+ NonPersistentDataProvider noPersistence = new NonPersistentDataProvider(Runnable::run);
ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
termInfo.update(1, LEADER_ID);
return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
- id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
+ id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams,
+ noPersistence, applyState -> actor.tell(applyState, actor), LOG);
}
abstract static class AbstractMockRaftActor extends MockRaftActor {
super(id, peerAddresses, config, persistent, collectorActor);
snapshotCohortDelegate = new RaftActorSnapshotCohort() {
@Override
- public void createSnapshot(ActorRef actorRef) {
- actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+ public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+ actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
}
@Override
- public void applySnapshot(byte[] snapshotBytes) {
+ public void applySnapshot(
+ org.opendaylight.controller.cluster.raft.persisted.Snapshot.State snapshotState) {
+ }
+
+ @Override
+ public org.opendaylight.controller.cluster.raft.persisted.Snapshot.State deserializeSnapshot(
+ ByteSource snapshotBytes) {
+ throw new UnsupportedOperationException();
}
};
}
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- public void createSnapshot(ActorRef actorRef) {
- try {
- actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
- } catch (Exception e) {
- LOG.error("createSnapshot failed", e);
+ public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+ MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
+ if (installSnapshotStream.isPresent()) {
+ SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
}
+
+ actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
}
static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {