X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=33b3810447532741a30e2e0f7084888646346a6a;hp=fabfd096104dcab91ee6a4ffabd5758a50466e64;hb=7161b121b21aeea325fe33485c841af31f9f0cfd;hpb=733636ec5f1b4caecd130a6a26f6d196af6ff854 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index fabfd09610..33b3810447 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -9,21 +9,24 @@ package org.opendaylight.controller.cluster.datastore; import static akka.pattern.Patterns.ask; -import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.actor.Status; import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; +import akka.dispatch.Futures; import akka.dispatch.OnComplete; import akka.japi.Function; +import akka.pattern.Patterns; import akka.persistence.RecoveryCompleted; import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; +import akka.persistence.SnapshotSelectionCriteria; import akka.serialization.Serialization; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; @@ -33,6 +36,8 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.Sets; +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -48,6 +53,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; @@ -84,8 +90,11 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.RemoveServer; +import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -93,6 +102,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -139,6 +149,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreSnapshot restoreFromSnapshot; + private ShardManagerSnapshot currentSnapshot; + private final Set shardReplicaOperationsInProgress = new HashSet<>(); private final String persistenceId; @@ -150,7 +162,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.cluster = builder.cluster; this.configuration = builder.configuration; this.datastoreContextFactory = builder.datastoreContextFactory; - this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType(); + this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch; @@ -175,7 +187,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void postStop() { - LOG.info("Stopping ShardManager"); + LOG.info("Stopping ShardManager {}", persistenceId()); mBean.unregisterMBean(); } @@ -226,26 +238,142 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof PrimaryShardFoundForContext) { PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message; onPrimaryShardFoundContext(primaryShardFoundContext); - } else if(message instanceof RemoveShardReplica){ - onRemoveShardReplica((RemoveShardReplica)message); + } else if(message instanceof RemoveShardReplica) { + onRemoveShardReplica((RemoveShardReplica) message); + } else if(message instanceof WrappedShardResponse){ + onWrappedShardResponse((WrappedShardResponse) message); } else if(message instanceof GetSnapshot) { onGetSnapshot(); } else if(message instanceof ServerRemoved){ onShardReplicaRemoved((ServerRemoved) message); - } else if (message instanceof SaveSnapshotSuccess) { - LOG.debug("{} saved ShardManager snapshot successfully", persistenceId()); - } else if (message instanceof SaveSnapshotFailure) { + } else if(message instanceof SaveSnapshotSuccess) { + onSaveSnapshotSuccess((SaveSnapshotSuccess)message); + } else if(message instanceof SaveSnapshotFailure) { LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(), ((SaveSnapshotFailure) message).cause()); + } else if(message instanceof Shutdown) { + onShutDown(); } else { unknownMessage(message); } } + private void onShutDown() { + List> stopFutures = new ArrayList<>(localShards.size()); + for (ShardInformation info : localShards.values()) { + if (info.getActor() != null) { + LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId()); + + FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2); + stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE)); + } + } + + LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size()); + + ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client); + Future> combinedFutures = Futures.sequence(stopFutures, dispatcher); + + combinedFutures.onComplete(new OnComplete>() { + @Override + public void onComplete(Throwable failure, Iterable results) { + LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId()); + + self().tell(PoisonPill.getInstance(), self()); + + if(failure != null) { + LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure); + } else { + int nfailed = 0; + for(Boolean r: results) { + if(!r) { + nfailed++; + } + } + + if(nfailed > 0) { + LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed); + } + } + } + }, dispatcher); + } + + private void onWrappedShardResponse(WrappedShardResponse message) { + if (message.getResponse() instanceof RemoveServerReply) { + onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(), + message.getLeaderPath()); + } + } + + private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg, + String leaderPath) { + shardReplicaOperationsInProgress.remove(shardId); + + LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName()); + + if (replyMsg.getStatus() == ServerChangeStatus.OK) { + LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(), + shardId.getShardName()); + originalSender.tell(new akka.actor.Status.Success(null), getSelf()); + } else { + LOG.warn ("{}: Leader failed to remove shard replica {} with status {}", + persistenceId(), shardId, replyMsg.getStatus()); + + Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), + leaderPath, shardId); + originalSender.tell(new akka.actor.Status.Failure(failure), getSelf()); + } + } + private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) { if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) { - addShard(primaryShardFoundContext.shardName, primaryShardFoundContext.getRemotePrimaryShardFound(), getSender()); + addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(), + getSender()); + } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){ + removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(), + primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender()); + } + } + + private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath, + final ActorRef sender) { + if(isShardReplicaOperationInProgress(shardName, sender)) { + return; } + + shardReplicaOperationsInProgress.add(shardName); + + final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName); + + final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); + + //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message + LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(), + primaryPath, shardId); + + Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout(). + duration()); + Future futureObj = ask(getContext().actorSelection(primaryPath), + new RemoveServer(shardId.toString()), removeServerTimeout); + + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + String msg = String.format("RemoveServer request to leader %s for shard %s failed", + primaryPath, shardName); + + LOG.debug ("{}: {}", persistenceId(), msg, failure); + + // FAILURE + sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); + } else { + // SUCCESS + self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender); + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); } private void onShardReplicaRemoved(ServerRemoved message) { @@ -255,8 +383,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString()); return; } else if(shardInformation.getActor() != null) { - LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor()); - shardInformation.getActor().tell(PoisonPill.getInstance(), self()); + LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor()); + shardInformation.getActor().tell(Shutdown.INSTANCE, self()); } LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName()); persistShardList(); @@ -283,6 +411,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } byte[] shardManagerSnapshot = null; + if(currentSnapshot != null) { + shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot); + } + ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props( new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(), datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration())); @@ -293,17 +425,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void onCreateShard(CreateShard createShard) { + LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard); + Object reply; try { String shardName = createShard.getModuleShardConfig().getShardName(); if(localShards.containsKey(shardName)) { + LOG.debug("{}: Shard {} already exists", persistenceId(), shardName); reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName)); } else { doCreateShard(createShard); reply = new akka.actor.Status.Success(null); } } catch (Exception e) { - LOG.error("onCreateShard failed", e); + LOG.error("{}: onCreateShard failed", persistenceId(), e); reply = new akka.actor.Status.Failure(e); } @@ -328,13 +463,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); + boolean shardWasInRecoveredSnapshot = currentSnapshot != null && + currentSnapshot.getShardList().contains(shardName); + Map peerAddresses; boolean isActiveMember; - if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) { + if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName). + contains(cluster.getCurrentMemberName())) { peerAddresses = getPeerAddresses(shardName); isActiveMember = true; } else { - // The local member is not in the given shard member configuration. In this case we'll create + // The local member is not in the static shard member configuration and the shard did not + // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create // the shard with no peers and with elections disabled so it stays as follower. A // subsequent AddServer request will be needed to make it an active member. isActiveMember = false; @@ -343,8 +483,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build(); } - LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId, - moduleShardConfig.getShardMemberNames(), peerAddresses); + LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}", + persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses, + isActiveMember); ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses, shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver); @@ -499,15 +640,34 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override protected void handleRecover(Object message) throws Exception { if (message instanceof RecoveryCompleted) { - LOG.info("Recovery complete : {}", persistenceId()); - - // We no longer persist SchemaContext modules so delete all the prior messages from the akka - // journal on upgrade from Helium. - deleteMessages(lastSequenceNr()); - createLocalShards(); + onRecoveryCompleted(); } else if (message instanceof SnapshotOffer) { - handleShardRecovery((SnapshotOffer) message); + applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot()); + } + } + + private void onRecoveryCompleted() { + LOG.info("Recovery complete : {}", persistenceId()); + + // We no longer persist SchemaContext modules so delete all the prior messages from the akka + // journal on upgrade from Helium. + deleteMessages(lastSequenceNr()); + + if(currentSnapshot == null && restoreFromSnapshot != null && + restoreFromSnapshot.getShardManagerSnapshot() != null) { + try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream( + restoreFromSnapshot.getShardManagerSnapshot()))) { + ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject(); + + LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot); + + applyShardManagerSnapshot(snapshot); + } catch(Exception e) { + LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e); + } } + + createLocalShards(); } private void findLocalShard(FindLocalShard message) { @@ -589,7 +749,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void memberRemoved(ClusterEvent.MemberRemoved message) { - String memberName = message.member().roles().head(); + String memberName = message.member().roles().iterator().next(); LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); @@ -602,7 +762,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void memberExited(ClusterEvent.MemberExited message) { - String memberName = message.member().roles().head(); + String memberName = message.member().roles().iterator().next(); LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); @@ -615,7 +775,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void memberUp(ClusterEvent.MemberUp message) { - String memberName = message.member().roles().head(); + String memberName = message.member().roles().iterator().next(); LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, message.member().address()); @@ -638,7 +798,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void memberReachable(ClusterEvent.ReachableMember message) { - String memberName = message.member().roles().head(); + String memberName = message.member().roles().iterator().next(); LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); addPeerAddress(memberName, message.member().address()); @@ -647,7 +807,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void memberUnreachable(ClusterEvent.UnreachableMember message) { - String memberName = message.member().roles().head(); + String memberName = message.member().roles().iterator().next(); LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address()); markMemberUnavailable(memberName); @@ -693,7 +853,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { if(shardInformation != null && shardInformation.getActor() != null) { shardInformation.getActor().tell( - new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf()); + new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf()); } else { LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available", message.getShardName(), message.getNewState()); @@ -759,12 +919,25 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } + Collection visitedAddresses; + if(message instanceof RemoteFindPrimary) { + visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses(); + } else { + visitedAddresses = new ArrayList<>(); + } + + visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString()); + for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) { - LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(), - shardName, address); + if(visitedAddresses.contains(address)) { + continue; + } + + LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}", + persistenceId(), shardName, address, visitedAddresses); getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName, - message.isWaitUntilReady()), getContext()); + message.isWaitUntilReady(), visitedAddresses), getContext()); return; } @@ -807,6 +980,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); + + LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId); + Map peerAddresses = getPeerAddresses(shardName); localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot( @@ -898,7 +1074,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); - } @Override @@ -1009,40 +1184,52 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.warn ("{}: Leader failed to add shard replica {} with status {}", persistenceId(), shardName, replyMsg.getStatus()); - Exception failure; - switch (replyMsg.getStatus()) { - case TIMEOUT: - failure = new TimeoutException(String.format( - "The shard leader %s timed out trying to replicate the initial data to the new shard %s." + - "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data", - leaderPath, shardName)); - break; - case NO_LEADER: - failure = createNoShardLeaderException(shardInfo.getShardId()); - break; - default : - failure = new RuntimeException(String.format( - "AddServer request to leader %s for shard %s failed with status %s", - leaderPath, shardName, replyMsg.getStatus())); - } + Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId()); onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure); } } - private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) { - String shardName = shardReplicaMsg.getShardName(); - - // verify the local shard replica is available in the controller node - if (!localShards.containsKey(shardName)) { - String msg = String.format("Local shard %s does not", shardName); - LOG.debug ("{}: {}", persistenceId(), msg); - getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf()); - return; + private static Exception getServerChangeException(Class serverChange, ServerChangeStatus serverChangeStatus, + String leaderPath, ShardIdentifier shardId) { + Exception failure; + switch (serverChangeStatus) { + case TIMEOUT: + failure = new TimeoutException(String.format( + "The shard leader %s timed out trying to replicate the initial data to the new shard %s." + + "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data", + leaderPath, shardId.getShardName())); + break; + case NO_LEADER: + failure = createNoShardLeaderException(shardId); + break; + case NOT_SUPPORTED: + failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s", + serverChange.getSimpleName(), shardId.getShardName())); + break; + default : + failure = new RuntimeException(String.format( + "%s request to leader %s for shard %s failed with status %s", + serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus)); } - // call RemoveShard for the shardName - getSender().tell(new akka.actor.Status.Success(true), getSelf()); - return; + return failure; + } + + private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) { + LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg); + + findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(), + shardReplicaMsg.getShardName(), persistenceId(), getSelf()) { + @Override + public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { + getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + } + + @Override + public void onLocalPrimaryFound(LocalPrimaryShardFound response) { + getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + } + }); } private void persistShardList() { @@ -1053,16 +1240,23 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList); - saveSnapshot(new ShardManagerSnapshot(shardList)); + saveSnapshot(updateShardManagerSnapshot(shardList)); } - private void handleShardRecovery(SnapshotOffer offer) { - LOG.debug ("{}: in handleShardRecovery", persistenceId()); - ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot(); + private ShardManagerSnapshot updateShardManagerSnapshot(List shardList) { + currentSnapshot = new ShardManagerSnapshot(shardList); + return currentSnapshot; + } + + private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) { + currentSnapshot = snapshot; + + LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot); + String currentMember = cluster.getCurrentMemberName(); Set configuredShardList = new HashSet<>(configuration.getMemberShardNames(currentMember)); - for (String shard : snapshot.getShardList()) { + for (String shard : currentSnapshot.getShardList()) { if (!configuredShardList.contains(shard)) { // add the current member as a replica for the shard LOG.debug ("{}: adding shard {}", persistenceId(), shard); @@ -1078,6 +1272,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) { + LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available", + persistenceId()); + deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1, + 0, 0)); + } + private static class ForwardedAddServerReply { ShardInformation shardInfo; AddServerReply addServerReply; @@ -1113,7 +1314,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final ShardIdentifier shardId; private final String shardName; private ActorRef actor; - private ActorPath actorPath; private final Map initialPeerAddresses; private Optional localShardDataTree; private boolean leaderAvailable = false; @@ -1161,13 +1361,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return actor; } - ActorPath getActorPath() { - return actorPath; - } - void setActor(ActorRef actor) { this.actor = actor; - this.actorPath = actor.path(); } ShardIdentifier getShardId() { @@ -1322,6 +1517,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { void setLeaderAvailable(boolean leaderAvailable) { this.leaderAvailable = leaderAvailable; + + if(leaderAvailable) { + notifyOnShardInitializedCallbacks(); + } } short getLeaderVersion() { @@ -1575,10 +1774,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return shardName; } - public ActorRef getShardManagerActor() { - return shardManagerActor; - } - @Override public void onFailure(Throwable failure) { LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure); @@ -1608,44 +1803,65 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final RemotePrimaryShardFound remotePrimaryShardFound; private final LocalPrimaryShardFound localPrimaryShardFound; - public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, @Nonnull Object primaryFoundMessage) { + public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, + @Nonnull Object primaryFoundMessage) { this.shardName = Preconditions.checkNotNull(shardName); this.contextMessage = Preconditions.checkNotNull(contextMessage); Preconditions.checkNotNull(primaryFoundMessage); - this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? (RemotePrimaryShardFound) primaryFoundMessage : null; - this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? (LocalPrimaryShardFound) primaryFoundMessage : null; + this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? + (RemotePrimaryShardFound) primaryFoundMessage : null; + this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? + (LocalPrimaryShardFound) primaryFoundMessage : null; } @Nonnull - public String getPrimaryPath(){ - if(remotePrimaryShardFound != null){ + String getPrimaryPath(){ + if(remotePrimaryShardFound != null) { return remotePrimaryShardFound.getPrimaryPath(); } return localPrimaryShardFound.getPrimaryPath(); } @Nonnull - public Object getContextMessage() { + Object getContextMessage() { return contextMessage; } @Nullable - public RemotePrimaryShardFound getRemotePrimaryShardFound(){ + RemotePrimaryShardFound getRemotePrimaryShardFound() { return remotePrimaryShardFound; } - @Nullable - public LocalPrimaryShardFound getLocalPrimaryShardFound(){ - return localPrimaryShardFound; + @Nonnull + String getShardName() { + return shardName; } + } + + /** + * The WrappedShardResponse class wraps a response from a Shard. + */ + private static class WrappedShardResponse { + private final ShardIdentifier shardId; + private final Object response; + private final String leaderPath; - boolean isPrimaryLocal(){ - return (remotePrimaryShardFound == null); + private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) { + this.shardId = shardId; + this.response = response; + this.leaderPath = leaderPath; } - @Nonnull - public String getShardName() { - return shardName; + ShardIdentifier getShardId() { + return shardId; + } + + Object getResponse() { + return response; + } + + String getLeaderPath() { + return leaderPath; } } }