Merge "Make TCP netconf endpoint configurable"
authorTony Tkacik <ttkacik@cisco.com>
Mon, 4 May 2015 08:23:06 +0000 (08:23 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 4 May 2015 08:23:06 +0000 (08:23 +0000)
23 files changed:
opendaylight/commons/opendaylight/pom.xml
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationListener.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationProviderService.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/NotificationService.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBean.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/XmlNormalizedNodeBodyReader.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/ControllerContext.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfProviderImpl.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/StatisticsRestconfServiceWrapper.java
opendaylight/md-sal/sal-rest-connector/src/main/yang/opendaylight-rest-connector.yang
opendaylight/netconf/mdsal-netconf-monitoring/src/main/java/org/opendaylight/controller/config/yang/netconf/mdsal/monitoring/MonitoringToMdsalWriter.java
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/AsyncExecutionStrategy.java
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/Parameters.java
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClient.java
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClientCallable.java [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/SyncExecutionStrategy.java

index 47b2afe92d6ea276c3b01d94a0c825c2a0b62403..885d9fdb532910586cb66468eae16a1d6dfd306d 100644 (file)
@@ -15,7 +15,7 @@
 
   <properties>
 
-    <akka.version>2.3.9</akka.version>
+    <akka.version>2.3.10</akka.version>
     <appauth.version>0.6.0-SNAPSHOT</appauth.version>
     <archetype-app-northbound>0.2.0-SNAPSHOT</archetype-app-northbound>
     <arphandler.version>0.7.0-SNAPSHOT</arphandler.version>
index 7b8816bd1b38843f2b0440c2bc565beaa13144fa..17663d982c5e188f7c898f9575571fb478e61bc5 100644 (file)
@@ -114,6 +114,7 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
         if (LocalServerChannel.class.equals(channelClass) == false) {
             // makes no sense for LocalServer and produces warning
             b.childOption(ChannelOption.SO_KEEPALIVE, true);
+            b.childOption(ChannelOption.TCP_NODELAY , true);
         }
         b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
         customizeBootstrap(b);
index be8e0cefc1a3115329ed3391d99bbc8a38810dcd..e5a0a2bd6dda74aa8565757f2acea4679bbb1300 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.sal.binding.api;
 
 import java.util.EventListener;
-
 import org.opendaylight.yangtools.yang.binding.Notification;
 
 /**
@@ -17,7 +16,9 @@ import org.opendaylight.yangtools.yang.binding.Notification;
  * capture of this interface.
  *
  * @param <T> the interested notification type
+ * @deprecated Deprecated unused API.
  */
+@Deprecated
 public interface NotificationListener<T extends Notification> extends EventListener {
     /**
      * Invoked to deliver a notification.
index 00db80c19f7fd3fc9399dd303df74f7987eb2ce7..4b06e77c44b7c7b3ff061623d4e16003f17fb8f2 100644 (file)
@@ -9,7 +9,6 @@ package org.opendaylight.controller.sal.binding.api;
 
 import java.util.EventListener;
 import java.util.concurrent.ExecutorService;
-
 import org.opendaylight.controller.md.sal.common.api.notify.NotificationPublishService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.Notification;
@@ -18,7 +17,10 @@ import org.opendaylight.yangtools.yang.binding.Notification;
  * Interface for a notification service that provides publish/subscribe capabilities for YANG
  * modeled notifications. This interface is a combination of the {@link NotificationService} and
  * {@link NotificationPublishService} interfaces.
+ *
+ * @deprecated Please use {@link org.opendaylight.controller.md.sal.binding.api.NotificationPublishService}.
  */
+@Deprecated
 public interface NotificationProviderService extends NotificationService, NotificationPublishService<Notification> {
 
     /**
index 335f55bcbbade9ce84898eafee05ff146ef2d382..dd66aa67f8a78451e9c31b851987455678306ea7 100644 (file)
@@ -91,7 +91,10 @@ import org.opendaylight.yangtools.yang.binding.Notification;
  * </pre>
  * The <code>onStart</code> method will be invoked when someone publishes a <code>Start</code> notification and
  * the <code>onStop</code> method will be invoked when someone publishes a <code>Stop</code> notification.
+ *
+ * @deprecated Please use {@link org.opendaylight.controller.md.sal.binding.api.NotificationService} instead.
  */
+@Deprecated
 public interface NotificationService extends BindingAwareService {
     /**
      * Registers a generic listener implementation for a specified notification type.
index f12fda0aa11a91f93bfffd2d6eb002ec76fb7b5e..f3cb78a30148e0a0de638120cfb6f973c822bc0f 100644 (file)
@@ -75,7 +75,7 @@ public class RpcManager extends AbstractUntypedActor {
         LOG.debug("Create rpc registry and broker actors");
 
         rpcRegistry =
-                getContext().actorOf(Props.create(RpcRegistry.class).
+                getContext().actorOf(RpcRegistry.props().
                     withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
 
         rpcBroker =
index f67657f6927801931fae2fd0434481169f2f3de8..fa93a3b83f03153d4dd031ceda1f67bb6f3d4929 100644 (file)
@@ -13,6 +13,8 @@ import akka.japi.Pair;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+
 import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 
@@ -41,6 +43,10 @@ public class RoutingTable implements Copier<RoutingTable>, Serializable {
         }
     }
 
+    public Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRoutes() {
+        return table.keySet();
+    }
+
     public void addRoute(RpcRouter.RouteIdentifier<?,?,?> routeId){
         table.put(routeId, System.currentTimeMillis());
     }
index 219646d8478ade824d22589842c4d4ddf1edccaa..1dcc4e140595250a0414eba706eada3e776f4ece 100644 (file)
@@ -8,6 +8,8 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
 import akka.japi.Option;
 import akka.japi.Pair;
 import com.google.common.base.Preconditions;
@@ -19,6 +21,8 @@ import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.Remo
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
+import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBean;
+import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
 
@@ -34,6 +38,10 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
         getLocalBucket().setData(new RoutingTable());
     }
 
+    public static Props props() {
+        return Props.create(new RpcRegistryCreator());
+    }
+
     @Override
     protected void handleReceive(Object message) throws Exception {
         //TODO: if sender is remote, reject message
@@ -220,4 +228,15 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
             }
         }
     }
+
+    private static class RpcRegistryCreator implements Creator<RpcRegistry> {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public RpcRegistry create() throws Exception {
+            RpcRegistry registry =  new RpcRegistry();
+            RemoteRpcRegistryMXBean mxBean = new RemoteRpcRegistryMXBeanImpl(registry);
+            return registry;
+        }
+    }
 }
index 628deb4311cebe1da5ff1b44deb715d933b8b8b2..febff0bc92efc4c8ad896a92e3d367156c6fd345 100644 (file)
@@ -13,7 +13,6 @@ import akka.actor.ActorRefProvider;
 import akka.actor.Address;
 import akka.actor.Props;
 import akka.cluster.ClusterActorRefProvider;
-import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -230,7 +229,7 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
         }
     }
 
-    protected BucketImpl<T> getLocalBucket() {
+    public BucketImpl<T> getLocalBucket() {
         return localBucket;
     }
 
@@ -239,12 +238,11 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
         versions.put(selfAddress, localBucket.getVersion());
     }
 
-    protected Map<Address, Bucket<T>> getRemoteBuckets() {
+    public Map<Address, Bucket<T>> getRemoteBuckets() {
         return remoteBuckets;
     }
 
-    @VisibleForTesting
-    Map<Address, Long> getVersions() {
+    public Map<Address, Long> getVersions() {
         return versions;
     }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBean.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBean.java
new file mode 100644 (file)
index 0000000..ddd3333
--- /dev/null
@@ -0,0 +1,22 @@
+package org.opendaylight.controller.remote.rpc.registry.mbeans;
+
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * JMX bean to check remote rpc registry
+ */
+
+public interface RemoteRpcRegistryMXBean {
+
+    Set<String> getGlobalRpc();
+
+    String getBucketVersions();
+
+    Set<String> getLocalRegisteredRoutedRpc();
+
+    Map<String,String> findRpcByName(String name);
+
+    Map<String,String> findRpcByRoute(String route);
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java
new file mode 100644 (file)
index 0000000..c7d9b99
--- /dev/null
@@ -0,0 +1,156 @@
+package org.opendaylight.controller.remote.rpc.registry.mbeans;
+
+import akka.actor.Address;
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import org.opendaylight.controller.remote.rpc.registry.RoutingTable;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+
+public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements RemoteRpcRegistryMXBean {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String NULL_CONSTANT = "null";
+
+    private final String LOCAL_CONSTANT = "local";
+
+    private final String ROUTE_CONSTANT = "route:";
+
+    private final String NAME_CONSTANT = " | name:";
+
+    private final RpcRegistry rpcRegistry;
+
+    public RemoteRpcRegistryMXBeanImpl(final RpcRegistry rpcRegistry) {
+        super("RemoteRpcRegistry", "RemoteRpcBroker", null);
+        this.rpcRegistry = rpcRegistry;
+        registerMBean();
+    }
+
+    @Override
+    public Set<String> getGlobalRpc() {
+        RoutingTable table = rpcRegistry.getLocalBucket().getData();
+        Set<String> globalRpc = new HashSet<>(table.getRoutes().size());
+        for(RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()){
+            if(route.getRoute() == null) {
+                globalRpc.add(route.getType() != null ? route.getType().toString() : NULL_CONSTANT);
+            }
+        }
+        if(log.isDebugEnabled()) {
+            log.debug("Locally registered global RPCs {}", globalRpc);
+        }
+        return globalRpc;
+    }
+
+    @Override
+    public Set<String> getLocalRegisteredRoutedRpc() {
+        RoutingTable table = rpcRegistry.getLocalBucket().getData();
+        Set<String> routedRpc = new HashSet<>(table.getRoutes().size());
+        for(RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()){
+            if(route.getRoute() != null) {
+                StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
+                builder.append(route.getRoute().toString()).append(NAME_CONSTANT).append(route.getType() != null ?
+                    route.getType().toString() : NULL_CONSTANT);
+                routedRpc.add(builder.toString());
+            }
+        }
+        if(log.isDebugEnabled()) {
+            log.debug("Locally registered routed RPCs {}", routedRpc);
+        }
+        return routedRpc;
+    }
+
+    @Override
+    public Map<String, String> findRpcByName(final String name) {
+        RoutingTable localTable = rpcRegistry.getLocalBucket().getData();
+        // Get all RPCs from local bucket
+        Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT));
+
+        // Get all RPCs from remote bucket
+        Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
+        for(Address address : buckets.keySet()) {
+            RoutingTable table = buckets.get(address).getData();
+            rpcMap.putAll(getRpcMemberMapByName(table, name, address.toString()));
+        }
+        if(log.isDebugEnabled()) {
+            log.debug("list of RPCs {} searched by name {}", rpcMap, name);
+        }
+        return rpcMap;
+    }
+
+    @Override
+    public Map<String, String> findRpcByRoute(String routeId) {
+        RoutingTable localTable = rpcRegistry.getLocalBucket().getData();
+        Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT));
+
+        Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
+        for(Address address : buckets.keySet()) {
+            RoutingTable table = buckets.get(address).getData();
+            rpcMap.putAll(getRpcMemberMapByRoute(table, routeId, address.toString()));
+
+        }
+        if(log.isDebugEnabled()) {
+            log.debug("list of RPCs {} searched by route {}", rpcMap, routeId);
+        }
+        return rpcMap;
+    }
+
+    /**
+     * Search if the routing table route String contains routeName
+     */
+
+    private Map<String,String> getRpcMemberMapByRoute(final RoutingTable table, final String routeName,
+                                                      final String address) {
+        Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
+        Map<String, String> rpcMap = new HashMap<>(routes.size());
+        for(RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()){
+            if(route.getRoute() != null) {
+                String routeString = route.getRoute().toString();
+                if(routeString.contains(routeName)) {
+                    StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
+                    builder.append(routeString).append(NAME_CONSTANT).append(route.getType() != null ?
+                        route.getType().toString() : NULL_CONSTANT);
+                    rpcMap.put(builder.toString(), address);
+                }
+            }
+        }
+        return rpcMap;
+    }
+
+    /**
+     * Search if the routing table route type contains name
+     */
+    private Map<String, String>  getRpcMemberMapByName(final RoutingTable table, final String name,
+                                                       final String address) {
+        Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
+        Map<String, String> rpcMap = new HashMap<>(routes.size());
+        for(RpcRouter.RouteIdentifier<?, ?, ?> route : routes){
+            if(route.getType() != null) {
+                String type = route.getType().toString();
+                if(type.contains(name)) {
+                    StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
+                    builder.append(route.getRoute() != null ? route.getRoute().toString(): NULL_CONSTANT)
+                        .append(NAME_CONSTANT).append(type);
+                    rpcMap.put(builder.toString(), address);
+                }
+            }
+        }
+        return rpcMap;
+    }
+
+
+
+    @Override
+    public String getBucketVersions() {
+        return rpcRegistry.getVersions().toString();
+    }
+
+}
\ No newline at end of file
index 4d8463adae66cb1e53ba06b2014b0bc8702fc4b4..3be247a3bbdc71a911570d431dc679c86e1222d4 100644 (file)
@@ -104,6 +104,8 @@ public class JsonNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPr
                 result = partialResult;
             }
             return new NormalizedNodeContext(path,result);
+        } catch (final RestconfDocumentedException e) {
+            throw e;
         } catch (final Exception e) {
             LOG.debug("Error parsing json input", e);
 
index 74a9bd2d313618be1b090e49c472bd7894457c50..2a9c5bf190cbed62c4756940942436ac8f88682e 100644 (file)
@@ -104,6 +104,8 @@ public class XmlNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPro
 
             final NormalizedNode<?, ?> result = parse(path,doc);
             return new NormalizedNodeContext(path,result);
+        } catch (final RestconfDocumentedException e){
+            throw e;
         } catch (final Exception e) {
             LOG.debug("Error parsing xml input", e);
 
index 2da58a38203de35102e0cc5e7e667ec83adedeec..6cc62e859c9bade70e4125d2fea72a8addc6f875 100644 (file)
@@ -153,6 +153,11 @@ public class ControllerContext implements SchemaContextListener {
 
         final InstanceIdentifierBuilder builder = YangInstanceIdentifier.builder();
         final Module latestModule = globalSchema.findModuleByName(startModule, null);
+
+        if (latestModule == null) {
+            throw new RestconfDocumentedException("The module named '" + startModule + "' does not exist.", ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
+        }
+
         final InstanceIdentifierContext<?> iiWithSchemaNode = collectPathArguments(builder, pathArgs, latestModule, null,
                 toMountPointIdentifier);
 
index 84b092e10eae92f9fc7715162c6bbdc48a975ad1..624d709c60a7abd761c7c40011ffb0619cd66e4c 100644 (file)
@@ -11,6 +11,7 @@ import java.math.BigInteger;
 import java.util.Collection;
 import java.util.Collections;
 import org.opendaylight.controller.config.yang.md.sal.rest.connector.Config;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Delete;
 import org.opendaylight.controller.config.yang.md.sal.rest.connector.Get;
 import org.opendaylight.controller.config.yang.md.sal.rest.connector.Operational;
 import org.opendaylight.controller.config.yang.md.sal.rest.connector.Post;
@@ -78,15 +79,31 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec
     @Override
     public Config getConfig() {
         final Config config = new Config();
+
         final Get get = new Get();
         get.setReceivedRequests(stats.getConfigGet());
+        get.setSuccessfulResponses(stats.getSuccessGetConfig());
+        get.setFailedResponses(stats.getFailureGetConfig());
         config.setGet(get);
+
         final Post post = new Post();
         post.setReceivedRequests(stats.getConfigPost());
+        post.setSuccessfulResponses(stats.getSuccessPost());
+        post.setFailedResponses(stats.getFailurePost());
         config.setPost(post);
+
         final Put put = new Put();
         put.setReceivedRequests(stats.getConfigPut());
+        put.setSuccessfulResponses(stats.getSuccessPut());
+        put.setFailedResponses(stats.getFailurePut());
         config.setPut(put);
+
+        final Delete delete = new Delete();
+        delete.setReceivedRequests(stats.getConfigDelete());
+        delete.setSuccessfulResponses(stats.getSuccessDelete());
+        delete.setFailedResponses(stats.getFailureDelete());
+        config.setDelete(delete);
+
         return config;
     }
 
@@ -96,6 +113,8 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec
         final Operational operational = new Operational();
         final Get get = new Get();
         get.setReceivedRequests(opGet);
+        get.setSuccessfulResponses(stats.getSuccessGetOperational());
+        get.setFailedResponses(stats.getFailureGetOperational());
         operational.setGet(get);
         return operational;
     }
@@ -105,6 +124,6 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec
         final BigInteger rpcInvoke = stats.getRpc();
         final Rpcs rpcs = new Rpcs();
         rpcs.setReceivedRequests(rpcInvoke);
-        return rpcs ;
+        return rpcs;
     }
-}
+}
\ No newline at end of file
index 07178f537999ce0a04c285431c1398f684453e67..f4a5fbc926479c94ed3eaa762722483424036133 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.sal.restconf.impl;
 import java.math.BigInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriInfo;
 import org.opendaylight.controller.sal.rest.api.RestconfService;
 
