X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=f4fa7b3a97e8617f8ac474b494a08deb330a0b6d;hb=34e38a415bc299403657315a5b61afd432dcbbee;hp=bc4c825351cc72148f5276fc28d5a94e2e64f79d;hpb=468b9523001807db03b3a545328ac9bf819278c7;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index bc4c825351..f4fa7b3a97 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -24,6 +24,7 @@ import akka.persistence.RecoveryFailure; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Supplier; @@ -41,28 +42,33 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; +import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; -import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -96,6 +102,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // A data store could be of type config/operational private final String type; + private final String shardManagerIdentifierString; + private final ClusterWrapper cluster; private final Configuration configuration; @@ -122,6 +130,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.datastoreContext = datastoreContext; this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent()); this.type = datastoreContext.getDataStoreType(); + this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; @@ -133,7 +142,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { - return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider(); + return (persistent) ? new PersistentDataProvider(this) : new NonPersistentDataProvider(); } public static Props props( @@ -158,8 +167,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void handleCommand(Object message) throws Exception { - if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) { - findPrimary(FindPrimary.fromSerializable(message)); + if (message instanceof FindPrimary) { + findPrimary((FindPrimary)message); } else if(message instanceof FindLocalShard){ findLocalShard((FindLocalShard) message); } else if (message instanceof UpdateSchemaContext) { @@ -180,20 +189,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message); } else if(message instanceof ShardNotInitializedTimeout) { onShardNotInitializedTimeout((ShardNotInitializedTimeout)message); - } else if(message instanceof LeaderStateChanged) { - onLeaderStateChanged((LeaderStateChanged)message); + } else if(message instanceof ShardLeaderStateChanged) { + onLeaderStateChanged((ShardLeaderStateChanged)message); } else { unknownMessage(message); } } - private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) { + private void checkReady(){ + if (isReadyWithLeaderId()) { + LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", + persistenceId(), type, waitTillReadyCountdownLatch.getCount()); + + waitTillReadyCountdownLatch.countDown(); + } + } + + private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) { LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged); ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); if(shardInformation != null) { + shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree()); shardInformation.setLeaderId(leaderStateChanged.getLeaderId()); + checkReady(); } else { LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId()); } @@ -203,13 +223,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardInformation shardInfo = message.getShardInfo(); LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(), - shardInfo.getShardId()); + shardInfo.getShardName()); shardInfo.removeOnShardInitialized(message.getOnShardInitialized()); if(!shardInfo.isShardInitialized()) { - message.getSender().tell(new ActorNotInitialized(), getSelf()); + LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName()); + message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf()); } else { + LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName()); message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf()); } } @@ -235,14 +257,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId()); if(shardInformation != null) { shardInformation.setRole(roleChanged.getNewRole()); - - if (isReady()) { - LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", - persistenceId(), type, waitTillReadyCountdownLatch.getCount()); - - waitTillReadyCountdownLatch.countDown(); - } - + checkReady(); mBean.setSyncStatus(isInSync()); } } @@ -258,10 +273,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return null; } - private boolean isReady() { + private boolean isReadyWithLeaderId() { boolean isReady = true; for (ShardInformation info : localShards.values()) { - if(!info.isShardReady()){ + if(!info.isShardReadyWithLeaderId()){ isReady = false; break; } @@ -297,7 +312,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void markShardAsInitialized(String shardName) { - LOG.debug("Initializing shard [{}]", shardName); + LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName); ShardInformation shardInformation = localShards.get(shardName); if (shardInformation != null) { @@ -367,6 +382,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInformation.addOnShardInitialized(onShardInitialized); + LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName()); + Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce( datastoreContext.getShardInitializationTimeout().duration(), getSelf(), new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender), @@ -375,8 +392,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onShardInitialized.setTimeoutSchedule(timeoutSchedule); } else if (!shardInformation.isShardInitialized()) { - getSender().tell(new ActorNotInitialized(), getSelf()); + LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), + shardInformation.getShardName()); + getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf()); } else { + LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), + shardInformation.getShardName()); getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf()); } @@ -392,13 +413,26 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { "recovering and a leader is being elected. Try again later.", shardId)); } + private NotInitializedException createNotInitializedException(ShardIdentifier shardId) { + return new NotInitializedException(String.format( + "Found primary shard %s but it's not initialized yet. Please try again later", shardId)); + } + private void memberRemoved(ClusterEvent.MemberRemoved message) { + String memberName = message.member().roles().head(); + + LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, + message.member().address()); + memberNameToAddress.remove(message.member().roles().head()); } private void memberUp(ClusterEvent.MemberUp message) { String memberName = message.member().roles().head(); + LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, + message.member().address()); + memberNameToAddress.put(memberName, message.member().address()); for(ShardInformation info : localShards.values()){ @@ -406,6 +440,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(), getShardActorPath(shardName, memberName), getSelf()); } + + checkReady(); } private void onDatastoreContext(DatastoreContext context) { @@ -461,6 +497,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } + @VisibleForTesting + protected ClusterWrapper getCluster() { + return cluster; + } + @VisibleForTesting protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) { return getContext().actorOf(Shard.props(info.getShardId(), @@ -469,7 +510,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void findPrimary(FindPrimary message) { + LOG.debug("{}: In findPrimary: {}", persistenceId(), message); + final String shardName = message.getShardName(); + final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary); // First see if the there is a local replica for the shard final ShardInformation info = localShards.get(shardName); @@ -477,10 +521,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { sendResponse(info, message.isWaitUntilReady(), true, new Supplier() { @Override public Object get() { - Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable(); + String primaryPath = info.getSerializedLeaderActor(); + Object found = canReturnLocalShardState && info.isLeader() ? + new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) : + new RemotePrimaryShardFound(primaryPath); if(LOG.isDebugEnabled()) { - LOG.debug("{}: Found primary for {}: {}", shardName, found); + LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); } return found; @@ -490,38 +537,36 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - List members = configuration.getMembersFromShardName(shardName); + for(Map.Entry entry: memberNameToAddress.entrySet()) { + if(!cluster.getCurrentMemberName().equals(entry.getKey())) { + String path = getShardManagerActorPathBuilder(entry.getValue()).toString(); - if(cluster.getCurrentMemberName() != null) { - members.remove(cluster.getCurrentMemberName()); - } + LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(), + shardName, path); - /** - * FIXME: Instead of sending remote shard actor path back to sender, - * forward FindPrimary message to remote shard manager - */ - // There is no way for us to figure out the primary (for now) so assume - // that one of the remote nodes is a primary - for(String memberName : members) { - Address address = memberNameToAddress.get(memberName); - if(address != null){ - String path = - getShardActorPath(shardName, memberName); - getSender().tell(new PrimaryFound(path).toSerializable(), getSelf()); + getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName, + message.isWaitUntilReady()), getContext()); return; } } - getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf()); + + LOG.debug("{}: No shard found for {}", persistenceId(), shardName); + + getSender().tell(new PrimaryNotFoundException( + String.format("No primary shard found for %s.", shardName)), getSelf()); + } + + private StringBuilder getShardManagerActorPathBuilder(Address address) { + StringBuilder builder = new StringBuilder(); + builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString); + return builder; } private String getShardActorPath(String shardName, String memberName) { Address address = memberNameToAddress.get(memberName); if(address != null) { - StringBuilder builder = new StringBuilder(); - builder.append(address.toString()) - .append("/user/") - .append(ShardManagerIdentifier.builder().type(type).build().toString()) - .append("/") + StringBuilder builder = getShardManagerActorPathBuilder(address); + builder.append("/") .append(getShardIdentifier(memberName, shardName)); return builder.toString(); } @@ -628,6 +673,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private ActorRef actor; private ActorPath actorPath; private final Map peerAddresses; + private Optional localShardDataTree; // flag that determines if the actor is ready for business private boolean actorInitialized = false; @@ -666,6 +712,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return shardId; } + void setLocalDataTree(Optional localShardDataTree) { + this.localShardDataTree = localShardDataTree; + } + + Optional getLocalShardDataTree() { + return localShardDataTree; + } + Map getPeerAddresses() { return peerAddresses; } @@ -694,7 +748,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } boolean isShardReadyWithLeaderId() { - return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId)); + return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null); } boolean isShardInitialized() {