X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=d3fb58e9d15e66e5c25ca40ce60c546cfb6d866e;hb=0fdb21f91e0e61e8ee911beb4cda9053537a0ccd;hp=dadbeeb4309703f80a823ca8ac3774e9b0e7a7e7;hpb=0a2ed6b43f45b92f09c291e99e2e66a7fa18085f;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index dadbeeb430..d3fb58e9d1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -51,6 +51,7 @@ import java.util.function.Consumer; import java.util.function.Supplier; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; +import org.opendaylight.controller.cluster.common.actor.Dispatchers; import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DatastoreContext; @@ -88,7 +89,6 @@ import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete; -import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; @@ -112,7 +112,6 @@ import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHan import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener; import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; @@ -306,7 +305,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.warn("{}: Failed to delete prior snapshots", persistenceId(), ((DeleteSnapshotsFailure) message).cause()); } else if (message instanceof DeleteSnapshotsSuccess) { - LOG.debug("{}: Successfully deleted prior snapshots", persistenceId(), message); + LOG.debug("{}: Successfully deleted prior snapshots", persistenceId()); } else if (message instanceof RegisterRoleChangeListenerReply) { LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId()); } else if (message instanceof ClusterEvent.MemberEvent) { @@ -336,7 +335,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onInitConfigListener() { LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName()); - final org.opendaylight.mdsal.common.api.LogicalDatastoreType type = + final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType = org.opendaylight.mdsal.common.api.LogicalDatastoreType .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name()); @@ -345,7 +344,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName()); - configUpdateHandler.initListener(dataStore, type); + configUpdateHandler.initListener(dataStore, datastoreType); } private void onShutDown() { @@ -442,13 +441,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { public void onComplete(final Throwable failure, final Object response) { if (failure != null) { shardReplicaOperationsInProgress.remove(shardName); - String msg = String.format("RemoveServer request to leader %s for shard %s failed", - primaryPath, shardName); - LOG.debug("{}: {}", persistenceId(), msg, failure); + LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath, + shardName, failure); // FAILURE - sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); + sender.tell(new Status.Failure(new RuntimeException( + String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName), + failure)), self()); } else { // SUCCESS self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender); @@ -482,13 +482,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { public void onComplete(final Throwable failure, final Object response) { if (failure != null) { shardReplicaOperationsInProgress.remove(shardName); - String msg = String.format("RemoveServer request to leader %s for shard %s failed", - primaryPath, shardName); - - LOG.debug("{}: {}", persistenceId(), msg, failure); + LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath, + shardName, failure); // FAILURE - sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); + sender.tell(new Status.Failure(new RuntimeException( + String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName), + failure)), self()); } else { // SUCCESS self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender); @@ -531,6 +531,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } self().tell((RunnableMessage) () -> { + // At any rate, invalidate primaryShardInfo cache + primaryShardInfoCache.remove(shardName); + shardActorsStopping.remove(shardName); notifyOnCompleteTasks(failure, result); }, ActorRef.noSender()); @@ -650,7 +653,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { configuration.addPrefixShardConfiguration(config); final Builder builder = newShardDatastoreContextBuilder(shardName); - builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name())) + builder.logicalStoreType(config.getPrefix().getDatastoreType()) .storeRoot(config.getPrefix().getRootIdentifier()); DatastoreContext shardDatastoreContext = builder.build(); @@ -846,7 +849,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final ActorRef sender = getSender(); if (sender == null) { - return; //why is a non-actor sending this message? Just ignore. + // why is a non-actor sending this message? Just ignore. + return; } String actorName = sender.path().name(); @@ -856,7 +860,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { try { shardId = ShardIdentifier.fromShardIdString(actorName); } catch (IllegalArgumentException e) { - LOG.debug("{}: ignoring actor {}", actorName, e); + LOG.debug("{}: ignoring actor {}", persistenceId, actorName, e); return; } @@ -1040,11 +1044,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void markMemberUnavailable(final MemberName memberName) { - final String memberStr = memberName.getName(); for (ShardInformation info : localShards.values()) { String leaderId = info.getLeaderId(); - // XXX: why are we using String#contains() here? - if (leaderId != null && leaderId.contains(memberStr)) { + if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) { LOG.debug("Marking Leader {} as unavailable.", leaderId); info.setLeaderAvailable(false); @@ -1056,11 +1058,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void markMemberAvailable(final MemberName memberName) { - final String memberStr = memberName.getName(); for (ShardInformation info : localShards.values()) { String leaderId = info.getLeaderId(); - // XXX: why are we using String#contains() here? - if (leaderId != null && leaderId.contains(memberStr)) { + if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) { LOG.debug("Marking Leader {} as available.", leaderId); info.setLeaderAvailable(true); } @@ -1125,7 +1125,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void updateSchemaContext(final Object message) { schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); - LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size()); + LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size()); for (ShardInformation info : localShards.values()) { info.setSchemaContext(schemaContext); @@ -1133,6 +1133,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { if (info.getActor() == null) { LOG.debug("Creating Shard {}", info.getShardId()); info.setActor(newShardActor(info)); + // Update peer address for every existing peer memeber to avoid missing sending + // PeerAddressResolved and PeerUp to this shard while UpdateSchemaContext comes after MemberUp. + String shardName = info.getShardName(); + for (MemberName memberName : peerAddressResolver.getPeerMembers()) { + String peerId = getShardIdentifier(memberName, shardName).toString() ; + String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName); + info.updatePeerAddress(peerId, peerAddress, getSelf()); + info.peerUp(memberName, peerId, getSelf()); + LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}", + persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor()); + } } else { info.getActor().tell(message, getSelf()); } @@ -1249,7 +1260,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - restoreFromSnapshot = null; // null out to GC + // null out to GC + restoreFromSnapshot = null; for (String shardName : memberShardNames) { ShardIdentifier shardId = getShardIdentifier(memberName, shardName); @@ -1309,9 +1321,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) { if (shardReplicaOperationsInProgress.contains(shardName)) { - String msg = String.format("A shard replica operation for %s is already in progress", shardName); - LOG.debug("{}: {}", persistenceId(), msg); - sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf()); + LOG.debug("{}: A shard replica operation for {} is already in progress", persistenceId(), shardName); + sender.tell(new Status.Failure(new IllegalStateException( + String.format("A shard replica operation for %s is already in progress", shardName))), getSelf()); return true; } @@ -1327,10 +1339,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { // Create the localShard if (schemaContext == null) { - String msg = String.format( - "No SchemaContext is available in order to create a local shard instance for %s", shardName); - LOG.debug("{}: {}", persistenceId(), msg); - getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf()); + LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}", + persistenceId(), shardName); + getSender().tell(new Status.Failure(new IllegalStateException( + "No SchemaContext is available in order to create a local shard instance for " + shardName)), + getSelf()); return; } @@ -1359,18 +1372,19 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { // verify the shard with the specified name is present in the cluster configuration if (!this.configuration.isShardConfigured(shardName)) { - String msg = String.format("No module configuration exists for shard %s", shardName); - LOG.debug("{}: {}", persistenceId(), msg); - getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf()); + LOG.debug("{}: No module configuration exists for shard {}", persistenceId(), shardName); + getSender().tell(new Status.Failure(new IllegalArgumentException( + "No module configuration exists for shard " + shardName)), getSelf()); return; } // Create the localShard if (schemaContext == null) { - String msg = String.format( - "No SchemaContext is available in order to create a local shard instance for %s", shardName); - LOG.debug("{}: {}", persistenceId(), msg); - getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf()); + LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}", + persistenceId(), shardName); + getSender().tell(new Status.Failure(new IllegalStateException( + "No SchemaContext is available in order to create a local shard instance for " + shardName)), + getSelf()); return; } @@ -1393,9 +1407,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) { - String msg = String.format("Local shard %s already exists", shardName); - LOG.debug("{}: {}", persistenceId(), msg); - sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf()); + LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName); + sender.tell(new Status.Failure(new AlreadyExistsException( + String.format("Local shard %s already exists", shardName))), getSelf()); } private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix, @@ -1747,15 +1761,16 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response), sender); } else if (response instanceof LocalShardNotFound) { - String msg = String.format("Local shard %s does not exist", shardName); - LOG.debug("{}: {}", persistenceId, msg); - sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self()); + LOG.debug("{}: Local shard {} does not exist", persistenceId, shardName); + sender.tell(new Status.Failure(new IllegalArgumentException( + String.format("Local shard %s does not exist", shardName))), self()); } else { - String msg = String.format("Failed to find local shard %s: received response: %s", - shardName, response); - LOG.debug("{}: {}", persistenceId, msg); - sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response : - new RuntimeException(msg)), self()); + LOG.debug("{}: Failed to find local shard {}: received response: {}", persistenceId, shardName, + response); + sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response + : new RuntimeException( + String.format("Failed to find local shard %s: received response: %s", shardName, + response))), self()); } } } @@ -1784,10 +1799,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { public void onComplete(final Throwable failure, final Object response) { shardReplicaOperationsInProgress.remove(shardName); if (failure != null) { - String msg = String.format("ChangeServersVotingStatus request to local shard %s failed", - shardActorRef.path()); - LOG.debug("{}: {}", persistenceId(), msg, failure); - sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); + LOG.debug("{}: ChangeServersVotingStatus request to local shard {} failed", persistenceId(), + shardActorRef.path(), failure); + sender.tell(new Status.Failure(new RuntimeException( + String.format("ChangeServersVotingStatus request to local shard %s failed", + shardActorRef.path()), failure)), self()); } else { LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path()); @@ -1949,11 +1965,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void onUnknownResponse(final Object response) { - String msg = String.format("Failed to find leader for shard %s: received response: %s", - shardName, response); - LOG.debug("{}: {}", persistenceId, msg); - targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response : - new RuntimeException(msg)), shardManagerActor); + LOG.debug("{}: Failed to find leader for shard {}: received response: {}", persistenceId, shardName, + response); + targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response + : new RuntimeException(String.format("Failed to find leader for shard %s: received response: %s", + shardName, response))), shardManagerActor); } }