From: Tony Tkacik Date: Mon, 4 May 2015 08:21:17 +0000 (+0000) Subject: Merge "BUG 3066 : Optimistic lock failed, on NetconfStateUpdate" X-Git-Tag: release/beryllium~621 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=2890d49fb524bf060f7e95c83bc025df0b6980ed;hp=fec4c0cc25398abf710e562e61da42667da5dec4 Merge "BUG 3066 : Optimistic lock failed, on NetconfStateUpdate" --- diff --git a/opendaylight/commons/opendaylight/pom.xml b/opendaylight/commons/opendaylight/pom.xml index 47b2afe92d..885d9fdb53 100644 --- a/opendaylight/commons/opendaylight/pom.xml +++ b/opendaylight/commons/opendaylight/pom.xml @@ -15,7 +15,7 @@ - 2.3.9 + 2.3.10 0.6.0-SNAPSHOT 0.2.0-SNAPSHOT 0.7.0-SNAPSHOT diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java index 7b8816bd1b..17663d982c 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java @@ -114,6 +114,7 @@ public abstract class AbstractDispatcher, L extends if (LocalServerChannel.class.equals(channelClass) == false) { // makes no sense for LocalServer and produces warning b.childOption(ChannelOption.SO_KEEPALIVE, true); + b.childOption(ChannelOption.TCP_NODELAY , true); } b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); customizeBootstrap(b); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 1738cc5fe0..d82528c48a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -298,7 +298,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { Optional roleChangeNotifier = getRoleChangeNotifier(); if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) { if(roleChangeNotifier.isPresent()) { - roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf()); + roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf()); } onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId()); @@ -311,6 +311,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } + protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) { + return new LeaderStateChanged(memberId, leaderId); + } + /** * When a derived RaftActor needs to persist something it must call * persistData. diff --git a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationListener.java b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationListener.java index be8e0cefc1..e5a0a2bd6d 100644 --- a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationListener.java +++ b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationListener.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.sal.binding.api; import java.util.EventListener; - import org.opendaylight.yangtools.yang.binding.Notification; /** @@ -17,7 +16,9 @@ import org.opendaylight.yangtools.yang.binding.Notification; * capture of this interface. * * @param the interested notification type + * @deprecated Deprecated unused API. */ +@Deprecated public interface NotificationListener extends EventListener { /** * Invoked to deliver a notification. diff --git a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationProviderService.java b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationProviderService.java index 00db80c19f..4b06e77c44 100644 --- a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationProviderService.java +++ b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationProviderService.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.sal.binding.api; import java.util.EventListener; import java.util.concurrent.ExecutorService; - import org.opendaylight.controller.md.sal.common.api.notify.NotificationPublishService; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.Notification; @@ -18,7 +17,10 @@ import org.opendaylight.yangtools.yang.binding.Notification; * Interface for a notification service that provides publish/subscribe capabilities for YANG * modeled notifications. This interface is a combination of the {@link NotificationService} and * {@link NotificationPublishService} interfaces. + * + * @deprecated Please use {@link org.opendaylight.controller.md.sal.binding.api.NotificationPublishService}. */ +@Deprecated public interface NotificationProviderService extends NotificationService, NotificationPublishService { /** diff --git a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationService.java b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationService.java index 335f55bcbb..dd66aa67f8 100644 --- a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationService.java +++ b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationService.java @@ -91,7 +91,10 @@ import org.opendaylight.yangtools.yang.binding.Notification; * * The onStart method will be invoked when someone publishes a Start notification and * the onStop method will be invoked when someone publishes a Stop notification. + * + * @deprecated Please use {@link org.opendaylight.controller.md.sal.binding.api.NotificationService} instead. */ +@Deprecated public interface NotificationService extends BindingAwareService { /** * Registers a generic listener implementation for a specified notification type. diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java index ec35b03b0a..23c95ecc99 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java @@ -7,29 +7,30 @@ */ package org.opendaylight.controller.cluster.notifications; -import java.io.Serializable; +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + /** - * A message initiated internally from the RaftActor when some state of a leader has changed + * A local message initiated internally from the RaftActor when some state of a leader has changed. * * @author Thomas Pantelis */ -public class LeaderStateChanged implements Serializable { - private static final long serialVersionUID = 1L; - +public class LeaderStateChanged { private final String memberId; private final String leaderId; - public LeaderStateChanged(String memberId, String leaderId) { - this.memberId = memberId; + public LeaderStateChanged(@Nonnull String memberId, @Nullable String leaderId) { + this.memberId = Preconditions.checkNotNull(memberId); this.leaderId = leaderId; } - public String getMemberId() { + public @Nonnull String getMemberId() { return memberId; } - public String getLeaderId() { + public @Nullable String getLeaderId() { return leaderId; } diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf index e72f4b2675..8d65e59dfe 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf @@ -56,6 +56,16 @@ odl-cluster-data { ] } + + persistence { + # By default the snapshots/journal directories live in KARAF_HOME. You can choose to put it somewhere else by + # modifying the following two properties. The directory location specified may be a relative or absolute path. + # The relative path is always relative to KARAF_HOME. + + # snapshot-store.local.dir = "target/snapshots" + # journal.leveldb.dir = "target/journal" + + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 62d3259a71..e62e918e5e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -48,12 +48,14 @@ import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTran import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.MessageTracker; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; @@ -63,6 +65,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; @@ -265,6 +268,12 @@ public class Shard extends RaftActor { return roleChangeNotifier; } + @Override + protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) { + return new ShardLeaderStateChanged(memberId, leaderId, + isLeader() ? Optional.of(store.getDataTree()) : Optional.absent()); + } + private void onDatastoreContext(DatastoreContext context) { datastoreContext = context; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index cff44b13cb..f4fa7b3a97 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -24,6 +24,7 @@ import akka.persistence.RecoveryFailure; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Supplier; @@ -54,17 +55,20 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.Sha import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; -import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -185,27 +189,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message); } else if(message instanceof ShardNotInitializedTimeout) { onShardNotInitializedTimeout((ShardNotInitializedTimeout)message); - } else if(message instanceof LeaderStateChanged) { - onLeaderStateChanged((LeaderStateChanged)message); + } else if(message instanceof ShardLeaderStateChanged) { + onLeaderStateChanged((ShardLeaderStateChanged)message); } else { unknownMessage(message); } } - private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) { + private void checkReady(){ + if (isReadyWithLeaderId()) { + LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", + persistenceId(), type, waitTillReadyCountdownLatch.getCount()); + + waitTillReadyCountdownLatch.countDown(); + } + } + + private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) { LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged); ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); if(shardInformation != null) { + shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree()); shardInformation.setLeaderId(leaderStateChanged.getLeaderId()); - if (isReadyWithLeaderId()) { - LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", - persistenceId(), type, waitTillReadyCountdownLatch.getCount()); - - waitTillReadyCountdownLatch.countDown(); - } - + checkReady(); } else { LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId()); } @@ -249,14 +257,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId()); if(shardInformation != null) { shardInformation.setRole(roleChanged.getNewRole()); - - if (isReadyWithLeaderId()) { - LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", - persistenceId(), type, waitTillReadyCountdownLatch.getCount()); - - waitTillReadyCountdownLatch.countDown(); - } - + checkReady(); mBean.setSyncStatus(isInSync()); } } @@ -439,6 +440,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(), getShardActorPath(shardName, memberName), getSelf()); } + + checkReady(); } private void onDatastoreContext(DatastoreContext context) { @@ -510,6 +513,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: In findPrimary: {}", persistenceId(), message); final String shardName = message.getShardName(); + final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary); // First see if the there is a local replica for the shard final ShardInformation info = localShards.get(shardName); @@ -517,7 +521,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { sendResponse(info, message.isWaitUntilReady(), true, new Supplier() { @Override public Object get() { - Object found = new PrimaryFound(info.getSerializedLeaderActor()); + String primaryPath = info.getSerializedLeaderActor(); + Object found = canReturnLocalShardState && info.isLeader() ? + new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) : + new RemotePrimaryShardFound(primaryPath); if(LOG.isDebugEnabled()) { LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); @@ -537,7 +544,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(), shardName, path); - getContext().actorSelection(path).forward(message, getContext()); + getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName, + message.isWaitUntilReady()), getContext()); return; } } @@ -665,6 +673,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private ActorRef actor; private ActorPath actorPath; private final Map peerAddresses; + private Optional localShardDataTree; // flag that determines if the actor is ready for business private boolean actorInitialized = false; @@ -703,6 +712,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return shardId; } + void setLocalDataTree(Optional localShardDataTree) { + this.localShardDataTree = localShardDataTree; + } + + Optional getLocalShardDataTree() { + return localShardDataTree; + } + Map getPeerAddresses() { return peerAddresses; } @@ -731,7 +748,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } boolean isShardReadyWithLeaderId() { - return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId)); + return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null); } boolean isShardInitialized() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java index 2c18eaa86f..0b7fcf0ed5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java @@ -21,7 +21,7 @@ public class FindPrimary implements Serializable { private final String shardName; private final boolean waitUntilReady; - public FindPrimary(String shardName, boolean waitUntilReady){ + public FindPrimary(String shardName, boolean waitUntilReady) { Preconditions.checkNotNull(shardName, "shardName should not be null"); @@ -40,8 +40,8 @@ public class FindPrimary implements Serializable { @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("FindPrimary [shardName=").append(shardName).append(", waitUntilReady=").append(waitUntilReady) - .append("]"); + builder.append(getClass().getName()).append(" [shardName=").append(shardName).append(", waitUntilReady=") + .append(waitUntilReady).append("]"); return builder.toString(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalPrimaryShardFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalPrimaryShardFound.java new file mode 100644 index 0000000000..e19dcd65b3 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalPrimaryShardFound.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; +import org.apache.commons.lang3.ObjectUtils; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; + +/** + * Local message sent in reply to FindPrimaryShard to indicate the primary shard is local to the caller. + * + * @author Thomas Pantelis + */ +public class LocalPrimaryShardFound { + + private final String primaryPath; + private final DataTree localShardDataTree; + + public LocalPrimaryShardFound(@Nonnull String primaryPath, @Nonnull DataTree localShardDataTree) { + this.primaryPath = Preconditions.checkNotNull(primaryPath); + this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree); + } + + public @Nonnull String getPrimaryPath() { + return primaryPath; + } + + public @Nonnull DataTree getLocalShardDataTree() { + return localShardDataTree; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("LocalPrimaryShardFound [primaryPath=").append(primaryPath).append(", localShardDataTree=") + .append(ObjectUtils.identityToString(localShardDataTree)).append("]"); + return builder.toString(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java new file mode 100644 index 0000000000..820512e096 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +/** + * A remote message sent to locate the primary shard. + * + * @author Thomas Pantelis + */ +public class RemoteFindPrimary extends FindPrimary { + private static final long serialVersionUID = 1L; + + public RemoteFindPrimary(String shardName, boolean waitUntilReady) { + super(shardName, waitUntilReady); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemotePrimaryShardFound.java similarity index 53% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemotePrimaryShardFound.java index 4c154d43ae..662eefd9d1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemotePrimaryShardFound.java @@ -10,12 +10,15 @@ package org.opendaylight.controller.cluster.datastore.messages; import java.io.Serializable; -public class PrimaryFound implements Serializable { +/** + * Local or remote message sent in reply to FindPrimaryShard to indicate the primary shard is remote to the caller. + */ +public class RemotePrimaryShardFound implements Serializable { private static final long serialVersionUID = 1L; private final String primaryPath; - public PrimaryFound(final String primaryPath) { + public RemotePrimaryShardFound(final String primaryPath) { this.primaryPath = primaryPath; } @@ -23,33 +26,10 @@ public class PrimaryFound implements Serializable { return primaryPath; } - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - PrimaryFound that = (PrimaryFound) o; - - if (!primaryPath.equals(that.primaryPath)) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - return primaryPath.hashCode(); - } - @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("PrimaryFound [primaryPath=").append(primaryPath).append("]"); + builder.append("RemotePrimaryShardFound [primaryPath=").append(primaryPath).append("]"); return builder.toString(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java new file mode 100644 index 0000000000..d9a55ab1e9 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; + +/** + * A local message derived from LeaderStateChanged containing additional Shard-specific info that is sent + * when some state of the shard leader has changed. This message is used by the ShardManager to maintain + * current Shard information. + * + * @author Thomas Pantelis + */ +public class ShardLeaderStateChanged extends LeaderStateChanged { + + private final Optional localShardDataTree; + + public ShardLeaderStateChanged(@Nonnull String memberId, @Nonnull String leaderId, + @Nonnull Optional localShardDataTree) { + super(memberId, leaderId); + this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree); + } + + public @Nonnull Optional getLocalShardDataTree() { + return localShardDataTree; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index afa773b461..73f1a8f328 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -42,10 +42,11 @@ import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.reporting.MetricsReporter; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; @@ -209,14 +210,13 @@ public class ActorContext { return future.transform(new Mapper() { @Override public PrimaryShardInfo checkedApply(Object response) throws Exception { - if(response instanceof PrimaryFound) { - PrimaryFound found = (PrimaryFound)response; - - LOG.debug("Primary found {}", found.getPrimaryPath()); - ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath()); - PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.absent()); - primaryShardInfoCache.put(shardName, Futures.successful(info)); - return info; + if(response instanceof RemotePrimaryShardFound) { + LOG.debug("findPrimaryShardAsync received: {}", response); + return onPrimaryShardFound(shardName, ((RemotePrimaryShardFound)response).getPrimaryPath(), null); + } else if(response instanceof LocalPrimaryShardFound) { + LOG.debug("findPrimaryShardAsync received: {}", response); + LocalPrimaryShardFound found = (LocalPrimaryShardFound)response; + return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getLocalShardDataTree()); } else if(response instanceof NotInitializedException) { throw (NotInitializedException)response; } else if(response instanceof PrimaryNotFoundException) { @@ -231,6 +231,14 @@ public class ActorContext { }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher()); } + private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath, + DataTree localShardDataTree) { + ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath); + PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree)); + primaryShardInfoCache.put(shardName, Futures.successful(info)); + return info; + } + /** * Finds a local shard given its shard name and return it's ActorRef * diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index b676cf225c..645890dcb9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -21,6 +22,7 @@ import akka.persistence.RecoveryCompleted; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; @@ -49,9 +51,11 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIde import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; @@ -63,6 +67,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; @@ -154,7 +159,8 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef()); + DataTree mockDataTree = mock(DataTree.class); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), getRef()); MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class); shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), @@ -162,9 +168,30 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() ); + }}; + } + + @Test + public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); }}; } @@ -182,11 +209,11 @@ public class ShardManagerTest extends AbstractActorTest { String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.tell(new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); - shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor); + shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.absent()), mockShardActor); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-2-shard-default")); }}; @@ -233,13 +260,15 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); - shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + DataTree mockDataTree = mock(DataTree.class); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() ); }}; } @@ -266,11 +295,13 @@ public class ShardManagerTest extends AbstractActorTest { expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); - shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + DataTree mockDataTree = mock(DataTree.class); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() ); expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); }}; @@ -362,7 +393,8 @@ public class ShardManagerTest extends AbstractActorTest { shardManager2.tell(new ActorInitialized(), mockShardActor2); String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; - shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2); + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, + Optional.of(mock(DataTree.class))), mockShardActor2); shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); @@ -370,7 +402,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager1.tell(new FindPrimary("astronauts", false), getRef()); - PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); String path = found.getPrimaryPath(); assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config")); @@ -639,7 +671,7 @@ public class ShardManagerTest extends AbstractActorTest { } @Test - public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception { + public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); @@ -650,7 +682,8 @@ public class ShardManagerTest extends AbstractActorTest { verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId)); + shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId, + Optional.of(mock(DataTree.class)))); verify(ready, times(1)).countDown(); @@ -658,7 +691,7 @@ public class ShardManagerTest extends AbstractActorTest { } @Test - public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception { + public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); @@ -669,13 +702,37 @@ public class ShardManagerTest extends AbstractActorTest { verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix)); + shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); + + shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)))); verify(ready, times(1)).countDown(); }}; } + @Test + public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception { + new JavaTestKit(getSystem()) { + { + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + memberId, null, RaftState.Follower.name())); + + verify(ready, never()).countDown(); + + shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)))); + + shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); + + verify(ready, times(1)).countDown(); + + }}; + } @Test public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 4cbc121a95..1ecf0971c1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -4,6 +4,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; @@ -29,7 +30,6 @@ import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -59,6 +59,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; @@ -66,7 +67,6 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; @@ -83,8 +83,10 @@ import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; @@ -2092,14 +2094,27 @@ public class ShardTest extends AbstractShardTest { shard.tell(new RegisterRoleChangeListener(), listener); - // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore - // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary - // sleep. - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class); - List allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class); + ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, + ShardLeaderStateChanged.class); + assertEquals("getLocalShardDataTree present", true, + leaderStateChanged.getLocalShardDataTree().isPresent()); + assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(), + leaderStateChanged.getLocalShardDataTree().get()); - assertEquals(1, allMatching.size()); + MessageCollectorActor.clearMessages(listener); + + // Force a leader change + + shard.tell(new RequestVote(10000, "member2", 50, 50), getRef()); + + leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, + ShardLeaderStateChanged.class); + assertEquals("getLocalShardDataTree present", false, + leaderStateChanged.getLocalShardDataTree().isPresent()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } }; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index bc80937897..031463b2b9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -4,6 +4,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; @@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang.time.StopWatch; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; @@ -39,10 +41,12 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; @@ -404,7 +408,7 @@ public class ActorContextTest extends AbstractActorTest{ } @Test - public void testFindPrimaryShardAsyncPrimaryFound() throws Exception { + public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception { TestActorRef shardManager = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); @@ -418,11 +422,10 @@ public class ActorContextTest extends AbstractActorTest{ mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new PrimaryFound(expPrimaryPath)); + return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath)); } }; - Future foobar = actorContext.findPrimaryShardAsync("foobar"); PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); @@ -444,7 +447,50 @@ public class ActorContextTest extends AbstractActorTest{ cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); assertNull(cached); + } + + @Test + public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception { + + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); + final DataTree mockDataTree = Mockito.mock(DataTree.class); + final String expPrimaryPath = "akka://test-system/find-primary-shard"; + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), dataStoreContext) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree)); + } + }; + + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); + + assertNotNull(actual); + assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent()); + assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get()); + assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(), + expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString())); + + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); + + PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); + + assertEquals(cachedInfo, actual); + + // Wait for 200 Milliseconds. The cached entry should have been removed. + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + + cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); + + assertNull(cached); } @Test @@ -521,8 +567,8 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props()); MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor(); - shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString())); - shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString())); + shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString())); + shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString())); shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found")); Configuration mockConfig = mock(Configuration.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java index 810b270cfc..5d44033cd6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java @@ -74,7 +74,7 @@ public class MockClusterWrapper implements ClusterWrapper{ } - private static ClusterEvent.MemberUp createMemberUp(String memberName, String address) { + public static ClusterEvent.MemberUp createMemberUp(String memberName, String address) { akka.cluster.UniqueAddress uniqueAddress = new UniqueAddress( AddressFromURIString.parse(address), 55); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java index f12fda0aa1..f3cb78a301 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java @@ -75,7 +75,7 @@ public class RpcManager extends AbstractUntypedActor { LOG.debug("Create rpc registry and broker actors"); rpcRegistry = - getContext().actorOf(Props.create(RpcRegistry.class). + getContext().actorOf(RpcRegistry.props(). withMailbox(config.getMailBoxName()), config.getRpcRegistryName()); rpcBroker = diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java index f67657f692..fa93a3b83f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java @@ -13,6 +13,8 @@ import akka.japi.Pair; import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.Set; + import org.opendaylight.controller.remote.rpc.registry.gossip.Copier; import org.opendaylight.controller.sal.connector.api.RpcRouter; @@ -41,6 +43,10 @@ public class RoutingTable implements Copier, Serializable { } } + public Set> getRoutes() { + return table.keySet(); + } + public void addRoute(RpcRouter.RouteIdentifier routeId){ table.put(routeId, System.currentTimeMillis()); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 219646d847..1dcc4e1405 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; +import akka.actor.Props; +import akka.japi.Creator; import akka.japi.Option; import akka.japi.Pair; import com.google.common.base.Preconditions; @@ -19,6 +21,8 @@ import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.Remo import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; +import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBean; +import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; @@ -34,6 +38,10 @@ public class RpcRegistry extends BucketStore { getLocalBucket().setData(new RoutingTable()); } + public static Props props() { + return Props.create(new RpcRegistryCreator()); + } + @Override protected void handleReceive(Object message) throws Exception { //TODO: if sender is remote, reject message @@ -220,4 +228,15 @@ public class RpcRegistry extends BucketStore { } } } + + private static class RpcRegistryCreator implements Creator { + private static final long serialVersionUID = 1L; + + @Override + public RpcRegistry create() throws Exception { + RpcRegistry registry = new RpcRegistry(); + RemoteRpcRegistryMXBean mxBean = new RemoteRpcRegistryMXBeanImpl(registry); + return registry; + } + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java index 628deb4311..febff0bc92 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -13,7 +13,6 @@ import akka.actor.ActorRefProvider; import akka.actor.Address; import akka.actor.Props; import akka.cluster.ClusterActorRefProvider; -import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -230,7 +229,7 @@ public class BucketStore> extends AbstractUntypedActorWithMe } } - protected BucketImpl getLocalBucket() { + public BucketImpl getLocalBucket() { return localBucket; } @@ -239,12 +238,11 @@ public class BucketStore> extends AbstractUntypedActorWithMe versions.put(selfAddress, localBucket.getVersion()); } - protected Map> getRemoteBuckets() { + public Map> getRemoteBuckets() { return remoteBuckets; } - @VisibleForTesting - Map getVersions() { + public Map getVersions() { return versions; } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBean.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBean.java new file mode 100644 index 0000000000..ddd33336ed --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBean.java @@ -0,0 +1,22 @@ +package org.opendaylight.controller.remote.rpc.registry.mbeans; + + +import java.util.Map; +import java.util.Set; + +/** + * JMX bean to check remote rpc registry + */ + +public interface RemoteRpcRegistryMXBean { + + Set getGlobalRpc(); + + String getBucketVersions(); + + Set getLocalRegisteredRoutedRpc(); + + Map findRpcByName(String name); + + Map findRpcByRoute(String route); +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java new file mode 100644 index 0000000000..c7d9b9955d --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java @@ -0,0 +1,156 @@ +package org.opendaylight.controller.remote.rpc.registry.mbeans; + +import akka.actor.Address; +import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean; +import org.opendaylight.controller.remote.rpc.registry.RoutingTable; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + + +public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements RemoteRpcRegistryMXBean { + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + private final String NULL_CONSTANT = "null"; + + private final String LOCAL_CONSTANT = "local"; + + private final String ROUTE_CONSTANT = "route:"; + + private final String NAME_CONSTANT = " | name:"; + + private final RpcRegistry rpcRegistry; + + public RemoteRpcRegistryMXBeanImpl(final RpcRegistry rpcRegistry) { + super("RemoteRpcRegistry", "RemoteRpcBroker", null); + this.rpcRegistry = rpcRegistry; + registerMBean(); + } + + @Override + public Set getGlobalRpc() { + RoutingTable table = rpcRegistry.getLocalBucket().getData(); + Set globalRpc = new HashSet<>(table.getRoutes().size()); + for(RpcRouter.RouteIdentifier route : table.getRoutes()){ + if(route.getRoute() == null) { + globalRpc.add(route.getType() != null ? route.getType().toString() : NULL_CONSTANT); + } + } + if(log.isDebugEnabled()) { + log.debug("Locally registered global RPCs {}", globalRpc); + } + return globalRpc; + } + + @Override + public Set getLocalRegisteredRoutedRpc() { + RoutingTable table = rpcRegistry.getLocalBucket().getData(); + Set routedRpc = new HashSet<>(table.getRoutes().size()); + for(RpcRouter.RouteIdentifier route : table.getRoutes()){ + if(route.getRoute() != null) { + StringBuilder builder = new StringBuilder(ROUTE_CONSTANT); + builder.append(route.getRoute().toString()).append(NAME_CONSTANT).append(route.getType() != null ? + route.getType().toString() : NULL_CONSTANT); + routedRpc.add(builder.toString()); + } + } + if(log.isDebugEnabled()) { + log.debug("Locally registered routed RPCs {}", routedRpc); + } + return routedRpc; + } + + @Override + public Map findRpcByName(final String name) { + RoutingTable localTable = rpcRegistry.getLocalBucket().getData(); + // Get all RPCs from local bucket + Map rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT)); + + // Get all RPCs from remote bucket + Map> buckets = rpcRegistry.getRemoteBuckets(); + for(Address address : buckets.keySet()) { + RoutingTable table = buckets.get(address).getData(); + rpcMap.putAll(getRpcMemberMapByName(table, name, address.toString())); + } + if(log.isDebugEnabled()) { + log.debug("list of RPCs {} searched by name {}", rpcMap, name); + } + return rpcMap; + } + + @Override + public Map findRpcByRoute(String routeId) { + RoutingTable localTable = rpcRegistry.getLocalBucket().getData(); + Map rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT)); + + Map> buckets = rpcRegistry.getRemoteBuckets(); + for(Address address : buckets.keySet()) { + RoutingTable table = buckets.get(address).getData(); + rpcMap.putAll(getRpcMemberMapByRoute(table, routeId, address.toString())); + + } + if(log.isDebugEnabled()) { + log.debug("list of RPCs {} searched by route {}", rpcMap, routeId); + } + return rpcMap; + } + + /** + * Search if the routing table route String contains routeName + */ + + private Map getRpcMemberMapByRoute(final RoutingTable table, final String routeName, + final String address) { + Set> routes = table.getRoutes(); + Map rpcMap = new HashMap<>(routes.size()); + for(RpcRouter.RouteIdentifier route : table.getRoutes()){ + if(route.getRoute() != null) { + String routeString = route.getRoute().toString(); + if(routeString.contains(routeName)) { + StringBuilder builder = new StringBuilder(ROUTE_CONSTANT); + builder.append(routeString).append(NAME_CONSTANT).append(route.getType() != null ? + route.getType().toString() : NULL_CONSTANT); + rpcMap.put(builder.toString(), address); + } + } + } + return rpcMap; + } + + /** + * Search if the routing table route type contains name + */ + private Map getRpcMemberMapByName(final RoutingTable table, final String name, + final String address) { + Set> routes = table.getRoutes(); + Map rpcMap = new HashMap<>(routes.size()); + for(RpcRouter.RouteIdentifier route : routes){ + if(route.getType() != null) { + String type = route.getType().toString(); + if(type.contains(name)) { + StringBuilder builder = new StringBuilder(ROUTE_CONSTANT); + builder.append(route.getRoute() != null ? route.getRoute().toString(): NULL_CONSTANT) + .append(NAME_CONSTANT).append(type); + rpcMap.put(builder.toString(), address); + } + } + } + return rpcMap; + } + + + + @Override + public String getBucketVersions() { + return rpcRegistry.getVersions().toString(); + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java index 4d8463adae..3be247a3bb 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java @@ -104,6 +104,8 @@ public class JsonNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPr result = partialResult; } return new NormalizedNodeContext(path,result); + } catch (final RestconfDocumentedException e) { + throw e; } catch (final Exception e) { LOG.debug("Error parsing json input", e); diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/XmlNormalizedNodeBodyReader.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/XmlNormalizedNodeBodyReader.java index 74a9bd2d31..2a9c5bf190 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/XmlNormalizedNodeBodyReader.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/XmlNormalizedNodeBodyReader.java @@ -104,6 +104,8 @@ public class XmlNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPro final NormalizedNode result = parse(path,doc); return new NormalizedNodeContext(path,result); + } catch (final RestconfDocumentedException e){ + throw e; } catch (final Exception e) { LOG.debug("Error parsing xml input", e); diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/ControllerContext.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/ControllerContext.java index 2da58a3820..6cc62e859c 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/ControllerContext.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/ControllerContext.java @@ -153,6 +153,11 @@ public class ControllerContext implements SchemaContextListener { final InstanceIdentifierBuilder builder = YangInstanceIdentifier.builder(); final Module latestModule = globalSchema.findModuleByName(startModule, null); + + if (latestModule == null) { + throw new RestconfDocumentedException("The module named '" + startModule + "' does not exist.", ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT); + } + final InstanceIdentifierContext iiWithSchemaNode = collectPathArguments(builder, pathArgs, latestModule, null, toMountPointIdentifier); diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfProviderImpl.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfProviderImpl.java index 84b092e10e..624d709c60 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfProviderImpl.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfProviderImpl.java @@ -11,6 +11,7 @@ import java.math.BigInteger; import java.util.Collection; import java.util.Collections; import org.opendaylight.controller.config.yang.md.sal.rest.connector.Config; +import org.opendaylight.controller.config.yang.md.sal.rest.connector.Delete; import org.opendaylight.controller.config.yang.md.sal.rest.connector.Get; import org.opendaylight.controller.config.yang.md.sal.rest.connector.Operational; import org.opendaylight.controller.config.yang.md.sal.rest.connector.Post; @@ -78,15 +79,31 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec @Override public Config getConfig() { final Config config = new Config(); + final Get get = new Get(); get.setReceivedRequests(stats.getConfigGet()); + get.setSuccessfulResponses(stats.getSuccessGetConfig()); + get.setFailedResponses(stats.getFailureGetConfig()); config.setGet(get); + final Post post = new Post(); post.setReceivedRequests(stats.getConfigPost()); + post.setSuccessfulResponses(stats.getSuccessPost()); + post.setFailedResponses(stats.getFailurePost()); config.setPost(post); + final Put put = new Put(); put.setReceivedRequests(stats.getConfigPut()); + put.setSuccessfulResponses(stats.getSuccessPut()); + put.setFailedResponses(stats.getFailurePut()); config.setPut(put); + + final Delete delete = new Delete(); + delete.setReceivedRequests(stats.getConfigDelete()); + delete.setSuccessfulResponses(stats.getSuccessDelete()); + delete.setFailedResponses(stats.getFailureDelete()); + config.setDelete(delete); + return config; } @@ -96,6 +113,8 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec final Operational operational = new Operational(); final Get get = new Get(); get.setReceivedRequests(opGet); + get.setSuccessfulResponses(stats.getSuccessGetOperational()); + get.setFailedResponses(stats.getFailureGetOperational()); operational.setGet(get); return operational; } @@ -105,6 +124,6 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec final BigInteger rpcInvoke = stats.getRpc(); final Rpcs rpcs = new Rpcs(); rpcs.setReceivedRequests(rpcInvoke); - return rpcs ; + return rpcs; } -} +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/StatisticsRestconfServiceWrapper.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/StatisticsRestconfServiceWrapper.java index 07178f5379..f4a5fbc926 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/StatisticsRestconfServiceWrapper.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/StatisticsRestconfServiceWrapper.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.sal.restconf.impl; import java.math.BigInteger; import java.util.concurrent.atomic.AtomicLong; import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.UriInfo; import org.opendaylight.controller.sal.rest.api.RestconfService; @@ -21,6 +22,16 @@ public class StatisticsRestconfServiceWrapper implements RestconfService { AtomicLong configPost = new AtomicLong(); AtomicLong configPut = new AtomicLong(); AtomicLong configDelete = new AtomicLong(); + AtomicLong successGetConfig = new AtomicLong(); + AtomicLong successGetOperational = new AtomicLong(); + AtomicLong successPost = new AtomicLong(); + AtomicLong successPut = new AtomicLong(); + AtomicLong successDelete = new AtomicLong(); + AtomicLong failureGetConfig = new AtomicLong(); + AtomicLong failureGetOperational = new AtomicLong(); + AtomicLong failurePost = new AtomicLong(); + AtomicLong failurePut = new AtomicLong(); + AtomicLong failureDelete = new AtomicLong(); private static final StatisticsRestconfServiceWrapper INSTANCE = new StatisticsRestconfServiceWrapper(RestconfImpl.getInstance()); @@ -79,36 +90,115 @@ public class StatisticsRestconfServiceWrapper implements RestconfService { @Override public NormalizedNodeContext readConfigurationData(final String identifier, final UriInfo uriInfo) { configGet.incrementAndGet(); - return delegate.readConfigurationData(identifier, uriInfo); + NormalizedNodeContext normalizedNodeContext = null; + try { + normalizedNodeContext = delegate.readConfigurationData(identifier, uriInfo); + if (normalizedNodeContext.getData() != null) { + successGetConfig.incrementAndGet(); + } + else { + failureGetConfig.incrementAndGet(); + } + } catch (Exception e) { + failureGetConfig.incrementAndGet(); + throw e; + } + return normalizedNodeContext; } @Override public NormalizedNodeContext readOperationalData(final String identifier, final UriInfo uriInfo) { operationalGet.incrementAndGet(); - return delegate.readOperationalData(identifier, uriInfo); + NormalizedNodeContext normalizedNodeContext = null; + try { + normalizedNodeContext = delegate.readOperationalData(identifier, uriInfo); + if (normalizedNodeContext.getData() != null) { + successGetOperational.incrementAndGet(); + } + else { + failureGetOperational.incrementAndGet(); + } + } catch (Exception e) { + failureGetOperational.incrementAndGet(); + throw e; + } + return normalizedNodeContext; } @Override public Response updateConfigurationData(final String identifier, final NormalizedNodeContext payload) { configPut.incrementAndGet(); - return delegate.updateConfigurationData(identifier, payload); + Response response = null; + try { + response = delegate.updateConfigurationData(identifier, payload); + if (response.getStatus() == Status.OK.getStatusCode()) { + successPut.incrementAndGet(); + } + else { + failurePut.incrementAndGet(); + } + } catch (Exception e) { + failurePut.incrementAndGet(); + throw e; + } + return response; } @Override public Response createConfigurationData(final String identifier, final NormalizedNodeContext payload, final UriInfo uriInfo) { configPost.incrementAndGet(); - return delegate.createConfigurationData(identifier, payload, uriInfo); + Response response = null; + try { + response = delegate.createConfigurationData(identifier, payload, uriInfo); + if (response.getStatus() == Status.OK.getStatusCode()) { + successPost.incrementAndGet(); + } + else { + failurePost.incrementAndGet(); + } + } catch (Exception e) { + failurePost.incrementAndGet(); + throw e; + } + return response; } @Override public Response createConfigurationData(final NormalizedNodeContext payload, final UriInfo uriInfo) { configPost.incrementAndGet(); - return delegate.createConfigurationData(payload, uriInfo); + Response response = null; + try { + response = delegate.createConfigurationData(payload, uriInfo); + if (response.getStatus() == Status.OK.getStatusCode()) { + successPost.incrementAndGet(); + } + else { + failurePost.incrementAndGet(); + } + }catch (Exception e) { + failurePost.incrementAndGet(); + throw e; + } + return response; } @Override public Response deleteConfigurationData(final String identifier) { - return delegate.deleteConfigurationData(identifier); + configDelete.incrementAndGet(); + Response response = null; + try { + response = delegate.deleteConfigurationData(identifier); + if (response.getStatus() == Status.OK.getStatusCode()) { + successDelete.incrementAndGet(); + } + else { + failureDelete.incrementAndGet(); + } + } catch (Exception e) { + failureDelete.incrementAndGet(); + throw e; + } + return response; } @Override @@ -144,4 +234,44 @@ public class StatisticsRestconfServiceWrapper implements RestconfService { public BigInteger getRpc() { return BigInteger.valueOf(rpc.get()); } -} + + public BigInteger getSuccessGetConfig() { + return BigInteger.valueOf(successGetConfig.get()); + } + + public BigInteger getSuccessGetOperational() { + return BigInteger.valueOf(successGetOperational.get()); + } + + public BigInteger getSuccessPost() { + return BigInteger.valueOf(successPost.get()); + } + + public BigInteger getSuccessPut() { + return BigInteger.valueOf(successPut.get()); + } + + public BigInteger getSuccessDelete() { + return BigInteger.valueOf(successDelete.get()); + } + + public BigInteger getFailureGetConfig() { + return BigInteger.valueOf(failureGetConfig.get()); + } + + public BigInteger getFailureGetOperational() { + return BigInteger.valueOf(failureGetOperational.get()); + } + + public BigInteger getFailurePost() { + return BigInteger.valueOf(failurePost.get()); + } + + public BigInteger getFailurePut() { + return BigInteger.valueOf(failurePut.get()); + } + + public BigInteger getFailureDelete() { + return BigInteger.valueOf(failureDelete.get()); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/yang/opendaylight-rest-connector.yang b/opendaylight/md-sal/sal-rest-connector/src/main/yang/opendaylight-rest-connector.yang index 6d2add6ff1..6fa9c86ec1 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/yang/opendaylight-rest-connector.yang +++ b/opendaylight/md-sal/sal-rest-connector/src/main/yang/opendaylight-rest-connector.yang @@ -31,6 +31,14 @@ module opendaylight-rest-connector { leaf received-requests { type uint64; } + + leaf successful-responses { + type uint64; + } + + leaf failed-responses { + type uint64; + } } augment "/config:modules/config:module/config:configuration" { @@ -70,6 +78,10 @@ module opendaylight-rest-connector { container put { uses statistics; } + + container delete { + uses statistics; + } } container operational { diff --git a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreService.java b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreService.java index 2961662092..ac3873e6b8 100644 --- a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreService.java +++ b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreService.java @@ -137,15 +137,15 @@ public class YangStoreService implements YangStoreContext { } public AutoCloseable registerCapabilityListener(final CapabilityListener listener) { - final SoftReference yangStoreSnapshotSoftReference = ref.get(); - YangStoreContext ret = yangStoreSnapshotSoftReference != null ? yangStoreSnapshotSoftReference.get() : null; - if(ret == null) { - ret = getYangStoreSnapshot(); + YangStoreContext context = ref.get().get(); + + if(context == null) { + context = getYangStoreSnapshot(); } this.listeners.add(listener); - listener.onCapabilitiesAdded(NetconfOperationServiceFactoryImpl.setupCapabilities(ret)); + listener.onCapabilitiesAdded(NetconfOperationServiceFactoryImpl.setupCapabilities(context)); return new AutoCloseable() { @Override diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/Parameters.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/Parameters.java index 6648bd4b52..7203107901 100644 --- a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/Parameters.java +++ b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/Parameters.java @@ -52,6 +52,9 @@ public class Parameters { @Arg(dest = "msg-timeout") public long msgTimeout; + @Arg(dest = "tcp-header") + public String tcpHeader; + static ArgumentParser getParser() { final ArgumentParser parser = ArgumentParsers.newArgumentParser("netconf stress client"); @@ -122,6 +125,11 @@ public class Parameters { .setDefault(false) .dest("ssh"); + parser.addArgument("--tcp-header") + .type(String.class) + .required(false) + .dest("tcp-header"); + // TODO add get-config option instead of edit + commit // TODO different edit config content diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java index fe0a0bcd52..6bf50d2c5a 100644 --- a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java +++ b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java @@ -10,7 +10,11 @@ package org.opendaylight.controller.netconf.test.tool.client.stress; import ch.qos.logback.classic.Level; import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; import com.google.common.base.Stopwatch; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.io.Files; import io.netty.channel.nio.NioEventLoopGroup; @@ -19,6 +23,7 @@ import io.netty.util.Timer; import io.netty.util.concurrent.GlobalEventExecutor; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -30,6 +35,7 @@ import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl; import org.opendaylight.controller.netconf.client.NetconfClientSession; import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration; import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder; +import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.opendaylight.controller.sal.connect.api.RemoteDevice; import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator; @@ -82,18 +88,18 @@ public final class StressClient { } } - private static final String MSG_ID_PLACEHOLDER = "{MSG_ID}"; private static final String MSG_ID_PLACEHOLDER_REGEX = "\\{MSG_ID\\}"; + private static final String PHYS_ADDR_PLACEHOLDER_REGEX = "\\{PHYS_ADDR\\}"; public static void main(final String[] args) { final Parameters params = parseArgs(args, Parameters.getParser()); params.validate(); - // TODO remove + // Wait 5 seconds to allow for debugging/profiling try { - Thread.sleep(10000); + Thread.sleep(5000); } catch (final InterruptedException e) { -// e.printStackTrace(); + throw new RuntimeException(e); } final ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); @@ -104,12 +110,8 @@ public final class StressClient { final List preparedMessages = Lists.newArrayListWithCapacity(params.editCount); final String editContentString; - boolean needsModification = false; try { editContentString = Files.toString(params.editContent, Charsets.UTF_8); - if(editContentString.contains(MSG_ID_PLACEHOLDER)) { - needsModification = true; - }; } catch (IOException e) { throw new IllegalArgumentException("Cannot read content of " + params.editContent); } @@ -122,9 +124,12 @@ public final class StressClient { final Element editContentElement; try { // Insert message id where needed - final String specificEditContent = needsModification ? - editContentString.replaceAll(MSG_ID_PLACEHOLDER_REGEX, Integer.toString(i)) : - editContentString; + String specificEditContent = + editContentString.replaceAll(MSG_ID_PLACEHOLDER_REGEX, Integer.toString(i)); + + // Insert physical address where needed + specificEditContent = + specificEditContent.replaceAll(PHYS_ADDR_PLACEHOLDER_REGEX, getMac(i)); editContentElement = XmlUtil.readXmlToElement(specificEditContent); final Node config = ((Element) msg.getDocumentElement().getElementsByTagName("edit-config").item(0)). @@ -176,6 +181,23 @@ public final class StressClient { } } + private static String getMac(final int i) { + final String hex = Integer.toHexString(i); + final Iterable macGroups = Splitter.fixedLength(2).split(hex); + + final int additional = 6 - Iterables.size(macGroups); + final ArrayList additionalGroups = Lists.newArrayListWithCapacity(additional); + for (int j = 0; j < additional; j++) { + additionalGroups.add("00"); + } + return Joiner.on(':').join(Iterables.concat(Iterables.transform(macGroups, new Function() { + @Override + public String apply(final String input) { + return input.length() == 1 ? input + "0" : input; + } + }), additionalGroups)); + } + private static ExecutionStrategy getExecutionStrategy(final Parameters params, final List preparedMessages, final NetconfDeviceCommunicator sessionListener) { if(params.async) { return new AsyncExecutionStrategy(params, preparedMessages, sessionListener); @@ -206,6 +228,16 @@ public final class StressClient { final NetconfClientConfigurationBuilder netconfClientConfigurationBuilder = NetconfClientConfigurationBuilder.create(); netconfClientConfigurationBuilder.withSessionListener(sessionListener); netconfClientConfigurationBuilder.withAddress(params.getInetAddress()); + if(params.tcpHeader != null) { + final String header = params.tcpHeader.replaceAll("\"", "").trim() + "\n"; + netconfClientConfigurationBuilder.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader(null, null, null, null, null) { + @Override + public String toFormattedString() { + LOG.debug("Sending TCP header {}", header); + return header; + } + }); + } netconfClientConfigurationBuilder.withProtocol(params.ssh ? NetconfClientConfiguration.NetconfClientProtocol.SSH : NetconfClientConfiguration.NetconfClientProtocol.TCP); netconfClientConfigurationBuilder.withConnectionTimeoutMillis(20000L); netconfClientConfigurationBuilder.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));