Add raftActor as a field to
RaftActorServerConfigurationSupport to avoid passing
raftActor around through all the method calls.
Change-Id: I19eb16877af98e9e05ec698c321081e211a7e572
Signed-off-by: Gary Wu <gary.wu1@huawei.com>
super.preStart();
snapshotSupport = newRaftActorSnapshotMessageSupport();
super.preStart();
snapshotSupport = newRaftActorSnapshotMessageSupport();
- serverConfigurationSupport = new RaftActorServerConfigurationSupport(getRaftActorContext());
+ serverConfigurationSupport = new RaftActorServerConfigurationSupport(this);
@Override
public void handleCommand(final Object message) {
@Override
public void handleCommand(final Object message) {
- if(serverConfigurationSupport.handleMessage(message, this, getSender())) {
+ if(serverConfigurationSupport.handleMessage(message, getSender())) {
return;
} else if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
return;
} else if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
private final OperationState IDLE = new Idle();
private final OperationState IDLE = new Idle();
+ private final RaftActor raftActor;
+
private final RaftActorContext raftContext;
private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new LinkedList<>();
private OperationState currentOperationState = IDLE;
private final RaftActorContext raftContext;
private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new LinkedList<>();
private OperationState currentOperationState = IDLE;
- RaftActorServerConfigurationSupport(RaftActorContext context) {
- this.raftContext = context;
+ RaftActorServerConfigurationSupport(RaftActor raftActor) {
+ this.raftActor = raftActor;
+ this.raftContext = raftActor.getRaftActorContext();
- boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) {
+ boolean handleMessage(Object message, ActorRef sender) {
if(message instanceof AddServer) {
if(message instanceof AddServer) {
- onAddServer((AddServer) message, raftActor, sender);
+ onAddServer((AddServer) message, sender);
return true;
} else if(message instanceof RemoveServer) {
return true;
} else if(message instanceof RemoveServer) {
- onRemoveServer((RemoveServer) message, raftActor, sender);
+ onRemoveServer((RemoveServer) message, sender);
return true;
} else if (message instanceof ServerOperationTimeout) {
return true;
} else if (message instanceof ServerOperationTimeout) {
- currentOperationState.onServerOperationTimeout(raftActor, (ServerOperationTimeout) message);
+ currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message);
return true;
} else if (message instanceof UnInitializedFollowerSnapshotReply) {
return true;
} else if (message instanceof UnInitializedFollowerSnapshotReply) {
- currentOperationState.onUnInitializedFollowerSnapshotReply(raftActor,
- (UnInitializedFollowerSnapshotReply) message);
+ currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message);
return true;
} else if(message instanceof ApplyState) {
return true;
} else if(message instanceof ApplyState) {
- return onApplyState((ApplyState) message, raftActor);
+ return onApplyState((ApplyState) message);
} else if(message instanceof SnapshotComplete) {
} else if(message instanceof SnapshotComplete) {
- currentOperationState.onSnapshotComplete(raftActor);
+ currentOperationState.onSnapshotComplete();
return false;
} else {
return false;
}
}
return false;
} else {
return false;
}
}
- private void onRemoveServer(RemoveServer removeServer, RaftActor raftActor, ActorRef sender) {
+ private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
if(removeServer.getServerId().equals(raftActor.getLeaderId())){
// Removing current leader is not supported yet
LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
if(removeServer.getServerId().equals(raftActor.getLeaderId())){
// Removing current leader is not supported yet
} else if(!raftContext.getPeerIds().contains(removeServer.getServerId())) {
sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()), raftActor.getSelf());
} else {
} else if(!raftContext.getPeerIds().contains(removeServer.getServerId())) {
sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()), raftActor.getSelf());
} else {
- onNewOperation(raftActor, new RemoveServerContext(removeServer, raftContext.getPeerAddress(removeServer.getServerId()), sender));
+ onNewOperation(new RemoveServerContext(removeServer, raftContext.getPeerAddress(removeServer.getServerId()), sender));
- private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
+ private boolean onApplyState(ApplyState applyState) {
Payload data = applyState.getReplicatedLogEntry().getData();
if(data instanceof ServerConfigurationPayload) {
Payload data = applyState.getReplicatedLogEntry().getData();
if(data instanceof ServerConfigurationPayload) {
- currentOperationState.onApplyState(raftActor, applyState);
+ currentOperationState.onApplyState(applyState);
* <li>Respond to caller with TIMEOUT.</li>
* </ul>
*/
* <li>Respond to caller with TIMEOUT.</li>
* </ul>
*/
- private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
+ private void onAddServer(AddServer addServer, ActorRef sender) {
LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
- onNewOperation(raftActor, new AddServerContext(addServer, sender));
+ onNewOperation(new AddServerContext(addServer, sender));
- private void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
+ private void onNewOperation(ServerOperationContext<?> operationContext) {
if (raftActor.isLeader()) {
if (raftActor.isLeader()) {
- currentOperationState.onNewOperation(raftActor, operationContext);
+ currentOperationState.onNewOperation(operationContext);
} else {
ActorSelection leader = raftActor.getLeader();
if (leader != null) {
} else {
ActorSelection leader = raftActor.getLeader();
if (leader != null) {
* Interface for a server operation FSM state.
*/
private interface OperationState {
* Interface for a server operation FSM state.
*/
private interface OperationState {
- void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext);
+ void onNewOperation(ServerOperationContext<?> operationContext);
- void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout);
+ void onServerOperationTimeout(ServerOperationTimeout timeout);
- void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply);
+ void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply);
- void onApplyState(RaftActor raftActor, ApplyState applyState);
+ void onApplyState(ApplyState applyState);
- void onSnapshotComplete(RaftActor raftActor);
+ void onSnapshotComplete();
}
/**
* Interface for the initial state for a server operation.
*/
private interface InitialOperationState {
}
/**
* Interface for the initial state for a server operation.
*/
private interface InitialOperationState {
- void initiate(RaftActor raftActor);
*/
private abstract class AbstractOperationState implements OperationState {
@Override
*/
private abstract class AbstractOperationState implements OperationState {
@Override
- public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
+ public void onNewOperation(ServerOperationContext<?> operationContext) {
// We're currently processing another operation so queue it to be processed later.
LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
// We're currently processing another operation so queue it to be processed later.
LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
- public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
+ public void onServerOperationTimeout(ServerOperationTimeout timeout) {
LOG.debug("onServerOperationTimeout should not be called in state {}", this);
}
@Override
LOG.debug("onServerOperationTimeout should not be called in state {}", this);
}
@Override
- public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
+ public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
}
@Override
LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
}
@Override
- public void onApplyState(RaftActor raftActor, ApplyState applyState) {
+ public void onApplyState(ApplyState applyState) {
LOG.debug("onApplyState was called in state {}", this);
}
@Override
LOG.debug("onApplyState was called in state {}", this);
}
@Override
- public void onSnapshotComplete(RaftActor raftActor) {
+ public void onSnapshotComplete() {
- protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
+ protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext){
raftContext.setDynamicServerConfigurationInUse();
ServerConfigurationPayload payload = raftContext.getPeerServerInfo();
LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
raftContext.setDynamicServerConfigurationInUse();
ServerConfigurationPayload payload = raftContext.getPeerServerInfo();
LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
- currentOperationState = new Persisting(operationContext, newTimer(
- new ServerOperationTimeout(operationContext.getServerId())));
+ currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(operationContext.getServerId())));
- sendReply(raftActor, operationContext, ServerChangeStatus.OK);
+ sendReply(operationContext, ServerChangeStatus.OK);
- protected void operationComplete(RaftActor raftActor, ServerOperationContext<?> operationContext,
- @Nullable ServerChangeStatus replyStatus) {
+ protected void operationComplete(ServerOperationContext<?> operationContext, @Nullable ServerChangeStatus replyStatus) {
if(replyStatus != null) {
if(replyStatus != null) {
- sendReply(raftActor, operationContext, replyStatus);
+ sendReply(operationContext, replyStatus);
}
operationContext.operationComplete(raftActor, replyStatus);
}
operationContext.operationComplete(raftActor, replyStatus);
ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
if(nextOperation != null) {
ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
if(nextOperation != null) {
- RaftActorServerConfigurationSupport.this.onNewOperation(raftActor, nextOperation);
+ RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation);
- protected void sendReply(RaftActor raftActor, ServerOperationContext<?> operationContext,
- ServerChangeStatus status) {
+ protected void sendReply(ServerOperationContext<?> operationContext, ServerChangeStatus status) {
LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
Cancellable newTimer(Object message) {
return raftContext.getActorSystem().scheduler().scheduleOnce(
Cancellable newTimer(Object message) {
return raftContext.getActorSystem().scheduler().scheduleOnce(
- raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(),
- message, raftContext.getActorSystem().dispatcher(), raftContext.getActor());
+ raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), message,
+ raftContext.getActorSystem().dispatcher(), raftContext.getActor());
*/
private class Idle extends AbstractOperationState {
@Override
*/
private class Idle extends AbstractOperationState {
@Override
- public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
- operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate(raftActor);
+ public void onNewOperation(ServerOperationContext<?> operationContext) {
+ operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate();
- public void onApplyState(RaftActor raftActor, ApplyState applyState) {
+ public void onApplyState(ApplyState applyState) {
// Noop - we override b/c ApplyState is called normally for followers in the idle state.
}
}
// Noop - we override b/c ApplyState is called normally for followers in the idle state.
}
}
- public void onApplyState(RaftActor raftActor, ApplyState applyState) {
+ public void onApplyState(ApplyState applyState) {
// Sanity check - we could get an ApplyState from a previous operation that timed out so make
// sure it's meant for us.
if(operationContext.getContextId().equals(applyState.getIdentifier())) {
// Sanity check - we could get an ApplyState from a previous operation that timed out so make
// sure it's meant for us.
if(operationContext.getContextId().equals(applyState.getIdentifier())) {
applyState.getReplicatedLogEntry().getData());
timer.cancel();
applyState.getReplicatedLogEntry().getData());
timer.cancel();
- operationComplete(raftActor, operationContext, null);
+ operationComplete(operationContext, null);
- public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
+ public void onServerOperationTimeout(ServerOperationTimeout timeout) {
LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
timeout.getServerId());
LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
timeout.getServerId());
// Fail any pending operations
ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
while(nextOperation != null) {
// Fail any pending operations
ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
while(nextOperation != null) {
- sendReply(raftActor, nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
+ sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
nextOperation = pendingOperationsQueue.poll();
}
}
@Override
nextOperation = pendingOperationsQueue.poll();
}
}
@Override
- public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
+ public void onNewOperation(ServerOperationContext<?> operationContext) {
- sendReply(raftActor, operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
+ sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
- super.onNewOperation(raftActor, operationContext);
+ super.onNewOperation(operationContext);
return addServerContext;
}
return addServerContext;
}
- Cancellable newInstallSnapshotTimer(RaftActor raftActor) {
+ Cancellable newInstallSnapshotTimer() {
return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
}
return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
}
- void handleInstallSnapshotTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
+ void handleInstallSnapshotTimeout(ServerOperationTimeout timeout) {
String serverId = timeout.getServerId();
LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
String serverId = timeout.getServerId();
LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
leader.removeFollower(serverId);
}
leader.removeFollower(serverId);
}
- operationComplete(raftActor, getAddServerContext(),
- isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
+ operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
- public void initiate(RaftActor raftActor) {
+ public void initiate() {
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
AddServer addServer = getAddServerContext().getOperation();
LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
AddServer addServer = getAddServerContext().getOperation();
LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
- operationComplete(raftActor, getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
+ operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
if(votingState == VotingState.VOTING_NOT_INITIALIZED){
// schedule the install snapshot timeout timer
if(votingState == VotingState.VOTING_NOT_INITIALIZED){
// schedule the install snapshot timeout timer
- Cancellable installSnapshotTimer = newInstallSnapshotTimer(raftActor);
+ Cancellable installSnapshotTimer = newInstallSnapshotTimer();
if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
addServer.getNewServerId());
if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
addServer.getNewServerId());
LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
raftContext.getId());
LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
raftContext.getId());
- persistNewServerConfiguration(raftActor, getAddServerContext());
+ persistNewServerConfiguration(getAddServerContext());
- public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
- handleInstallSnapshotTimeout(raftActor, timeout);
+ public void onServerOperationTimeout(ServerOperationTimeout timeout) {
+ handleInstallSnapshotTimeout(timeout);
LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
timeout.getServerId());
}
@Override
LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
timeout.getServerId());
}
@Override
- public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
+ public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
String followerId = reply.getFollowerId();
LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
String followerId = reply.getFollowerId();
raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
leader.updateMinReplicaCount();
raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
leader.updateMinReplicaCount();
- persistNewServerConfiguration(raftActor, getAddServerContext());
+ persistNewServerConfiguration(getAddServerContext());
installSnapshotTimer.cancel();
} else {
installSnapshotTimer.cancel();
} else {
- public void onSnapshotComplete(RaftActor raftActor) {
+ public void onSnapshotComplete() {
LOG.debug("{}: onSnapshotComplete", raftContext.getId());
if(!raftActor.isLeader()) {
LOG.debug("{}: onSnapshotComplete", raftContext.getId());
if(!raftActor.isLeader()) {
getAddServerContext().getOperation().getNewServerId());
currentOperationState = new InstallingSnapshot(getAddServerContext(),
getAddServerContext().getOperation().getNewServerId());
currentOperationState = new InstallingSnapshot(getAddServerContext(),
- newInstallSnapshotTimer(raftActor));
+ newInstallSnapshotTimer());
snapshotTimer.cancel();
}
}
@Override
snapshotTimer.cancel();
}
}
@Override
- public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
- handleInstallSnapshotTimeout(raftActor, timeout);
+ public void onServerOperationTimeout(ServerOperationTimeout timeout) {
+ handleInstallSnapshotTimeout(timeout);
LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
raftContext.getId(), timeout.getServerId());
LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
raftContext.getId(), timeout.getServerId());
- public void initiate(RaftActor raftActor) {
+ public void initiate() {
raftContext.removePeer(getRemoveServerContext().getOperation().getServerId());
raftContext.removePeer(getRemoveServerContext().getOperation().getServerId());
- persistNewServerConfiguration(raftActor, getRemoveServerContext());
+ persistNewServerConfiguration(getRemoveServerContext());
@Test
public void testOnApplyState() {
@Test
public void testOnApplyState() {
- RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(new MockRaftActorContext());
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
+ MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
+ configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID));
+
+ RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
ReplicatedLogEntry serverConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
new ServerConfigurationPayload(Collections.<ServerInfo>emptyList()));
- boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), null, ActorRef.noSender());
+ boolean handled = support.handleMessage(new ApplyState(null, null, serverConfigEntry), ActorRef.noSender());
assertEquals("Message handled", true, handled);
ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
new MockRaftActorContext.MockPayload("1"));
assertEquals("Message handled", true, handled);
ReplicatedLogEntry nonServerConfigEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
new MockRaftActorContext.MockPayload("1"));
- handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), null, ActorRef.noSender());
+ handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
assertEquals("Message handled", false, handled);
}
assertEquals("Message handled", false, handled);
}