@@ -21,6 +22,16 @@ public class StatisticsRestconfServiceWrapper implements RestconfService {
     AtomicLong configPost = new AtomicLong();
     AtomicLong configPut = new AtomicLong();
     AtomicLong configDelete = new AtomicLong();
+    AtomicLong successGetConfig = new AtomicLong();
+    AtomicLong successGetOperational = new AtomicLong();
+    AtomicLong successPost = new AtomicLong();
+    AtomicLong successPut = new AtomicLong();
+    AtomicLong successDelete = new AtomicLong();
+    AtomicLong failureGetConfig = new AtomicLong();
+    AtomicLong failureGetOperational = new AtomicLong();
+    AtomicLong failurePost = new AtomicLong();
+    AtomicLong failurePut = new AtomicLong();
+    AtomicLong failureDelete = new AtomicLong();
 
     private static final StatisticsRestconfServiceWrapper INSTANCE = new StatisticsRestconfServiceWrapper(RestconfImpl.getInstance());
 
@@ -79,36 +90,115 @@ public class StatisticsRestconfServiceWrapper implements RestconfService {
     @Override
     public NormalizedNodeContext readConfigurationData(final String identifier, final UriInfo uriInfo) {
         configGet.incrementAndGet();
-        return delegate.readConfigurationData(identifier, uriInfo);
+        NormalizedNodeContext normalizedNodeContext = null;
+        try {
+            normalizedNodeContext = delegate.readConfigurationData(identifier, uriInfo);
+            if (normalizedNodeContext.getData() != null) {
+                successGetConfig.incrementAndGet();
+            }
+            else {
+                failureGetConfig.incrementAndGet();
+            }
+        } catch (Exception e) {
+            failureGetConfig.incrementAndGet();
+            throw e;
+        }
+        return normalizedNodeContext;
     }
 
     @Override
     public NormalizedNodeContext readOperationalData(final String identifier, final UriInfo uriInfo) {
         operationalGet.incrementAndGet();
-        return delegate.readOperationalData(identifier, uriInfo);
+        NormalizedNodeContext normalizedNodeContext = null;
+        try {
+            normalizedNodeContext = delegate.readOperationalData(identifier, uriInfo);
+            if (normalizedNodeContext.getData() != null) {
+                successGetOperational.incrementAndGet();
+            }
+            else {
+                failureGetOperational.incrementAndGet();
+            }
+        } catch (Exception e) {
+            failureGetOperational.incrementAndGet();
+            throw e;
+        }
+        return normalizedNodeContext;
     }
 
     @Override
     public Response updateConfigurationData(final String identifier, final NormalizedNodeContext payload) {
         configPut.incrementAndGet();
-        return delegate.updateConfigurationData(identifier, payload);
+        Response response = null;
+        try {
+            response = delegate.updateConfigurationData(identifier, payload);
+            if (response.getStatus() == Status.OK.getStatusCode()) {
+                successPut.incrementAndGet();
+            }
+            else {
+                failurePut.incrementAndGet();
+            }
+        } catch (Exception e) {
+            failurePut.incrementAndGet();
+            throw e;
+        }
+        return response;
     }
 
     @Override
     public Response createConfigurationData(final String identifier, final NormalizedNodeContext payload, final UriInfo uriInfo) {
         configPost.incrementAndGet();
-        return delegate.createConfigurationData(identifier, payload, uriInfo);
+        Response response = null;
+        try {
+            response = delegate.createConfigurationData(identifier, payload, uriInfo);
+            if (response.getStatus() == Status.OK.getStatusCode()) {
+                successPost.incrementAndGet();
+            }
+            else {
+                failurePost.incrementAndGet();
+            }
+        } catch (Exception e) {
+            failurePost.incrementAndGet();
+            throw e;
+        }
+        return response;
     }
 
     @Override
     public Response createConfigurationData(final NormalizedNodeContext payload, final UriInfo uriInfo) {
         configPost.incrementAndGet();
-        return delegate.createConfigurationData(payload, uriInfo);
+        Response response = null;
+        try {
+            response = delegate.createConfigurationData(payload, uriInfo);
+            if (response.getStatus() == Status.OK.getStatusCode()) {
+                successPost.incrementAndGet();
+            }
+            else {
+                failurePost.incrementAndGet();
+            }
+        }catch (Exception e) {
+            failurePost.incrementAndGet();
+            throw e;
+        }
+        return response;
     }
 
     @Override
     public Response deleteConfigurationData(final String identifier) {
-        return delegate.deleteConfigurationData(identifier);
+        configDelete.incrementAndGet();
+        Response response = null;
+        try {
+            response = delegate.deleteConfigurationData(identifier);
+            if (response.getStatus() == Status.OK.getStatusCode()) {
+                successDelete.incrementAndGet();
+            }
+            else {
+                failureDelete.incrementAndGet();
+            }
+        } catch (Exception e) {
+            failureDelete.incrementAndGet();
+            throw e;
+        }
+        return response;
     }
 
     @Override
@@ -144,4 +234,44 @@ public class StatisticsRestconfServiceWrapper implements RestconfService {
     public BigInteger getRpc() {
         return BigInteger.valueOf(rpc.get());
     }
-}
+
+    public BigInteger getSuccessGetConfig() {
+        return BigInteger.valueOf(successGetConfig.get());
+    }
+
+    public BigInteger getSuccessGetOperational() {
+        return BigInteger.valueOf(successGetOperational.get());
+    }
+
+    public BigInteger getSuccessPost() {
+        return BigInteger.valueOf(successPost.get());
+    }
+
+    public BigInteger getSuccessPut() {
+        return BigInteger.valueOf(successPut.get());
+    }
+
+    public BigInteger getSuccessDelete() {
+        return BigInteger.valueOf(successDelete.get());
+    }
+
+    public BigInteger getFailureGetConfig() {
+        return BigInteger.valueOf(failureGetConfig.get());
+    }
+
+    public BigInteger getFailureGetOperational() {
+        return BigInteger.valueOf(failureGetOperational.get());
+    }
+
+    public BigInteger getFailurePost() {
+        return BigInteger.valueOf(failurePost.get());
+    }
+
+    public BigInteger getFailurePut() {
+        return BigInteger.valueOf(failurePut.get());
+    }
+
+    public BigInteger getFailureDelete() {
+        return BigInteger.valueOf(failureDelete.get());
+    }
+}
\ No newline at end of file
index 6d2add6ff14313b8824a5ade4d8cba187260bc24..6fa9c86ec1879985afa664783f65e483ffad60da 100644 (file)
@@ -31,6 +31,14 @@ module opendaylight-rest-connector {
         leaf received-requests {
            type uint64;
         }
+
+        leaf successful-responses {
+            type uint64;
+        }
+
+        leaf failed-responses {
+            type uint64;
+        }
     }
 
     augment "/config:modules/config:module/config:configuration" {
@@ -70,6 +78,10 @@ module opendaylight-rest-connector {
                 container put {
                     uses statistics;
                 }
+                
+                container delete {
+                    uses statistics;
+                }
             }
 
             container operational {
index 50958e423f4a14e5af67435f4b3d0f745a97ef87..3dbcc53822e7919fa184541c47d224c2440bc8fc 100644 (file)
@@ -61,19 +61,12 @@ final class MonitoringToMdsalWriter implements AutoCloseable, NetconfMonitoringS
         tx.put(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(NetconfState.class), state);
         // FIXME first attempt (right after we register to binding broker) always fails
         // Is it due to the fact that we are writing from the onSessionInitiated callback ?
-        final CheckedFuture<Void, TransactionCommitFailedException> submit = tx.submit();
-
-        Futures.addCallback(submit, new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void aVoid) {
-                LOG.debug("Netconf state updated successfully");
-            }
-
-            @Override
-            public void onFailure(final Throwable throwable) {
-                LOG.warn("Unable to update netconf state", throwable);
-            }
-        });
+        try {
+            tx.submit().checkedGet();
+            LOG.debug("Netconf state updated successfully");
+        } catch (TransactionCommitFailedException e) {
+            LOG.warn("Unable to update netconf state", e);
+        }
     }
 
     @Override
