X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=4156ab34265f29376ef71519a4dab2c283c1fce8;hb=refs%2Fchanges%2F63%2F39863%2F3;hp=04c64ddca751b7a82b5d71a26e650bc9a3ebc1f2;hpb=dc6370feeb5fc47be3e267bf85d6354013d0409b;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 04c64ddca7..4156ab3426 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -40,13 +40,13 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import java.util.function.Supplier; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; @@ -63,10 +63,12 @@ import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundE import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; +import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; @@ -81,12 +83,16 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus; import org.opendaylight.controller.cluster.raft.messages.RemoveServer; import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -220,9 +226,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof ForwardedAddServerFailure) { ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message; onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); - } else if(message instanceof PrimaryShardFoundForContext) { - PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message; - onPrimaryShardFoundContext(primaryShardFoundContext); } else if(message instanceof RemoveShardReplica) { onRemoveShardReplica((RemoveShardReplica) message); } else if(message instanceof WrappedShardResponse){ @@ -231,6 +234,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onGetSnapshot(); } else if(message instanceof ServerRemoved){ onShardReplicaRemoved((ServerRemoved) message); + } else if(message instanceof ChangeShardMembersVotingStatus){ + onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message); + } else if(message instanceof FlipShardMembersVotingStatus){ + onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message); } else if(message instanceof SaveSnapshotSuccess) { onSaveSnapshotSuccess((SaveSnapshotSuccess)message); } else if(message instanceof SaveSnapshotFailure) { @@ -240,6 +247,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onShutDown(); } else if (message instanceof GetLocalShardIds) { onGetLocalShardIds(); + } else if(message instanceof RunnableMessage) { + ((RunnableMessage)message).run(); } else { unknownMessage(message); } @@ -312,16 +321,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) { - if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) { - addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(), - getSender()); - } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){ - removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(), - primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender()); - } - } - private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath, final ActorRef sender) { if(isShardReplicaOperationInProgress(shardName, sender)) { @@ -347,6 +346,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void onComplete(Throwable failure, Object response) { if (failure != null) { + shardReplicaOperationsInProgress.remove(shardName); String msg = String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName); @@ -1079,7 +1079,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { - getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + getSelf().tell(new RunnableMessage() { + @Override + public void run() { + addShard(getShardName(), response, getSender()); + } + }, getTargetActor()); } @Override @@ -1227,12 +1232,21 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardReplicaMsg.getShardName(), persistenceId(), getSelf()) { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { - getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + doRemoveShardReplicaAsync(response.getPrimaryPath()); } @Override public void onLocalPrimaryFound(LocalPrimaryShardFound response) { - getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + doRemoveShardReplicaAsync(response.getPrimaryPath()); + } + + private void doRemoveShardReplicaAsync(final String primaryPath) { + getSelf().tell(new RunnableMessage() { + @Override + public void run() { + removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender()); + } + }, getTargetActor()); } }); } @@ -1284,6 +1298,145 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { 0, 0)); } + private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) { + LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus); + + String shardName = changeMembersVotingStatus.getShardName(); + Map serverVotingStatusMap = new HashMap<>(); + for(Entry e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) { + serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(), + e.getValue()); + } + + ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap); + + findLocalShard(shardName, getSender(), + localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName, + localShardFound.getPath(), getSender())); + } + + private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) { + LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus); + + ActorRef sender = getSender(); + final String shardName = flipMembersVotingStatus.getShardName(); + findLocalShard(shardName, sender, localShardFound -> { + Future future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE, + Timeout.apply(30, TimeUnit.SECONDS)); + + future.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + sender.tell(new Status.Failure(new RuntimeException( + String.format("Failed to access local shard %s", shardName), failure)), self()); + return; + } + + OnDemandRaftState raftState = (OnDemandRaftState) response; + Map serverVotingStatusMap = new HashMap<>(); + for(Entry e: raftState.getPeerVotingStates().entrySet()) { + serverVotingStatusMap.put(e.getKey(), !e.getValue()); + } + + serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName). + toString(), !raftState.isVoting()); + + changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap), + shardName, localShardFound.getPath(), sender); + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + }); + + } + + private void findLocalShard(final String shardName, final ActorRef sender, + final Consumer onLocalShardFound) { + Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext(). + getShardInitializationTimeout().duration().$times(2)); + + Future futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout); + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + LOG.debug ("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure); + sender.tell(new Status.Failure(new RuntimeException( + String.format("Failed to find local shard %s", shardName), failure)), self()); + } else { + if(response instanceof LocalShardFound) { + getSelf().tell(new RunnableMessage() { + @Override + public void run() { + onLocalShardFound.accept((LocalShardFound) response); + } + }, sender); + } else if(response instanceof LocalShardNotFound) { + String msg = String.format("Local shard %s does not exist", shardName); + LOG.debug ("{}: {}", persistenceId, msg); + sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self()); + } else { + String msg = String.format("Failed to find local shard %s: received response: %s", + shardName, response); + LOG.debug ("{}: {}", persistenceId, msg); + sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response : + new RuntimeException(msg)), self()); + } + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + + private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus, + final String shardName, final ActorRef shardActorRef, final ActorRef sender) { + if(isShardReplicaOperationInProgress(shardName, sender)) { + return; + } + + shardReplicaOperationsInProgress.add(shardName); + + DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); + + LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(), + changeServersVotingStatus, shardActorRef.path()); + + Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2)); + Future futureObj = ask(shardActorRef, changeServersVotingStatus, timeout); + + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + shardReplicaOperationsInProgress.remove(shardName); + if (failure != null) { + String msg = String.format("ChangeServersVotingStatus request to local shard %s failed", + shardActorRef.path()); + LOG.debug ("{}: {}", persistenceId(), msg, failure); + sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); + } else { + LOG.debug ("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path()); + + ServerChangeReply replyMsg = (ServerChangeReply) response; + if(replyMsg.getStatus() == ServerChangeStatus.OK) { + LOG.debug ("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName); + sender.tell(new Status.Success(null), getSelf()); + } else if(replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) { + sender.tell(new Status.Failure(new IllegalArgumentException(String.format( + "The requested voting state change for shard %s is invalid. At least one member must be voting", + shardId.getShardName()))), getSelf()); + } else { + LOG.warn ("{}: ChangeServersVotingStatus failed for shard {} with status {}", + persistenceId(), shardName, replyMsg.getStatus()); + + Exception error = getServerChangeException(ChangeServersVotingStatus.class, + replyMsg.getStatus(), shardActorRef.path().toString(), shardId); + sender.tell(new Status.Failure(error), getSelf()); + } + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + private static final class ForwardedAddServerReply { ShardInformation shardInfo; AddServerReply addServerReply; @@ -1365,6 +1518,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); } + private static interface RunnableMessage extends Runnable { + } + /** * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the * a remote or local find primary message is processed @@ -1446,52 +1602,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - /** - * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be - * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received - * as a successful response to find primary. - */ - private static class PrimaryShardFoundForContext { - private final String shardName; - private final Object contextMessage; - private final RemotePrimaryShardFound remotePrimaryShardFound; - private final LocalPrimaryShardFound localPrimaryShardFound; - - public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, - @Nonnull Object primaryFoundMessage) { - this.shardName = Preconditions.checkNotNull(shardName); - this.contextMessage = Preconditions.checkNotNull(contextMessage); - Preconditions.checkNotNull(primaryFoundMessage); - this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? - (RemotePrimaryShardFound) primaryFoundMessage : null; - this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? - (LocalPrimaryShardFound) primaryFoundMessage : null; - } - - @Nonnull - String getPrimaryPath(){ - if(remotePrimaryShardFound != null) { - return remotePrimaryShardFound.getPrimaryPath(); - } - return localPrimaryShardFound.getPrimaryPath(); - } - - @Nonnull - Object getContextMessage() { - return contextMessage; - } - - @Nullable - RemotePrimaryShardFound getRemotePrimaryShardFound() { - return remotePrimaryShardFound; - } - - @Nonnull - String getShardName() { - return shardName; - } - } - /** * The WrappedShardResponse class wraps a response from a Shard. */