X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=1632d5646640101ed47a8d1b0e50417b39686a0d;hp=3dbac003b9d66b74520d431c2d7ebb45da488fbc;hb=1fbf74e08377dd5e003274be1a43df42d1678845;hpb=03cbc1c8d5e08fd1f3001f8d0c30c6348c8c5778 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 3dbac003b9..1632d56466 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.Address; +import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.SupervisorStrategy; @@ -20,22 +21,28 @@ import akka.japi.Function; import akka.japi.Procedure; import akka.persistence.RecoveryCompleted; import akka.persistence.RecoveryFailure; +import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; @@ -50,6 +57,12 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -67,7 +80,7 @@ import scala.concurrent.duration.Duration; */ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { - private final Logger LOG = LoggerFactory.getLogger(getClass()); + private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class); // Stores a mapping between a member name and the address of the member // Member names look like "member-1", "member-2" etc and are as specified @@ -87,24 +100,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Configuration configuration; - private ShardManagerInfoMBean mBean; + private final String shardDispatcherPath; - private final DatastoreContext datastoreContext; + private ShardManagerInfo mBean; + + private DatastoreContext datastoreContext; private Collection knownModules = Collections.emptySet(); private final DataPersistenceProvider dataPersistenceProvider; + private final CountDownLatch waitTillReadyCountdownLatch; + /** */ protected ShardManager(ClusterWrapper cluster, Configuration configuration, - DatastoreContext datastoreContext) { + DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) { this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); this.datastoreContext = datastoreContext; this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent()); this.type = datastoreContext.getDataStoreType(); + this.shardDispatcherPath = + new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); + this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); @@ -119,17 +139,26 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { public static Props props( final ClusterWrapper cluster, final Configuration configuration, - final DatastoreContext datastoreContext) { + final DatastoreContext datastoreContext, + final CountDownLatch waitTillReadyCountdownLatch) { Preconditions.checkNotNull(cluster, "cluster should not be null"); Preconditions.checkNotNull(configuration, "configuration should not be null"); + Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null"); - return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext)); + return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch)); + } + + @Override + public void postStop() { + LOG.info("Stopping ShardManager"); + + mBean.unregisterMBean(); } @Override public void handleCommand(Object message) throws Exception { - if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) { + if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) { findPrimary(FindPrimary.fromSerializable(message)); } else if(message instanceof FindLocalShard){ findLocalShard((FindLocalShard) message); @@ -143,12 +172,119 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { memberRemoved((ClusterEvent.MemberRemoved) message); } else if(message instanceof ClusterEvent.UnreachableMember) { ignoreMessage(message); - } else{ + } else if(message instanceof DatastoreContext) { + onDatastoreContext((DatastoreContext)message); + } else if(message instanceof RoleChangeNotification) { + onRoleChangeNotification((RoleChangeNotification) message); + } else if(message instanceof FollowerInitialSyncUpStatus){ + onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message); + } else if(message instanceof ShardNotInitializedTimeout) { + onShardNotInitializedTimeout((ShardNotInitializedTimeout)message); + } else if(message instanceof LeaderStateChanged) { + onLeaderStateChanged((LeaderStateChanged)message); + } else { unknownMessage(message); } } + private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) { + LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged); + + ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); + if(shardInformation != null) { + shardInformation.setLeaderId(leaderStateChanged.getLeaderId()); + if (isReadyWithLeaderId()) { + LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", + persistenceId(), type, waitTillReadyCountdownLatch.getCount()); + + waitTillReadyCountdownLatch.countDown(); + } + + } else { + LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId()); + } + } + + private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) { + ShardInformation shardInfo = message.getShardInfo(); + + LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(), + shardInfo.getShardId()); + + shardInfo.removeOnShardInitialized(message.getOnShardInitialized()); + + if(!shardInfo.isShardInitialized()) { + message.getSender().tell(new ActorNotInitialized(), getSelf()); + } else { + message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf()); + } + } + + private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) { + LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(), + status.getName(), status.isInitialSyncDone()); + + ShardInformation shardInformation = findShardInformation(status.getName()); + + if(shardInformation != null) { + shardInformation.setFollowerSyncStatus(status.isInitialSyncDone()); + + mBean.setSyncStatus(isInSync()); + } + + } + + private void onRoleChangeNotification(RoleChangeNotification roleChanged) { + LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(), + roleChanged.getOldRole(), roleChanged.getNewRole()); + + ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId()); + if(shardInformation != null) { + shardInformation.setRole(roleChanged.getNewRole()); + + if (isReadyWithLeaderId()) { + LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", + persistenceId(), type, waitTillReadyCountdownLatch.getCount()); + + waitTillReadyCountdownLatch.countDown(); + } + + mBean.setSyncStatus(isInSync()); + } + } + + + private ShardInformation findShardInformation(String memberId) { + for(ShardInformation info : localShards.values()){ + if(info.getShardId().toString().equals(memberId)){ + return info; + } + } + + return null; + } + + private boolean isReadyWithLeaderId() { + boolean isReady = true; + for (ShardInformation info : localShards.values()) { + if(!info.isShardReadyWithLeaderId()){ + isReady = false; + break; + } + } + return isReady; + } + + private boolean isInSync(){ + for (ShardInformation info : localShards.values()) { + if(!info.isInSync()){ + return false; + } + } + return true; + } + private void onActorInitialized(Object message) { final ActorRef sender = getSender(); @@ -163,14 +299,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { if (shardId.getShardName() == null) { return; } + markShardAsInitialized(shardId.getShardName()); } private void markShardAsInitialized(String shardName) { LOG.debug("Initializing shard [{}]", shardName); + ShardInformation shardInformation = localShards.get(shardName); if (shardInformation != null) { shardInformation.setActorInitialized(); + + shardInformation.getActor().tell(new RegisterRoleChangeListener(), self()); } } @@ -207,7 +347,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier() { + sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier() { @Override public Object get() { return new LocalShardFound(shardInformation.getActor()); @@ -215,20 +355,36 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } - private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized, - final Supplier messageSupplier) { - if (!shardInformation.isShardInitialized()) { - if(waitUntilInitialized) { + private void sendResponse(ShardInformation shardInformation, boolean doWait, + boolean wantShardReady, final Supplier messageSupplier) { + if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) { + if(doWait) { final ActorRef sender = getSender(); final ActorRef self = self(); - shardInformation.addRunnableOnInitialized(new Runnable() { + + Runnable replyRunnable = new Runnable() { @Override public void run() { sender.tell(messageSupplier.get(), self); } - }); - } else { + }; + + OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) : + new OnShardInitialized(replyRunnable); + + shardInformation.addOnShardInitialized(onShardInitialized); + + Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce( + datastoreContext.getShardInitializationTimeout().duration(), getSelf(), + new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender), + getContext().dispatcher(), getSelf()); + + onShardInitialized.setTimeoutSchedule(timeoutSchedule); + + } else if (!shardInformation.isShardInitialized()) { getSender().tell(new ActorNotInitialized(), getSelf()); + } else { + getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf()); } return; @@ -237,6 +393,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSender().tell(messageSupplier.get(), getSelf()); } + private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) { + return new NoShardLeaderException(String.format( + "Could not find a leader for shard %s. This typically happens when the system is coming up or " + + "recovering and a leader is being elected. Try again later.", shardId)); + } + private void memberRemoved(ClusterEvent.MemberRemoved message) { memberNameToAddress.remove(message.member().roles().head()); } @@ -248,8 +410,17 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { for(ShardInformation info : localShards.values()){ String shardName = info.getShardName(); - info.updatePeerAddress(getShardIdentifier(memberName, shardName), - getShardActorPath(shardName, memberName)); + info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(), + getShardActorPath(shardName, memberName), getSelf()); + } + } + + private void onDatastoreContext(DatastoreContext context) { + datastoreContext = context; + for (ShardInformation info : localShards.values()) { + if (info.getActor() != null) { + info.getActor().tell(datastoreContext, getSelf()); + } } } @@ -271,7 +442,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { if(newModules.containsAll(knownModules)) { - LOG.info("New SchemaContext has a super set of current knownModules - persisting info"); + LOG.debug("New SchemaContext has a super set of current knownModules - persisting info"); knownModules = ImmutableSet.copyOf(newModules); @@ -279,12 +450,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void apply(SchemaContextModules param) throws Exception { - LOG.info("Sending new SchemaContext to Shards"); + LOG.debug("Sending new SchemaContext to Shards"); for (ShardInformation info : localShards.values()) { if (info.getActor() == null) { - info.setActor(getContext().actorOf(Shard.props(info.getShardId(), - info.getPeerAddresses(), datastoreContext, schemaContext), - info.getShardId().toString())); + info.setActor(newShardActor(schemaContext, info)); } else { info.getActor().tell(message, getSelf()); } @@ -293,21 +462,35 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } else { - LOG.info("Rejecting schema context update because it is not a super set of previously known modules"); + LOG.debug("Rejecting schema context update - not a super set of previously known modules:\nUPDATE: {}\nKNOWN: {}", + newModules, knownModules); } } + @VisibleForTesting + protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) { + return getContext().actorOf(Shard.props(info.getShardId(), + info.getPeerAddresses(), datastoreContext, schemaContext) + .withDispatcher(shardDispatcherPath), info.getShardId().toString()); + } + private void findPrimary(FindPrimary message) { - String shardName = message.getShardName(); + final String shardName = message.getShardName(); // First see if the there is a local replica for the shard final ShardInformation info = localShards.get(shardName); if (info != null) { - sendResponse(info, message.isWaitUntilInitialized(), new Supplier() { + sendResponse(info, message.isWaitUntilReady(), true, new Supplier() { @Override public Object get() { - return new PrimaryFound(info.getActorPath().toString()).toSerializable(); + Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable(); + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Found primary for {}: {}", shardName, found); + } + + return found; } }); @@ -377,7 +560,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { List localShardActorNames = new ArrayList<>(); for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); - Map peerAddresses = getPeerAddresses(shardName); + Map peerAddresses = getPeerAddresses(shardName); localShardActorNames.add(shardId.toString()); localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses)); } @@ -392,22 +575,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param shardName * @return */ - private Map getPeerAddresses(String shardName){ + private Map getPeerAddresses(String shardName){ - Map peerAddresses = new HashMap<>(); + Map peerAddresses = new HashMap<>(); - List members = - this.configuration.getMembersFromShardName(shardName); + List members = this.configuration.getMembersFromShardName(shardName); String currentMemberName = this.cluster.getCurrentMemberName(); for(String memberName : members){ if(!currentMemberName.equals(memberName)){ - ShardIdentifier shardId = getShardIdentifier(memberName, - shardName); - String path = - getShardActorPath(shardName, currentMemberName); - peerAddresses.put(shardId, path); + ShardIdentifier shardId = getShardIdentifier(memberName, shardName); + String path = getShardActorPath(shardName, currentMemberName); + peerAddresses.put(shardId.toString(), path); } } return peerAddresses; @@ -443,20 +623,30 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return dataPersistenceProvider; } - private class ShardInformation { + @VisibleForTesting + ShardManagerInfoMBean getMBean(){ + return mBean; + } + + @VisibleForTesting + protected static class ShardInformation { private final ShardIdentifier shardId; private final String shardName; private ActorRef actor; private ActorPath actorPath; - private final Map peerAddresses; + private final Map peerAddresses; // flag that determines if the actor is ready for business private boolean actorInitialized = false; - private final List runnablesOnInitialized = Lists.newArrayList(); + private boolean followerSyncStatus = false; + + private final Set onShardInitializedSet = Sets.newHashSet(); + private String role ; + private String leaderId; private ShardInformation(String shardName, ShardIdentifier shardId, - Map peerAddresses) { + Map peerAddresses) { this.shardName = shardName; this.shardId = shardId; this.peerAddresses = peerAddresses; @@ -483,11 +673,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return shardId; } - Map getPeerAddresses() { + Map getPeerAddresses() { return peerAddresses; } - void updatePeerAddress(ShardIdentifier peerId, String peerAddress){ + void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){ LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); if(peerAddresses.containsKey(peerId)){ @@ -499,27 +689,100 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { peerId, peerAddress, actor.path()); } - actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf()); + actor.tell(new PeerAddressResolved(peerId.toString(), peerAddress), sender); } + + notifyOnShardInitializedCallbacks(); } } + boolean isShardReady() { + return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role); + } + + boolean isShardReadyWithLeaderId() { + return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId)); + } + boolean isShardInitialized() { return getActor() != null && actorInitialized; } + boolean isLeader() { + return Objects.equal(leaderId, shardId.toString()); + } + + String getSerializedLeaderActor() { + if(isLeader()) { + return Serialization.serializedActorPath(getActor()); + } else { + return peerAddresses.get(leaderId); + } + } + void setActorInitialized() { + LOG.debug("Shard {} is initialized", shardId); + this.actorInitialized = true; - for(Runnable runnable: runnablesOnInitialized) { - runnable.run(); + notifyOnShardInitializedCallbacks(); + } + + private void notifyOnShardInitializedCallbacks() { + if(onShardInitializedSet.isEmpty()) { + return; + } + + boolean ready = isShardReadyWithLeaderId(); + + if(LOG.isDebugEnabled()) { + LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId, + ready ? "ready" : "initialized", onShardInitializedSet.size()); + } + + Iterator iter = onShardInitializedSet.iterator(); + while(iter.hasNext()) { + OnShardInitialized onShardInitialized = iter.next(); + if(!(onShardInitialized instanceof OnShardReady) || ready) { + iter.remove(); + onShardInitialized.getTimeoutSchedule().cancel(); + onShardInitialized.getReplyRunnable().run(); + } + } + } + + void addOnShardInitialized(OnShardInitialized onShardInitialized) { + onShardInitializedSet.add(onShardInitialized); + } + + void removeOnShardInitialized(OnShardInitialized onShardInitialized) { + onShardInitializedSet.remove(onShardInitialized); + } + + void setRole(String newRole) { + this.role = newRole; + + notifyOnShardInitializedCallbacks(); + } + + void setFollowerSyncStatus(boolean syncStatus){ + this.followerSyncStatus = syncStatus; + } + + boolean isInSync(){ + if(RaftState.Follower.name().equals(this.role)){ + return followerSyncStatus; + } else if(RaftState.Leader.name().equals(this.role)){ + return true; } - runnablesOnInitialized.clear(); + return false; } - void addRunnableOnInitialized(Runnable runnable) { - runnablesOnInitialized.add(runnable); + void setLeaderId(String leaderId) { + this.leaderId = leaderId; + + notifyOnShardInitializedCallbacks(); } } @@ -529,17 +792,70 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { final ClusterWrapper cluster; final Configuration configuration; final DatastoreContext datastoreContext; + private final CountDownLatch waitTillReadyCountdownLatch; ShardManagerCreator(ClusterWrapper cluster, - Configuration configuration, DatastoreContext datastoreContext) { + Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) { this.cluster = cluster; this.configuration = configuration; this.datastoreContext = datastoreContext; + this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; } @Override public ShardManager create() throws Exception { - return new ShardManager(cluster, configuration, datastoreContext); + return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch); + } + } + + private static class OnShardInitialized { + private final Runnable replyRunnable; + private Cancellable timeoutSchedule; + + OnShardInitialized(Runnable replyRunnable) { + this.replyRunnable = replyRunnable; + } + + Runnable getReplyRunnable() { + return replyRunnable; + } + + Cancellable getTimeoutSchedule() { + return timeoutSchedule; + } + + void setTimeoutSchedule(Cancellable timeoutSchedule) { + this.timeoutSchedule = timeoutSchedule; + } + } + + private static class OnShardReady extends OnShardInitialized { + OnShardReady(Runnable replyRunnable) { + super(replyRunnable); + } + } + + private static class ShardNotInitializedTimeout { + private final ActorRef sender; + private final ShardInformation shardInfo; + private final OnShardInitialized onShardInitialized; + + ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) { + this.sender = sender; + this.shardInfo = shardInfo; + this.onShardInitialized = onShardInitialized; + } + + ActorRef getSender() { + return sender; + } + + ShardInformation getShardInfo() { + return shardInfo; + } + + OnShardInitialized getOnShardInitialized() { + return onShardInitialized; } }