X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=0b3768f39527783cdca2453e56914063a605f9a8;hp=6d26b8419c3561dd070623821e62cfca116674f2;hb=282a8afa481508d1c749d87502c6224932c8b36a;hpb=60b94e13e2c92ea7d3ef39d4aa6b96674a9415b2 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 6d26b8419c..0b3768f395 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -336,7 +336,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onInitConfigListener() { LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName()); - final org.opendaylight.mdsal.common.api.LogicalDatastoreType type = + final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType = org.opendaylight.mdsal.common.api.LogicalDatastoreType .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name()); @@ -345,7 +345,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName()); - configUpdateHandler.initListener(dataStore, type); + configUpdateHandler.initListener(dataStore, datastoreType); } private void onShutDown() { @@ -531,6 +531,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } self().tell((RunnableMessage) () -> { + // At any rate, invalidate primaryShardInfo cache + primaryShardInfoCache.remove(shardName); + shardActorsStopping.remove(shardName); notifyOnCompleteTasks(failure, result); }, ActorRef.noSender()); @@ -846,7 +849,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final ActorRef sender = getSender(); if (sender == null) { - return; //why is a non-actor sending this message? Just ignore. + // why is a non-actor sending this message? Just ignore. + return; } String actorName = sender.path().name(); @@ -1040,11 +1044,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void markMemberUnavailable(final MemberName memberName) { - final String memberStr = memberName.getName(); for (ShardInformation info : localShards.values()) { String leaderId = info.getLeaderId(); - // XXX: why are we using String#contains() here? - if (leaderId != null && leaderId.contains(memberStr)) { + if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) { LOG.debug("Marking Leader {} as unavailable.", leaderId); info.setLeaderAvailable(false); @@ -1056,11 +1058,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void markMemberAvailable(final MemberName memberName) { - final String memberStr = memberName.getName(); for (ShardInformation info : localShards.values()) { String leaderId = info.getLeaderId(); - // XXX: why are we using String#contains() here? - if (leaderId != null && leaderId.contains(memberStr)) { + if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) { LOG.debug("Marking Leader {} as available.", leaderId); info.setLeaderAvailable(true); } @@ -1133,6 +1133,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { if (info.getActor() == null) { LOG.debug("Creating Shard {}", info.getShardId()); info.setActor(newShardActor(info)); + // Update peer address for every existing peer memeber to avoid missing sending + // PeerAddressResolved and PeerUp to this shard while UpdateSchemaContext comes after MemberUp. + String shardName = info.getShardName(); + for (MemberName memberName : peerAddressResolver.getPeerMembers()) { + String peerId = getShardIdentifier(memberName, shardName).toString() ; + String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName); + info.updatePeerAddress(peerId, peerAddress, getSelf()); + info.peerUp(memberName, peerId, getSelf()); + LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}", + persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor()); + } } else { info.getActor().tell(message, getSelf()); } @@ -1249,7 +1260,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - restoreFromSnapshot = null; // null out to GC + // null out to GC + restoreFromSnapshot = null; for (String shardName : memberShardNames) { ShardIdentifier shardId = getShardIdentifier(memberName, shardName);