X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=f8237b56a72cf3e1cdc0ef8c497b3f0d9c2896bc;hp=775cae35e22843cafd223bce22eb5232a230868f;hb=02f738dec4a31bdad04e42b2c19ecf09aacc0b87;hpb=f78b7f15e24efdb5dd9f91b487bc63dad7517b1c diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 775cae35e2..f8237b56a7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -11,50 +11,70 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.Address; +import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; import akka.japi.Creator; import akka.japi.Function; -import akka.japi.Procedure; import akka.persistence.RecoveryCompleted; -import akka.persistence.RecoveryFailure; +import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import org.opendaylight.controller.cluster.DataPersistenceProvider; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; +import org.opendaylight.controller.cluster.datastore.config.Configuration; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; +import org.opendaylight.controller.cluster.datastore.messages.CreateShard; +import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; +import org.opendaylight.controller.cluster.datastore.messages.PeerDown; +import org.opendaylight.controller.cluster.datastore.messages.PeerUp; +import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; +import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; -import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; /** * The ShardManager has the following jobs, @@ -67,7 +87,7 @@ import scala.concurrent.duration.Duration; */ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { - private final Logger LOG = LoggerFactory.getLogger(getClass()); + private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class); // Stores a mapping between a member name and the address of the member // Member names look like "member-1", "member-2" etc and are as specified @@ -83,6 +103,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // A data store could be of type config/operational private final String type; + private final String shardManagerIdentifierString; + private final ClusterWrapper cluster; private final Configuration configuration; @@ -93,22 +115,27 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreContext datastoreContext; - private Collection knownModules = Collections.emptySet(); + private final CountDownLatch waitTillReadyCountdownLatch; + + private final PrimaryShardInfoFutureCache primaryShardInfoCache; - private final DataPersistenceProvider dataPersistenceProvider; + private SchemaContext schemaContext; /** */ protected ShardManager(ClusterWrapper cluster, Configuration configuration, - DatastoreContext datastoreContext) { + DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, + PrimaryShardInfoFutureCache primaryShardInfoCache) { this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); this.datastoreContext = datastoreContext; - this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent()); this.type = datastoreContext.getDataStoreType(); + this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); + this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; + this.primaryShardInfoCache = primaryShardInfoCache; // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); @@ -116,19 +143,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { createLocalShards(); } - protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { - return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider(); - } - public static Props props( final ClusterWrapper cluster, final Configuration configuration, - final DatastoreContext datastoreContext) { + final DatastoreContext datastoreContext, + final CountDownLatch waitTillReadyCountdownLatch, + final PrimaryShardInfoFutureCache primaryShardInfoCache) { Preconditions.checkNotNull(cluster, "cluster should not be null"); Preconditions.checkNotNull(configuration, "configuration should not be null"); + Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null"); + Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null"); - return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext)); + return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, + waitTillReadyCountdownLatch, primaryShardInfoCache)); } @Override @@ -140,8 +168,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void handleCommand(Object message) throws Exception { - if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) { - findPrimary(FindPrimary.fromSerializable(message)); + if (message instanceof FindPrimary) { + findPrimary((FindPrimary)message); } else if(message instanceof FindLocalShard){ findLocalShard((FindLocalShard) message); } else if (message instanceof UpdateSchemaContext) { @@ -150,18 +178,174 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onActorInitialized(message); } else if (message instanceof ClusterEvent.MemberUp){ memberUp((ClusterEvent.MemberUp) message); + } else if (message instanceof ClusterEvent.MemberExited){ + memberExited((ClusterEvent.MemberExited) message); } else if(message instanceof ClusterEvent.MemberRemoved) { memberRemoved((ClusterEvent.MemberRemoved) message); } else if(message instanceof ClusterEvent.UnreachableMember) { - ignoreMessage(message); + memberUnreachable((ClusterEvent.UnreachableMember)message); + } else if(message instanceof ClusterEvent.ReachableMember) { + memberReachable((ClusterEvent.ReachableMember) message); } else if(message instanceof DatastoreContext) { onDatastoreContext((DatastoreContext)message); - } else{ + } else if(message instanceof RoleChangeNotification) { + onRoleChangeNotification((RoleChangeNotification) message); + } else if(message instanceof FollowerInitialSyncUpStatus){ + onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message); + } else if(message instanceof ShardNotInitializedTimeout) { + onShardNotInitializedTimeout((ShardNotInitializedTimeout)message); + } else if(message instanceof ShardLeaderStateChanged) { + onLeaderStateChanged((ShardLeaderStateChanged) message); + } else if(message instanceof SwitchShardBehavior){ + onSwitchShardBehavior((SwitchShardBehavior) message); + } else if(message instanceof CreateShard) { + onCreateShard((CreateShard)message); + } else { unknownMessage(message); } } + private void onCreateShard(CreateShard createShard) { + Object reply; + try { + if(localShards.containsKey(createShard.getShardName())) { + throw new IllegalStateException(String.format("Shard with name %s already exists", + createShard.getShardName())); + } + + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), createShard.getShardName()); + Map peerAddresses = getPeerAddresses(createShard.getShardName(), createShard.getMemberNames()); + + LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId, + createShard.getMemberNames(), peerAddresses); + + DatastoreContext shardDatastoreContext = createShard.getDatastoreContext(); + if(shardDatastoreContext == null) { + shardDatastoreContext = datastoreContext; + } + + ShardInformation info = new ShardInformation(createShard.getShardName(), shardId, peerAddresses, + shardDatastoreContext, createShard.getShardPropsCreator()); + localShards.put(createShard.getShardName(), info); + + mBean.addLocalShard(shardId.toString()); + + if(schemaContext != null) { + info.setActor(newShardActor(schemaContext, info)); + } + + reply = new CreateShardReply(); + } catch (Exception e) { + LOG.error("onCreateShard failed", e); + reply = new akka.actor.Status.Failure(e); + } + + if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) { + getSender().tell(reply, getSelf()); + } + } + + private void checkReady(){ + if (isReadyWithLeaderId()) { + LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", + persistenceId(), type, waitTillReadyCountdownLatch.getCount()); + + waitTillReadyCountdownLatch.countDown(); + } + } + + private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) { + LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged); + + ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); + if(shardInformation != null) { + shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree()); + shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion()); + if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) { + primaryShardInfoCache.remove(shardInformation.getShardName()); + } + + checkReady(); + } else { + LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId()); + } + } + + private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) { + ShardInformation shardInfo = message.getShardInfo(); + + LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(), + shardInfo.getShardName()); + + shardInfo.removeOnShardInitialized(message.getOnShardInitialized()); + + if(!shardInfo.isShardInitialized()) { + LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName()); + message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf()); + } else { + LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName()); + message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf()); + } + } + + private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) { + LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(), + status.getName(), status.isInitialSyncDone()); + + ShardInformation shardInformation = findShardInformation(status.getName()); + + if(shardInformation != null) { + shardInformation.setFollowerSyncStatus(status.isInitialSyncDone()); + + mBean.setSyncStatus(isInSync()); + } + + } + + private void onRoleChangeNotification(RoleChangeNotification roleChanged) { + LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(), + roleChanged.getOldRole(), roleChanged.getNewRole()); + + ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId()); + if(shardInformation != null) { + shardInformation.setRole(roleChanged.getNewRole()); + checkReady(); + mBean.setSyncStatus(isInSync()); + } + } + + + private ShardInformation findShardInformation(String memberId) { + for(ShardInformation info : localShards.values()){ + if(info.getShardId().toString().equals(memberId)){ + return info; + } + } + + return null; + } + + private boolean isReadyWithLeaderId() { + boolean isReady = true; + for (ShardInformation info : localShards.values()) { + if(!info.isShardReadyWithLeaderId()){ + isReady = false; + break; + } + } + return isReady; + } + + private boolean isInSync(){ + for (ShardInformation info : localShards.values()) { + if(!info.isInSync()){ + return false; + } + } + return true; + } + private void onActorInitialized(Object message) { final ActorRef sender = getSender(); @@ -176,39 +360,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { if (shardId.getShardName() == null) { return; } + markShardAsInitialized(shardId.getShardName()); } private void markShardAsInitialized(String shardName) { - LOG.debug("Initializing shard [{}]", shardName); + LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName); + ShardInformation shardInformation = localShards.get(shardName); if (shardInformation != null) { shardInformation.setActorInitialized(); + + shardInformation.getActor().tell(new RegisterRoleChangeListener(), self()); } } @Override protected void handleRecover(Object message) throws Exception { - if(dataPersistenceProvider.isRecoveryApplicable()) { - if (message instanceof SchemaContextModules) { - SchemaContextModules msg = (SchemaContextModules) message; - knownModules = ImmutableSet.copyOf(msg.getModules()); - } else if (message instanceof RecoveryFailure) { - RecoveryFailure failure = (RecoveryFailure) message; - LOG.error("Recovery failed", failure.cause()); - } else if (message instanceof RecoveryCompleted) { - LOG.info("Recovery complete : {}", persistenceId()); - - // Delete all the messages from the akka journal except the last one - deleteMessages(lastSequenceNr() - 1); - } - } else { - if (message instanceof RecoveryCompleted) { - LOG.info("Recovery complete : {}", persistenceId()); + if (message instanceof RecoveryCompleted) { + LOG.info("Recovery complete : {}", persistenceId()); - // Delete all the messages from the akka journal - deleteMessages(lastSequenceNr()); - } + // We no longer persist SchemaContext modules so delete all the prior messages from the akka + // journal on upgrade from Helium. + deleteMessages(lastSequenceNr()); } } @@ -220,7 +394,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier() { + sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier() { @Override public Object get() { return new LocalShardFound(shardInformation.getActor()); @@ -228,20 +402,50 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } - private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized, - final Supplier messageSupplier) { - if (!shardInformation.isShardInitialized()) { - if(waitUntilInitialized) { + private void sendResponse(ShardInformation shardInformation, boolean doWait, + boolean wantShardReady, final Supplier messageSupplier) { + if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) { + if(doWait) { final ActorRef sender = getSender(); final ActorRef self = self(); - shardInformation.addRunnableOnInitialized(new Runnable() { + + Runnable replyRunnable = new Runnable() { @Override public void run() { sender.tell(messageSupplier.get(), self); } - }); + }; + + OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) : + new OnShardInitialized(replyRunnable); + + shardInformation.addOnShardInitialized(onShardInitialized); + + LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName()); + + FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration(); + if(shardInformation.isShardInitialized()) { + // If the shard is already initialized then we'll wait enough time for the shard to + // elect a leader, ie 2 times the election timeout. + timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig() + .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS); + } + + Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce( + timeout, getSelf(), + new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender), + getContext().dispatcher(), getSelf()); + + onShardInitialized.setTimeoutSchedule(timeoutSchedule); + + } else if (!shardInformation.isShardInitialized()) { + LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), + shardInformation.getShardName()); + getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf()); } else { - getSender().tell(new ActorNotInitialized(), getSelf()); + LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), + shardInformation.getShardName()); + getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf()); } return; @@ -250,19 +454,97 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSender().tell(messageSupplier.get(), getSelf()); } + private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) { + return new NoShardLeaderException(null, shardId.toString()); + } + + private NotInitializedException createNotInitializedException(ShardIdentifier shardId) { + return new NotInitializedException(String.format( + "Found primary shard %s but it's not initialized yet. Please try again later", shardId)); + } + private void memberRemoved(ClusterEvent.MemberRemoved message) { - memberNameToAddress.remove(message.member().roles().head()); + String memberName = message.member().roles().head(); + + LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, + message.member().address()); + + memberNameToAddress.remove(memberName); + + for(ShardInformation info : localShards.values()){ + info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); + } + } + + private void memberExited(ClusterEvent.MemberExited message) { + String memberName = message.member().roles().head(); + + LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName, + message.member().address()); + + memberNameToAddress.remove(memberName); + + for(ShardInformation info : localShards.values()){ + info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); + } } private void memberUp(ClusterEvent.MemberUp message) { String memberName = message.member().roles().head(); + LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, + message.member().address()); + memberNameToAddress.put(memberName, message.member().address()); for(ShardInformation info : localShards.values()){ String shardName = info.getShardName(); - info.updatePeerAddress(getShardIdentifier(memberName, shardName), - getShardActorPath(shardName, memberName)); + String peerId = getShardIdentifier(memberName, shardName).toString(); + info.updatePeerAddress(peerId, getShardActorPath(shardName, memberName), getSelf()); + + info.peerUp(memberName, peerId, getSelf()); + } + + checkReady(); + } + + private void memberReachable(ClusterEvent.ReachableMember message) { + String memberName = message.member().roles().head(); + LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); + + markMemberAvailable(memberName); + } + + private void memberUnreachable(ClusterEvent.UnreachableMember message) { + String memberName = message.member().roles().head(); + LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address()); + + markMemberUnavailable(memberName); + } + + private void markMemberUnavailable(final String memberName) { + for(ShardInformation info : localShards.values()){ + String leaderId = info.getLeaderId(); + if(leaderId != null && leaderId.contains(memberName)) { + LOG.debug("Marking Leader {} as unavailable.", leaderId); + info.setLeaderAvailable(false); + + primaryShardInfoCache.remove(info.getShardName()); + } + + info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); + } + } + + private void markMemberAvailable(final String memberName) { + for(ShardInformation info : localShards.values()){ + String leaderId = info.getLeaderId(); + if(leaderId != null && leaderId.contains(memberName)) { + LOG.debug("Marking Leader {} as available.", leaderId); + info.setLeaderAvailable(true); + } + + info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); } } @@ -275,100 +557,109 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onSwitchShardBehavior(SwitchShardBehavior message) { + ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build(); + + ShardInformation shardInformation = localShards.get(identifier.getShardName()); + + if(shardInformation != null && shardInformation.getActor() != null) { + shardInformation.getActor().tell( + new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf()); + } else { + LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available", + message.getShardName(), message.getNewState()); + } + } + /** * Notifies all the local shards of a change in the schema context * * @param message */ private void updateSchemaContext(final Object message) { - final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); + schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); - Set allModuleIdentifiers = schemaContext.getAllModuleIdentifiers(); - Set newModules = new HashSet<>(128); + LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size()); - for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){ - String s = moduleIdentifier.getNamespace().toString(); - newModules.add(s); + for (ShardInformation info : localShards.values()) { + if (info.getActor() == null) { + LOG.debug("Creating Shard {}", info.getShardId()); + info.setActor(newShardActor(schemaContext, info)); + } else { + info.getActor().tell(message, getSelf()); + } } + } - if(newModules.containsAll(knownModules)) { - - LOG.debug("New SchemaContext has a super set of current knownModules - persisting info"); - - knownModules = ImmutableSet.copyOf(newModules); - - dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure() { - - @Override - public void apply(SchemaContextModules param) throws Exception { - LOG.debug("Sending new SchemaContext to Shards"); - for (ShardInformation info : localShards.values()) { - if (info.getActor() == null) { - info.setActor(getContext().actorOf(Shard.props(info.getShardId(), - info.getPeerAddresses(), datastoreContext, schemaContext) - .withDispatcher(shardDispatcherPath), info.getShardId().toString())); - } else { - info.getActor().tell(message, getSelf()); - } - } - } - - }); - } else { - LOG.debug("Rejecting schema context update - not a super set of previously known modules:\nUPDATE: {}\nKNOWN: {}", - newModules, knownModules); - } + @VisibleForTesting + protected ClusterWrapper getCluster() { + return cluster; + } + @VisibleForTesting + protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) { + return getContext().actorOf(info.newProps(schemaContext) + .withDispatcher(shardDispatcherPath), info.getShardId().toString()); } private void findPrimary(FindPrimary message) { - String shardName = message.getShardName(); + LOG.debug("{}: In findPrimary: {}", persistenceId(), message); + + final String shardName = message.getShardName(); + final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary); // First see if the there is a local replica for the shard final ShardInformation info = localShards.get(shardName); if (info != null) { - sendResponse(info, message.isWaitUntilInitialized(), new Supplier() { + sendResponse(info, message.isWaitUntilReady(), true, new Supplier() { @Override public Object get() { - return new PrimaryFound(info.getActorPath().toString()).toSerializable(); + String primaryPath = info.getSerializedLeaderActor(); + Object found = canReturnLocalShardState && info.isLeader() ? + new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) : + new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion()); + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); + } + + return found; } }); return; } - List members = configuration.getMembersFromShardName(shardName); + for(Map.Entry entry: memberNameToAddress.entrySet()) { + if(!cluster.getCurrentMemberName().equals(entry.getKey())) { + String path = getShardManagerActorPathBuilder(entry.getValue()).toString(); - if(cluster.getCurrentMemberName() != null) { - members.remove(cluster.getCurrentMemberName()); - } + LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(), + shardName, path); - /** - * FIXME: Instead of sending remote shard actor path back to sender, - * forward FindPrimary message to remote shard manager - */ - // There is no way for us to figure out the primary (for now) so assume - // that one of the remote nodes is a primary - for(String memberName : members) { - Address address = memberNameToAddress.get(memberName); - if(address != null){ - String path = - getShardActorPath(shardName, memberName); - getSender().tell(new PrimaryFound(path).toSerializable(), getSelf()); + getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName, + message.isWaitUntilReady()), getContext()); return; } } - getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf()); + + LOG.debug("{}: No shard found for {}", persistenceId(), shardName); + + getSender().tell(new PrimaryNotFoundException( + String.format("No primary shard found for %s.", shardName)), getSelf()); + } + + private StringBuilder getShardManagerActorPathBuilder(Address address) { + StringBuilder builder = new StringBuilder(); + builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString); + return builder; } private String getShardActorPath(String shardName, String memberName) { Address address = memberNameToAddress.get(memberName); if(address != null) { - StringBuilder builder = new StringBuilder(); - builder.append(address.toString()) - .append("/user/") - .append(ShardManagerIdentifier.builder().type(type).build().toString()) - .append("/") + StringBuilder builder = getShardManagerActorPathBuilder(address); + builder.append("/") .append(getShardIdentifier(memberName, shardName)); return builder.toString(); } @@ -394,19 +685,22 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { */ private void createLocalShards() { String memberName = this.cluster.getCurrentMemberName(); - List memberShardNames = - this.configuration.getMemberShardNames(memberName); + Collection memberShardNames = this.configuration.getMemberShardNames(memberName); + ShardPropsCreator shardPropsCreator = new DefaultShardPropsCreator(); List localShardActorNames = new ArrayList<>(); for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); - Map peerAddresses = getPeerAddresses(shardName); + Map peerAddresses = getPeerAddresses(shardName); localShardActorNames.add(shardId.toString()); - localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses)); + localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext, + shardPropsCreator)); } - mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type, + mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type, datastoreContext.getDataStoreMXBeanType(), localShardActorNames); + + mBean.setShardManager(this); } /** @@ -415,22 +709,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param shardName * @return */ - private Map getPeerAddresses(String shardName){ + private Map getPeerAddresses(String shardName) { + return getPeerAddresses(shardName, configuration.getMembersFromShardName(shardName)); + } - Map peerAddresses = new HashMap<>(); + private Map getPeerAddresses(String shardName, Collection members) { - List members = - this.configuration.getMembersFromShardName(shardName); + Map peerAddresses = new HashMap<>(); String currentMemberName = this.cluster.getCurrentMemberName(); - for(String memberName : members){ - if(!currentMemberName.equals(memberName)){ - ShardIdentifier shardId = getShardIdentifier(memberName, - shardName); - String path = - getShardActorPath(shardName, currentMemberName); - peerAddresses.put(shardId, path); + for(String memberName : members) { + if(!currentMemberName.equals(memberName)) { + ShardIdentifier shardId = getShardIdentifier(memberName, shardName); + String path = getShardActorPath(shardName, memberName); + peerAddresses.put(shardId.toString(), path); } } return peerAddresses; @@ -457,32 +750,45 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } @VisibleForTesting - Collection getKnownModules() { - return knownModules; + ShardManagerInfoMBean getMBean(){ + return mBean; } @VisibleForTesting - DataPersistenceProvider getDataPersistenceProvider() { - return dataPersistenceProvider; - } - - private class ShardInformation { + protected static class ShardInformation { private final ShardIdentifier shardId; private final String shardName; private ActorRef actor; private ActorPath actorPath; - private final Map peerAddresses; + private final Map peerAddresses; + private Optional localShardDataTree; + private boolean leaderAvailable = false; // flag that determines if the actor is ready for business private boolean actorInitialized = false; - private final List runnablesOnInitialized = Lists.newArrayList(); + private boolean followerSyncStatus = false; + + private final Set onShardInitializedSet = Sets.newHashSet(); + private String role ; + private String leaderId; + private short leaderVersion; + + private final DatastoreContext datastoreContext; + private final ShardPropsCreator shardPropsCreator; private ShardInformation(String shardName, ShardIdentifier shardId, - Map peerAddresses) { + Map peerAddresses, DatastoreContext datastoreContext, + ShardPropsCreator shardPropsCreator) { this.shardName = shardName; this.shardId = shardId; this.peerAddresses = peerAddresses; + this.datastoreContext = datastoreContext; + this.shardPropsCreator = shardPropsCreator; + } + + Props newProps(SchemaContext schemaContext) { + return shardPropsCreator.newProps(shardId, peerAddresses, datastoreContext, schemaContext); } String getShardName() { @@ -506,11 +812,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return shardId; } - Map getPeerAddresses() { + void setLocalDataTree(Optional localShardDataTree) { + this.localShardDataTree = localShardDataTree; + } + + Optional getLocalShardDataTree() { + return localShardDataTree; + } + + Map getPeerAddresses() { return peerAddresses; } - void updatePeerAddress(ShardIdentifier peerId, String peerAddress){ + void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){ LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); if(peerAddresses.containsKey(peerId)){ @@ -522,27 +836,134 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { peerId, peerAddress, actor.path()); } - actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf()); + actor.tell(new PeerAddressResolved(peerId, peerAddress), sender); } + + notifyOnShardInitializedCallbacks(); + } + } + + void peerDown(String memberName, String peerId, ActorRef sender) { + if(peerAddresses.containsKey(peerId) && actor != null) { + actor.tell(new PeerDown(memberName, peerId), sender); } } + void peerUp(String memberName, String peerId, ActorRef sender) { + if(peerAddresses.containsKey(peerId) && actor != null) { + actor.tell(new PeerUp(memberName, peerId), sender); + } + } + + boolean isShardReady() { + return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role); + } + + boolean isShardReadyWithLeaderId() { + return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) && + (isLeader() || peerAddresses.get(leaderId) != null); + } + boolean isShardInitialized() { return getActor() != null && actorInitialized; } + boolean isLeader() { + return Objects.equal(leaderId, shardId.toString()); + } + + String getSerializedLeaderActor() { + if(isLeader()) { + return Serialization.serializedActorPath(getActor()); + } else { + return peerAddresses.get(leaderId); + } + } + void setActorInitialized() { + LOG.debug("Shard {} is initialized", shardId); + this.actorInitialized = true; - for(Runnable runnable: runnablesOnInitialized) { - runnable.run(); + notifyOnShardInitializedCallbacks(); + } + + private void notifyOnShardInitializedCallbacks() { + if(onShardInitializedSet.isEmpty()) { + return; + } + + boolean ready = isShardReadyWithLeaderId(); + + if(LOG.isDebugEnabled()) { + LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId, + ready ? "ready" : "initialized", onShardInitializedSet.size()); + } + + Iterator iter = onShardInitializedSet.iterator(); + while(iter.hasNext()) { + OnShardInitialized onShardInitialized = iter.next(); + if(!(onShardInitialized instanceof OnShardReady) || ready) { + iter.remove(); + onShardInitialized.getTimeoutSchedule().cancel(); + onShardInitialized.getReplyRunnable().run(); + } } + } - runnablesOnInitialized.clear(); + void addOnShardInitialized(OnShardInitialized onShardInitialized) { + onShardInitializedSet.add(onShardInitialized); } - void addRunnableOnInitialized(Runnable runnable) { - runnablesOnInitialized.add(runnable); + void removeOnShardInitialized(OnShardInitialized onShardInitialized) { + onShardInitializedSet.remove(onShardInitialized); + } + + void setRole(String newRole) { + this.role = newRole; + + notifyOnShardInitializedCallbacks(); + } + + void setFollowerSyncStatus(boolean syncStatus){ + this.followerSyncStatus = syncStatus; + } + + boolean isInSync(){ + if(RaftState.Follower.name().equals(this.role)){ + return followerSyncStatus; + } else if(RaftState.Leader.name().equals(this.role)){ + return true; + } + + return false; + } + + boolean setLeaderId(String leaderId) { + boolean changed = !Objects.equal(this.leaderId, leaderId); + this.leaderId = leaderId; + if(leaderId != null) { + this.leaderAvailable = true; + } + notifyOnShardInitializedCallbacks(); + + return changed; + } + + String getLeaderId() { + return leaderId; + } + + void setLeaderAvailable(boolean leaderAvailable) { + this.leaderAvailable = leaderAvailable; + } + + short getLeaderVersion() { + return leaderVersion; + } + + void setLeaderVersion(short leaderVersion) { + this.leaderVersion = leaderVersion; } } @@ -552,20 +973,81 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { final ClusterWrapper cluster; final Configuration configuration; final DatastoreContext datastoreContext; + private final CountDownLatch waitTillReadyCountdownLatch; + private final PrimaryShardInfoFutureCache primaryShardInfoCache; - ShardManagerCreator(ClusterWrapper cluster, - Configuration configuration, DatastoreContext datastoreContext) { + ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext, + CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) { this.cluster = cluster; this.configuration = configuration; this.datastoreContext = datastoreContext; + this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; + this.primaryShardInfoCache = primaryShardInfoCache; } @Override public ShardManager create() throws Exception { - return new ShardManager(cluster, configuration, datastoreContext); + return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, + primaryShardInfoCache); + } + } + + private static class OnShardInitialized { + private final Runnable replyRunnable; + private Cancellable timeoutSchedule; + + OnShardInitialized(Runnable replyRunnable) { + this.replyRunnable = replyRunnable; + } + + Runnable getReplyRunnable() { + return replyRunnable; + } + + Cancellable getTimeoutSchedule() { + return timeoutSchedule; + } + + void setTimeoutSchedule(Cancellable timeoutSchedule) { + this.timeoutSchedule = timeoutSchedule; } } + private static class OnShardReady extends OnShardInitialized { + OnShardReady(Runnable replyRunnable) { + super(replyRunnable); + } + } + + private static class ShardNotInitializedTimeout { + private final ActorRef sender; + private final ShardInformation shardInfo; + private final OnShardInitialized onShardInitialized; + + ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) { + this.sender = sender; + this.shardInfo = shardInfo; + this.onShardInitialized = onShardInitialized; + } + + ActorRef getSender() { + return sender; + } + + ShardInformation getShardInfo() { + return shardInfo; + } + + OnShardInitialized getOnShardInitialized() { + return onShardInitialized; + } + } + + /** + * We no longer persist SchemaContextModules but keep this class around for now for backwards + * compatibility so we don't get de-serialization failures on upgrade from Helium. + */ + @Deprecated static class SchemaContextModules implements Serializable { private static final long serialVersionUID = -8884620101025936590L;