X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=bbac0e3e25da0cac560b26fe639a50a250d743a9;hp=9bb2ea8f79d2773124a4fde1b35e865897e21e72;hb=769ef0f950f2ed6cfc14d274e6a8edc583a36a96;hpb=e970feb618c5e3793454f7f1a3974797a61c7c17 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 9bb2ea8f79..bbac0e3e25 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -19,9 +19,11 @@ import akka.actor.Props; import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; import akka.dispatch.OnComplete; -import akka.japi.Creator; import akka.japi.Function; import akka.persistence.RecoveryCompleted; +import akka.persistence.SaveSnapshotFailure; +import akka.persistence.SaveSnapshotSuccess; +import akka.persistence.SnapshotOffer; import akka.serialization.Serialization; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; @@ -34,16 +36,20 @@ import com.google.common.collect.Sets; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; +import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; @@ -53,7 +59,7 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.Sha import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; -import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; @@ -116,7 +122,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final String shardDispatcherPath; - private ShardManagerInfo mBean; + private final ShardManagerInfo mBean; private DatastoreContextFactory datastoreContextFactory; @@ -128,43 +134,40 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private SchemaContext schemaContext; + private DatastoreSnapshot restoreFromSnapshot; + + private final Set shardReplicaOperationsInProgress = new HashSet<>(); + + private final String persistenceId; + /** */ - protected ShardManager(ClusterWrapper cluster, Configuration configuration, - DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch, - PrimaryShardInfoFutureCache primaryShardInfoCache) { - - this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); - this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); - this.datastoreContextFactory = datastoreContextFactory; - this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreType(); + protected ShardManager(Builder builder) { + + this.cluster = builder.cluster; + this.configuration = builder.configuration; + this.datastoreContextFactory = builder.datastoreContextFactory; + this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); - this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; - this.primaryShardInfoCache = primaryShardInfoCache; + this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch; + this.primaryShardInfoCache = builder.primaryShardInfoCache; + this.restoreFromSnapshot = builder.restoreFromSnapshot; + + String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId(); + persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type; peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName()); // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); - createLocalShards(); - } - - public static Props props( - final ClusterWrapper cluster, - final Configuration configuration, - final DatastoreContextFactory datastoreContextFactory, - final CountDownLatch waitTillReadyCountdownLatch, - final PrimaryShardInfoFutureCache primaryShardInfoCache) { - - Preconditions.checkNotNull(cluster, "cluster should not be null"); - Preconditions.checkNotNull(configuration, "configuration should not be null"); - Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null"); - Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null"); - - return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContextFactory, - waitTillReadyCountdownLatch, primaryShardInfoCache)); + List localShardActorNames = new ArrayList<>(); + mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(), + "shard-manager-" + this.type, + datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), + localShardActorNames); + mBean.setShardManager(this); } @Override @@ -210,14 +213,28 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onCreateShard((CreateShard)message); } else if(message instanceof AddShardReplica){ onAddShardReplica((AddShardReplica)message); + } else if(message instanceof ForwardedAddServerReply) { + ForwardedAddServerReply msg = (ForwardedAddServerReply)message; + onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, + msg.removeShardOnFailure); + } else if(message instanceof ForwardedAddServerFailure) { + ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message; + onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); + } else if(message instanceof ForwardedAddServerPrimaryShardFound) { + ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message; + addShard(msg.shardName, msg.primaryFound, getSender()); } else if(message instanceof RemoveShardReplica){ onRemoveShardReplica((RemoveShardReplica)message); } else if(message instanceof GetSnapshot) { onGetSnapshot(); + } else if (message instanceof SaveSnapshotSuccess) { + LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId()); + } else if (message instanceof SaveSnapshotFailure) { + LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards", + persistenceId(), ((SaveSnapshotFailure)message).cause()); } else { unknownMessage(message); } - } private void onGetSnapshot() { @@ -253,47 +270,66 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onCreateShard(CreateShard createShard) { Object reply; try { - ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig(); - if(localShards.containsKey(moduleShardConfig.getShardName())) { - throw new IllegalStateException(String.format("Shard with name %s already exists", - moduleShardConfig.getShardName())); + String shardName = createShard.getModuleShardConfig().getShardName(); + if(localShards.containsKey(shardName)) { + reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName)); + } else { + doCreateShard(createShard); + reply = new akka.actor.Status.Success(null); } + } catch (Exception e) { + LOG.error("onCreateShard failed", e); + reply = new akka.actor.Status.Failure(e); + } - configuration.addModuleShardConfiguration(moduleShardConfig); + if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) { + getSender().tell(reply, getSelf()); + } + } - ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName()); - Map peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*, - moduleShardConfig.getShardMemberNames()*/); + private void doCreateShard(CreateShard createShard) { + ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig(); + String shardName = moduleShardConfig.getShardName(); - LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId, - moduleShardConfig.getShardMemberNames(), peerAddresses); + configuration.addModuleShardConfiguration(moduleShardConfig); - DatastoreContext shardDatastoreContext = createShard.getDatastoreContext(); - if(shardDatastoreContext == null) { - shardDatastoreContext = newShardDatastoreContext(moduleShardConfig.getShardName()); - } else { - shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver( - peerAddressResolver).build(); - } + DatastoreContext shardDatastoreContext = createShard.getDatastoreContext(); + if(shardDatastoreContext == null) { + shardDatastoreContext = newShardDatastoreContext(shardName); + } else { + shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver( + peerAddressResolver).build(); + } - ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses, - shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver); - localShards.put(info.getShardName(), info); + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); - mBean.addLocalShard(shardId.toString()); + Map peerAddresses; + boolean isActiveMember; + if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) { + peerAddresses = getPeerAddresses(shardName); + isActiveMember = true; + } else { + // The local member is not in the given shard member configuration. In this case we'll create + // the shard with no peers and with elections disabled so it stays as follower. A + // subsequent AddServer request will be needed to make it an active member. + isActiveMember = false; + peerAddresses = Collections.emptyMap(); + shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext). + customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build(); + } - if(schemaContext != null) { - info.setActor(newShardActor(schemaContext, info)); - } + LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId, + moduleShardConfig.getShardMemberNames(), peerAddresses); - reply = new CreateShardReply(); - } catch (Exception e) { - LOG.error("onCreateShard failed", e); - reply = new akka.actor.Status.Failure(e); - } + ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses, + shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver); + info.setActiveMember(isActiveMember); + localShards.put(info.getShardName(), info); - if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) { - getSender().tell(reply, getSelf()); + mBean.addLocalShard(shardId.toString()); + + if(schemaContext != null) { + info.setActor(newShardActor(schemaContext, info)); } } @@ -443,6 +479,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // We no longer persist SchemaContext modules so delete all the prior messages from the akka // journal on upgrade from Helium. deleteMessages(lastSequenceNr()); + createLocalShards(); + } else if (message instanceof SnapshotOffer) { + handleShardRecovery((SnapshotOffer) message); } } @@ -675,7 +714,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // First see if the there is a local replica for the shard final ShardInformation info = localShards.get(shardName); - if (info != null) { + if (info != null && info.isActiveMember()) { sendResponse(info, message.isWaitUntilReady(), true, new Supplier() { @Override public Object get() { @@ -731,19 +770,24 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { String memberName = this.cluster.getCurrentMemberName(); Collection memberShardNames = this.configuration.getMemberShardNames(memberName); - List localShardActorNames = new ArrayList<>(); + Map shardSnapshots = new HashMap<>(); + if(restoreFromSnapshot != null) + { + for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) { + shardSnapshots.put(snapshot.getName(), snapshot); + } + } + + restoreFromSnapshot = null; // null out to GC + for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); Map peerAddresses = getPeerAddresses(shardName); - localShardActorNames.add(shardId.toString()); localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, - newShardDatastoreContext(shardName), Shard.builder(), peerAddressResolver)); + newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot( + shardSnapshots.get(shardName)), peerAddressResolver)); + mBean.addLocalShard(shardId.toString()); } - - mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type, - datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), localShardActorNames); - - mBean.setShardManager(this); } /** @@ -784,7 +828,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public String persistenceId() { - return "shard-manager-" + type; + return persistenceId; } @VisibleForTesting @@ -792,21 +836,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return mBean; } - private void checkLocalShardExists(final String shardName, final ActorRef sender) { - if (localShards.containsKey(shardName)) { - String msg = String.format("Local shard %s already exists", shardName); + private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) { + if (shardReplicaOperationsInProgress.contains(shardName)) { + String msg = String.format("A shard replica operation for %s is already in progress", shardName); LOG.debug ("{}: {}", persistenceId(), msg); - sender.tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf()); + sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf()); + return true; } + + return false; } private void onAddShardReplica (AddShardReplica shardReplicaMsg) { final String shardName = shardReplicaMsg.getShardName(); - // verify the local shard replica is already available in the controller node - LOG.debug ("onAddShardReplica: {}", shardReplicaMsg); - - checkLocalShardExists(shardName, getSender()); + LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg); // verify the shard with the specified name is present in the cluster configuration if (!(this.configuration.isShardConfigured(shardName))) { @@ -825,65 +869,79 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - Map peerAddresses = getPeerAddresses(shardName); - if (peerAddresses.isEmpty()) { - String msg = String.format("Cannot add replica for shard %s because no peer is available", shardName); - LOG.debug ("{}: {}", persistenceId(), msg); - getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf()); - return; - } - Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext(). getShardInitializationTimeout().duration().$times(2)); final ActorRef sender = getSender(); - Future futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout); + Future futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout); futureObj.onComplete(new OnComplete() { @Override public void onComplete(Throwable failure, Object response) { if (failure != null) { LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure); sender.tell(new akka.actor.Status.Failure(new RuntimeException( - String.format("Failed to find leader for shard %s", shardName), failure)), - getSelf()); + String.format("Failed to find leader for shard %s", shardName), failure)), getSelf()); } else { - if (!(response instanceof RemotePrimaryShardFound)) { + if(response instanceof RemotePrimaryShardFound) { + self().tell(new ForwardedAddServerPrimaryShardFound(shardName, + (RemotePrimaryShardFound)response), sender); + } else if(response instanceof LocalPrimaryShardFound) { + sendLocalReplicaAlreadyExistsReply(shardName, sender); + } else { String msg = String.format("Failed to find leader for shard %s: received response: %s", shardName, response); LOG.debug ("{}: {}", persistenceId(), msg); - sender.tell(new akka.actor.Status.Failure(new RuntimeException(msg)), getSelf()); - return; + sender.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable)response : + new RuntimeException(msg)), getSelf()); } - - RemotePrimaryShardFound message = (RemotePrimaryShardFound)response; - addShard (shardName, message, sender); } } }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); } + private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) { + String msg = String.format("Local shard %s already exists", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf()); + } + private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) { - checkLocalShardExists(shardName, sender); + if(isShardReplicaOperationInProgress(shardName, sender)) { + return; + } - ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); - String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName()); + shardReplicaOperationsInProgress.add(shardName); + + final ShardInformation shardInfo; + final boolean removeShardOnFailure; + ShardInformation existingShardInfo = localShards.get(shardName); + if(existingShardInfo == null) { + removeShardOnFailure = true; + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); - DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation( - DisableElectionsRaftPolicy.class.getName()).build(); + DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation( + DisableElectionsRaftPolicy.class.getName()).build(); + + shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext, + Shard.builder(), peerAddressResolver); + shardInfo.setActiveMember(false); + localShards.put(shardName, shardInfo); + shardInfo.setActor(newShardActor(schemaContext, shardInfo)); + } else { + removeShardOnFailure = false; + shardInfo = existingShardInfo; + } - final ShardInformation shardInfo = new ShardInformation(shardName, shardId, - getPeerAddresses(shardName), datastoreContext, - Shard.builder(), peerAddressResolver); - localShards.put(shardName, shardInfo); - shardInfo.setActor(newShardActor(schemaContext, shardInfo)); + String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName()); //inform ShardLeader to add this shard as a replica by sending an AddServer message LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(), - response.getPrimaryPath(), shardId); + response.getPrimaryPath(), shardInfo.getShardId()); - Timeout addServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(4)); + Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout(). + duration()); Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), - new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout); + new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout); futureObj.onComplete(new OnComplete() { @Override @@ -892,27 +950,37 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(), response.getPrimaryPath(), shardName, failure); - // Remove the shard - localShards.remove(shardName); - if (shardInfo.getActor() != null) { - shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); - } - - sender.tell(new akka.actor.Status.Failure(new RuntimeException( - String.format("AddServer request to leader %s for shard %s failed", - response.getPrimaryPath(), shardName), failure)), getSelf()); + String msg = String.format("AddServer request to leader %s for shard %s failed", + response.getPrimaryPath(), shardName); + self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender); } else { - AddServerReply reply = (AddServerReply)addServerResponse; - onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath()); + self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse, + response.getPrimaryPath(), removeShardOnFailure), sender); } } - }, new Dispatchers(context().system().dispatchers()). - getDispatcher(Dispatchers.DispatcherType.Client)); - return; + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); } - private void onAddServerReply (String shardName, ShardInformation shardInfo, - AddServerReply replyMsg, ActorRef sender, String leaderPath) { + private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender, + boolean removeShardOnFailure) { + shardReplicaOperationsInProgress.remove(shardName); + + if(removeShardOnFailure) { + ShardInformation shardInfo = localShards.remove(shardName); + if (shardInfo.getActor() != null) { + shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); + } + } + + sender.tell(new akka.actor.Status.Failure(message == null ? failure : + new RuntimeException(message, failure)), getSelf()); + } + + private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender, + String leaderPath, boolean removeShardOnFailure) { + String shardName = shardInfo.getShardName(); + shardReplicaOperationsInProgress.remove(shardName); + LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath); if (replyMsg.getStatus() == ServerChangeStatus.OK) { @@ -920,33 +988,35 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // Make the local shard voting capable shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf()); + shardInfo.setActiveMember(true); + persistShardList(); mBean.addLocalShard(shardInfo.getShardId().toString()); - sender.tell(new akka.actor.Status.Success(true), getSelf()); + sender.tell(new akka.actor.Status.Success(null), getSelf()); + } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) { + sendLocalReplicaAlreadyExistsReply(shardName, sender); } else { - LOG.warn ("{}: Leader failed to add shard replica {} with status {} - removing the local shard", + LOG.warn ("{}: Leader failed to add shard replica {} with status {}", persistenceId(), shardName, replyMsg.getStatus()); - //remove the local replica created - localShards.remove(shardName); - if (shardInfo.getActor() != null) { - shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); - } + Exception failure; switch (replyMsg.getStatus()) { case TIMEOUT: - sender.tell(new akka.actor.Status.Failure(new RuntimeException( - String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data", - leaderPath, shardName))), getSelf()); + failure = new TimeoutException(String.format( + "The shard leader %s timed out trying to replicate the initial data to the new shard %s." + + "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data", + leaderPath, shardName)); break; case NO_LEADER: - sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format( - "There is no shard leader available for shard %s", shardName))), getSelf()); + failure = createNoShardLeaderException(shardInfo.getShardId()); break; default : - sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format( - "AddServer request to leader %s for shard %s failed with status %s", - leaderPath, shardName, replyMsg.getStatus()))), getSelf()); + failure = new RuntimeException(String.format( + "AddServer request to leader %s for shard %s failed with status %s", + leaderPath, shardName, replyMsg.getStatus())); } + + onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure); } } @@ -965,6 +1035,79 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } + private void persistShardList() { + List shardList = new ArrayList<>(localShards.keySet()); + for (ShardInformation shardInfo : localShards.values()) { + if (!shardInfo.isActiveMember()) { + shardList.remove(shardInfo.getShardName()); + } + } + LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList); + saveSnapshot(new ShardManagerSnapshot(shardList)); + } + + private void handleShardRecovery(SnapshotOffer offer) { + LOG.debug ("{}: in handleShardRecovery", persistenceId()); + ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot(); + String currentMember = cluster.getCurrentMemberName(); + Set configuredShardList = + new HashSet<>(configuration.getMemberShardNames(currentMember)); + for (String shard : snapshot.getShardList()) { + if (!configuredShardList.contains(shard)) { + // add the current member as a replica for the shard + LOG.debug ("{}: adding shard {}", persistenceId(), shard); + configuration.addMemberReplicaForShard(shard, currentMember); + } else { + configuredShardList.remove(shard); + } + } + for (String shard : configuredShardList) { + // remove the member as a replica for the shard + LOG.debug ("{}: removing shard {}", persistenceId(), shard); + configuration.removeMemberReplicaForShard(shard, currentMember); + } + } + + private static class ForwardedAddServerPrimaryShardFound { + String shardName; + RemotePrimaryShardFound primaryFound; + + ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) { + this.shardName = shardName; + this.primaryFound = primaryFound; + } + } + + private static class ForwardedAddServerReply { + ShardInformation shardInfo; + AddServerReply addServerReply; + String leaderPath; + boolean removeShardOnFailure; + + ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath, + boolean removeShardOnFailure) { + this.shardInfo = shardInfo; + this.addServerReply = addServerReply; + this.leaderPath = leaderPath; + this.removeShardOnFailure = removeShardOnFailure; + } + } + + private static class ForwardedAddServerFailure { + String shardName; + String failureMessage; + Throwable failure; + boolean removeShardOnFailure; + + ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure, + boolean removeShardOnFailure) { + this.shardName = shardName; + this.failureMessage = failureMessage; + this.failure = failure; + this.removeShardOnFailure = removeShardOnFailure; + } + } + @VisibleForTesting protected static class ShardInformation { private final ShardIdentifier shardId; @@ -986,8 +1129,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private short leaderVersion; private DatastoreContext datastoreContext; - private final Shard.AbstractBuilder builder; + private Shard.AbstractBuilder builder; private final ShardPeerAddressResolver addressResolver; + private boolean isActiveMember = true; private ShardInformation(String shardName, ShardIdentifier shardId, Map initialPeerAddresses, DatastoreContext datastoreContext, @@ -1001,8 +1145,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } Props newProps(SchemaContext schemaContext) { - return builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext). + Preconditions.checkNotNull(builder); + Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext). schemaContext(schemaContext).props(); + builder = null; + return props; } String getShardName() { @@ -1183,31 +1330,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { void setLeaderVersion(short leaderVersion) { this.leaderVersion = leaderVersion; } - } - - private static class ShardManagerCreator implements Creator { - private static final long serialVersionUID = 1L; - final ClusterWrapper cluster; - final Configuration configuration; - final DatastoreContextFactory datastoreContextFactory; - private final CountDownLatch waitTillReadyCountdownLatch; - private final PrimaryShardInfoFutureCache primaryShardInfoCache; - - ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, - DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch, - PrimaryShardInfoFutureCache primaryShardInfoCache) { - this.cluster = cluster; - this.configuration = configuration; - this.datastoreContextFactory = datastoreContextFactory; - this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; - this.primaryShardInfoCache = primaryShardInfoCache; + boolean isActiveMember() { + return isActiveMember; } - @Override - public ShardManager create() throws Exception { - return new ShardManager(cluster, configuration, datastoreContextFactory, waitTillReadyCountdownLatch, - primaryShardInfoCache); + void setActiveMember(boolean isActiveMember) { + this.isActiveMember = isActiveMember; } } @@ -1280,6 +1409,74 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return modules; } } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private ClusterWrapper cluster; + private Configuration configuration; + private DatastoreContextFactory datastoreContextFactory; + private CountDownLatch waitTillReadyCountdownLatch; + private PrimaryShardInfoFutureCache primaryShardInfoCache; + private DatastoreSnapshot restoreFromSnapshot; + private volatile boolean sealed; + + protected void checkSealed() { + Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed"); + } + + public Builder cluster(ClusterWrapper cluster) { + checkSealed(); + this.cluster = cluster; + return this; + } + + public Builder configuration(Configuration configuration) { + checkSealed(); + this.configuration = configuration; + return this; + } + + public Builder datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) { + checkSealed(); + this.datastoreContextFactory = datastoreContextFactory; + return this; + } + + public Builder waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) { + checkSealed(); + this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; + return this; + } + + public Builder primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) { + checkSealed(); + this.primaryShardInfoCache = primaryShardInfoCache; + return this; + } + + public Builder restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) { + checkSealed(); + this.restoreFromSnapshot = restoreFromSnapshot; + return this; + } + + protected void verify() { + sealed = true; + Preconditions.checkNotNull(cluster, "cluster should not be null"); + Preconditions.checkNotNull(configuration, "configuration should not be null"); + Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null"); + Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null"); + Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null"); + } + + public Props props() { + verify(); + return Props.create(ShardManager.class, this); + } + } }