index 7b60a17827dfe388b3775c83800bac803f1ddc7a..af352b1c2e8e28d1b338c7fab9377d72860c97e7 100644 (file)
@@ -30,24 +30,26 @@ class AsyncExecutionStrategy implements ExecutionStrategy {
     private final List<NetconfMessage> preparedMessages;
     private final NetconfDeviceCommunicator sessionListener;
     private final List<Integer> editBatches;
+    private final int editAmount;
 
     public AsyncExecutionStrategy(final Parameters params, final List<NetconfMessage> editConfigMsgs, final NetconfDeviceCommunicator sessionListener) {
         this.params = params;
         this.preparedMessages = editConfigMsgs;
         this.sessionListener = sessionListener;
-        this.editBatches = countEditBatchSizes(params);
+        this.editBatches = countEditBatchSizes(params, editConfigMsgs.size());
+        editAmount = editConfigMsgs.size();
     }
 
-    private static List<Integer> countEditBatchSizes(final Parameters params) {
+    private static List<Integer> countEditBatchSizes(final Parameters params, final int amount) {
         final List<Integer> editBatches = Lists.newArrayList();
-        if (params.editBatchSize != params.editCount) {
-            final int fullBatches = params.editCount / params.editBatchSize;
+        if (params.editBatchSize != amount) {
+            final int fullBatches = amount / params.editBatchSize;
             for (int i = 0; i < fullBatches; i++) {
                 editBatches.add(params.editBatchSize);
             }
 
-            if (params.editCount % params.editBatchSize != 0) {
-                editBatches.add(params.editCount % params.editBatchSize);
+            if (amount % params.editBatchSize != 0) {
+                editBatches.add(amount % params.editBatchSize);
             }
         } else {
             editBatches.add(params.editBatchSize);
@@ -96,6 +98,6 @@ class AsyncExecutionStrategy implements ExecutionStrategy {
             }
         }
 
-        Preconditions.checkState(responseCounter.get() == params.editCount + editBatches.size(), "Not all responses were received, only %s from %s", responseCounter.get(), params.editCount + editBatches.size());
+        Preconditions.checkState(responseCounter.get() == editAmount + editBatches.size(), "Not all responses were received, only %s from %s", responseCounter.get(), params.editCount + editBatches.size());
     }
 }
index 72031079019fca044f323a014b58844fce1e9b53..8a9a915a08d4ef4a62c5d9a5326a78f20b758873 100644 (file)
@@ -55,6 +55,9 @@ public class Parameters {
     @Arg(dest = "tcp-header")
     public String tcpHeader;
 
+    @Arg(dest = "thread-amount")
+    public int threadAmount;
+
     static ArgumentParser getParser() {
         final ArgumentParser parser = ArgumentParsers.newArgumentParser("netconf stress client");
 
@@ -130,6 +133,11 @@ public class Parameters {
                 .required(false)
                 .dest("tcp-header");
 
+        parser.addArgument("--thread-amount")
+                .type(Integer.class)
+                .setDefault(1)
+                .dest("thread-amount");
+
         // TODO add get-config option instead of edit + commit
         // TODO different edit config content
 
index 6bf50d2c5a0070e4273d19d11cf843e974d569a4..2916ec52f7bed25d4d0065579db166b73c94b4c8 100644 (file)
@@ -10,38 +10,28 @@ package org.opendaylight.controller.netconf.test.tool.client.stress;
 
 import ch.qos.logback.classic.Level;
 import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
-import io.netty.util.concurrent.GlobalEventExecutor;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
-import org.opendaylight.controller.netconf.client.NetconfClientSession;
-import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
-import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
-import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.opendaylight.controller.sal.connect.api.RemoteDevice;
 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
-import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.protocol.framework.NeverReconnectStrategy;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.CommitInput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.EditConfigInput;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -80,6 +70,7 @@ public final class StressClient {
                             "        <target>\n" +
                             "            <candidate/>\n" +
                             "        </target>\n" +
+                            "        <default-operation>none</default-operation>" +
                             "        <config/>\n" +
                             "    </edit-config>\n" +
                             "</rpc>");
@@ -89,90 +80,89 @@ public final class StressClient {
     }
 
     private static final String MSG_ID_PLACEHOLDER_REGEX = "\\{MSG_ID\\}";
-    private static final String PHYS_ADDR_PLACEHOLDER_REGEX = "\\{PHYS_ADDR\\}";
+    private static final String PHYS_ADDR_PLACEHOLDER = "{PHYS_ADDR}";
+
+    private static long macStart = 0xAABBCCDD0000L;
 
     public static void main(final String[] args) {
         final Parameters params = parseArgs(args, Parameters.getParser());
         params.validate();
 
-        // Wait 5 seconds to allow for debugging/profiling
-        try {
-            Thread.sleep(5000);
-        } catch (final InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-
         final ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
         root.setLevel(params.debug ? Level.DEBUG : Level.INFO);
 
+        final int threadAmount = params.threadAmount;
+        LOG.info("thread amount: " + threadAmount);
+        final int requestsPerThread = params.editCount / params.threadAmount;
+        LOG.info("requestsPerThread: " + requestsPerThread);
+        final int leftoverRequests = params.editCount % params.threadAmount;
+        LOG.info("leftoverRequests: " + leftoverRequests);
+
+
         LOG.info("Preparing messages");
         // Prepare all msgs up front
-        final List<NetconfMessage> preparedMessages = Lists.newArrayListWithCapacity(params.editCount);
+        final List<List<NetconfMessage>> allPreparedMessages = new ArrayList<>(threadAmount);
+        for (int i = 0; i < threadAmount; i++) {
+            if (i != threadAmount - 1) {
+                allPreparedMessages.add(new ArrayList<NetconfMessage>(requestsPerThread));
+            } else {
+                allPreparedMessages.add(new ArrayList<NetconfMessage>(requestsPerThread + leftoverRequests));
+            }
+        }
+
 
         final String editContentString;
         try {
             editContentString = Files.toString(params.editContent, Charsets.UTF_8);
-        } catch (IOException e) {
+        } catch (final IOException e) {
             throw new IllegalArgumentException("Cannot read content of " + params.editContent);
         }
 
-        for (int i = 0; i < params.editCount; i++) {
-            final Document msg = XmlUtil.createDocumentCopy(editBlueprint);
-            msg.getDocumentElement().setAttribute("message-id", Integer.toString(i));
-            final NetconfMessage netconfMessage = new NetconfMessage(msg);
-
-            final Element editContentElement;
-            try {
-                // Insert message id where needed
-                String specificEditContent =
-                        editContentString.replaceAll(MSG_ID_PLACEHOLDER_REGEX, Integer.toString(i));
-
-                // Insert physical address where needed
-                specificEditContent =
-                        specificEditContent.replaceAll(PHYS_ADDR_PLACEHOLDER_REGEX, getMac(i));
-
-                editContentElement = XmlUtil.readXmlToElement(specificEditContent);
-                final Node config = ((Element) msg.getDocumentElement().getElementsByTagName("edit-config").item(0)).
-                        getElementsByTagName("config").item(0);
-                config.appendChild(msg.importNode(editContentElement, true));
-            } catch (final IOException | SAXException e) {
-                throw new IllegalArgumentException("Edit content file is unreadable", e);
+        for (int i = 0; i < threadAmount; i++) {
+            final List<NetconfMessage> preparedMessages = allPreparedMessages.get(i);
+            int padding = 0;
+            if (i == threadAmount - 1) {
+                padding = leftoverRequests;
+            }
+            for (int j = 0; j < requestsPerThread + padding; j++) {
+                LOG.debug("id: " + (i * requestsPerThread + j));
+                preparedMessages.add(prepareMessage(i * requestsPerThread + j, editContentString));
             }
-
-            preparedMessages.add(netconfMessage);
-
         }
 
-
         final NioEventLoopGroup nioGroup = new NioEventLoopGroup();
         final Timer timer = new HashedWheelTimer();
 
         final NetconfClientDispatcherImpl netconfClientDispatcher = configureClientDispatcher(params, nioGroup, timer);
 
-        final NetconfDeviceCommunicator sessionListener = getSessionListener(params.getInetAddress());
+        final List<StressClientCallable> callables = new ArrayList<>(threadAmount);
+        for (final List<NetconfMessage> messages : allPreparedMessages) {
+            callables.add(new StressClientCallable(params, netconfClientDispatcher, messages));
+        }
 
-        final NetconfClientConfiguration cfg = getNetconfClientConfiguration(params, sessionListener);
+        final ExecutorService executorService = Executors.newFixedThreadPool(threadAmount);
 
-        LOG.info("Connecting to netconf server {}:{}", params.ip, params.port);
-        final NetconfClientSession netconfClientSession;
+        LOG.info("Starting stress test");
+        final Stopwatch started = Stopwatch.createStarted();
         try {
-            netconfClientSession = netconfClientDispatcher.createClient(cfg).get();
+            final List<Future<Boolean>> futures = executorService.invokeAll(callables);
+            for (final Future<Boolean> future : futures) {
+                try {
+                    future.get(4L, TimeUnit.MINUTES);
+                } catch (ExecutionException | TimeoutException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            executorService.shutdownNow();
         } catch (final InterruptedException e) {
-            throw new RuntimeException(e);
-        } catch (final ExecutionException e) {
-            throw new RuntimeException("Unable to connect", e);
+            throw new RuntimeException("Unable to execute requests", e);
         }
-
-        LOG.info("Starting stress test");
-        final Stopwatch started = Stopwatch.createStarted();
-        getExecutionStrategy(params, preparedMessages, sessionListener).invoke();
         started.stop();
 
         LOG.info("FINISHED. Execution time: {}", started);
         LOG.info("Requests per second: {}", (params.editCount * 1000.0 / started.elapsed(TimeUnit.MILLISECONDS)));
 
         // Cleanup
-        netconfClientSession.close();
         timer.stop();
         try {
             nioGroup.shutdownGracefully().get(20L, TimeUnit.SECONDS);
@@ -181,29 +171,33 @@ public final class StressClient {
         }
     }
 
-    private static String getMac(final int i) {
-        final String hex = Integer.toHexString(i);
-        final Iterable<String> macGroups = Splitter.fixedLength(2).split(hex);
+    static NetconfMessage prepareMessage(final int id, final String editContentString) {
+        final Document msg = XmlUtil.createDocumentCopy(editBlueprint);
+        msg.getDocumentElement().setAttribute("message-id", Integer.toString(id));
+        final NetconfMessage netconfMessage = new NetconfMessage(msg);
 
-        final int additional = 6 - Iterables.size(macGroups);
-        final ArrayList<String> additionalGroups = Lists.newArrayListWithCapacity(additional);
-        for (int j = 0; j < additional; j++) {
-            additionalGroups.add("00");
-        }
-        return Joiner.on(':').join(Iterables.concat(Iterables.transform(macGroups, new Function<String, String>() {
-            @Override
-            public String apply(final String input) {
-                return input.length() == 1 ? input + "0" : input;
+        final Element editContentElement;
+        try {
+            // Insert message id where needed
+            String specificEditContent = editContentString.replaceAll(MSG_ID_PLACEHOLDER_REGEX, Integer.toString(id));
+
+            final StringBuilder stringBuilder = new StringBuilder(specificEditContent);
+            int idx = stringBuilder.indexOf(PHYS_ADDR_PLACEHOLDER);
+            while (idx!= -1) {
+                stringBuilder.replace(idx, idx + PHYS_ADDR_PLACEHOLDER.length(), getMac(macStart++));
+                idx = stringBuilder.indexOf(PHYS_ADDR_PLACEHOLDER);
             }
-        }), additionalGroups));
-    }
-
-    private static ExecutionStrategy getExecutionStrategy(final Parameters params, final List<NetconfMessage> preparedMessages, final NetconfDeviceCommunicator sessionListener) {
-        if(params.async) {
-            return new AsyncExecutionStrategy(params, preparedMessages, sessionListener);
-        } else {
-            return new SyncExecutionStrategy(params, preparedMessages, sessionListener);
+            specificEditContent = stringBuilder.toString();
+
+            editContentElement = XmlUtil.readXmlToElement(specificEditContent);
+            final Node config = ((Element) msg.getDocumentElement().getElementsByTagName("edit-config").item(0)).
+                    getElementsByTagName("config").item(0);
+            config.appendChild(msg.importNode(editContentElement, true));
+        } catch (final IOException | SAXException e) {
+            throw new IllegalArgumentException("Edit content file is unreadable", e);
         }
+
+        return netconfMessage;
     }
 
     private static NetconfClientDispatcherImpl configureClientDispatcher(final Parameters params, final NioEventLoopGroup nioGroup, final Timer timer) {
@@ -224,29 +218,18 @@ public final class StressClient {
         return netconfClientDispatcher;
     }
 
-    private static NetconfClientConfiguration getNetconfClientConfiguration(final Parameters params, final NetconfDeviceCommunicator sessionListener) {
-        final NetconfClientConfigurationBuilder netconfClientConfigurationBuilder = NetconfClientConfigurationBuilder.create();
-        netconfClientConfigurationBuilder.withSessionListener(sessionListener);
-        netconfClientConfigurationBuilder.withAddress(params.getInetAddress());
-        if(params.tcpHeader != null) {
-            final String header = params.tcpHeader.replaceAll("\"", "").trim() + "\n";
-            netconfClientConfigurationBuilder.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader(null, null, null, null, null) {
-                @Override
-                public String toFormattedString() {
-                    LOG.debug("Sending TCP header {}", header);
-                    return header;
-                }
-            });
+    public static String getMac(long mac) {
+        StringBuilder m = new StringBuilder(Long.toString(mac, 16));
+
+        for (int i = m.length(); i < 12; i++) {
+            m.insert(0, "0");
+        }
+
+        for (int j = m.length() - 2; j >= 2; j-=2) {
+            m.insert(j, ":");
         }
-        netconfClientConfigurationBuilder.withProtocol(params.ssh ? NetconfClientConfiguration.NetconfClientProtocol.SSH : NetconfClientConfiguration.NetconfClientProtocol.TCP);
-        netconfClientConfigurationBuilder.withConnectionTimeoutMillis(20000L);
-        netconfClientConfigurationBuilder.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
-        return netconfClientConfigurationBuilder.build();
-    }
 
-    static NetconfDeviceCommunicator getSessionListener(final InetSocketAddress inetAddress) {
-        final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> loggingRemoteDevice = new LoggingRemoteDevice();
-        return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test", inetAddress), loggingRemoteDevice);
+        return m.toString();
     }
 
     private static Parameters parseArgs(final String[] args, final ArgumentParser parser) {
@@ -263,7 +246,7 @@ public final class StressClient {
     }
 
 
-    private static class LoggingRemoteDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
+    static class LoggingRemoteDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> {
         @Override
         public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceCommunicator netconfDeviceCommunicator) {
             LOG.info("Session established");
diff --git a/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClientCallable.java b/opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/client/stress/StressClientCallable.java
new file mode 100644 (file)
index 0000000..a4c5c57
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.test.tool.client.stress;
+
+import io.netty.util.concurrent.GlobalEventExecutor;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
+import org.opendaylight.controller.netconf.client.NetconfClientSession;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.opendaylight.controller.sal.connect.api.RemoteDevice;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.protocol.framework.NeverReconnectStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StressClientCallable implements Callable<Boolean>{
+
+    private static final Logger LOG = LoggerFactory.getLogger(StressClientCallable.class);
+
+    private Parameters params;
+    private final NetconfDeviceCommunicator sessionListener;
+    private final NetconfClientDispatcherImpl netconfClientDispatcher;
+    private final NetconfClientConfiguration cfg;
+    private final NetconfClientSession netconfClientSession;
+    private final ExecutionStrategy executionStrategy;
+
+    public StressClientCallable(final Parameters params,
+                                final NetconfClientDispatcherImpl netconfClientDispatcher,
+                                final List<NetconfMessage> preparedMessages) {
+        this.params = params;
+        this.sessionListener = getSessionListener(params.getInetAddress());
+        this.netconfClientDispatcher = netconfClientDispatcher;
+        cfg = getNetconfClientConfiguration(this.params, this.sessionListener);
+
+        LOG.info("Connecting to netconf server {}:{}", params.ip, params.port);
+        try {
+            netconfClientSession = netconfClientDispatcher.createClient(cfg).get();
+        } catch (final InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (final ExecutionException e) {
+            throw new RuntimeException("Unable to connect", e);
+        }
+        executionStrategy = getExecutionStrategy(params, preparedMessages, sessionListener);
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+        executionStrategy.invoke();
+        netconfClientSession.close();
+        return true;
+    }
+
+    private static ExecutionStrategy getExecutionStrategy(final Parameters params, final List<NetconfMessage> preparedMessages, final NetconfDeviceCommunicator sessionListener) {
+        if(params.async) {
+            return new AsyncExecutionStrategy(params, preparedMessages, sessionListener);
+        } else {
+            return new SyncExecutionStrategy(params, preparedMessages, sessionListener);
+        }
+    }
+
+    private static NetconfDeviceCommunicator getSessionListener(final InetSocketAddress inetAddress) {
+        final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> loggingRemoteDevice = new StressClient.LoggingRemoteDevice();
+        return new NetconfDeviceCommunicator(new RemoteDeviceId("secure-test", inetAddress), loggingRemoteDevice);
+    }
+
+    private static NetconfClientConfiguration getNetconfClientConfiguration(final Parameters params, final NetconfDeviceCommunicator sessionListener) {
+        final NetconfClientConfigurationBuilder netconfClientConfigurationBuilder = NetconfClientConfigurationBuilder.create();
+        netconfClientConfigurationBuilder.withSessionListener(sessionListener);
+        netconfClientConfigurationBuilder.withAddress(params.getInetAddress());
+        if(params.tcpHeader != null) {
+            final String header = params.tcpHeader.replaceAll("\"", "").trim() + "\n";
+            netconfClientConfigurationBuilder.withAdditionalHeader(new NetconfHelloMessageAdditionalHeader(null, null, null, null, null) {
+                @Override
+                public String toFormattedString() {
+                    LOG.debug("Sending TCP header {}", header);
+                    return header;
+                }
+            });
+        }
+        netconfClientConfigurationBuilder.withProtocol(params.ssh ? NetconfClientConfiguration.NetconfClientProtocol.SSH : NetconfClientConfiguration.NetconfClientProtocol.TCP);
+        netconfClientConfigurationBuilder.withConnectionTimeoutMillis(20000L);
+        netconfClientConfigurationBuilder.withReconnectStrategy(new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
+        return netconfClientConfigurationBuilder.build();
+    }
+}
index 34142a7f2a49d89a47d3529b9fe976c5e76743fb..40f39022e6b41e9424f46421cc38c2e5837ceff2 100644 (file)
@@ -31,24 +31,26 @@ class SyncExecutionStrategy implements ExecutionStrategy {
     private final List<NetconfMessage> preparedMessages;
     private final NetconfDeviceCommunicator sessionListener;
     private final List<Integer> editBatches;
+    private final int editAmount;
 
     public SyncExecutionStrategy(final Parameters params, final List<NetconfMessage> preparedMessages, final NetconfDeviceCommunicator sessionListener) {
         this.params = params;
         this.preparedMessages = preparedMessages;
         this.sessionListener = sessionListener;
-        editBatches = countEditBatchSizes(params);
+        this.editBatches = countEditBatchSizes(params, preparedMessages.size());
+        editAmount = preparedMessages.size();
     }
 
-    private static List<Integer> countEditBatchSizes(final Parameters params) {
+    private static List<Integer> countEditBatchSizes(final Parameters params, final int amount) {
         final List<Integer> editBatches = Lists.newArrayList();
-        if (params.editBatchSize != params.editCount) {
-            final int fullBatches = params.editCount / params.editBatchSize;
+        if (params.editBatchSize != amount) {
+            final int fullBatches = amount / params.editBatchSize;
             for (int i = 0; i < fullBatches; i++) {
                 editBatches.add(params.editBatchSize);
             }
 
-            if (params.editCount % params.editBatchSize != 0) {
-                editBatches.add(params.editCount % params.editBatchSize);
+            if (amount % params.editBatchSize != 0) {
+                editBatches.add(amount % params.editBatchSize);
             }
         } else {
             editBatches.add(params.editBatchSize);
@@ -82,7 +84,7 @@ class SyncExecutionStrategy implements ExecutionStrategy {
                     sessionListener.sendRequest(StressClient.COMMIT_MSG, StressClient.COMMIT_QNAME));
         }
 
-        Preconditions.checkState(responseCounter.get() == params.editCount + editBatches.size(), "Not all responses were received, only %s from %s", responseCounter.get(), params.editCount + editBatches.size());
+        Preconditions.checkState(responseCounter.get() == editAmount + editBatches.size(), "Not all responses were received, only %s from %s", responseCounter.get(), params.editCount + editBatches.size());
     }
 
     private void waitForResponse(AtomicInteger responseCounter, final ListenableFuture<RpcResult<NetconfMessage>> netconfMessageFuture) {