X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=8386e669b83f46cb102771f7c12f1b9458f5e485;hp=fa8b7854dae2160245b282f097dab8b4329ecb93;hb=4639f61a41a93d6a762af97b819d164781b0f9f8;hpb=a5ede745694ca779d8629952e1a1069011877558 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index fa8b7854da..8386e669b8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -16,7 +16,10 @@ import akka.actor.OneForOneStrategy; import akka.actor.PoisonPill; import akka.actor.Status; import akka.actor.SupervisorStrategy; +import akka.actor.SupervisorStrategy.Directive; import akka.cluster.ClusterEvent; +import akka.cluster.ClusterEvent.MemberWeaklyUp; +import akka.cluster.Member; import akka.dispatch.Futures; import akka.dispatch.OnComplete; import akka.japi.Function; @@ -38,14 +41,15 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import java.util.function.Supplier; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; import org.apache.commons.lang3.SerializationUtils; +import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DatastoreContext; @@ -60,10 +64,12 @@ import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundE import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; +import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; @@ -78,12 +84,16 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus; import org.opendaylight.controller.cluster.raft.messages.RemoveServer; import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -146,7 +156,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardManager(AbstractShardManagerCreator builder) { this.cluster = builder.getCluster(); this.configuration = builder.getConfiguration(); - this.datastoreContextFactory = builder.getDdatastoreContextFactory(); + this.datastoreContextFactory = builder.getDatastoreContextFactory(); this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); @@ -186,6 +196,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onActorInitialized(message); } else if (message instanceof ClusterEvent.MemberUp){ memberUp((ClusterEvent.MemberUp) message); + } else if (message instanceof ClusterEvent.MemberWeaklyUp){ + memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message); } else if (message instanceof ClusterEvent.MemberExited){ memberExited((ClusterEvent.MemberExited) message); } else if(message instanceof ClusterEvent.MemberRemoved) { @@ -217,9 +229,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof ForwardedAddServerFailure) { ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message; onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); - } else if(message instanceof PrimaryShardFoundForContext) { - PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message; - onPrimaryShardFoundContext(primaryShardFoundContext); } else if(message instanceof RemoveShardReplica) { onRemoveShardReplica((RemoveShardReplica) message); } else if(message instanceof WrappedShardResponse){ @@ -228,6 +237,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onGetSnapshot(); } else if(message instanceof ServerRemoved){ onShardReplicaRemoved((ServerRemoved) message); + } else if(message instanceof ChangeShardMembersVotingStatus){ + onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message); + } else if(message instanceof FlipShardMembersVotingStatus){ + onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message); } else if(message instanceof SaveSnapshotSuccess) { onSaveSnapshotSuccess((SaveSnapshotSuccess)message); } else if(message instanceof SaveSnapshotFailure) { @@ -237,6 +250,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onShutDown(); } else if (message instanceof GetLocalShardIds) { onGetLocalShardIds(); + } else if(message instanceof RunnableMessage) { + ((RunnableMessage)message).run(); } else { unknownMessage(message); } @@ -292,7 +307,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg, String leaderPath) { - shardReplicaOperationsInProgress.remove(shardId); + shardReplicaOperationsInProgress.remove(shardId.getShardName()); LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName()); @@ -309,16 +324,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) { - if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) { - addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(), - getSender()); - } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){ - removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(), - primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender()); - } - } - private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath, final ActorRef sender) { if(isShardReplicaOperationInProgress(shardName, sender)) { @@ -344,6 +349,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void onComplete(Throwable failure, Object response) { if (failure != null) { + shardReplicaOperationsInProgress.remove(shardName); String msg = String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName); @@ -598,9 +604,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String actorName = sender.path().name(); //find shard name from actor name; actor name is stringified shardId - ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build(); - if (shardId.getShardName() == null) { + final ShardIdentifier shardId; + try { + shardId = ShardIdentifier.fromShardIdString(actorName); + } catch (IllegalArgumentException e) { + LOG.debug("{}: ignoring actor {}", actorName, e); return; } @@ -659,12 +668,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier() { - @Override - public Object get() { - return new LocalShardFound(shardInformation.getActor()); - } - }); + sendResponse(shardInformation, message.isWaitUntilInitialized(), false, () -> new LocalShardFound(shardInformation.getActor())); } private void sendResponse(ShardInformation shardInformation, boolean doWait, @@ -674,12 +678,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final ActorRef sender = getSender(); final ActorRef self = self(); - Runnable replyRunnable = new Runnable() { - @Override - public void run() { - sender.tell(messageSupplier.get(), self); - } - }; + Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self); OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) : new OnShardInitialized(replyRunnable); @@ -729,10 +728,15 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { "Found primary shard %s but it's not initialized yet. Please try again later", shardId)); } + @VisibleForTesting + static MemberName memberToName(final Member member) { + return MemberName.forName(member.roles().iterator().next()); + } + private void memberRemoved(ClusterEvent.MemberRemoved message) { - String memberName = message.member().roles().iterator().next(); + MemberName memberName = memberToName(message.member()); - LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, + LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); peerAddressResolver.removePeerAddress(memberName); @@ -743,9 +747,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void memberExited(ClusterEvent.MemberExited message) { - String memberName = message.member().roles().iterator().next(); + MemberName memberName = memberToName(message.member()); - LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName, + LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); peerAddressResolver.removePeerAddress(memberName); @@ -756,17 +760,29 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void memberUp(ClusterEvent.MemberUp message) { - String memberName = message.member().roles().iterator().next(); + MemberName memberName = memberToName(message.member()); - LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, + LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); - addPeerAddress(memberName, message.member().address()); + memberUp(memberName, message.member().address()); + } + private void memberUp(MemberName memberName, Address address) { + addPeerAddress(memberName, address); checkReady(); } - private void addPeerAddress(String memberName, Address address) { + private void memberWeaklyUp(MemberWeaklyUp message) { + MemberName memberName = memberToName(message.member()); + + LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName, + message.member().address()); + + memberUp(memberName, message.member().address()); + } + + private void addPeerAddress(MemberName memberName, Address address) { peerAddressResolver.addPeerAddress(memberName, address); for(ShardInformation info : localShards.values()){ @@ -779,8 +795,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void memberReachable(ClusterEvent.ReachableMember message) { - String memberName = message.member().roles().iterator().next(); - LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); + MemberName memberName = memberToName(message.member()); + LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); addPeerAddress(memberName, message.member().address()); @@ -788,16 +804,18 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void memberUnreachable(ClusterEvent.UnreachableMember message) { - String memberName = message.member().roles().iterator().next(); - LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address()); + MemberName memberName = memberToName(message.member()); + LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address()); markMemberUnavailable(memberName); } - private void markMemberUnavailable(final String memberName) { - for(ShardInformation info : localShards.values()){ + private void markMemberUnavailable(final MemberName memberName) { + final String memberStr = memberName.getName(); + for (ShardInformation info : localShards.values()) { String leaderId = info.getLeaderId(); - if(leaderId != null && leaderId.contains(memberName)) { + // XXX: why are we using String#contains() here? + if (leaderId != null && leaderId.contains(memberStr)) { LOG.debug("Marking Leader {} as unavailable.", leaderId); info.setLeaderAvailable(false); @@ -808,10 +826,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void markMemberAvailable(final String memberName) { - for(ShardInformation info : localShards.values()){ + private void markMemberAvailable(final MemberName memberName) { + final String memberStr = memberName.getName(); + for (ShardInformation info : localShards.values()) { String leaderId = info.getLeaderId(); - if(leaderId != null && leaderId.contains(memberName)) { + // XXX: why are we using String#contains() here? + if (leaderId != null && leaderId.contains(memberStr)) { LOG.debug("Marking Leader {} as available.", leaderId); info.setLeaderAvailable(true); } @@ -908,20 +928,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { // First see if the there is a local replica for the shard final ShardInformation info = localShards.get(shardName); if (info != null && info.isActiveMember()) { - sendResponse(info, message.isWaitUntilReady(), true, new Supplier() { - @Override - public Object get() { - String primaryPath = info.getSerializedLeaderActor(); - Object found = canReturnLocalShardState && info.isLeader() ? - new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) : - new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion()); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); - } + sendResponse(info, message.isWaitUntilReady(), true, () -> { + String primaryPath = info.getSerializedLeaderActor(); + Object found = canReturnLocalShardState && info.isLeader() ? + new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) : + new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion()); + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); + } - return found; - } + return found; }); return; @@ -963,7 +980,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param shardName * @return */ - private ShardIdentifier getShardIdentifier(String memberName, String shardName){ + private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName){ return peerAddressResolver.getShardIdentifier(memberName, shardName); } @@ -973,7 +990,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * */ private void createLocalShards() { - String memberName = this.cluster.getCurrentMemberName(); + MemberName memberName = this.cluster.getCurrentMemberName(); Collection memberShardNames = this.configuration.getMemberShardNames(memberName); Map shardSnapshots = new HashMap<>(); @@ -1004,13 +1021,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param shardName */ private Map getPeerAddresses(String shardName) { - Collection members = configuration.getMembersFromShardName(shardName); + Collection members = configuration.getMembersFromShardName(shardName); Map peerAddresses = new HashMap<>(); - String currentMemberName = this.cluster.getCurrentMemberName(); + MemberName currentMemberName = this.cluster.getCurrentMemberName(); - for(String memberName : members) { - if(!currentMemberName.equals(memberName)) { + for (MemberName memberName : members) { + if (!currentMemberName.equals(memberName)) { ShardIdentifier shardId = getShardIdentifier(memberName, shardName); String address = peerAddressResolver.getShardActorAddress(shardName, memberName); peerAddresses.put(shardId.toString(), address); @@ -1023,13 +1040,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(10, Duration.create("1 minute"), - new Function() { - @Override - public SupervisorStrategy.Directive apply(Throwable t) { - LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t); - return SupervisorStrategy.resume(); - } - } + (Function) t -> { + LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t); + return SupervisorStrategy.resume(); + } ); } @@ -1080,7 +1094,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { - getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()), getTargetActor()); } @Override @@ -1228,12 +1242,16 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardReplicaMsg.getShardName(), persistenceId(), getSelf()) { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { - getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + doRemoveShardReplicaAsync(response.getPrimaryPath()); } @Override public void onLocalPrimaryFound(LocalPrimaryShardFound response) { - getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + doRemoveShardReplicaAsync(response.getPrimaryPath()); + } + + private void doRemoveShardReplicaAsync(final String primaryPath) { + getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender()), getTargetActor()); } }); } @@ -1259,7 +1277,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot); - String currentMember = cluster.getCurrentMemberName(); + final MemberName currentMember = cluster.getCurrentMemberName(); Set configuredShardList = new HashSet<>(configuration.getMemberShardNames(currentMember)); for (String shard : currentSnapshot.getShardList()) { @@ -1285,6 +1303,140 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { 0, 0)); } + private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) { + LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus); + + String shardName = changeMembersVotingStatus.getShardName(); + Map serverVotingStatusMap = new HashMap<>(); + for(Entry e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) { + serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(), + e.getValue()); + } + + ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap); + + findLocalShard(shardName, getSender(), + localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName, + localShardFound.getPath(), getSender())); + } + + private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) { + LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus); + + ActorRef sender = getSender(); + final String shardName = flipMembersVotingStatus.getShardName(); + findLocalShard(shardName, sender, localShardFound -> { + Future future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE, + Timeout.apply(30, TimeUnit.SECONDS)); + + future.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + sender.tell(new Status.Failure(new RuntimeException( + String.format("Failed to access local shard %s", shardName), failure)), self()); + return; + } + + OnDemandRaftState raftState = (OnDemandRaftState) response; + Map serverVotingStatusMap = new HashMap<>(); + for(Entry e: raftState.getPeerVotingStates().entrySet()) { + serverVotingStatusMap.put(e.getKey(), !e.getValue()); + } + + serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName). + toString(), !raftState.isVoting()); + + changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap), + shardName, localShardFound.getPath(), sender); + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + }); + + } + + private void findLocalShard(final String shardName, final ActorRef sender, + final Consumer onLocalShardFound) { + Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext(). + getShardInitializationTimeout().duration().$times(2)); + + Future futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout); + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + LOG.debug ("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure); + sender.tell(new Status.Failure(new RuntimeException( + String.format("Failed to find local shard %s", shardName), failure)), self()); + } else { + if(response instanceof LocalShardFound) { + getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response), sender); + } else if(response instanceof LocalShardNotFound) { + String msg = String.format("Local shard %s does not exist", shardName); + LOG.debug ("{}: {}", persistenceId, msg); + sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self()); + } else { + String msg = String.format("Failed to find local shard %s: received response: %s", + shardName, response); + LOG.debug ("{}: {}", persistenceId, msg); + sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response : + new RuntimeException(msg)), self()); + } + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + + private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus, + final String shardName, final ActorRef shardActorRef, final ActorRef sender) { + if(isShardReplicaOperationInProgress(shardName, sender)) { + return; + } + + shardReplicaOperationsInProgress.add(shardName); + + DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); + + LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(), + changeServersVotingStatus, shardActorRef.path()); + + Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2)); + Future futureObj = ask(shardActorRef, changeServersVotingStatus, timeout); + + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + shardReplicaOperationsInProgress.remove(shardName); + if (failure != null) { + String msg = String.format("ChangeServersVotingStatus request to local shard %s failed", + shardActorRef.path()); + LOG.debug ("{}: {}", persistenceId(), msg, failure); + sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); + } else { + LOG.debug ("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path()); + + ServerChangeReply replyMsg = (ServerChangeReply) response; + if(replyMsg.getStatus() == ServerChangeStatus.OK) { + LOG.debug ("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName); + sender.tell(new Status.Success(null), getSelf()); + } else if(replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) { + sender.tell(new Status.Failure(new IllegalArgumentException(String.format( + "The requested voting state change for shard %s is invalid. At least one member must be voting", + shardId.getShardName()))), getSelf()); + } else { + LOG.warn ("{}: ChangeServersVotingStatus failed for shard {} with status {}", + persistenceId(), shardName, replyMsg.getStatus()); + + Exception error = getServerChangeException(ChangeServersVotingStatus.class, + replyMsg.getStatus(), shardActorRef.path().toString(), shardId); + sender.tell(new Status.Failure(error), getSelf()); + } + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + private static final class ForwardedAddServerReply { ShardInformation shardInfo; AddServerReply addServerReply; @@ -1366,6 +1518,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); } + private static interface RunnableMessage extends Runnable { + } + /** * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the * a remote or local find primary message is processed @@ -1447,52 +1602,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - /** - * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be - * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received - * as a successful response to find primary. - */ - private static class PrimaryShardFoundForContext { - private final String shardName; - private final Object contextMessage; - private final RemotePrimaryShardFound remotePrimaryShardFound; - private final LocalPrimaryShardFound localPrimaryShardFound; - - public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, - @Nonnull Object primaryFoundMessage) { - this.shardName = Preconditions.checkNotNull(shardName); - this.contextMessage = Preconditions.checkNotNull(contextMessage); - Preconditions.checkNotNull(primaryFoundMessage); - this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? - (RemotePrimaryShardFound) primaryFoundMessage : null; - this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? - (LocalPrimaryShardFound) primaryFoundMessage : null; - } - - @Nonnull - String getPrimaryPath(){ - if(remotePrimaryShardFound != null) { - return remotePrimaryShardFound.getPrimaryPath(); - } - return localPrimaryShardFound.getPrimaryPath(); - } - - @Nonnull - Object getContextMessage() { - return contextMessage; - } - - @Nullable - RemotePrimaryShardFound getRemotePrimaryShardFound() { - return remotePrimaryShardFound; - } - - @Nonnull - String getShardName() { - return shardName; - } - } - /** * The WrappedShardResponse class wraps a response from a Shard. */