X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=8386e669b83f46cb102771f7c12f1b9458f5e485;hp=e314e1c894242d802bb242d034de86a403de52b0;hb=4639f61a41a93d6a762af97b819d164781b0f9f8;hpb=768a0cd726150bdf29ad92b12c62ac5652433ef8 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index e314e1c894..8386e669b8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -14,10 +14,12 @@ import akka.actor.Address; import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; import akka.actor.PoisonPill; -import akka.actor.Props; import akka.actor.Status; import akka.actor.SupervisorStrategy; +import akka.actor.SupervisorStrategy.Directive; import akka.cluster.ClusterEvent; +import akka.cluster.ClusterEvent.MemberWeaklyUp; +import akka.cluster.Member; import akka.dispatch.Futures; import akka.dispatch.OnComplete; import akka.japi.Function; @@ -27,15 +29,9 @@ import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; -import akka.serialization.Serialization; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.base.Supplier; -import com.google.common.collect.Sets; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.util.ArrayList; @@ -43,16 +39,17 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; +import java.util.function.Consumer; +import java.util.function.Supplier; import org.apache.commons.lang3.SerializationUtils; +import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DatastoreContext; @@ -67,16 +64,15 @@ import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundE import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; +import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; -import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; -import org.opendaylight.controller.cluster.datastore.messages.PeerDown; -import org.opendaylight.controller.cluster.datastore.messages.PeerUp; import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; @@ -86,19 +82,21 @@ import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; -import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus; import org.opendaylight.controller.cluster.raft.messages.RemoveServer; import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -158,7 +156,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardManager(AbstractShardManagerCreator builder) { this.cluster = builder.getCluster(); this.configuration = builder.getConfiguration(); - this.datastoreContextFactory = builder.getDdatastoreContextFactory(); + this.datastoreContextFactory = builder.getDatastoreContextFactory(); this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); @@ -174,12 +172,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); - List localShardActorNames = new ArrayList<>(); - mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(), - "shard-manager-" + this.type, - datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), - localShardActorNames); - mBean.setShardManager(this); + mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type, + datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); + mBean.registerMBean(); } @Override @@ -201,6 +196,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onActorInitialized(message); } else if (message instanceof ClusterEvent.MemberUp){ memberUp((ClusterEvent.MemberUp) message); + } else if (message instanceof ClusterEvent.MemberWeaklyUp){ + memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message); } else if (message instanceof ClusterEvent.MemberExited){ memberExited((ClusterEvent.MemberExited) message); } else if(message instanceof ClusterEvent.MemberRemoved) { @@ -232,9 +229,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof ForwardedAddServerFailure) { ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message; onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); - } else if(message instanceof PrimaryShardFoundForContext) { - PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message; - onPrimaryShardFoundContext(primaryShardFoundContext); } else if(message instanceof RemoveShardReplica) { onRemoveShardReplica((RemoveShardReplica) message); } else if(message instanceof WrappedShardResponse){ @@ -243,6 +237,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onGetSnapshot(); } else if(message instanceof ServerRemoved){ onShardReplicaRemoved((ServerRemoved) message); + } else if(message instanceof ChangeShardMembersVotingStatus){ + onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message); + } else if(message instanceof FlipShardMembersVotingStatus){ + onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message); } else if(message instanceof SaveSnapshotSuccess) { onSaveSnapshotSuccess((SaveSnapshotSuccess)message); } else if(message instanceof SaveSnapshotFailure) { @@ -250,6 +248,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { persistenceId(), ((SaveSnapshotFailure) message).cause()); } else if(message instanceof Shutdown) { onShutDown(); + } else if (message instanceof GetLocalShardIds) { + onGetLocalShardIds(); + } else if(message instanceof RunnableMessage) { + ((RunnableMessage)message).run(); } else { unknownMessage(message); } @@ -305,31 +307,20 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg, String leaderPath) { - shardReplicaOperationsInProgress.remove(shardId); + shardReplicaOperationsInProgress.remove(shardId.getShardName()); LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName()); if (replyMsg.getStatus() == ServerChangeStatus.OK) { LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(), shardId.getShardName()); - originalSender.tell(new akka.actor.Status.Success(null), getSelf()); + originalSender.tell(new Status.Success(null), getSelf()); } else { LOG.warn ("{}: Leader failed to remove shard replica {} with status {}", persistenceId(), shardId, replyMsg.getStatus()); - Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), - leaderPath, shardId); - originalSender.tell(new akka.actor.Status.Failure(failure), getSelf()); - } - } - - private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) { - if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) { - addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(), - getSender()); - } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){ - removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(), - primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender()); + Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId); + originalSender.tell(new Status.Failure(failure), getSelf()); } } @@ -358,6 +349,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void onComplete(Throwable failure, Object response) { if (failure != null) { + shardReplicaOperationsInProgress.remove(shardName); String msg = String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName); @@ -402,7 +394,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } if(notInitialized != null) { - getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format( + getSender().tell(new Status.Failure(new IllegalStateException(String.format( "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf()); return; } @@ -429,14 +421,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String shardName = createShard.getModuleShardConfig().getShardName(); if(localShards.containsKey(shardName)) { LOG.debug("{}: Shard {} already exists", persistenceId(), shardName); - reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName)); + reply = new Status.Success(String.format("Shard with name %s already exists", shardName)); } else { doCreateShard(createShard); - reply = new akka.actor.Status.Success(null); + reply = new Status.Success(null); } } catch (Exception e) { LOG.error("{}: onCreateShard failed", persistenceId(), e); - reply = new akka.actor.Status.Failure(e); + reply = new Status.Failure(e); } if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) { @@ -489,8 +481,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { info.setActiveMember(isActiveMember); localShards.put(info.getShardName(), info); - mBean.addLocalShard(shardId.toString()); - if(schemaContext != null) { info.setActor(newShardActor(schemaContext, info)); } @@ -541,10 +531,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { if(!shardInfo.isShardInitialized()) { LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName()); - message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf()); + message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf()); } else { LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName()); - message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf()); + message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf()); } } @@ -614,9 +604,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String actorName = sender.path().name(); //find shard name from actor name; actor name is stringified shardId - ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build(); - if (shardId.getShardName() == null) { + final ShardIdentifier shardId; + try { + shardId = ShardIdentifier.fromShardIdString(actorName); + } catch (IllegalArgumentException e) { + LOG.debug("{}: ignoring actor {}", actorName, e); return; } @@ -675,12 +668,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier() { - @Override - public Object get() { - return new LocalShardFound(shardInformation.getActor()); - } - }); + sendResponse(shardInformation, message.isWaitUntilInitialized(), false, () -> new LocalShardFound(shardInformation.getActor())); } private void sendResponse(ShardInformation shardInformation, boolean doWait, @@ -690,12 +678,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final ActorRef sender = getSender(); final ActorRef self = self(); - Runnable replyRunnable = new Runnable() { - @Override - public void run() { - sender.tell(messageSupplier.get(), self); - } - }; + Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self); OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) : new OnShardInitialized(replyRunnable); @@ -723,11 +706,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if (!shardInformation.isShardInitialized()) { LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInformation.getShardName()); - getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf()); + getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf()); } else { LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInformation.getShardName()); - getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf()); + getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf()); } return; @@ -745,10 +728,15 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { "Found primary shard %s but it's not initialized yet. Please try again later", shardId)); } + @VisibleForTesting + static MemberName memberToName(final Member member) { + return MemberName.forName(member.roles().iterator().next()); + } + private void memberRemoved(ClusterEvent.MemberRemoved message) { - String memberName = message.member().roles().iterator().next(); + MemberName memberName = memberToName(message.member()); - LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, + LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); peerAddressResolver.removePeerAddress(memberName); @@ -759,9 +747,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void memberExited(ClusterEvent.MemberExited message) { - String memberName = message.member().roles().iterator().next(); + MemberName memberName = memberToName(message.member()); - LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName, + LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); peerAddressResolver.removePeerAddress(memberName); @@ -772,17 +760,29 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void memberUp(ClusterEvent.MemberUp message) { - String memberName = message.member().roles().iterator().next(); + MemberName memberName = memberToName(message.member()); - LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, + LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); - addPeerAddress(memberName, message.member().address()); + memberUp(memberName, message.member().address()); + } + private void memberUp(MemberName memberName, Address address) { + addPeerAddress(memberName, address); checkReady(); } - private void addPeerAddress(String memberName, Address address) { + private void memberWeaklyUp(MemberWeaklyUp message) { + MemberName memberName = memberToName(message.member()); + + LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName, + message.member().address()); + + memberUp(memberName, message.member().address()); + } + + private void addPeerAddress(MemberName memberName, Address address) { peerAddressResolver.addPeerAddress(memberName, address); for(ShardInformation info : localShards.values()){ @@ -795,8 +795,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void memberReachable(ClusterEvent.ReachableMember message) { - String memberName = message.member().roles().iterator().next(); - LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); + MemberName memberName = memberToName(message.member()); + LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); addPeerAddress(memberName, message.member().address()); @@ -804,16 +804,18 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void memberUnreachable(ClusterEvent.UnreachableMember message) { - String memberName = message.member().roles().iterator().next(); - LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address()); + MemberName memberName = memberToName(message.member()); + LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address()); markMemberUnavailable(memberName); } - private void markMemberUnavailable(final String memberName) { - for(ShardInformation info : localShards.values()){ + private void markMemberUnavailable(final MemberName memberName) { + final String memberStr = memberName.getName(); + for (ShardInformation info : localShards.values()) { String leaderId = info.getLeaderId(); - if(leaderId != null && leaderId.contains(memberName)) { + // XXX: why are we using String#contains() here? + if (leaderId != null && leaderId.contains(memberStr)) { LOG.debug("Marking Leader {} as unavailable.", leaderId); info.setLeaderAvailable(false); @@ -824,10 +826,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void markMemberAvailable(final String memberName) { - for(ShardInformation info : localShards.values()){ + private void markMemberAvailable(final MemberName memberName) { + final String memberStr = memberName.getName(); + for (ShardInformation info : localShards.values()) { String leaderId = info.getLeaderId(); - if(leaderId != null && leaderId.contains(memberName)) { + // XXX: why are we using String#contains() here? + if (leaderId != null && leaderId.contains(memberStr)) { LOG.debug("Marking Leader {} as available.", leaderId); info.setLeaderAvailable(true); } @@ -843,17 +847,44 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onSwitchShardBehavior(SwitchShardBehavior message) { - ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build(); + private void onGetLocalShardIds() { + final List response = new ArrayList<>(localShards.size()); + + for (ShardInformation info : localShards.values()) { + response.add(info.getShardId().toString()); + } + + getSender().tell(new Status.Success(response), getSelf()); + } + + private void onSwitchShardBehavior(final SwitchShardBehavior message) { + final ShardIdentifier identifier = message.getShardId(); - ShardInformation shardInformation = localShards.get(identifier.getShardName()); + if (identifier != null) { + final ShardInformation info = localShards.get(identifier.getShardName()); + if (info == null) { + getSender().tell(new Status.Failure( + new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf()); + return; + } - if(shardInformation != null && shardInformation.getActor() != null) { - shardInformation.getActor().tell( - new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf()); + switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm())); } else { + for (ShardInformation info : localShards.values()) { + switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm())); + } + } + + getSender().tell(new Status.Success(null), getSelf()); + } + + private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) { + final ActorRef actor = info.getActor(); + if (actor != null) { + actor.tell(switchBehavior, getSelf()); + } else { LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available", - message.getShardName(), message.getNewState()); + info.getShardName(), switchBehavior.getNewState()); } } @@ -897,20 +928,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { // First see if the there is a local replica for the shard final ShardInformation info = localShards.get(shardName); if (info != null && info.isActiveMember()) { - sendResponse(info, message.isWaitUntilReady(), true, new Supplier() { - @Override - public Object get() { - String primaryPath = info.getSerializedLeaderActor(); - Object found = canReturnLocalShardState && info.isLeader() ? - new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) : - new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion()); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); - } + sendResponse(info, message.isWaitUntilReady(), true, () -> { + String primaryPath = info.getSerializedLeaderActor(); + Object found = canReturnLocalShardState && info.isLeader() ? + new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) : + new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion()); + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); + } - return found; - } + return found; }); return; @@ -952,7 +980,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param shardName * @return */ - private ShardIdentifier getShardIdentifier(String memberName, String shardName){ + private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName){ return peerAddressResolver.getShardIdentifier(memberName, shardName); } @@ -962,7 +990,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * */ private void createLocalShards() { - String memberName = this.cluster.getCurrentMemberName(); + MemberName memberName = this.cluster.getCurrentMemberName(); Collection memberShardNames = this.configuration.getMemberShardNames(memberName); Map shardSnapshots = new HashMap<>(); @@ -984,7 +1012,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot( shardSnapshots.get(shardName)), peerAddressResolver)); - mBean.addLocalShard(shardId.toString()); } } @@ -994,13 +1021,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param shardName */ private Map getPeerAddresses(String shardName) { - Collection members = configuration.getMembersFromShardName(shardName); + Collection members = configuration.getMembersFromShardName(shardName); Map peerAddresses = new HashMap<>(); - String currentMemberName = this.cluster.getCurrentMemberName(); + MemberName currentMemberName = this.cluster.getCurrentMemberName(); - for(String memberName : members) { - if(!currentMemberName.equals(memberName)) { + for (MemberName memberName : members) { + if (!currentMemberName.equals(memberName)) { ShardIdentifier shardId = getShardIdentifier(memberName, shardName); String address = peerAddressResolver.getShardActorAddress(shardName, memberName); peerAddresses.put(shardId.toString(), address); @@ -1013,13 +1040,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(10, Duration.create("1 minute"), - new Function() { - @Override - public SupervisorStrategy.Directive apply(Throwable t) { - LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t); - return SupervisorStrategy.resume(); - } - } + (Function) t -> { + LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t); + return SupervisorStrategy.resume(); + } ); } @@ -1038,7 +1062,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { if (shardReplicaOperationsInProgress.contains(shardName)) { String msg = String.format("A shard replica operation for %s is already in progress", shardName); LOG.debug ("{}: {}", persistenceId(), msg); - sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf()); + sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf()); return true; } @@ -1054,7 +1078,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { if (!(this.configuration.isShardConfigured(shardName))) { String msg = String.format("No module configuration exists for shard %s", shardName); LOG.debug ("{}: {}", persistenceId(), msg); - getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf()); + getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf()); return; } @@ -1063,14 +1087,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String msg = String.format( "No SchemaContext is available in order to create a local shard instance for %s", shardName); LOG.debug ("{}: {}", persistenceId(), msg); - getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf()); + getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf()); return; } findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { - getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()), getTargetActor()); } @Override @@ -1084,7 +1108,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) { String msg = String.format("Local shard %s already exists", shardName); LOG.debug ("{}: {}", persistenceId(), msg); - sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf()); + sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf()); } private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) { @@ -1154,7 +1178,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - sender.tell(new akka.actor.Status.Failure(message == null ? failure : + sender.tell(new Status.Failure(message == null ? failure : new RuntimeException(message, failure)), getSelf()); } @@ -1173,8 +1197,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInfo.setActiveMember(true); persistShardList(); - mBean.addLocalShard(shardInfo.getShardId().toString()); - sender.tell(new akka.actor.Status.Success(null), getSelf()); + sender.tell(new Status.Success(null), getSelf()); } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) { sendLocalReplicaAlreadyExistsReply(shardName, sender); } else { @@ -1219,12 +1242,16 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardReplicaMsg.getShardName(), persistenceId(), getSelf()) { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { - getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + doRemoveShardReplicaAsync(response.getPrimaryPath()); } @Override public void onLocalPrimaryFound(LocalPrimaryShardFound response) { - getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + doRemoveShardReplicaAsync(response.getPrimaryPath()); + } + + private void doRemoveShardReplicaAsync(final String primaryPath) { + getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender()), getTargetActor()); } }); } @@ -1250,7 +1277,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot); - String currentMember = cluster.getCurrentMemberName(); + final MemberName currentMember = cluster.getCurrentMemberName(); Set configuredShardList = new HashSet<>(configuration.getMemberShardNames(currentMember)); for (String shard : currentSnapshot.getShardList()) { @@ -1276,268 +1303,171 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { 0, 0)); } - private static class ForwardedAddServerReply { - ShardInformation shardInfo; - AddServerReply addServerReply; - String leaderPath; - boolean removeShardOnFailure; + private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) { + LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus); - ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath, - boolean removeShardOnFailure) { - this.shardInfo = shardInfo; - this.addServerReply = addServerReply; - this.leaderPath = leaderPath; - this.removeShardOnFailure = removeShardOnFailure; + String shardName = changeMembersVotingStatus.getShardName(); + Map serverVotingStatusMap = new HashMap<>(); + for(Entry e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) { + serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(), + e.getValue()); } - } - private static class ForwardedAddServerFailure { - String shardName; - String failureMessage; - Throwable failure; - boolean removeShardOnFailure; + ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap); - ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure, - boolean removeShardOnFailure) { - this.shardName = shardName; - this.failureMessage = failureMessage; - this.failure = failure; - this.removeShardOnFailure = removeShardOnFailure; - } + findLocalShard(shardName, getSender(), + localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName, + localShardFound.getPath(), getSender())); } - @VisibleForTesting - protected static class ShardInformation { - private final ShardIdentifier shardId; - private final String shardName; - private ActorRef actor; - private final Map initialPeerAddresses; - private Optional localShardDataTree; - private boolean leaderAvailable = false; - - // flag that determines if the actor is ready for business - private boolean actorInitialized = false; - - private boolean followerSyncStatus = false; - - private final Set onShardInitializedSet = Sets.newHashSet(); - private String role ; - private String leaderId; - private short leaderVersion; - - private DatastoreContext datastoreContext; - private Shard.AbstractBuilder builder; - private final ShardPeerAddressResolver addressResolver; - private boolean isActiveMember = true; + private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) { + LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus); - private ShardInformation(String shardName, ShardIdentifier shardId, - Map initialPeerAddresses, DatastoreContext datastoreContext, - Shard.AbstractBuilder builder, ShardPeerAddressResolver addressResolver) { - this.shardName = shardName; - this.shardId = shardId; - this.initialPeerAddresses = initialPeerAddresses; - this.datastoreContext = datastoreContext; - this.builder = builder; - this.addressResolver = addressResolver; - } - - Props newProps(SchemaContext schemaContext) { - Preconditions.checkNotNull(builder); - Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext). - schemaContext(schemaContext).props(); - builder = null; - return props; - } - - String getShardName() { - return shardName; - } + ActorRef sender = getSender(); + final String shardName = flipMembersVotingStatus.getShardName(); + findLocalShard(shardName, sender, localShardFound -> { + Future future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE, + Timeout.apply(30, TimeUnit.SECONDS)); - @Nullable - ActorRef getActor(){ - return actor; - } - - void setActor(ActorRef actor) { - this.actor = actor; - } - - ShardIdentifier getShardId() { - return shardId; - } - - void setLocalDataTree(Optional localShardDataTree) { - this.localShardDataTree = localShardDataTree; - } - - Optional getLocalShardDataTree() { - return localShardDataTree; - } - - DatastoreContext getDatastoreContext() { - return datastoreContext; - } + future.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + sender.tell(new Status.Failure(new RuntimeException( + String.format("Failed to access local shard %s", shardName), failure)), self()); + return; + } - void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) { - this.datastoreContext = datastoreContext; - if (actor != null) { - LOG.debug ("Sending new DatastoreContext to {}", shardId); - actor.tell(this.datastoreContext, sender); - } - } + OnDemandRaftState raftState = (OnDemandRaftState) response; + Map serverVotingStatusMap = new HashMap<>(); + for(Entry e: raftState.getPeerVotingStates().entrySet()) { + serverVotingStatusMap.put(e.getKey(), !e.getValue()); + } - void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){ - LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); + serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName). + toString(), !raftState.isVoting()); - if(actor != null) { - if(LOG.isDebugEnabled()) { - LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", - peerId, peerAddress, actor.path()); + changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap), + shardName, localShardFound.getPath(), sender); } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + }); - actor.tell(new PeerAddressResolved(peerId, peerAddress), sender); - } - - notifyOnShardInitializedCallbacks(); - } - - void peerDown(String memberName, String peerId, ActorRef sender) { - if(actor != null) { - actor.tell(new PeerDown(memberName, peerId), sender); - } - } - - void peerUp(String memberName, String peerId, ActorRef sender) { - if(actor != null) { - actor.tell(new PeerUp(memberName, peerId), sender); - } - } - - boolean isShardReady() { - return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role); - } - - boolean isShardReadyWithLeaderId() { - return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) && - (isLeader() || addressResolver.resolve(leaderId) != null); - } - - boolean isShardInitialized() { - return getActor() != null && actorInitialized; - } - - boolean isLeader() { - return Objects.equal(leaderId, shardId.toString()); - } - - String getSerializedLeaderActor() { - if(isLeader()) { - return Serialization.serializedActorPath(getActor()); - } else { - return addressResolver.resolve(leaderId); - } - } - - void setActorInitialized() { - LOG.debug("Shard {} is initialized", shardId); - - this.actorInitialized = true; - - notifyOnShardInitializedCallbacks(); - } - - private void notifyOnShardInitializedCallbacks() { - if(onShardInitializedSet.isEmpty()) { - return; - } - - boolean ready = isShardReadyWithLeaderId(); + } - if(LOG.isDebugEnabled()) { - LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId, - ready ? "ready" : "initialized", onShardInitializedSet.size()); - } + private void findLocalShard(final String shardName, final ActorRef sender, + final Consumer onLocalShardFound) { + Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext(). + getShardInitializationTimeout().duration().$times(2)); - Iterator iter = onShardInitializedSet.iterator(); - while(iter.hasNext()) { - OnShardInitialized onShardInitialized = iter.next(); - if(!(onShardInitialized instanceof OnShardReady) || ready) { - iter.remove(); - onShardInitialized.getTimeoutSchedule().cancel(); - onShardInitialized.getReplyRunnable().run(); + Future futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout); + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + LOG.debug ("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure); + sender.tell(new Status.Failure(new RuntimeException( + String.format("Failed to find local shard %s", shardName), failure)), self()); + } else { + if(response instanceof LocalShardFound) { + getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response), sender); + } else if(response instanceof LocalShardNotFound) { + String msg = String.format("Local shard %s does not exist", shardName); + LOG.debug ("{}: {}", persistenceId, msg); + sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self()); + } else { + String msg = String.format("Failed to find local shard %s: received response: %s", + shardName, response); + LOG.debug ("{}: {}", persistenceId, msg); + sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response : + new RuntimeException(msg)), self()); + } } } - } - - void addOnShardInitialized(OnShardInitialized onShardInitialized) { - onShardInitializedSet.add(onShardInitialized); - } - - void removeOnShardInitialized(OnShardInitialized onShardInitialized) { - onShardInitializedSet.remove(onShardInitialized); - } - - void setRole(String newRole) { - this.role = newRole; - - notifyOnShardInitializedCallbacks(); - } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } - void setFollowerSyncStatus(boolean syncStatus){ - this.followerSyncStatus = syncStatus; + private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus, + final String shardName, final ActorRef shardActorRef, final ActorRef sender) { + if(isShardReplicaOperationInProgress(shardName, sender)) { + return; } - boolean isInSync(){ - if(RaftState.Follower.name().equals(this.role)){ - return followerSyncStatus; - } else if(RaftState.Leader.name().equals(this.role)){ - return true; - } - - return false; - } + shardReplicaOperationsInProgress.add(shardName); - boolean setLeaderId(String leaderId) { - boolean changed = !Objects.equal(this.leaderId, leaderId); - this.leaderId = leaderId; - if(leaderId != null) { - this.leaderAvailable = true; - } - notifyOnShardInitializedCallbacks(); + DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); - return changed; - } + LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(), + changeServersVotingStatus, shardActorRef.path()); - String getLeaderId() { - return leaderId; - } + Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2)); + Future futureObj = ask(shardActorRef, changeServersVotingStatus, timeout); - void setLeaderAvailable(boolean leaderAvailable) { - this.leaderAvailable = leaderAvailable; + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + shardReplicaOperationsInProgress.remove(shardName); + if (failure != null) { + String msg = String.format("ChangeServersVotingStatus request to local shard %s failed", + shardActorRef.path()); + LOG.debug ("{}: {}", persistenceId(), msg, failure); + sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); + } else { + LOG.debug ("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path()); + + ServerChangeReply replyMsg = (ServerChangeReply) response; + if(replyMsg.getStatus() == ServerChangeStatus.OK) { + LOG.debug ("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName); + sender.tell(new Status.Success(null), getSelf()); + } else if(replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) { + sender.tell(new Status.Failure(new IllegalArgumentException(String.format( + "The requested voting state change for shard %s is invalid. At least one member must be voting", + shardId.getShardName()))), getSelf()); + } else { + LOG.warn ("{}: ChangeServersVotingStatus failed for shard {} with status {}", + persistenceId(), shardName, replyMsg.getStatus()); - if(leaderAvailable) { - notifyOnShardInitializedCallbacks(); + Exception error = getServerChangeException(ChangeServersVotingStatus.class, + replyMsg.getStatus(), shardActorRef.path().toString(), shardId); + sender.tell(new Status.Failure(error), getSelf()); + } + } } - } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } - short getLeaderVersion() { - return leaderVersion; - } + private static final class ForwardedAddServerReply { + ShardInformation shardInfo; + AddServerReply addServerReply; + String leaderPath; + boolean removeShardOnFailure; - void setLeaderVersion(short leaderVersion) { - this.leaderVersion = leaderVersion; + ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath, + boolean removeShardOnFailure) { + this.shardInfo = shardInfo; + this.addServerReply = addServerReply; + this.leaderPath = leaderPath; + this.removeShardOnFailure = removeShardOnFailure; } + } - boolean isActiveMember() { - return isActiveMember; - } + private static final class ForwardedAddServerFailure { + String shardName; + String failureMessage; + Throwable failure; + boolean removeShardOnFailure; - void setActiveMember(boolean isActiveMember) { - this.isActiveMember = isActiveMember; + ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure, + boolean removeShardOnFailure) { + this.shardName = shardName; + this.failureMessage = failureMessage; + this.failure = failure; + this.removeShardOnFailure = removeShardOnFailure; } } - private static class OnShardInitialized { + static class OnShardInitialized { private final Runnable replyRunnable; private Cancellable timeoutSchedule; @@ -1558,36 +1488,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private static class OnShardReady extends OnShardInitialized { + static class OnShardReady extends OnShardInitialized { OnShardReady(Runnable replyRunnable) { super(replyRunnable); } } - private static class ShardNotInitializedTimeout { - private final ActorRef sender; - private final ShardInformation shardInfo; - private final OnShardInitialized onShardInitialized; - - ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) { - this.sender = sender; - this.shardInfo = shardInfo; - this.onShardInitialized = onShardInitialized; - } - - ActorRef getSender() { - return sender; - } - - ShardInformation getShardInfo() { - return shardInfo; - } - - OnShardInitialized getOnShardInitialized() { - return onShardInitialized; - } - } - private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) { Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext(). getShardInitializationTimeout().duration().$times(2)); @@ -1612,6 +1518,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); } + private static interface RunnableMessage extends Runnable { + } + /** * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the * a remote or local find primary message is processed @@ -1679,7 +1588,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void onFailure(Throwable failure) { LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure); - targetActor.tell(new akka.actor.Status.Failure(new RuntimeException( + targetActor.tell(new Status.Failure(new RuntimeException( String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor); } @@ -1688,67 +1597,20 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String msg = String.format("Failed to find leader for shard %s: received response: %s", shardName, response); LOG.debug ("{}: {}", persistenceId, msg); - targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response : + targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response : new RuntimeException(msg)), shardManagerActor); } } - - /** - * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be - * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received - * as a successful response to find primary. - */ - private static class PrimaryShardFoundForContext { - private final String shardName; - private final Object contextMessage; - private final RemotePrimaryShardFound remotePrimaryShardFound; - private final LocalPrimaryShardFound localPrimaryShardFound; - - public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, - @Nonnull Object primaryFoundMessage) { - this.shardName = Preconditions.checkNotNull(shardName); - this.contextMessage = Preconditions.checkNotNull(contextMessage); - Preconditions.checkNotNull(primaryFoundMessage); - this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? - (RemotePrimaryShardFound) primaryFoundMessage : null; - this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? - (LocalPrimaryShardFound) primaryFoundMessage : null; - } - - @Nonnull - String getPrimaryPath(){ - if(remotePrimaryShardFound != null) { - return remotePrimaryShardFound.getPrimaryPath(); - } - return localPrimaryShardFound.getPrimaryPath(); - } - - @Nonnull - Object getContextMessage() { - return contextMessage; - } - - @Nullable - RemotePrimaryShardFound getRemotePrimaryShardFound() { - return remotePrimaryShardFound; - } - - @Nonnull - String getShardName() { - return shardName; - } - } - /** * The WrappedShardResponse class wraps a response from a Shard. */ - private static class WrappedShardResponse { + private static final class WrappedShardResponse { private final ShardIdentifier shardId; private final Object response; private final String leaderPath; - private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) { + WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) { this.shardId = shardId; this.response = response; this.leaderPath = leaderPath; @@ -1766,6 +1628,30 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return leaderPath; } } + + private static final class ShardNotInitializedTimeout { + private final ActorRef sender; + private final ShardInformation shardInfo; + private final OnShardInitialized onShardInitialized; + + ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) { + this.sender = sender; + this.shardInfo = shardInfo; + this.onShardInitialized = onShardInitialized; + } + + ActorRef getSender() { + return sender; + } + + ShardInformation getShardInfo() { + return shardInfo; + } + + OnShardInitialized getOnShardInitialized() { + return onShardInitialized; + } + } }