Merge "BUG 3066 : Optimistic lock failed, on NetconfStateUpdate"
authorTony Tkacik <ttkacik@cisco.com>
Mon, 4 May 2015 08:21:17 +0000 (08:21 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 4 May 2015 08:21:17 +0000 (08:21 +0000)
35 files changed:
opendaylight/commons/opendaylight/pom.xml
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationListener.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationProviderService.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationService.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalPrimaryShardFound.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemotePrimaryShardFound.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java with 53% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBean.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/XmlNormalizedNodeBodyReader.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/ControllerContext.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfProviderImpl.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/StatisticsRestconfServiceWrapper.java
opendaylight/md-sal/sal-rest-connector/src/main/yang/opendaylight-rest-connector.yang
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreService.java
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/Parameters.java
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java

index 47b2afe..885d9fd 100644 (file)
@@ -15,7 +15,7 @@
 
   <properties>
 
-    <akka.version>2.3.9</akka.version>
+    <akka.version>2.3.10</akka.version>
     <appauth.version>0.6.0-SNAPSHOT</appauth.version>
     <archetype-app-northbound>0.2.0-SNAPSHOT</archetype-app-northbound>
     <arphandler.version>0.7.0-SNAPSHOT</arphandler.version>
index 7b8816b..17663d9 100644 (file)
@@ -114,6 +114,7 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
         if (LocalServerChannel.class.equals(channelClass) == false) {
             // makes no sense for LocalServer and produces warning
             b.childOption(ChannelOption.SO_KEEPALIVE, true);
+            b.childOption(ChannelOption.TCP_NODELAY , true);
         }
         b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
         customizeBootstrap(b);
index 1738cc5..d82528c 100644 (file)
@@ -298,7 +298,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
         if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) {
             if(roleChangeNotifier.isPresent()) {
-                roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf());
+                roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf());
             }
 
             onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
@@ -311,6 +311,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
+    protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) {
+        return new LeaderStateChanged(memberId, leaderId);
+    }
+
     /**
      * When a derived RaftActor needs to persist something it must call
      * persistData.
index be8e0ce..e5a0a2b 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.sal.binding.api;
 
 import java.util.EventListener;
-
 import org.opendaylight.yangtools.yang.binding.Notification;
 
 /**
@@ -17,7 +16,9 @@ import org.opendaylight.yangtools.yang.binding.Notification;
  * capture of this interface.
  *
  * @param <T> the interested notification type
+ * @deprecated Deprecated unused API.
  */
+@Deprecated
 public interface NotificationListener<T extends Notification> extends EventListener {
     /**
      * Invoked to deliver a notification.
index 00db80c..4b06e77 100644 (file)
@@ -9,7 +9,6 @@ package org.opendaylight.controller.sal.binding.api;
 
 import java.util.EventListener;
 import java.util.concurrent.ExecutorService;
-
 import org.opendaylight.controller.md.sal.common.api.notify.NotificationPublishService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.Notification;
@@ -18,7 +17,10 @@ import org.opendaylight.yangtools.yang.binding.Notification;
  * Interface for a notification service that provides publish/subscribe capabilities for YANG
  * modeled notifications. This interface is a combination of the {@link NotificationService} and
  * {@link NotificationPublishService} interfaces.
+ *
+ * @deprecated Please use {@link org.opendaylight.controller.md.sal.binding.api.NotificationPublishService}.
  */
+@Deprecated
 public interface NotificationProviderService extends NotificationService, NotificationPublishService<Notification> {
 
     /**
index 335f55b..dd66aa6 100644 (file)
@@ -91,7 +91,10 @@ import org.opendaylight.yangtools.yang.binding.Notification;
  * </pre>
  * The <code>onStart</code> method will be invoked when someone publishes a <code>Start</code> notification and
  * the <code>onStop</code> method will be invoked when someone publishes a <code>Stop</code> notification.
+ *
+ * @deprecated Please use {@link org.opendaylight.controller.md.sal.binding.api.NotificationService} instead.
  */
+@Deprecated
 public interface NotificationService extends BindingAwareService {
     /**
      * Registers a generic listener implementation for a specified notification type.
index ec35b03..23c95ec 100644 (file)
@@ -7,29 +7,30 @@
  */
 package org.opendaylight.controller.cluster.notifications;
 
-import java.io.Serializable;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 
 /**
- * A message initiated internally from the RaftActor when some state of a leader has changed
+ * A local message initiated internally from the RaftActor when some state of a leader has changed.
  *
  * @author Thomas Pantelis
  */
-public class LeaderStateChanged implements Serializable {
-    private static final long serialVersionUID = 1L;
-
+public class LeaderStateChanged {
     private final String memberId;
     private final String leaderId;
 
-    public LeaderStateChanged(String memberId, String leaderId) {
-        this.memberId = memberId;
+    public LeaderStateChanged(@Nonnull String memberId, @Nullable String leaderId) {
+        this.memberId = Preconditions.checkNotNull(memberId);
         this.leaderId = leaderId;
     }
 
-    public String getMemberId() {
+    public @Nonnull String getMemberId() {
         return memberId;
     }
 
-    public String getLeaderId() {
+    public @Nullable String getLeaderId() {
         return leaderId;
     }
 
index e72f4b2..8d65e59 100644 (file)
@@ -56,6 +56,16 @@ odl-cluster-data {
       ]
 
     }
+
+    persistence {
+      # By default the snapshots/journal directories live in KARAF_HOME. You can choose to put it somewhere else by
+      # modifying the following two properties. The directory location specified may be a relative or absolute path. 
+      # The relative path is always relative to KARAF_HOME.
+
+      # snapshot-store.local.dir = "target/snapshots"
+      # journal.leveldb.dir = "target/journal"
+
+    }
   }
 }
 
index 62d3259..e62e918 100644 (file)
@@ -48,12 +48,14 @@ import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTran
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
@@ -63,6 +65,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
@@ -265,6 +268,12 @@ public class Shard extends RaftActor {
         return roleChangeNotifier;
     }
 
+    @Override
+    protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) {
+        return new ShardLeaderStateChanged(memberId, leaderId,
+                isLeader() ? Optional.<DataTree>of(store.getDataTree()) : Optional.<DataTree>absent());
+    }
+
     private void onDatastoreContext(DatastoreContext context) {
         datastoreContext = context;
 
index cff44b1..f4fa7b3 100644 (file)
@@ -24,6 +24,7 @@ import akka.persistence.RecoveryFailure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
@@ -54,17 +55,20 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.Sha
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
-import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -185,27 +189,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
         } else if(message instanceof ShardNotInitializedTimeout) {
             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
-        } else if(message instanceof LeaderStateChanged) {
-            onLeaderStateChanged((LeaderStateChanged)message);
+        } else if(message instanceof ShardLeaderStateChanged) {
+            onLeaderStateChanged((ShardLeaderStateChanged)message);
         } else {
             unknownMessage(message);
         }
 
     }
 
-    private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) {
+    private void checkReady(){
+        if (isReadyWithLeaderId()) {
+            LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+                    persistenceId(), type, waitTillReadyCountdownLatch.getCount());
+
+            waitTillReadyCountdownLatch.countDown();
+        }
+    }
+
+    private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
 
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
         if(shardInformation != null) {
+            shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
             shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
-            if (isReadyWithLeaderId()) {
-                LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
-                        persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
-                waitTillReadyCountdownLatch.countDown();
-            }
-
+            checkReady();
         } else {
             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
         }
@@ -249,14 +257,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
         if(shardInformation != null) {
             shardInformation.setRole(roleChanged.getNewRole());
-
-            if (isReadyWithLeaderId()) {
-                LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
-                        persistenceId(), type, waitTillReadyCountdownLatch.getCount());
-
-                waitTillReadyCountdownLatch.countDown();
-            }
-
+            checkReady();
             mBean.setSyncStatus(isInSync());
         }
     }
