import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
return true;
} else if(message instanceof ApplyState) {
return onApplyState((ApplyState) message, raftActor);
+ } else if(message instanceof SnapshotComplete) {
+ currentOperationState.onSnapshotComplete(raftActor);
+ return false;
} else {
return false;
}
void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply);
void onApplyState(RaftActor raftActor, ApplyState applyState);
+
+ void onSnapshotComplete(RaftActor raftActor);
}
/**
LOG.debug("onApplyState was called in state {}", this);
}
+ @Override
+ public void onSnapshotComplete(RaftActor raftActor) {
+ }
+
protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
Collection<PeerInfo> peers = raftContext.getPeers();
List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
AddServerContext getAddServerContext() {
return addServerContext;
}
+
+ Cancellable newInstallSnapshotTimer(RaftActor raftActor) {
+ return raftContext.getActorSystem().scheduler().scheduleOnce(
+ new FiniteDuration(((raftContext.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
+ TimeUnit.MILLISECONDS), raftContext.getActor(),
+ new FollowerCatchUpTimeout(addServerContext.getOperation().getNewServerId()),
+ raftContext.getActorSystem().dispatcher(), raftContext.getActor());
+ }
+
+ void handleOnFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
+ String serverId = followerTimeout.getNewServerId();
+
+ LOG.debug("{}: onFollowerCatchupTimeout for new server {}", raftContext.getId(), serverId);
+
+ // cleanup
+ raftContext.removePeer(serverId);
+
+ boolean isLeader = raftActor.isLeader();
+ if(isLeader) {
+ AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+ leader.removeFollower(serverId);
+ }
+
+ operationComplete(raftActor, getAddServerContext(),
+ isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
+ }
}
/**
leader.addFollower(addServer.getNewServerId());
if(votingState == VotingState.VOTING_NOT_INITIALIZED){
- LOG.debug("{}: Leader sending initiate capture snapshot to new follower {}", raftContext.getId(),
- addServer.getNewServerId());
-
- leader.initiateCaptureSnapshot(addServer.getNewServerId());
-
// schedule the install snapshot timeout timer
- Cancellable installSnapshotTimer = raftContext.getActorSystem().scheduler().scheduleOnce(
- new FiniteDuration(((raftContext.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
- TimeUnit.MILLISECONDS), raftContext.getActor(),
- new FollowerCatchUpTimeout(addServer.getNewServerId()),
- raftContext.getActorSystem().dispatcher(), raftContext.getActor());
-
- currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
+ Cancellable installSnapshotTimer = newInstallSnapshotTimer(raftActor);
+ if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
+ LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
+ addServer.getNewServerId());
+
+ currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
+ } else {
+ LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
+
+ currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
+ installSnapshotTimer);
+ }
} else {
LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
raftContext.getId());
@Override
public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
- String serverId = followerTimeout.getNewServerId();
-
- LOG.debug("{}: onFollowerCatchupTimeout: {}", raftContext.getId(), serverId);
-
- AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
-
- // cleanup
- raftContext.removePeer(serverId);
- leader.removeFollower(serverId);
+ handleOnFollowerCatchupTimeout(raftActor, followerTimeout);
- LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(), serverId);
-
- operationComplete(raftActor, getAddServerContext(), ServerChangeStatus.TIMEOUT);
+ LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
+ followerTimeout.getNewServerId());
}
@Override
// Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
// add server operation that timed out.
- if(getAddServerContext().getOperation().getNewServerId().equals(followerId)) {
+ if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
leader.updateMinReplicaCount();
persistNewServerConfiguration(raftActor, getAddServerContext());
installSnapshotTimer.cancel();
+ } else {
+ LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
+ raftContext.getId(), followerId,
+ !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
+ }
+ }
+ }
+
+ /**
+ * The AddServer operation state for when there is a snapshot already in progress. When the current
+ * snapshot completes, it initiates an install snapshot.
+ */
+ private class WaitingForPriorSnapshotComplete extends AddServerState {
+ private final Cancellable snapshotTimer;
+
+ WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) {
+ super(addServerContext);
+ this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer);
+ }
+
+ @Override
+ public void onSnapshotComplete(RaftActor raftActor) {
+ LOG.debug("{}: onSnapshotComplete", raftContext.getId());
+
+ if(!raftActor.isLeader()) {
+ LOG.debug("{}: No longer the leader", raftContext.getId());
+ return;
}
+
+ AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+ if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
+ LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
+ getAddServerContext().getOperation().getNewServerId());
+
+ currentOperationState = new InstallingSnapshot(getAddServerContext(),
+ newInstallSnapshotTimer(raftActor));
+
+ snapshotTimer.cancel();
+ }
+ }
+
+ @Override
+ public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
+ handleOnFollowerCatchupTimeout(raftActor, followerTimeout);
+
+ LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
+ raftContext.getId(), followerTimeout.getNewServerId());
}
}
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
Object installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
+ // Send a second AddServer - should get queued
JavaTestKit testKit2 = new JavaTestKit(getSystem());
leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
+ // Continue the first AddServer
newFollowerRaftActorInstance.setDropMessageOfType(null);
newFollowerRaftActor.tell(installSnapshot, leaderActor);
+ // Verify both complete successfully
AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
}
@Test
- public void testAddServerWithInstallSnapshotTimeout() throws Exception {
- newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
+ public void testAddServerWithPriorSnapshotInProgress() throws Exception {
+ RaftActorContext initialActorContext = new MockRaftActorContext();
+ initialActorContext.setCommitIndex(-1);
+ initialActorContext.setLastApplied(-1);
+ initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+ MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+ initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID));
+
+ MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+ RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
+ TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID + "Collector"));
+ leaderRaftActor.setCollectorActor(leaderCollectorActor);
+
+ // Drop commit message for now to delay snapshot completion
+ leaderRaftActor.setDropMessageOfType(String.class);
+
+ leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
+
+ String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
+
+ leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+ leaderRaftActor.setDropMessageOfType(null);
+ leaderActor.tell(commitMsg, leaderActor);
+
+ AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+ assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+ assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+
+ expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
+
+ // Verify ServerConfigurationPayload entry in leader's log
+
+ assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
+ assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
+ assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
+ verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
+ votingServer(NEW_SERVER_ID));
+ }
+
+ @Test
+ public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
RaftActorContext initialActorContext = new MockRaftActorContext();
initialActorContext.setCommitIndex(-1);
initialActorContext.setLastApplied(-1);
MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
+ TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID + "Collector"));
+ leaderRaftActor.setCollectorActor(leaderCollectorActor);
+
+ // Drop commit message so the snapshot doesn't complete.
+ leaderRaftActor.setDropMessageOfType(String.class);
+
+ leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
+
leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
+ assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
+ }
+
+ @Test
+ public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
+ RaftActorContext initialActorContext = new MockRaftActorContext();
+ initialActorContext.setCommitIndex(-1);
+ initialActorContext.setLastApplied(-1);
+ initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+ MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+ initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID));
+
+ MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+ RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
+
+ TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID + "Collector"));
+ leaderRaftActor.setCollectorActor(leaderCollectorActor);
+
+ // Drop the commit message so the snapshot doesn't complete yet.
+ leaderRaftActor.setDropMessageOfType(String.class);
+
+ leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
+
+ leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+ String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
+
+ // Change the leader behavior to follower
+ leaderActor.tell(new Follower(leaderActorContext), leaderActor);
+
+ // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
+ // snapshot completes. This will prevent the invalid snapshot from completing and fail the
+ // isCapturing assertion below.
+ leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
+
+ // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
+ leaderActor.tell(commitMsg, leaderActor);
+
+ leaderActor.tell(new FollowerCatchUpTimeout(NEW_SERVER_ID), leaderActor);
+
+ AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+ assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
+
+ assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
+ assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
+ }
+
+ @Test
+ public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
+ RaftActorContext initialActorContext = new MockRaftActorContext();
+ initialActorContext.setCommitIndex(-1);
+ initialActorContext.setLastApplied(-1);
+ initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+ MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+ initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID));
+
+ MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+ RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
+
+ TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID + "Collector"));
+ leaderRaftActor.setCollectorActor(leaderCollectorActor);
+
+ // Drop the UnInitializedFollowerSnapshotReply to delay it.
+ leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
+
+ leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+ UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
+ UnInitializedFollowerSnapshotReply.class);
+
+ // Prevent election timeout when the leader switches to follower
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
+
+ // Change the leader behavior to follower
+ leaderActor.tell(new Follower(leaderActorContext), leaderActor);
+
+ // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
+ leaderRaftActor.setDropMessageOfType(null);
+ leaderActor.tell(snapshotReply, leaderActor);
+
+ AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+ assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
+
+ assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
+ }
+
+ @Test
+ public void testAddServerWithInstallSnapshotTimeout() throws Exception {
+ RaftActorContext initialActorContext = new MockRaftActorContext();
+ initialActorContext.setCommitIndex(-1);
+ initialActorContext.setLastApplied(-1);
+ initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+ MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+ initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID));
+
+ MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+ RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
+
+ // Drop the InstallSnapshot message so it times out
+ newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
+
+ leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+ leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
+
+ AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+ assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
+
assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
assertEquals("Leader followers size", 0,
((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
return followerActorContext;
}
- public static class MockLeaderRaftActor extends MockRaftActor {
+ static abstract class AbstractMockRaftActor extends MockRaftActor {
+ private volatile TestActorRef<MessageCollectorActor> collectorActor;
+ private volatile Class<?> dropMessageOfType;
+
+ AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
+ DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
+ super(id, peerAddresses, config, dataPersistenceProvider);
+ this.collectorActor = collectorActor;
+ }
+
+ void setDropMessageOfType(Class<?> dropMessageOfType) {
+ this.dropMessageOfType = dropMessageOfType;
+ }
+
+ void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
+ this.collectorActor = collectorActor;
+ }
+
+ @Override
+ public void handleCommand(Object message) {
+ if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
+ super.handleCommand(message);
+ }
+
+ if(collectorActor != null) {
+ collectorActor.tell(message, getSender());
+ }
+ }
+ }
+
+ public static class MockLeaderRaftActor extends AbstractMockRaftActor {
public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
RaftActorContext fromContext) {
- super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE);
+ super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
+ setPersistence(false);
RaftActorContext context = getRaftActorContext();
for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
}
}
- public static class MockNewFollowerRaftActor extends MockRaftActor {
- private final TestActorRef<MessageCollectorActor> collectorActor;
- private volatile Class<?> dropMessageOfType;
-
+ public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
- super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null);
- this.collectorActor = collectorActor;
- }
-
- void setDropMessageOfType(Class<?> dropMessageOfType) {
- this.dropMessageOfType = dropMessageOfType;
- }
-
- @Override
- public void handleCommand(Object message) {
- if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
- super.handleCommand(message);
- }
-
- collectorActor.tell(message, getSender());
+ super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
}
static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {