import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
import java.util.function.Supplier;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
} else if(message instanceof ForwardedAddServerFailure) {
ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
- } else if(message instanceof PrimaryShardFoundForContext) {
- PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
- onPrimaryShardFoundContext(primaryShardFoundContext);
} else if(message instanceof RemoveShardReplica) {
onRemoveShardReplica((RemoveShardReplica) message);
} else if(message instanceof WrappedShardResponse){
onGetSnapshot();
} else if(message instanceof ServerRemoved){
onShardReplicaRemoved((ServerRemoved) message);
+ } else if(message instanceof ChangeShardMembersVotingStatus){
+ onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
+ } else if(message instanceof FlipShardMembersVotingStatus){
+ onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
} else if(message instanceof SaveSnapshotSuccess) {
onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
} else if(message instanceof SaveSnapshotFailure) {
onShutDown();
} else if (message instanceof GetLocalShardIds) {
onGetLocalShardIds();
+ } else if(message instanceof RunnableMessage) {
+ ((RunnableMessage)message).run();
} else {
unknownMessage(message);
}
}
}
- private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
- if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
- addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
- getSender());
- } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
- removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
- primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
- }
- }
-
private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
final ActorRef sender) {
if(isShardReplicaOperationInProgress(shardName, sender)) {
@Override
public void onComplete(Throwable failure, Object response) {
if (failure != null) {
+ shardReplicaOperationsInProgress.remove(shardName);
String msg = String.format("RemoveServer request to leader %s for shard %s failed",
primaryPath, shardName);
findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
@Override
public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ getSelf().tell(new RunnableMessage() {
+ @Override
+ public void run() {
+ addShard(getShardName(), response, getSender());
+ }
+ }, getTargetActor());
}
@Override
shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
@Override
public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
}
@Override
public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
- getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
+ }
+
+ private void doRemoveShardReplicaAsync(final String primaryPath) {
+ getSelf().tell(new RunnableMessage() {
+ @Override
+ public void run() {
+ removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender());
+ }
+ }, getTargetActor());
}
});
}
0, 0));
}
+ private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
+ LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
+
+ String shardName = changeMembersVotingStatus.getShardName();
+ Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+ for(Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
+ serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
+ e.getValue());
+ }
+
+ ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
+
+ findLocalShard(shardName, getSender(),
+ localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
+ localShardFound.getPath(), getSender()));
+ }
+
+ private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
+ LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
+
+ ActorRef sender = getSender();
+ final String shardName = flipMembersVotingStatus.getShardName();
+ findLocalShard(shardName, sender, localShardFound -> {
+ Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
+ Timeout.apply(30, TimeUnit.SECONDS));
+
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("Failed to access local shard %s", shardName), failure)), self());
+ return;
+ }
+
+ OnDemandRaftState raftState = (OnDemandRaftState) response;
+ Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+ for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+ serverVotingStatusMap.put(e.getKey(), !e.getValue());
+ }
+
+ serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName).
+ toString(), !raftState.isVoting());
+
+ changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
+ shardName, localShardFound.getPath(), sender);
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ });
+
+ }
+
+ private void findLocalShard(final String shardName, final ActorRef sender,
+ final Consumer<LocalShardFound> onLocalShardFound) {
+ Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+ getShardInitializationTimeout().duration().$times(2));
+
+ Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ LOG.debug ("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure);
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("Failed to find local shard %s", shardName), failure)), self());
+ } else {
+ if(response instanceof LocalShardFound) {
+ getSelf().tell(new RunnableMessage() {
+ @Override
+ public void run() {
+ onLocalShardFound.accept((LocalShardFound) response);
+ }
+ }, sender);
+ } else if(response instanceof LocalShardNotFound) {
+ String msg = String.format("Local shard %s does not exist", shardName);
+ LOG.debug ("{}: {}", persistenceId, msg);
+ sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
+ } else {
+ String msg = String.format("Failed to find local shard %s: received response: %s",
+ shardName, response);
+ LOG.debug ("{}: {}", persistenceId, msg);
+ sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
+ new RuntimeException(msg)), self());
+ }
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
+ final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
+ if(isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+ LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
+ changeServersVotingStatus, shardActorRef.path());
+
+ Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
+ Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ shardReplicaOperationsInProgress.remove(shardName);
+ if (failure != null) {
+ String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
+ shardActorRef.path());
+ LOG.debug ("{}: {}", persistenceId(), msg, failure);
+ sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ } else {
+ LOG.debug ("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
+
+ ServerChangeReply replyMsg = (ServerChangeReply) response;
+ if(replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug ("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
+ sender.tell(new Status.Success(null), getSelf());
+ } else if(replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
+ sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
+ "The requested voting state change for shard %s is invalid. At least one member must be voting",
+ shardId.getShardName()))), getSelf());
+ } else {
+ LOG.warn ("{}: ChangeServersVotingStatus failed for shard {} with status {}",
+ persistenceId(), shardName, replyMsg.getStatus());
+
+ Exception error = getServerChangeException(ChangeServersVotingStatus.class,
+ replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
+ sender.tell(new Status.Failure(error), getSelf());
+ }
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
private static final class ForwardedAddServerReply {
ShardInformation shardInfo;
AddServerReply addServerReply;
}, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
}
+ private static interface RunnableMessage extends Runnable {
+ }
+
/**
* The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
* a remote or local find primary message is processed
}
}
- /**
- * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
- * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
- * as a successful response to find primary.
- */
- private static class PrimaryShardFoundForContext {
- private final String shardName;
- private final Object contextMessage;
- private final RemotePrimaryShardFound remotePrimaryShardFound;
- private final LocalPrimaryShardFound localPrimaryShardFound;
-
- public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
- @Nonnull Object primaryFoundMessage) {
- this.shardName = Preconditions.checkNotNull(shardName);
- this.contextMessage = Preconditions.checkNotNull(contextMessage);
- Preconditions.checkNotNull(primaryFoundMessage);
- this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
- (RemotePrimaryShardFound) primaryFoundMessage : null;
- this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
- (LocalPrimaryShardFound) primaryFoundMessage : null;
- }
-
- @Nonnull
- String getPrimaryPath(){
- if(remotePrimaryShardFound != null) {
- return remotePrimaryShardFound.getPrimaryPath();
- }
- return localPrimaryShardFound.getPrimaryPath();
- }
-
- @Nonnull
- Object getContextMessage() {
- return contextMessage;
- }
-
- @Nullable
- RemotePrimaryShardFound getRemotePrimaryShardFound() {
- return remotePrimaryShardFound;
- }
-
- @Nonnull
- String getShardName() {
- return shardName;
- }
- }
-
/**
* The WrappedShardResponse class wraps a response from a Shard.
*/