@@ -439,6 +440,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
                 getShardActorPath(shardName, memberName), getSelf());
         }
+
+        checkReady();
     }
 
     private void onDatastoreContext(DatastoreContext context) {
@@ -510,6 +513,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
 
         final String shardName = message.getShardName();
+        final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
 
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
@@ -517,7 +521,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                 @Override
                 public Object get() {
-                    Object found = new PrimaryFound(info.getSerializedLeaderActor());
+                    String primaryPath = info.getSerializedLeaderActor();
+                    Object found = canReturnLocalShardState && info.isLeader() ?
+                            new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+                                new RemotePrimaryShardFound(primaryPath);
 
                     if(LOG.isDebugEnabled()) {
                         LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
@@ -537,7 +544,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
                         shardName, path);
 
-                getContext().actorSelection(path).forward(message, getContext());
+                getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName,
+                        message.isWaitUntilReady()), getContext());
                 return;
             }
         }
@@ -665,6 +673,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private ActorRef actor;
         private ActorPath actorPath;
         private final Map<String, String> peerAddresses;
+        private Optional<DataTree> localShardDataTree;
 
         // flag that determines if the actor is ready for business
         private boolean actorInitialized = false;
@@ -703,6 +712,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return shardId;
         }
 
+        void setLocalDataTree(Optional<DataTree> localShardDataTree) {
+            this.localShardDataTree = localShardDataTree;
+        }
+
+        Optional<DataTree> getLocalShardDataTree() {
+            return localShardDataTree;
+        }
+
         Map<String, String> getPeerAddresses() {
             return peerAddresses;
         }
