X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=187d2522de9e7df810a878b8c4605a3e3a44e2eb;hp=dadbeeb4309703f80a823ca8ac3774e9b0e7a7e7;hb=20a32e6459fd1e27e7669bf1ebc7742b96787b94;hpb=0a2ed6b43f45b92f09c291e99e2e66a7fa18085f diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index dadbeeb430..187d2522de 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -51,6 +51,7 @@ import java.util.function.Consumer; import java.util.function.Supplier; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; +import org.opendaylight.controller.cluster.common.actor.Dispatchers; import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DatastoreContext; @@ -88,7 +89,6 @@ import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete; -import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; @@ -336,7 +336,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onInitConfigListener() { LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName()); - final org.opendaylight.mdsal.common.api.LogicalDatastoreType type = + final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType = org.opendaylight.mdsal.common.api.LogicalDatastoreType .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name()); @@ -345,7 +345,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName()); - configUpdateHandler.initListener(dataStore, type); + configUpdateHandler.initListener(dataStore, datastoreType); } private void onShutDown() { @@ -531,6 +531,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } self().tell((RunnableMessage) () -> { + // At any rate, invalidate primaryShardInfo cache + primaryShardInfoCache.remove(shardName); + shardActorsStopping.remove(shardName); notifyOnCompleteTasks(failure, result); }, ActorRef.noSender()); @@ -846,7 +849,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final ActorRef sender = getSender(); if (sender == null) { - return; //why is a non-actor sending this message? Just ignore. + // why is a non-actor sending this message? Just ignore. + return; } String actorName = sender.path().name(); @@ -1040,11 +1044,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void markMemberUnavailable(final MemberName memberName) { - final String memberStr = memberName.getName(); for (ShardInformation info : localShards.values()) { String leaderId = info.getLeaderId(); - // XXX: why are we using String#contains() here? - if (leaderId != null && leaderId.contains(memberStr)) { + if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) { LOG.debug("Marking Leader {} as unavailable.", leaderId); info.setLeaderAvailable(false); @@ -1056,11 +1058,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void markMemberAvailable(final MemberName memberName) { - final String memberStr = memberName.getName(); for (ShardInformation info : localShards.values()) { String leaderId = info.getLeaderId(); - // XXX: why are we using String#contains() here? - if (leaderId != null && leaderId.contains(memberStr)) { + if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) { LOG.debug("Marking Leader {} as available.", leaderId); info.setLeaderAvailable(true); } @@ -1125,7 +1125,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void updateSchemaContext(final Object message) { schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); - LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size()); + LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size()); for (ShardInformation info : localShards.values()) { info.setSchemaContext(schemaContext); @@ -1133,6 +1133,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { if (info.getActor() == null) { LOG.debug("Creating Shard {}", info.getShardId()); info.setActor(newShardActor(info)); + // Update peer address for every existing peer memeber to avoid missing sending + // PeerAddressResolved and PeerUp to this shard while UpdateSchemaContext comes after MemberUp. + String shardName = info.getShardName(); + for (MemberName memberName : peerAddressResolver.getPeerMembers()) { + String peerId = getShardIdentifier(memberName, shardName).toString() ; + String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName); + info.updatePeerAddress(peerId, peerAddress, getSelf()); + info.peerUp(memberName, peerId, getSelf()); + LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}", + persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor()); + } } else { info.getActor().tell(message, getSelf()); } @@ -1249,7 +1260,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - restoreFromSnapshot = null; // null out to GC + // null out to GC + restoreFromSnapshot = null; for (String shardName : memberShardNames) { ShardIdentifier shardId = getShardIdentifier(memberName, shardName);