X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=98a6090514c9549f2f506c82a85fce7376e35cf6;hb=refs%2Fchanges%2F70%2F29370%2F7;hp=eb39a34dc0dc508b3543e5d3f37976c749672bfe;hpb=b34452ce75563e360ae1d02a9f2aa6223d6208c3;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index eb39a34dc0..98a6090514 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -8,18 +8,24 @@ package org.opendaylight.controller.cluster.datastore; +import static akka.pattern.Patterns.ask; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; +import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; -import akka.japi.Creator; +import akka.dispatch.OnComplete; import akka.japi.Function; import akka.persistence.RecoveryCompleted; +import akka.persistence.SaveSnapshotFailure; +import akka.persistence.SaveSnapshotSuccess; +import akka.persistence.SnapshotOffer; import akka.serialization.Serialization; +import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Optional; @@ -31,6 +37,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -47,8 +54,10 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; @@ -59,11 +68,10 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerDown; import org.opendaylight.controller.cluster.datastore.messages.PeerUp; import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; -import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; @@ -71,10 +79,16 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.cluster.raft.messages.AddServer; +import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -106,9 +120,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final String shardDispatcherPath; - private ShardManagerInfo mBean; + private final ShardManagerInfo mBean; - private DatastoreContext datastoreContext; + private DatastoreContextFactory datastoreContextFactory; private final CountDownLatch waitTillReadyCountdownLatch; @@ -118,45 +132,33 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private SchemaContext schemaContext; + private DatastoreSnapshot restoreFromSnapshot; + /** */ - protected ShardManager(ClusterWrapper cluster, Configuration configuration, - DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, - PrimaryShardInfoFutureCache primaryShardInfoCache) { - - this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); - this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); - this.datastoreContext = datastoreContext; - this.type = datastoreContext.getDataStoreType(); + protected ShardManager(Builder builder) { + + this.cluster = builder.cluster; + this.configuration = builder.configuration; + this.datastoreContextFactory = builder.datastoreContextFactory; + this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); - this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; - this.primaryShardInfoCache = primaryShardInfoCache; + this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch; + this.primaryShardInfoCache = builder.primaryShardInfoCache; + this.restoreFromSnapshot = builder.restoreFromSnapshot; peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName()); - this.datastoreContext = DatastoreContext.newBuilderFrom(datastoreContext).shardPeerAddressResolver( - peerAddressResolver).build(); // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); - createLocalShards(); - } - - public static Props props( - final ClusterWrapper cluster, - final Configuration configuration, - final DatastoreContext datastoreContext, - final CountDownLatch waitTillReadyCountdownLatch, - final PrimaryShardInfoFutureCache primaryShardInfoCache) { - - Preconditions.checkNotNull(cluster, "cluster should not be null"); - Preconditions.checkNotNull(configuration, "configuration should not be null"); - Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null"); - Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null"); - - return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, - waitTillReadyCountdownLatch, primaryShardInfoCache)); + List localShardActorNames = new ArrayList<>(); + mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(), + "shard-manager-" + this.type, + datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), + localShardActorNames); + mBean.setShardManager(this); } @Override @@ -186,8 +188,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { memberUnreachable((ClusterEvent.UnreachableMember)message); } else if(message instanceof ClusterEvent.ReachableMember) { memberReachable((ClusterEvent.ReachableMember) message); - } else if(message instanceof DatastoreContext) { - onDatastoreContext((DatastoreContext)message); + } else if(message instanceof DatastoreContextFactory) { + onDatastoreContextFactory((DatastoreContextFactory)message); } else if(message instanceof RoleChangeNotification) { onRoleChangeNotification((RoleChangeNotification) message); } else if(message instanceof FollowerInitialSyncUpStatus){ @@ -204,12 +206,49 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onAddShardReplica((AddShardReplica)message); } else if(message instanceof RemoveShardReplica){ onRemoveShardReplica((RemoveShardReplica)message); + } else if(message instanceof GetSnapshot) { + onGetSnapshot(); + } else if (message instanceof SaveSnapshotSuccess) { + LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId()); + } else if (message instanceof SaveSnapshotFailure) { + LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards", + persistenceId(), ((SaveSnapshotFailure)message).cause()); } else { unknownMessage(message); } } + private void onGetSnapshot() { + LOG.debug("{}: onGetSnapshot", persistenceId()); + + List notInitialized = null; + for(ShardInformation shardInfo: localShards.values()) { + if(!shardInfo.isShardInitialized()) { + if(notInitialized == null) { + notInitialized = new ArrayList<>(); + } + + notInitialized.add(shardInfo.getShardName()); + } + } + + if(notInitialized != null) { + getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format( + "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf()); + return; + } + + byte[] shardManagerSnapshot = null; + ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props( + new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(), + datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration())); + + for(ShardInformation shardInfo: localShards.values()) { + shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor); + } + } + private void onCreateShard(CreateShard createShard) { Object reply; try { @@ -230,14 +269,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { DatastoreContext shardDatastoreContext = createShard.getDatastoreContext(); if(shardDatastoreContext == null) { - shardDatastoreContext = datastoreContext; + shardDatastoreContext = newShardDatastoreContext(moduleShardConfig.getShardName()); } else { shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver( peerAddressResolver).build(); } ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses, - shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver); + shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver); localShards.put(info.getShardName(), info); mBean.addLocalShard(shardId.toString()); @@ -257,6 +296,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) { + return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)). + shardPeerAddressResolver(peerAddressResolver); + } + + private DatastoreContext newShardDatastoreContext(String shardName) { + return newShardDatastoreContextBuilder(shardName).build(); + } + private void checkReady(){ if (isReadyWithLeaderId()) { LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", @@ -394,6 +442,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // We no longer persist SchemaContext modules so delete all the prior messages from the akka // journal on upgrade from Helium. deleteMessages(lastSequenceNr()); + createLocalShards(); + } else if (message instanceof SnapshotOffer) { + handleShardRecovery((SnapshotOffer) message); } } @@ -432,16 +483,17 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInformation.addOnShardInitialized(onShardInitialized); - LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName()); - - FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration(); + FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration(); if(shardInformation.isShardInitialized()) { // If the shard is already initialized then we'll wait enough time for the shard to // elect a leader, ie 2 times the election timeout. - timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig() + timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig() .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS); } + LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(), + shardInformation.getShardName()); + Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce( timeout, getSelf(), new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender), @@ -465,11 +517,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSender().tell(messageSupplier.get(), getSelf()); } - private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) { + private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) { return new NoShardLeaderException(null, shardId.toString()); } - private NotInitializedException createNotInitializedException(ShardIdentifier shardId) { + private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) { return new NotInitializedException(String.format( "Found primary shard %s but it's not initialized yet. Please try again later", shardId)); } @@ -565,13 +617,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onDatastoreContext(DatastoreContext context) { - datastoreContext = DatastoreContext.newBuilderFrom(context).shardPeerAddressResolver( - peerAddressResolver).build(); + private void onDatastoreContextFactory(DatastoreContextFactory factory) { + datastoreContextFactory = factory; for (ShardInformation info : localShards.values()) { - if (info.getActor() != null) { - info.getActor().tell(datastoreContext, getSelf()); - } + info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf()); } } @@ -684,20 +733,24 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { String memberName = this.cluster.getCurrentMemberName(); Collection memberShardNames = this.configuration.getMemberShardNames(memberName); - ShardPropsCreator shardPropsCreator = new DefaultShardPropsCreator(); - List localShardActorNames = new ArrayList<>(); + Map shardSnapshots = new HashMap<>(); + if(restoreFromSnapshot != null) + { + for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) { + shardSnapshots.put(snapshot.getName(), snapshot); + } + } + + restoreFromSnapshot = null; // null out to GC + for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); Map peerAddresses = getPeerAddresses(shardName); - localShardActorNames.add(shardId.toString()); - localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext, - shardPropsCreator, peerAddressResolver)); + localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, + newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot( + shardSnapshots.get(shardName)), peerAddressResolver)); + mBean.addLocalShard(shardId.toString()); } - - mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type, - datastoreContext.getDataStoreMXBeanType(), localShardActorNames); - - mBean.setShardManager(this); } /** @@ -746,41 +799,175 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return mBean; } + private void checkLocalShardExists(final String shardName, final ActorRef sender) { + if (localShards.containsKey(shardName)) { + String msg = String.format("Local shard %s already exists", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + sender.tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf()); + } + } + private void onAddShardReplica (AddShardReplica shardReplicaMsg) { - String shardName = shardReplicaMsg.getShardName(); + final String shardName = shardReplicaMsg.getShardName(); // verify the local shard replica is already available in the controller node - if (localShards.containsKey(shardName)) { - LOG.debug ("Local shard {} already available in the controller node", shardName); - getSender().tell(new akka.actor.Status.Failure( - new IllegalArgumentException(String.format("Local shard %s already exists", - shardName))), getSelf()); - return; - } + LOG.debug ("onAddShardReplica: {}", shardReplicaMsg); + + checkLocalShardExists(shardName, getSender()); + // verify the shard with the specified name is present in the cluster configuration if (!(this.configuration.isShardConfigured(shardName))) { - LOG.debug ("No module configuration exists for shard {}", shardName); - getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException( - String.format("No module configuration exists for shard %s", - shardName))), getSelf()); + String msg = String.format("No module configuration exists for shard %s", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf()); return; } // Create the localShard - getSender().tell(new akka.actor.Status.Success(true), getSelf()); + if (schemaContext == null) { + String msg = String.format( + "No SchemaContext is available in order to create a local shard instance for %s", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf()); + return; + } + + Map peerAddresses = getPeerAddresses(shardName); + if (peerAddresses.isEmpty()) { + String msg = String.format("Cannot add replica for shard %s because no peer is available", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf()); + return; + } + + Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext(). + getShardInitializationTimeout().duration().$times(2)); + + final ActorRef sender = getSender(); + Future futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout); + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure); + sender.tell(new akka.actor.Status.Failure(new RuntimeException( + String.format("Failed to find leader for shard %s", shardName), failure)), + getSelf()); + } else { + if (!(response instanceof RemotePrimaryShardFound)) { + String msg = String.format("Failed to find leader for shard %s: received response: %s", + shardName, response); + LOG.debug ("{}: {}", persistenceId(), msg); + sender.tell(new akka.actor.Status.Failure(new RuntimeException(msg)), getSelf()); + return; + } + + RemotePrimaryShardFound message = (RemotePrimaryShardFound)response; + addShard (shardName, message, sender); + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + + private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) { + checkLocalShardExists(shardName, sender); + + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); + String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName()); + + DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation( + DisableElectionsRaftPolicy.class.getName()).build(); + + final ShardInformation shardInfo = new ShardInformation(shardName, shardId, + getPeerAddresses(shardName), datastoreContext, + Shard.builder(), peerAddressResolver); + shardInfo.setShardActiveMember(false); + localShards.put(shardName, shardInfo); + shardInfo.setActor(newShardActor(schemaContext, shardInfo)); + + //inform ShardLeader to add this shard as a replica by sending an AddServer message + LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(), + response.getPrimaryPath(), shardId); + + Timeout addServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(4)); + Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), + new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout); + + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object addServerResponse) { + if (failure != null) { + LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(), + response.getPrimaryPath(), shardName, failure); + + // Remove the shard + localShards.remove(shardName); + if (shardInfo.getActor() != null) { + shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); + } + + sender.tell(new akka.actor.Status.Failure(new RuntimeException( + String.format("AddServer request to leader %s for shard %s failed", + response.getPrimaryPath(), shardName), failure)), getSelf()); + } else { + AddServerReply reply = (AddServerReply)addServerResponse; + onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath()); + } + } + }, new Dispatchers(context().system().dispatchers()). + getDispatcher(Dispatchers.DispatcherType.Client)); return; } + private void onAddServerReply (String shardName, ShardInformation shardInfo, + AddServerReply replyMsg, ActorRef sender, String leaderPath) { + LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath); + + if (replyMsg.getStatus() == ServerChangeStatus.OK) { + LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName); + + // Make the local shard voting capable + shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf()); + shardInfo.setShardActiveMember(true); + persistShardList(); + + mBean.addLocalShard(shardInfo.getShardId().toString()); + sender.tell(new akka.actor.Status.Success(true), getSelf()); + } else { + LOG.warn ("{}: Leader failed to add shard replica {} with status {} - removing the local shard", + persistenceId(), shardName, replyMsg.getStatus()); + + //remove the local replica created + localShards.remove(shardName); + if (shardInfo.getActor() != null) { + shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); + } + switch (replyMsg.getStatus()) { + case TIMEOUT: + sender.tell(new akka.actor.Status.Failure(new RuntimeException( + String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data", + leaderPath, shardName))), getSelf()); + break; + case NO_LEADER: + sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format( + "There is no shard leader available for shard %s", shardName))), getSelf()); + break; + default : + sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format( + "AddServer request to leader %s for shard %s failed with status %s", + leaderPath, shardName, replyMsg.getStatus()))), getSelf()); + } + } + } + private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) { String shardName = shardReplicaMsg.getShardName(); - boolean deleteStatus = false; // verify the local shard replica is available in the controller node if (!localShards.containsKey(shardName)) { - LOG.debug ("Local shard replica {} is not available in the controller node", shardName); - getSender().tell(new akka.actor.Status.Failure( - new IllegalArgumentException(String.format("Local shard %s not available", - shardName))), getSelf()); + String msg = String.format("Local shard %s does not", shardName); + LOG.debug ("{}: {}", persistenceId(), msg); + getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf()); return; } // call RemoveShard for the shardName @@ -788,6 +975,39 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } + private void persistShardList() { + List shardList = new ArrayList(localShards.keySet()); + for (ShardInformation shardInfo : localShards.values()) { + if (!shardInfo.isShardActiveMember()) { + shardList.remove(shardInfo.getShardName()); + } + } + LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList); + saveSnapshot(new ShardManagerSnapshot(shardList)); + } + + private void handleShardRecovery(SnapshotOffer offer) { + LOG.debug ("{}: in handleShardRecovery", persistenceId()); + ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot(); + String currentMember = cluster.getCurrentMemberName(); + Set configuredShardList = + new HashSet<>(configuration.getMemberShardNames(currentMember)); + for (String shard : snapshot.getShardList()) { + if (!configuredShardList.contains(shard)) { + // add the current member as a replica for the shard + LOG.debug ("{}: adding shard {}", persistenceId(), shard); + configuration.addMemberReplicaForShard(shard, currentMember); + } else { + configuredShardList.remove(shard); + } + } + for (String shard : configuredShardList) { + // remove the member as a replica for the shard + LOG.debug ("{}: removing shard {}", persistenceId(), shard); + configuration.removeMemberReplicaForShard(shard, currentMember); + } + } + @VisibleForTesting protected static class ShardInformation { private final ShardIdentifier shardId; @@ -808,23 +1028,28 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private String leaderId; private short leaderVersion; - private final DatastoreContext datastoreContext; - private final ShardPropsCreator shardPropsCreator; + private DatastoreContext datastoreContext; + private Shard.AbstractBuilder builder; private final ShardPeerAddressResolver addressResolver; + private boolean shardActiveStatus = true; private ShardInformation(String shardName, ShardIdentifier shardId, Map initialPeerAddresses, DatastoreContext datastoreContext, - ShardPropsCreator shardPropsCreator, ShardPeerAddressResolver addressResolver) { + Shard.AbstractBuilder builder, ShardPeerAddressResolver addressResolver) { this.shardName = shardName; this.shardId = shardId; this.initialPeerAddresses = initialPeerAddresses; this.datastoreContext = datastoreContext; - this.shardPropsCreator = shardPropsCreator; + this.builder = builder; this.addressResolver = addressResolver; } Props newProps(SchemaContext schemaContext) { - return shardPropsCreator.newProps(shardId, initialPeerAddresses, datastoreContext, schemaContext); + Preconditions.checkNotNull(builder); + Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext). + schemaContext(schemaContext).props(); + builder = null; + return props; } String getShardName() { @@ -856,6 +1081,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return localShardDataTree; } + DatastoreContext getDatastoreContext() { + return datastoreContext; + } + + void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) { + this.datastoreContext = datastoreContext; + if (actor != null) { + LOG.debug ("Sending new DatastoreContext to {}", shardId); + actor.tell(this.datastoreContext, sender); + } + } + void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){ LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); @@ -993,30 +1230,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { void setLeaderVersion(short leaderVersion) { this.leaderVersion = leaderVersion; } - } - - private static class ShardManagerCreator implements Creator { - private static final long serialVersionUID = 1L; - - final ClusterWrapper cluster; - final Configuration configuration; - final DatastoreContext datastoreContext; - private final CountDownLatch waitTillReadyCountdownLatch; - private final PrimaryShardInfoFutureCache primaryShardInfoCache; - ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext, - CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) { - this.cluster = cluster; - this.configuration = configuration; - this.datastoreContext = datastoreContext; - this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; - this.primaryShardInfoCache = primaryShardInfoCache; + void setShardActiveMember(boolean flag) { + shardActiveStatus = flag; } - @Override - public ShardManager create() throws Exception { - return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, - primaryShardInfoCache); + boolean isShardActiveMember() { + return shardActiveStatus; } } @@ -1089,6 +1309,74 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return modules; } } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private ClusterWrapper cluster; + private Configuration configuration; + private DatastoreContextFactory datastoreContextFactory; + private CountDownLatch waitTillReadyCountdownLatch; + private PrimaryShardInfoFutureCache primaryShardInfoCache; + private DatastoreSnapshot restoreFromSnapshot; + private volatile boolean sealed; + + protected void checkSealed() { + Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed"); + } + + public Builder cluster(ClusterWrapper cluster) { + checkSealed(); + this.cluster = cluster; + return this; + } + + public Builder configuration(Configuration configuration) { + checkSealed(); + this.configuration = configuration; + return this; + } + + public Builder datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) { + checkSealed(); + this.datastoreContextFactory = datastoreContextFactory; + return this; + } + + public Builder waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) { + checkSealed(); + this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; + return this; + } + + public Builder primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) { + checkSealed(); + this.primaryShardInfoCache = primaryShardInfoCache; + return this; + } + + public Builder restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) { + checkSealed(); + this.restoreFromSnapshot = restoreFromSnapshot; + return this; + } + + protected void verify() { + sealed = true; + Preconditions.checkNotNull(cluster, "cluster should not be null"); + Preconditions.checkNotNull(configuration, "configuration should not be null"); + Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null"); + Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null"); + Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null"); + } + + public Props props() { + verify(); + return Props.create(ShardManager.class, this); + } + } }