@@ -731,7 +748,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         boolean isShardReadyWithLeaderId() {
-            return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId));
+            return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
         }
 
         boolean isShardInitialized() {
index 2c18eaa..0b7fcf0 100644 (file)
@@ -21,7 +21,7 @@ public class FindPrimary implements Serializable {
     private final String shardName;
     private final boolean waitUntilReady;
 
-    public FindPrimary(String shardName, boolean waitUntilReady){
+    public FindPrimary(String shardName, boolean waitUntilReady) {
 
         Preconditions.checkNotNull(shardName, "shardName should not be null");
 
@@ -40,8 +40,8 @@ public class FindPrimary implements Serializable {
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("FindPrimary [shardName=").append(shardName).append(", waitUntilReady=").append(waitUntilReady)
-                .append("]");
+        builder.append(getClass().getName()).append(" [shardName=").append(shardName).append(", waitUntilReady=")
+               .append(waitUntilReady).append("]");
         return builder.toString();
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalPrimaryShardFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/LocalPrimaryShardFound.java
new file mode 100644 (file)
index 0000000..e19dcd6
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import org.apache.commons.lang3.ObjectUtils;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+/**
+ * Local message sent in reply to FindPrimaryShard to indicate the primary shard is local to the caller.
+ *
+ * @author Thomas Pantelis
+ */
+public class LocalPrimaryShardFound {
+
+    private final String primaryPath;
+    private final DataTree localShardDataTree;
+
+    public LocalPrimaryShardFound(@Nonnull String primaryPath, @Nonnull DataTree localShardDataTree) {
+        this.primaryPath = Preconditions.checkNotNull(primaryPath);
+        this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree);
+    }
+
+    public @Nonnull String getPrimaryPath() {
+        return primaryPath;
+    }
+
+    public @Nonnull DataTree getLocalShardDataTree() {
+        return localShardDataTree;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("LocalPrimaryShardFound [primaryPath=").append(primaryPath).append(", localShardDataTree=")
+                .append(ObjectUtils.identityToString(localShardDataTree)).append("]");
+        return builder.toString();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemoteFindPrimary.java
new file mode 100644 (file)
index 0000000..820512e
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * A remote message sent to locate the primary shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class RemoteFindPrimary extends FindPrimary {
+    private static final long serialVersionUID = 1L;
+
+    public RemoteFindPrimary(String shardName, boolean waitUntilReady) {
+        super(shardName, waitUntilReady);
+    }
+}
@@ -10,12 +10,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import java.io.Serializable;
 
-public class PrimaryFound implements Serializable {
+/**
+ * Local or remote message sent in reply to FindPrimaryShard to indicate the primary shard is remote to the caller.
+ */
+public class RemotePrimaryShardFound implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final String primaryPath;
 
-    public PrimaryFound(final String primaryPath) {
+    public RemotePrimaryShardFound(final String primaryPath) {
         this.primaryPath = primaryPath;
     }
 
@@ -23,33 +26,10 @@ public class PrimaryFound implements Serializable {
         return primaryPath;
     }
 
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        PrimaryFound that = (PrimaryFound) o;
-
-        if (!primaryPath.equals(that.primaryPath)) {
-            return false;
-        }
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        return primaryPath.hashCode();
-    }
-
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("PrimaryFound [primaryPath=").append(primaryPath).append("]");
+        builder.append("RemotePrimaryShardFound [primaryPath=").append(primaryPath).append("]");
         return builder.toString();
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java
new file mode 100644 (file)
index 0000000..d9a55ab
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+/**
+ * A local message derived from LeaderStateChanged containing additional Shard-specific info that is sent
+ * when some state of the shard leader has changed. This message is used by the ShardManager to maintain
+ * current Shard information.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardLeaderStateChanged extends LeaderStateChanged {
+
+    private final Optional<DataTree> localShardDataTree;
+
+    public ShardLeaderStateChanged(@Nonnull String memberId, @Nonnull String leaderId,
+            @Nonnull Optional<DataTree> localShardDataTree) {
+        super(memberId, leaderId);
+        this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree);
+    }
+
+    public @Nonnull Optional<DataTree> getLocalShardDataTree() {
+        return localShardDataTree;
+    }
+}
index afa773b..73f1a8f 100644 (file)
@@ -42,10 +42,11 @@ import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
@@ -209,14 +210,13 @@ public class ActorContext {
         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
             @Override
             public PrimaryShardInfo checkedApply(Object response) throws Exception {
-                if(response instanceof PrimaryFound) {
-                    PrimaryFound found = (PrimaryFound)response;
-
-                    LOG.debug("Primary found {}", found.getPrimaryPath());
-                    ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
-                    PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.<DataTree>absent());
-                    primaryShardInfoCache.put(shardName, Futures.successful(info));
-                    return info;
+                if(response instanceof RemotePrimaryShardFound) {
+                    LOG.debug("findPrimaryShardAsync received: {}", response);
+                    return onPrimaryShardFound(shardName, ((RemotePrimaryShardFound)response).getPrimaryPath(), null);
+                } else if(response instanceof LocalPrimaryShardFound) {
+                    LOG.debug("findPrimaryShardAsync received: {}", response);
+                    LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
+                    return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getLocalShardDataTree());
                 } else if(response instanceof NotInitializedException) {
                     throw (NotInitializedException)response;
                 } else if(response instanceof PrimaryNotFoundException) {
@@ -231,6 +231,14 @@ public class ActorContext {
         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
     }
 
+    private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
+            DataTree localShardDataTree) {
+        ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
+        PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree));
+        primaryShardInfoCache.put(shardName, Futures.successful(info));
+        return info;
+    }
+
     /**
      * Finds a local shard given its shard name and return it's ActorRef
      *
index b676cf2..645890d 100644 (file)
@@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -21,6 +22,7 @@ import akka.persistence.RecoveryCompleted;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
@@ -49,9 +51,11 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIde
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
@@ -63,6 +67,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
@@ -154,7 +159,8 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef());
+            DataTree mockDataTree = mock(DataTree.class);
+            shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), getRef());
 
             MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
@@ -162,9 +168,30 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+            LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
+        }};
+    }
+
+    @Test
+    public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+            String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager.tell(new RoleChangeNotification(memberId1,
+                    RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+            shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
+
+            expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
         }};
     }
 
@@ -182,11 +209,11 @@ public class ShardManagerTest extends AbstractActorTest {
             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
             shardManager.tell(new RoleChangeNotification(memberId1,
                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
-            shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
+            shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent()), mockShardActor);
 
             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+            RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
         }};
@@ -233,13 +260,15 @@ public class ShardManagerTest extends AbstractActorTest {
 
             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
 
-            shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+            DataTree mockDataTree = mock(DataTree.class);
+            shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor);
 
             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+            LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
         }};
     }
 
@@ -266,11 +295,13 @@ public class ShardManagerTest extends AbstractActorTest {
 
             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
 
-            shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+            DataTree mockDataTree = mock(DataTree.class);
+            shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor);
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+            LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+            assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() );
 
             expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
         }};
@@ -362,7 +393,8 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager2.tell(new ActorInitialized(), mockShardActor2);
 
             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
-            shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2);
+            shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
+                    Optional.of(mock(DataTree.class))), mockShardActor2);
             shardManager2.tell(new RoleChangeNotification(memberId2,
                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
 
@@ -370,7 +402,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager1.tell(new FindPrimary("astronauts", false), getRef());
 
-            PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+            RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
             String path = found.getPrimaryPath();
             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
 
@@ -639,7 +671,7 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception {
+    public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
         new JavaTestKit(getSystem()) {
             {
                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
@@ -650,7 +682,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
                 verify(ready, never()).countDown();
 
-                shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId));
+                shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId,
+                        Optional.of(mock(DataTree.class))));
 
                 verify(ready, times(1)).countDown();
 
@@ -658,7 +691,7 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception {
+    public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
         new JavaTestKit(getSystem()) {
             {
                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
@@ -669,13 +702,37 @@ public class ShardManagerTest extends AbstractActorTest {
 
                 verify(ready, never()).countDown();
 
-                shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix));
+                shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
+
+                shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
+                        "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class))));
 
                 verify(ready, times(1)).countDown();
 
             }};
     }
 
+    @Test
+    public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+
+                String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+                        memberId, null, RaftState.Follower.name()));
+
+                verify(ready, never()).countDown();
+
+                shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId,
+                        "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class))));
+
+                shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString()));
+
+                verify(ready, times(1)).countDown();
+
+            }};
+    }
 
     @Test
     public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
index 4cbc121..1ecf097 100644 (file)
@@ -4,6 +4,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
@@ -29,7 +30,6 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -59,6 +59,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
@@ -66,7 +67,6 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
@@ -83,8 +83,10 @@ import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
@@ -2092,14 +2094,27 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new RegisterRoleChangeListener(), listener);
 
-                // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
-                // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
-                // sleep.
-                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+                MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
 
-                List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
+                ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
+                        ShardLeaderStateChanged.class);
+                assertEquals("getLocalShardDataTree present", true,
+                        leaderStateChanged.getLocalShardDataTree().isPresent());
+                assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
+                        leaderStateChanged.getLocalShardDataTree().get());
 
-                assertEquals(1, allMatching.size());
+                MessageCollectorActor.clearMessages(listener);
+
+                // Force a leader change
+
+                shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
+
+                leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
+                        ShardLeaderStateChanged.class);
+                assertEquals("getLocalShardDataTree present", false,
+                        leaderStateChanged.getLocalShardDataTree().isPresent());
+
+                shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
             }
         };
     }
index bc80937..031463b 100644 (file)
@@ -4,6 +4,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
@@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.time.StopWatch;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
@@ -39,10 +41,12 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -404,7 +408,7 @@ public class ActorContextTest extends AbstractActorTest{
     }
 
     @Test
-    public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
+    public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
 
             TestActorRef<MessageCollectorActor> shardManager =
                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
@@ -418,11 +422,10 @@ public class ActorContextTest extends AbstractActorTest{
                             mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new PrimaryFound(expPrimaryPath));
+                            return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath));
                         }
                     };
 
-
             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
 
@@ -444,7 +447,50 @@ public class ActorContextTest extends AbstractActorTest{
             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
             assertNull(cached);
+    }
+
+    @Test
+    public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
+
+            TestActorRef<MessageCollectorActor> shardManager =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
 
+            final DataTree mockDataTree = Mockito.mock(DataTree.class);
+            final String expPrimaryPath = "akka://test-system/find-primary-shard";
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                            mock(Configuration.class), dataStoreContext) {
+                        @Override
+                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                            return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
+                        }
+                    };
+
+            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+            PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+
+            assertNotNull(actual);
+            assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
+            assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
+            assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+                    expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+
+            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+            PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+
+            assertEquals(cachedInfo, actual);
+
+            // Wait for 200 Milliseconds. The cached entry should have been removed.
+
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+
+            cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+            assertNull(cached);
     }
 
     @Test
@@ -521,8 +567,8 @@ public class ActorContextTest extends AbstractActorTest{
 
             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
-            shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
-            shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
+            shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString()));
+            shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString()));
             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
 
             Configuration mockConfig = mock(Configuration.class);
index 810b270..5d44033 100644 (file)
@@ -74,7 +74,7 @@ public class MockClusterWrapper implements ClusterWrapper{
     }
 
 
-    private static ClusterEvent.MemberUp createMemberUp(String memberName, String address) {
+    public static ClusterEvent.MemberUp createMemberUp(String memberName, String address) {
         akka.cluster.UniqueAddress uniqueAddress = new UniqueAddress(
             AddressFromURIString.parse(address), 55);
 
index f12fda0..f3cb78a 100644 (file)
@@ -75,7 +75,7 @@ public class RpcManager extends AbstractUntypedActor {
         LOG.debug("Create rpc registry and broker actors");
 
         rpcRegistry =
-                getContext().actorOf(Props.create(RpcRegistry.class).
+                getContext().actorOf(RpcRegistry.props().
                     withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
 
         rpcBroker =
index f67657f..fa93a3b 100644 (file)
@@ -13,6 +13,8 @@ import akka.japi.Pair;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+
 import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 
@@ -41,6 +43,10 @@ public class RoutingTable implements Copier<RoutingTable>, Serializable {
         }
     }
 
+    public Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRoutes() {
+        return table.keySet();
+    }
+
     public void addRoute(RpcRouter.RouteIdentifier<?,?,?> routeId){
         table.put(routeId, System.currentTimeMillis());
     }
index 219646d..1dcc4e1 100644 (file)
@@ -8,6 +8,8 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
 import akka.japi.Option;
 import akka.japi.Pair;
 import com.google.common.base.Preconditions;
@@ -19,6 +21,8 @@ import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.Remo
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
+import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBean;
+import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
 
@@ -34,6 +38,10 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         getLocalBucket().setData(new RoutingTable());
     }
 
+    public static Props props() {
+        return Props.create(new RpcRegistryCreator());
+    }
+
     @Override
     protected void handleReceive(Object message) throws Exception {
         //TODO: if sender is remote, reject message
@@ -220,4 +228,15 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
             }
         }
     }
+
+    private static class RpcRegistryCreator implements Creator<RpcRegistry> {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public RpcRegistry create() throws Exception {
+            RpcRegistry registry =  new RpcRegistry();
+            RemoteRpcRegistryMXBean mxBean = new RemoteRpcRegistryMXBeanImpl(registry);
+            return registry;
+        }
+    }
 }
index 628deb4..febff0b 100644 (file)
@@ -13,7 +13,6 @@ import akka.actor.ActorRefProvider;
 import akka.actor.Address;
 import akka.actor.Props;
 import akka.cluster.ClusterActorRefProvider;
-import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -230,7 +229,7 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
         }
     }
 
-    protected BucketImpl<T> getLocalBucket() {
+    public BucketImpl<T> getLocalBucket() {
         return localBucket;
     }
 
@@ -239,12 +238,11 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
         versions.put(selfAddress, localBucket.getVersion());
     }
 
-    protected Map<Address, Bucket<T>> getRemoteBuckets() {
+    public Map<Address, Bucket<T>> getRemoteBuckets() {
         return remoteBuckets;
     }
 
-    @VisibleForTesting
-    Map<Address, Long> getVersions() {
+    public Map<Address, Long> getVersions() {
         return versions;
     }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBean.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBean.java
new file mode 100644 (file)
index 0000000..ddd3333
--- /dev/null
@@ -0,0 +1,22 @@
+package org.opendaylight.controller.remote.rpc.registry.mbeans;
+
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * JMX bean to check remote rpc registry
+ */
+
+public interface RemoteRpcRegistryMXBean {
+
+    Set<String> getGlobalRpc();
+
+    String getBucketVersions();
+
+    Set<String> getLocalRegisteredRoutedRpc();
+
+    Map<String,String> findRpcByName(String name);
+
+    Map<String,String> findRpcByRoute(String route);
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java
new file mode 100644 (file)
index 0000000..c7d9b99
--- /dev/null
@@ -0,0 +1,156 @@
+package org.opendaylight.controller.remote.rpc.registry.mbeans;
+
+import akka.actor.Address;
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import org.opendaylight.controller.remote.rpc.registry.RoutingTable;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+
+public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements RemoteRpcRegistryMXBean {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String NULL_CONSTANT = "null";
+
+    private final String LOCAL_CONSTANT = "local";
+
+    private final String ROUTE_CONSTANT = "route:";
+
+    private final String NAME_CONSTANT = " | name:";
+
+    private final RpcRegistry rpcRegistry;
+
+    public RemoteRpcRegistryMXBeanImpl(final RpcRegistry rpcRegistry) {
+        super("RemoteRpcRegistry", "RemoteRpcBroker", null);
+        this.rpcRegistry = rpcRegistry;
+        registerMBean();
+    }
+
+    @Override
+    public Set<String> getGlobalRpc() {
+        RoutingTable table = rpcRegistry.getLocalBucket().getData();
+        Set<String> globalRpc = new HashSet<>(table.getRoutes().size());
+        for(RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()){
+            if(route.getRoute() == null) {
+                globalRpc.add(route.getType() != null ? route.getType().toString() : NULL_CONSTANT);
+            }
+        }
+        if(log.isDebugEnabled()) {
+            log.debug("Locally registered global RPCs {}", globalRpc);
+        }
+        return globalRpc;
+    }
+
+    @Override
+    public Set<String> getLocalRegisteredRoutedRpc() {
+        RoutingTable table = rpcRegistry.getLocalBucket().getData();
+        Set<String> routedRpc = new HashSet<>(table.getRoutes().size());
+        for(RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()){
+            if(route.getRoute() != null) {
+                StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
+                builder.append(route.getRoute().toString()).append(NAME_CONSTANT).append(route.getType() != null ?
+                    route.getType().toString() : NULL_CONSTANT);
+                routedRpc.add(builder.toString());
+            }
+        }
+        if(log.isDebugEnabled()) {
+            log.debug("Locally registered routed RPCs {}", routedRpc);
+        }
+        return routedRpc;
+    }
+
+    @Override
+    public Map<String, String> findRpcByName(final String name) {
+        RoutingTable localTable = rpcRegistry.getLocalBucket().getData();
+        // Get all RPCs from local bucket
+        Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT));
+
+        // Get all RPCs from remote bucket
+        Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
+        for(Address address : buckets.keySet()) {
+            RoutingTable table = buckets.get(address).getData();
+            rpcMap.putAll(getRpcMemberMapByName(table, name, address.toString()));
+        }
+        if(log.isDebugEnabled()) {
+            log.debug("list of RPCs {} searched by name {}", rpcMap, name);
+        }
+        return rpcMap;
+    }
+
+    @Override
+    public Map<String, String> findRpcByRoute(String routeId) {
+        RoutingTable localTable = rpcRegistry.getLocalBucket().getData();
+        Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT));
+
+        Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
+        for(Address address : buckets.keySet()) {
+            RoutingTable table = buckets.get(address).getData();
+            rpcMap.putAll(getRpcMemberMapByRoute(table, routeId, address.toString()));
+
+        }
+        if(log.isDebugEnabled()) {
+            log.debug("list of RPCs {} searched by route {}", rpcMap, routeId);
+        }
+        return rpcMap;
+    }
+
+    /**
+     * Search if the routing table route String contains routeName
+     */
+
+    private Map<String,String> getRpcMemberMapByRoute(final RoutingTable table, final String routeName,
+                                                      final String address) {
+        Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
+        Map<String, String> rpcMap = new HashMap<>(routes.size());
+        for(RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()){
+            if(route.getRoute() != null) {
+                String routeString = route.getRoute().toString();
+                if(routeString.contains(routeName)) {
+                    StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
+                    builder.append(routeString).append(NAME_CONSTANT).append(route.getType() != null ?
+                        route.getType().toString() : NULL_CONSTANT);
+                    rpcMap.put(builder.toString(), address);
+                }
+            }
+        }
+        return rpcMap;
+    }
+
+    /**
+     * Search if the routing table route type contains name
+     */
+    private Map<String, String>  getRpcMemberMapByName(final RoutingTable table, final String name,
+                                                       final String address) {
+        Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
+        Map<String, String> rpcMap = new HashMap<>(routes.size());
+        for(RpcRouter.RouteIdentifier<?, ?, ?> route : routes){
+            if(route.getType() != null) {
+                String type = route.getType().toString();
+                if(type.contains(name)) {
+                    StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
+                    builder.append(route.getRoute() != null ? route.getRoute().toString(): NULL_CONSTANT)
+                        .append(NAME_CONSTANT).append(type);
+                    rpcMap.put(builder.toString(), address);
+                }
+            }
+        }
+        return rpcMap;
+    }
+
+
+
+    @Override
+    public String getBucketVersions() {
+        return rpcRegistry.getVersions().toString();
+    }
+
+}
\ No newline at end of file
index 4d8463a..3be247a 100644 (file)
@@ -104,6 +104,8 @@ public class JsonNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPr
                 result = partialResult;
             }
             return new NormalizedNodeContext(path,result);
+        } catch (final RestconfDocumentedException e) {
+            throw e;
         } catch (final Exception e) {
             LOG.debug("Error parsing json input", e);
 
index 74a9bd2..2a9c5bf 100644 (file)
@@ -104,6 +104,8 @@ public class XmlNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPro
 
             final NormalizedNode<?, ?> result = parse(path,doc);
             return new NormalizedNodeContext(path,result);
+        } catch (final RestconfDocumentedException e){
+            throw e;
         } catch (final Exception e) {
             LOG.debug("Error parsing xml input", e);
 
index 2da58a3..6cc62e8 100644 (file)
@@ -153,6 +153,11 @@ public class ControllerContext implements SchemaContextListener {
 
         final InstanceIdentifierBuilder builder = YangInstanceIdentifier.builder();
         final Module latestModule = globalSchema.findModuleByName(startModule, null);
+
+        if (latestModule == null) {
+            throw new RestconfDocumentedException("The module named '" + startModule + "' does not exist.", ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
+        }
+
         final InstanceIdentifierContext<?> iiWithSchemaNode = collectPathArguments(builder, pathArgs, latestModule, null,
                 toMountPointIdentifier);
 
index 84b092e..624d709 100644 (file)
@@ -11,6 +11,7 @@ import java.math.BigInteger;
 import java.util.Collection;
 import java.util.Collections;
 import org.opendaylight.controller.config.yang.md.sal.rest.connector.Config;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Delete;
 import org.opendaylight.controller.config.yang.md.sal.rest.connector.Get;
 import org.opendaylight.controller.config.yang.md.sal.rest.connector.Operational;
 import org.opendaylight.controller.config.yang.md.sal.rest.connector.Post;
@@ -78,15 +79,31 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec
     @Override
     public Config getConfig() {
         final Config config = new Config();
+
         final Get get = new Get();
         get.setReceivedRequests(stats.getConfigGet());
+        get.setSuccessfulResponses(stats.getSuccessGetConfig());
+        get.setFailedResponses(stats.getFailureGetConfig());
         config.setGet(get);
+
         final Post post = new Post();
         post.setReceivedRequests(stats.getConfigPost());
+        post.setSuccessfulResponses(stats.getSuccessPost());
+        post.setFailedResponses(stats.getFailurePost());
         config.setPost(post);
+
         final Put put = new Put();
         put.setReceivedRequests(stats.getConfigPut());
+        put.setSuccessfulResponses(stats.getSuccessPut());
+        put.setFailedResponses(stats.getFailurePut());
         config.setPut(put);
+
+        final Delete delete = new Delete();
+        delete.setReceivedRequests(stats.getConfigDelete());
+        delete.setSuccessfulResponses(stats.getSuccessDelete());
+        delete.setFailedResponses(stats.getFailureDelete());
+        config.setDelete(delete);
+
         return config;
     }
 
@@ -96,6 +113,8 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec
         final Operational operational = new Operational();
         final Get get = new Get();
         get.setReceivedRequests(opGet);
+        get.setSuccessfulResponses(stats.getSuccessGetOperational());
+        get.setFailedResponses(stats.getFailureGetOperational());
         operational.setGet(get);
         return operational;
     }
@@ -105,6 +124,6 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec
         final BigInteger rpcInvoke = stats.getRpc();
         final Rpcs rpcs = new Rpcs();
         rpcs.setReceivedRequests(rpcInvoke);
-        return rpcs ;
+        return rpcs;
     }
-}
+}
\ No newline at end of file
index 07178f5..f4a5fbc 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.sal.restconf.impl;
 import java.math.BigInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriInfo;
 import org.opendaylight.controller.sal.rest.api.RestconfService;
 
@@ -21,6 +22,16 @@ public class StatisticsRestconfServiceWrapper implements RestconfService {
     AtomicLong configPost = new AtomicLong();
     AtomicLong configPut = new AtomicLong();
     AtomicLong configDelete = new AtomicLong();
+    AtomicLong successGetConfig = new AtomicLong();
+    AtomicLong successGetOperational = new AtomicLong();
+    AtomicLong successPost = new AtomicLong();
+    AtomicLong successPut = new AtomicLong();
+    AtomicLong successDelete = new AtomicLong();
+    AtomicLong failureGetConfig = new AtomicLong();
+    AtomicLong failureGetOperational = new AtomicLong();
+    AtomicLong failurePost = new AtomicLong();
+    AtomicLong failurePut = new AtomicLong();
+    AtomicLong failureDelete = new AtomicLong();
 
     private static final StatisticsRestconfServiceWrapper INSTANCE = new StatisticsRestconfServiceWrapper(RestconfImpl.getInstance());
 
@@ -79,36 +90,115 @@ public class StatisticsRestconfServiceWrapper implements RestconfService {
     @Override
     public NormalizedNodeContext readConfigurationData(final String identifier, final UriInfo uriInfo) {
         configGet.incrementAndGet();
-        return delegate.readConfigurationData(identifier, uriInfo);
+        NormalizedNodeContext normalizedNodeContext = null;
+        try {
+            normalizedNodeContext = delegate.readConfigurationData(identifier, uriInfo);
+            if (normalizedNodeContext.getData() != null) {
+                successGetConfig.incrementAndGet();
+            }
+            else {
+                failureGetConfig.incrementAndGet();
+            }
+        } catch (Exception e) {
+            failureGetConfig.incrementAndGet();
+            throw e;
+        }
+        return normalizedNodeContext;
     }
 
     @Override
     public NormalizedNodeContext readOperationalData(final String identifier, final UriInfo uriInfo) {
         operationalGet.incrementAndGet();
-        return delegate.readOperationalData(identifier, uriInfo);
+        NormalizedNodeContext normalizedNodeContext = null;
+        try {
+            normalizedNodeContext = delegate.readOperationalData(identifier, uriInfo);
+            if (normalizedNodeContext.getData() != null) {
+                successGetOperational.incrementAndGet();
+            }
+            else {
+                failureGetOperational.incrementAndGet();
+            }
+        } catch (Exception e) {
+            failureGetOperational.incrementAndGet();
+            throw e;
+        }
+        return normalizedNodeContext;
     }
 
     @Override
     public Response updateConfigurationData(final String identifier, final NormalizedNodeContext payload) {
         configPut.incrementAndGet();
-        return delegate.updateConfigurationData(identifier, payload);
+        Response response = null;
+        try {
+            response = delegate.updateConfigurationData(identifier, payload);
+            if (response.getStatus() == Status.OK.getStatusCode()) {
+                successPut.incrementAndGet();
+            }
+            else {
+                failurePut.incrementAndGet();
+            }
+        } catch (Exception e) {
+            failurePut.incrementAndGet();
+            throw e;
+        }
+        return response;
     }
 
     @Override
     public Response createConfigurationData(final String identifier, final NormalizedNodeContext payload, final UriInfo uriInfo) {
         configPost.incrementAndGet();
-        return delegate.createConfigurationData(identifier, payload, uriInfo);
+        Response response = null;
+        try {
+            response = delegate.createConfigurationData(identifier, payload, uriInfo);
+            if (response.getStatus() == Status.OK.getStatusCode()) {
+                successPost.incrementAndGet();
+            }
+            else {
+                failurePost.incrementAndGet();
+            }
+        } catch (Exception e) {
+            failurePost.incrementAndGet();
+            throw e;
+        }
+        return response;
     }
 
     @Override
     public Response createConfigurationData(final NormalizedNodeContext payload, final UriInfo uriInfo) {
         configPost.incrementAndGet();
-        return delegate.createConfigurationData(payload, uriInfo);
+        Response response = null;
+        try {
+            response = delegate.createConfigurationData(payload, uriInfo);
+            if (response.getStatus() == Status.OK.getStatusCode()) {
+                successPost.incrementAndGet();
+            }
+            else {
+                failurePost.incrementAndGet();
+            }
+        }catch (Exception e) {
+            failurePost.incrementAndGet();
+            throw e;
+        }
+        return response;
     }
 
     @Override
     public Response deleteConfigurationData(final String identifier) {
-        return delegate.deleteConfigurationData(identifier);
+        configDelete.incrementAndGet();
+        Response response = null;
+        try {
+            response = delegate.deleteConfigurationData(identifier);
+            if (response.getStatus() == Status.OK.getStatusCode()) {
+                successDelete.incrementAndGet();
+            }
+            else {
+                failureDelete.incrementAndGet();
+            }
+        } catch (Exception e) {
+            failureDelete.incrementAndGet();
+            throw e;
+        }
+        return response;
     }
 
     @Override
@@ -144,4 +234,44 @@ public class StatisticsRestconfServiceWrapper implements RestconfService {
     public BigInteger getRpc() {
         return BigInteger.valueOf(rpc.get());
     }
-}
+
+    public BigInteger getSuccessGetConfig() {
+        return BigInteger.valueOf(successGetConfig.get());
+    }
+
+    public BigInteger getSuccessGetOperational() {
+        return BigInteger.valueOf(successGetOperational.get());
+    }
+
+    public BigInteger getSuccessPost() {
+        return BigInteger.valueOf(successPost.get());
+    }
+
+    public BigInteger getSuccessPut() {
+        return BigInteger.valueOf(successPut.get());
+    }
+
+    public BigInteger getSuccessDelete() {
+        return BigInteger.valueOf(successDelete.get());
+    }
+
+    public BigInteger getFailureGetConfig() {
+        return BigInteger.valueOf(failureGetConfig.get());
+    }
+
+    public BigInteger getFailureGetOperational() {
+        return BigInteger.valueOf(failureGetOperational.get());
+    }
+
+    public BigInteger getFailurePost() {
+        return BigInteger.valueOf(failurePost.get());
+    }
+
+    public BigInteger getFailurePut() {
+        return BigInteger.valueOf(failurePut.get());
+    }
+
+    public BigInteger getFailureDelete() {
+        return BigInteger.valueOf(failureDelete.get());
+    }
+}
\ No newline at end of file
index 6d2add6..6fa9c86 100644 (file)
@@ -31,6 +31,14 @@ module opendaylight-rest-connector {
         leaf received-requests {
            type uint64;
         }
+
+        leaf successful-responses {
+            type uint64;
+        }
+
+        leaf failed-responses {
+            type uint64;
+        }
     }
 
     augment "/config:modules/config:module/config:configuration" {
@@ -70,6 +78,10 @@ module opendaylight-rest-connector {
                 container put {
                     uses statistics;
                 }
+                
+                container delete {
+                    uses statistics;
+                }
             }
 
             container operational {
index 2961662..ac3873e 100644 (file)
@@ -137,15 +137,15 @@ public class YangStoreService implements YangStoreContext {
     }
 
     public AutoCloseable registerCapabilityListener(final CapabilityListener listener) {
-        final SoftReference<YangStoreSnapshot> yangStoreSnapshotSoftReference = ref.get();
 
-        YangStoreContext ret = yangStoreSnapshotSoftReference != null ? yangStoreSnapshotSoftReference.get() : null;
-        if(ret == null) {
-            ret = getYangStoreSnapshot();
+        YangStoreContext context = ref.get().get();
+
+        if(context == null) {
+            context = getYangStoreSnapshot();
         }
 
         this.listeners.add(listener);
-        listener.onCapabilitiesAdded(NetconfOperationServiceFactoryImpl.setupCapabilities(ret));
+        listener.onCapabilitiesAdded(NetconfOperationServiceFactoryImpl.setupCapabilities(context));
 
         return new AutoCloseable() {
             @Override
index 6648bd4..7203107 100644 (file)
@@ -52,6 +52,9 @@ public class Parameters {
     @Arg(dest = "msg-timeout")
     public long msgTimeout;
 
+    @Arg(dest = "tcp-header")
+    public String tcpHeader;
+
     static ArgumentParser getParser() {
         final ArgumentParser parser = ArgumentParsers.newArgumentParser("netconf stress client");
 
@@ -122,6 +125,11 @@ public class Parameters {
                 .setDefault(false)
                 .dest("ssh");
 
+        parser.addArgument("--tcp-header")
+                .type(String.class)
+                .required(false)
+                .dest("tcp-header");
+
         // TODO add get-config option instead of edit + commit
         // TODO different edit config content
 
index fe0a0bc..6bf50d2 100644 (file)
@@ -10,7 +10,11 @@ package org.opendaylight.controller.netconf.test.tool.client.stress;
 
 import ch.qos.logback.classic.Level;
 import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
 import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -19,6 +23,7 @@ import io.netty.util.Timer;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -30,6 +35,7 @@ import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
 import org.opendaylight.controller.netconf.client.NetconfClientSession;
 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
 import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.opendaylight.controller.sal.connect.api.RemoteDevice;
 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
@@ -82,18 +88,18 @@ public final class StressClient {
         }
     }
 
-    private static final String MSG_ID_PLACEHOLDER = "{MSG_ID}";
     private static final String MSG_ID_PLACEHOLDER_REGEX = "\\{MSG_ID\\}";
+    private static final String PHYS_ADDR_PLACEHOLDER_REGEX = "\\{PHYS_ADDR\\}";
 
     public static void main(final String[] args) {
         final Parameters params = parseArgs(args, Parameters.getParser());
         params.validate();
 
-        // TODO remove
+        // Wait 5 seconds to allow for debugging/profiling
         try {
-            Thread.sleep(10000);
+            Thread.sleep(5000);
         } catch (final InterruptedException e) {
-//            e.printStackTrace();
+            throw new RuntimeException(e);
         }
 
         final ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
@@ -104,12 +110,8 @@ public final class StressClient {
         final List<NetconfMessage> preparedMessages = Lists.newArrayListWithCapacity(params.editCount);
 
         final String editContentString;
-        boolean needsModification = false;
         try {
             editContentString = Files.toString(params.editContent, Charsets.UTF_8);
-            if(editContentString.contains(MSG_ID_PLACEHOLDER)) {
-                needsModification = true;
-            };
         } catch (IOException e) {
             throw new IllegalArgumentException("Cannot read content of " + params.editContent);
         }
@@ -122,9 +124,12 @@ public final class StressClient {
             final Element editContentElement;
             try {
                 // Insert message id where needed
-                final String specificEditContent = needsModification ?
-                        editContentString.replaceAll(MSG_ID_PLACEHOLDER_REGEX, Integer.toString(i)) :
-                        editContentString;
+                String specificEditContent =
+                        editContentString.replaceAll(MSG_ID_PLACEHOLDER_REGEX, Integer.toString(i));
+
+                // Insert physical address where needed
+                specificEditContent =
+                        specificEditContent.replaceAll(PHYS_ADDR_PLACEHOLDER_REGEX, getMac(i));
 
                 editContentElement = XmlUtil.readXmlToElement(specificEditContent);
                 final Node config = ((Element) msg.getDocumentElement().getElementsByTagName("edit-config").item(0)).
@@ -176,6 +181,23 @@ public final class StressClient {
         }
     }
 
+    private static String getMac(final int i) {
+        final String hex = Integer.toHexString(i);
+        final Iterable<String> macGroups = Splitter.fixedLength(2).split(hex);
+
+        final int additional = 6 - Iterables.size(macGroups);
+        final ArrayList<String> additionalGroups = Lists.newArrayListWithCapacity(additional);
+        for (int j = 0; j < additional; j++) {
+            additionalGroups.add("00");
+        }
+        return Joiner.on(':').join(Iterables.concat(Iterables.transform(macGroups, new Function<String, String>() {
+            @Override
+            public String apply(final String input) {
+                return input.length() == 1 ? input + "0" : input;
+            }
+        }), additionalGroups));
+    }
+
     private static ExecutionStrategy getExecutionStrategy(final Parameters params, final List<NetconfMessage> preparedMessages, final NetconfDeviceCommunicator sessionListener) {
         if(params.async) {
             return new AsyncExecutionStrategy(params, preparedMessages, sessionListener);
@@ -206,6 +228,16 @@ public final class StressClient {
         final NetconfClientConfigurationBuilder netconfClientConfigurationBuilder = NetconfClientConfigurationBuilder.create();
         netconfClientConfigurationBuilder.withSessionListener(sessionListener);
         netconfClientConfigurationBuilder.withAddress(params.getInetAddress());
+        if(params.tcpHeader != null) {
+            final String header = params.tcpHeader.replaceAll("\"", "").trim() + "\n";
+            netconfClientConfigurationBuilder.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader(null, null, null, null, null) {
+                @Override
+                public String toFormattedString() {
+                    LOG.debug("Sending TCP header {}", header);
+                    return header;
+                }
+            });
+        }
         netconfClientConfigurationBuilder.withProtocol(params.ssh ? NetconfClientConfiguration.NetconfClientProtocol.SSH : NetconfClientConfiguration.NetconfClientProtocol.TCP);
         netconfClientConfigurationBuilder.withConnectionTimeoutMillis(20000L);
         netconfClientConfigurationBuilder.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.