Move netconf.api.monitoring
[netconf.git] / netconf / tools / netconf-testtool / src / main / java / org / opendaylight / netconf / test / tool / ScaleUtil.java
index 555f6833b2b38eb10a96c1d2bc7e6c7225fa5361..186251a19c552e218cb75c27b66552c31d6169bf 100644 (file)
@@ -5,28 +5,27 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.netconf.test.tool;
 
 import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.util.ContextInitializer;
-import ch.qos.logback.core.joran.spi.JoranException;
-import ch.qos.logback.core.util.StatusPrinter;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Throwables;
 import com.google.common.io.CharStreams;
-import com.ning.http.client.AsyncHttpClient;
-import com.ning.http.client.AsyncHttpClientConfig.Builder;
-import com.ning.http.client.Request;
-import com.ning.http.client.Response;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.net.Authenticator;
 import java.net.ConnectException;
+import java.net.PasswordAuthentication;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -34,25 +33,30 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import net.sourceforge.argparse4j.inf.ArgumentParser;
-import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import org.opendaylight.netconf.test.tool.config.Configuration;
+import org.opendaylight.netconf.test.tool.config.ConfigurationBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ScaleUtil {
-    private static Logger RESULTS_LOG ;
-    private static final ScheduledExecutorService executor = new LoggingWrapperExecutor(4);
-
-    private static final int deviceStep = 1000;
-    private static final long retryDelay = 10l;
-    private static final long timeout = 20l;
+@SuppressFBWarnings({"DM_EXIT", "DM_DEFAULT_ENCODING", "SLF4J_LOGGER_SHOULD_BE_FINAL"})
+public final class ScaleUtil {
+    private static final ScheduledExecutorService EXECUTOR = new LoggingWrapperExecutor(4);
+    private static final Semaphore SEMAPHORE = new Semaphore(0);
+    private static final Stopwatch STOPWATCH = Stopwatch.createUnstarted();
+    private static final String RESTCONF_URL = "http://%s:%d/rests/data/"
+            + "network-topology:network-topology?content=nonconfig";
 
-    private static final Stopwatch stopwatch = Stopwatch.createUnstarted();
+    private static final long TIMEOUT = 20L;
+    private static final long RETRY_DELAY = 10L;
+    private static final int DEVICE_STEP = 1000;
 
-    private static ScheduledFuture timeoutGuardFuture;
     private static ch.qos.logback.classic.Logger root;
-    private static final Semaphore semaphore = new Semaphore(0);
+    private static Logger resultsLog;
 
+    private ScaleUtil() {
+    }
+
+    @SuppressWarnings("checkstyle:illegalCatch")
     public static void main(final String[] args) {
         final TesttoolParameters params = TesttoolParameters.parseArgs(args, TesttoolParameters.getParser());
 
@@ -64,26 +68,19 @@ public class ScaleUtil {
 
         while (true) {
             root.warn("Starting scale test with {} devices", params.deviceCount);
-            timeoutGuardFuture = executor.schedule(new TimeoutGuard(), timeout, TimeUnit.MINUTES);
-            final NetconfDeviceSimulator netconfDeviceSimulator = new NetconfDeviceSimulator(params.threadAmount);
-            try {
-                final List<Integer> openDevices = netconfDeviceSimulator.start(params);
-                if (openDevices.size() == 0) {
-                    root.error("Failed to start any simulated devices, exiting...");
-                    System.exit(1);
-                }
-                if (params.distroFolder != null) {
-                    final Main.ConfigGenerator configGenerator = new Main.ConfigGenerator(params.distroFolder, openDevices);
-                    final List<File> generated = configGenerator.generate(
-                            params.ssh, params.generateConfigBatchSize,
-                            params.generateConfigsTimeout, params.generateConfigsAddress,
-                            params.devicesPerPort);
-                    configGenerator.updateFeatureFile(generated);
-                    configGenerator.changeLoadOrder();
-                }
-            } catch (final Exception e) {
-                root.error("Unhandled exception", e);
-                netconfDeviceSimulator.close();
+            final ScheduledFuture<?> timeoutGuardFuture = EXECUTOR.schedule(new TimeoutGuard(), TIMEOUT,
+                TimeUnit.MINUTES);
+            final Configuration configuration = new ConfigurationBuilder().from(params).build();
+            final NetconfDeviceSimulator netconfDeviceSimulator = new NetconfDeviceSimulator(configuration);
+
+            final List<Integer> openDevices = netconfDeviceSimulator.start();
+            if (openDevices.size() == 0) {
+                root.error("Failed to start any simulated devices, exiting...");
+                System.exit(1);
+            }
+
+            if (params.distroFolder == null) {
+                root.error("Distro folder is not set, exiting...");
                 System.exit(1);
             }
 
@@ -92,41 +89,50 @@ public class ScaleUtil {
                 runtime.exec(params.distroFolder.getAbsolutePath() + "/bin/start");
                 String status;
                 do {
-                    final Process exec = runtime.exec(params.distroFolder.getAbsolutePath() + "/bin/status");
+                    final Process list = runtime.exec(params.distroFolder.getAbsolutePath()
+                        + "/bin/client feature:list");
                     try {
-                        Thread.sleep(2000l);
+                        Thread.sleep(2000L);
                     } catch (InterruptedException e) {
                         root.warn("Failed to sleep", e);
                     }
-                    status = CharStreams.toString(new BufferedReader(new InputStreamReader(exec.getInputStream())));
-                    root.warn("Current status: {}", status);
-                } while (!status.startsWith("Running ..."));
-                root.warn("Doing feature install {}", params.distroFolder.getAbsolutePath() + "/bin/client -u karaf feature:install odl-restconf-noauth odl-netconf-connector-all");
-                final Process featureInstall = runtime.exec(params.distroFolder.getAbsolutePath() + "/bin/client -u karaf feature:install odl-restconf-noauth odl-netconf-connector-all");
-                root.warn(CharStreams.toString(new BufferedReader(new InputStreamReader(featureInstall.getInputStream()))));
-                root.warn(CharStreams.toString(new BufferedReader(new InputStreamReader(featureInstall.getErrorStream()))));
+                    status = CharStreams.toString(new BufferedReader(new InputStreamReader(list.getErrorStream())));
+                    root.warn(status);
+                } while (status.startsWith("Failed to get the session"));
+                root.warn("Doing feature install {}", params.distroFolder.getAbsolutePath()
+                    + "/bin/client feature:install odl-restconf-nb odl-netconf-topology");
+                final Process featureInstall = runtime.exec(params.distroFolder.getAbsolutePath()
+                    + "/bin/client feature:install odl-restconf-nb odl-netconf-topology");
+                root.warn(
+                    CharStreams.toString(new BufferedReader(new InputStreamReader(featureInstall.getInputStream()))));
+                root.warn(
+                    CharStreams.toString(new BufferedReader(new InputStreamReader(featureInstall.getErrorStream()))));
 
             } catch (IOException e) {
-                root.warn("Failed to start karaf", e);
+                root.error("Failed to start karaf", e);
                 System.exit(1);
             }
 
+            waitNetconfTopologyReady(params);
+            final Execution ex = new Execution(openDevices, params);
+            ex.call();
+
             root.warn("Karaf started, starting stopwatch");
-            stopwatch.start();
+            STOPWATCH.start();
 
             try {
-                executor.schedule(new ScaleVerifyCallable(netconfDeviceSimulator, params.deviceCount), retryDelay, TimeUnit.SECONDS);
+                EXECUTOR.schedule(new ScaleVerifyCallable(params), RETRY_DELAY, TimeUnit.SECONDS);
                 root.warn("First callable scheduled");
-                semaphore.acquire();
+                SEMAPHORE.acquire();
                 root.warn("semaphore released");
             } catch (InterruptedException e) {
-                throw new RuntimeException(e);
+                throw new IllegalStateException("Interrupted while waiting for semaphore", e);
             }
 
             timeoutGuardFuture.cancel(false);
-            params.deviceCount += deviceStep;
+            params.deviceCount += DEVICE_STEP;
             netconfDeviceSimulator.close();
-            stopwatch.reset();
+            STOPWATCH.reset();
 
             cleanup(runtime, params);
         }
@@ -137,167 +143,195 @@ public class ScaleUtil {
 
         root = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
         root.setLevel(params.debug ? Level.DEBUG : Level.INFO);
-        RESULTS_LOG = LoggerFactory.getLogger("results");
+        resultsLog = LoggerFactory.getLogger("results");
     }
 
     private static void cleanup(final Runtime runtime, final TesttoolParameters params) {
         try {
             stopKaraf(runtime, params);
             deleteFolder(new File(params.distroFolder.getAbsoluteFile() + "/data"));
-
         } catch (IOException | InterruptedException e) {
             root.warn("Failed to stop karaf", e);
             System.exit(1);
         }
     }
 
-    private static void stopKaraf(final Runtime runtime, final TesttoolParameters params) throws IOException, InterruptedException {
+    private static void stopKaraf(final Runtime runtime, final TesttoolParameters params)
+            throws IOException, InterruptedException {
         root.info("Stopping karaf and sleeping for 10 sec..");
         String controllerPid = "";
         do {
-
             final Process pgrep = runtime.exec("pgrep -f org.apache.karaf.main.Main");
 
             controllerPid = CharStreams.toString(new BufferedReader(new InputStreamReader(pgrep.getInputStream())));
             root.warn(controllerPid);
             runtime.exec("kill -9 " + controllerPid);
 
-            Thread.sleep(10000l);
+            Thread.sleep(10000L);
         } while (!controllerPid.isEmpty());
-        deleteFolder(new File(params.distroFolder.getAbsoluteFile() + "/data"));
     }
 
-    private static void deleteFolder(File folder) {
+    private static void deleteFolder(final File folder) {
         File[] files = folder.listFiles();
-        if(files!=null) { //some JVMs return null for empty dirs
-            for(File f: files) {
-                if(f.isDirectory()) {
+        if (files != null) { //some JVMs return null for empty dirs
+            for (File f : files) {
+                if (f.isDirectory()) {
                     deleteFolder(f);
-                } else {
-                    f.delete();
+                } else if (!f.delete()) {
+                    root.warn("Failed to delete {}", f);
                 }
             }
         }
-        folder.delete();
+        if (!folder.delete()) {
+            root.warn("Failed to delete {}", folder);
+        }
     }
 
-    private static TesttoolParameters parseArgs(final String[] args, final ArgumentParser parser) {
-        final TesttoolParameters parameters = new TesttoolParameters();
-        try {
-            parser.parseArgs(args, parameters);
-            return parameters;
-        } catch (ArgumentParserException e) {
-            parser.handleError(e);
+    private static void waitNetconfTopologyReady(final TesttoolParameters params) {
+        root.info("Wait for Netconf topology to be accessible via Restconf");
+        HttpResponse<String> response = requestNetconfTopology(params);
+        while (response == null || response.statusCode() != 200 && response.statusCode() != 204) {
+            if (response == null) {
+                root.warn("Failed to get response from controller, going to sleep...");
+            } else {
+                root.warn("Received status code {}, going to sleep...", response.statusCode());
+            }
+            try {
+                Thread.sleep(1000L);
+            } catch (InterruptedException e) {
+                throw new IllegalStateException("Sleep interrupted", e);
+            }
+            response = requestNetconfTopology(params);
         }
-
-        System.exit(1);
-        return null;
+        root.info("Returned status code {}, Netconf topology is accessible", response.statusCode());
     }
 
-    private static class ScaleVerifyCallable implements Callable {
+    private static HttpResponse<String> requestNetconfTopology(final TesttoolParameters params) {
+        final HttpClient httpClient = HttpClient.newBuilder()
+                .connectTimeout(Duration.ofSeconds(Integer.MAX_VALUE))
+                .authenticator(new Authenticator() {
+                    @Override
+                    protected PasswordAuthentication getPasswordAuthentication() {
+                        return new PasswordAuthentication(params.controllerAuthUsername,
+                                params.controllerAuthPassword.toCharArray());
+                    }
+                })
+                .build();
+        final HttpRequest request = HttpRequest.newBuilder(URI.create(String.format(RESTCONF_URL, params.controllerIp,
+                        params.controllerPort)))
+                .GET()
+                .header("Content-Type", "application/json")
+                .header("Accept", "application/json")
+                .build();
+        try {
+            return httpClient.send(request, HttpResponse.BodyHandlers.ofString());
+        } catch (IOException e) {
+            root.warn(e.getMessage());
+            return null;
+        } catch (InterruptedException e) {
+            throw new IllegalStateException("Interrupted while waiting for response", e);
+        }
+    }
 
+    private static class ScaleVerifyCallable implements Callable<Void> {
         private static final Logger LOG = LoggerFactory.getLogger(ScaleVerifyCallable.class);
-
-        private static final String RESTCONF_URL = "http://127.0.0.1:8181/restconf/operational/network-topology:network-topology/topology/topology-netconf/";
         private static final Pattern PATTERN = Pattern.compile("connected");
 
-        private final AsyncHttpClient asyncHttpClient = new AsyncHttpClient(new Builder()
-                .setConnectTimeout(Integer.MAX_VALUE)
-                .setRequestTimeout(Integer.MAX_VALUE)
-                .setAllowPoolingConnections(true)
-                .build());
-        private final NetconfDeviceSimulator simulator;
+        private final HttpClient httpClient;
+        private final HttpRequest request;
+
         private final int deviceCount;
-        private final Request request;
-
-        public ScaleVerifyCallable(final NetconfDeviceSimulator simulator, final int deviceCount) {
-            LOG.info("New callable created");
-            this.simulator = simulator;
-            this.deviceCount = deviceCount;
-            AsyncHttpClient.BoundRequestBuilder requestBuilder = asyncHttpClient.prepareGet(RESTCONF_URL)
-                    .addHeader("content-type", "application/xml")
-                    .addHeader("Accept", "application/xml")
-                    .setRequestTimeout(Integer.MAX_VALUE);
-            request = requestBuilder.build();
+
+        ScaleVerifyCallable(final TesttoolParameters params) {
+            deviceCount = params.deviceCount;
+            httpClient = HttpClient.newBuilder()
+                    .authenticator(new Authenticator() {
+                        @Override
+                        protected PasswordAuthentication getPasswordAuthentication() {
+                            return new PasswordAuthentication(params.controllerAuthUsername,
+                                params.controllerAuthPassword.toCharArray());
+                        }
+                    })
+                    .build();
+            request = HttpRequest.newBuilder(URI.create(String.format(RESTCONF_URL, params.controllerIp,
+                            params.controllerPort)))
+                    .GET()
+                    .header("Content-Type", "application/xml")
+                    .header("Accept", "application/xml")
+                    .build();
         }
 
         @Override
-        public Object call() throws Exception {
+        public Void call() throws Exception {
+            LOG.info("Checking number of connected devices.");
             try {
-                final Response response = asyncHttpClient.executeRequest(request).get();
-
-                if (response.getStatusCode() != 200 && response.getStatusCode() != 204) {
-                    LOG.warn("Request failed, status code: {}", response.getStatusCode() + response.getStatusText());
-                    executor.schedule(new ScaleVerifyCallable(simulator, deviceCount), retryDelay, TimeUnit.SECONDS);
+                final HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
+                if (response.statusCode() != 200 && response.statusCode() != 204) {
+                    LOG.warn("Request failed, status code: {}", response.statusCode());
+                    EXECUTOR.schedule(this, RETRY_DELAY, TimeUnit.SECONDS);
                 } else {
-                    final String body = response.getResponseBody();
+                    final String body = response.body();
                     final Matcher matcher = PATTERN.matcher(body);
                     int count = 0;
                     while (matcher.find()) {
                         count++;
                     }
-                    RESULTS_LOG.info("Currently connected devices : {} out of {}, time elapsed: {}", count, deviceCount + 1, stopwatch);
-                    if (count != deviceCount + 1) {
-                        executor.schedule(new ScaleVerifyCallable(simulator, deviceCount), retryDelay, TimeUnit.SECONDS);
+                    resultsLog.info("Currently connected devices : {} out of {}, time elapsed: {}",
+                        count, deviceCount, STOPWATCH);
+                    if (count != deviceCount) {
+                        EXECUTOR.schedule(this, RETRY_DELAY, TimeUnit.SECONDS);
                     } else {
-                        stopwatch.stop();
-                        RESULTS_LOG.info("All devices connected in {}", stopwatch);
-                        semaphore.release();
+                        STOPWATCH.stop();
+                        resultsLog.info("All {} of {} devices connected in {}", count, deviceCount, STOPWATCH);
+                        SEMAPHORE.release();
                     }
                 }
-            } catch (ConnectException | ExecutionException e) {
+            } catch (ConnectException e) {
                 LOG.warn("Failed to connect to Restconf, is the controller running?", e);
-                executor.schedule(new ScaleVerifyCallable(simulator, deviceCount), retryDelay, TimeUnit.SECONDS);
+                EXECUTOR.schedule(this, RETRY_DELAY, TimeUnit.SECONDS);
             }
             return null;
         }
     }
 
-    private static class TimeoutGuard implements Callable {
-
+    private static class TimeoutGuard implements Callable<Void> {
         @Override
-        public Object call() throws Exception {
-            RESULTS_LOG.warn("Timeout for scale test reached after: {} ..aborting", stopwatch);
-            root.warn("Timeout for scale test reached after: {} ..aborting", stopwatch);
+        public Void call() {
+            resultsLog.warn("Timeout for scale test reached after: {} ..aborting", STOPWATCH);
+            root.warn("Timeout for scale test reached after: {} ..aborting", STOPWATCH);
             System.exit(0);
             return null;
         }
     }
 
     public static class LoggingWrapperExecutor extends ScheduledThreadPoolExecutor {
-
-        public LoggingWrapperExecutor(int corePoolSize) {
+        public LoggingWrapperExecutor(final int corePoolSize) {
             super(corePoolSize);
         }
 
         @Override
-        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
-            return super.schedule(wrapCallable(callable), delay, unit);
-        }
-
-        private Callable wrapCallable(Callable callable) {
-            return new LogOnExceptionCallable(callable);
+        public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) {
+            return super.schedule(new LogOnExceptionCallable<>(callable), delay, unit);
         }
 
-        private class LogOnExceptionCallable implements Callable {
-            private Callable theCallable;
+        private static class LogOnExceptionCallable<T> implements Callable<T> {
+            private final Callable<T> theCallable;
 
-            public LogOnExceptionCallable(Callable theCallable) {
-                super();
+            LogOnExceptionCallable(final Callable<T> theCallable) {
                 this.theCallable = theCallable;
             }
 
             @Override
-            public Object call() throws Exception {
+            @SuppressWarnings("checkstyle:illegalCatch")
+            public T call() {
                 try {
-                    theCallable.call();
-                    return null;
+                    return theCallable.call();
                 } catch (Exception e) {
                     // log
                     root.warn("error in executing: " + theCallable + ". It will no longer be run!", e);
-
+                    Throwables.throwIfUnchecked(e);
                     // rethrow so that the executor can do it's thing
-                    throw new RuntimeException(e);
+                    throw new IllegalStateException(e);
                 }
             }
         }