<module>sal-rest-connector-config</module>
<module>sal-rest-docgen</module>
<module>sal-rest-docgen-maven</module>
+ <module>websocket-client</module>
<!--
FIXME: Needs to be reimplemented to not use YANGTools restconf impl
<module>sal-restconf-broker</module>
<artifactId>sal-restconf-broker</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>websocket-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- restconf features -->
<dependency>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright © 2019 FRINX s.r.o. All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.opendaylight.netconf</groupId>
+ <artifactId>restconf-parent</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <relativePath>../restconf-parent</relativePath>
+ </parent>
+
+ <artifactId>websocket-client</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>websocket-client</artifactId>
+ <!-- FIXME: remove this with odlparent-5.0.1+ -->
+ <version>9.4.12.v20180830</version>
+ </dependency>
+ <dependency>
+ <groupId>net.sourceforge.argparse4j</groupId>
+ <artifactId>argparse4j</artifactId>
+ <version>0.8.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>threadpool-config-impl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.opendaylight.restconf.websocket.client.StartApplication</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <finalName>${project.artifactId}-${project.version}-executable</finalName>
+ <appendAssemblyId>false</appendAssemblyId>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
--- /dev/null
+/*
+ * Copyright © 2019 FRINX s.r.o. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.restconf.websocket.client;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.annotation.Arg;
+import net.sourceforge.argparse4j.helper.HelpScreenException;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holder of the parsed user-input application arguments.
+ */
+final class ApplicationSettings {
+
+ /**
+ * Credentials used for basic authentication - grouping of username and password.
+ */
+ static final class Credentials {
+ final String userName;
+ final String password;
+
+ private Credentials(final String userName, final String password) {
+ this.userName = userName;
+ this.password = password;
+ }
+
+ private static Credentials extractCredentials(final String basicAuthentication) {
+ final String[] credentials = basicAuthentication.split(":");
+ Preconditions.checkArgument(credentials.length == 2, "Both username and password must be specified in the "
+ + "format [username]:[password] for basic authentication.");
+ final String userName = credentials[0].trim();
+ final String password = credentials[1].trim();
+ return new Credentials(userName, password);
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(ApplicationSettings.class);
+ private static final ArgumentParser PARSER = ArgumentParsers.newFor("web-socket client test-tool").build();
+
+ static {
+ PARSER.addArgument("-l")
+ .dest("loggingLevel")
+ .required(false)
+ .setDefault("INFO")
+ .metavar("LEVEL")
+ .type(String.class)
+ .help("Logging level threshold used throughout the whole web-socket client.");
+ PARSER.addArgument("-s")
+ .dest("streams")
+ .required(true)
+ .nargs("+")
+ .help("Web-socket stream paths with ws or wss schemas.")
+ .metavar("STREAM")
+ .type(String.class);
+ PARSER.addArgument("-pi")
+ .dest("pingInterval")
+ .help("Interval in milliseconds between sending of ping web-socket frames to server. "
+ + "Value of 0 disables ping process.")
+ .metavar("INTERVAL")
+ .setDefault(0)
+ .type(Integer.class);
+ PARSER.addArgument("-pm")
+ .dest("pingMessage")
+ .help("Explicitly set ping message.")
+ .metavar("MESSAGE")
+ .setDefault("ping")
+ .type(String.class);
+ PARSER.addArgument("-t")
+ .dest("threads")
+ .help("Explicitly set size of thread-pool used for holding of web-socket handlers and ping processes.")
+ .metavar("SIZE")
+ .setDefault(8)
+ .type(Integer.class);
+ PARSER.addArgument("-r")
+ .dest("regeneration")
+ .help("Allowed TLS/SSL session regeneration.")
+ .metavar("ALLOWED")
+ .setDefault(false)
+ .type(Boolean.class);
+ PARSER.addArgument("-kpath")
+ .dest("keystorePath")
+ .help("Path to the certificates key-store file.")
+ .metavar("PATH")
+ .type(File.class);
+ PARSER.addArgument("-kpass")
+ .dest("keystorePassword")
+ .help("Password used for unlocking of the certificates keystore.")
+ .metavar("SECRET")
+ .type(String.class);
+ PARSER.addArgument("-tpath")
+ .dest("truststorePath")
+ .help("Path to the certificates trust-store file.")
+ .metavar("PATH")
+ .type(File.class);
+ PARSER.addArgument("-tpass")
+ .dest("truststorePassword")
+ .help("Password used for unlocking of the certificates truststore.")
+ .metavar("SECRET")
+ .type(String.class);
+ PARSER.addArgument("-ta")
+ .dest("trustAll")
+ .help("All incoming certificates are trusted when both truststore and keystore are not specified.")
+ .metavar("TRUST")
+ .setDefault(false)
+ .type(Boolean.class);
+ PARSER.addArgument("-ip")
+ .dest("includedProtocols")
+ .nargs("+")
+ .help("Explicitly specified list of permitted versions of web-security protocols.")
+ .metavar("PROTOCOL")
+ .setDefault("TLSv1.2", "TLSv1.3")
+ .type(String.class);
+ PARSER.addArgument("-ep")
+ .dest("excludedProtocols")
+ .nargs("*")
+ .help("Explicitly specified list of denied versions of web-security protocols (denied protocols have "
+ + "the highest priority).")
+ .metavar("PROTOCOL")
+ .setDefault("TLSv1", "TLSv1.1", "SSL", "SSLv2", "SSLv2Hello", "SSLv3")
+ .type(String.class);
+ PARSER.addArgument("-ic")
+ .dest("includedCipherSuites")
+ .nargs("+")
+ .help("Explicitly specified list of permitted cipher suites.")
+ .metavar("CIPHER")
+ .setDefault("TLS_ECDHE.*", "TLS_DHE_RSA.*")
+ .type(String.class);
+ PARSER.addArgument("-ec")
+ .dest("excludedCipherSuites")
+ .nargs("*")
+ .help("Explicitly specified list of denied cipher suites (denied ciphers have the highest priority).")
+ .metavar("CIPHER")
+ .setDefault(".*MD5.*", ".*RC4.*", ".*DSS.*", ".*NULL.*", ".*DES.*")
+ .type(String.class);
+ PARSER.addArgument("-b")
+ .dest("basicAuthentication")
+ .help("[username:password] used with basic authentication that can be required on upgrade-request.")
+ .metavar("[USERNAME]:[PASSWORD]")
+ .type(String.class);
+ }
+
+ @Arg(dest = "loggingLevel")
+ private String loggingLevel;
+ @Arg(dest = "streams")
+ private List<String> streams;
+ @Arg(dest = "pingInterval")
+ private int pingInterval;
+ @Arg(dest = "pingMessage")
+ private String pingMessage;
+ @Arg(dest = "threads")
+ private int threadPoolSize;
+ @Arg(dest = "regeneration")
+ private boolean regenerationAllowed;
+ @Arg(dest = "keystorePath")
+ private File keystorePath;
+ @Arg(dest = "keystorePassword")
+ private String keystorePassword;
+ @Arg(dest = "truststorePath")
+ private File truststorePath;
+ @Arg(dest = "truststorePassword")
+ private String truststorePassword;
+ @Arg(dest = "trustAll")
+ private boolean trustAll;
+ @Arg(dest = "includedProtocols")
+ private List<String> includedProtocols;
+ @Arg(dest = "excludedProtocols")
+ private List<String> excludedProtocols;
+ @Arg(dest = "includedCipherSuites")
+ private List<String> includedCipherSuites;
+ @Arg(dest = "excludedCipherSuites")
+ private List<String> excludedCipherSuites;
+ @Arg(dest = "basicAuthentication")
+ private String basicAuthentication;
+
+ private Credentials credentials;
+
+ private ApplicationSettings() {
+ }
+
+ /**
+ * Creation of application settings object using input command-line arguments (factory method).
+ *
+ * @param arguments Raw program arguments.
+ * @return Parsed arguments wrapped in {@link Optional} or {@link Optional#empty()} if only help is going
+ * to be invoked.
+ */
+ static Optional<ApplicationSettings> parseApplicationSettings(final String[] arguments) {
+ final ApplicationSettings applicationSettings = new ApplicationSettings();
+ try {
+ PARSER.parseArgs(arguments, applicationSettings);
+ applicationSettings.verifyParsedArguments();
+ if (applicationSettings.basicAuthentication == null) {
+ applicationSettings.credentials = null;
+ } else {
+ applicationSettings.credentials = Credentials.extractCredentials(
+ applicationSettings.basicAuthentication);
+ }
+ } catch (final ArgumentParserException | IllegalArgumentException e) {
+ if (e instanceof HelpScreenException) {
+ return Optional.empty();
+ } else {
+ final StringWriter helpWriter = new StringWriter();
+ final PrintWriter helpPrintWriter = new PrintWriter(helpWriter);
+ PARSER.printHelp(helpPrintWriter);
+ LOG.error("Cannot parse input arguments {}.", arguments, e);
+ LOG.info("Help: {}", helpWriter.toString());
+ throw new IllegalArgumentException("Cannot parse input arguments", e);
+ }
+ }
+ LOG.info("Application settings {} have been parsed successfully.", (Object) arguments);
+ return Optional.of(applicationSettings);
+ }
+
+ private void verifyParsedArguments() {
+ Preconditions.checkArgument(pingInterval >= 0, "Ping interval must be set to value higher than 0 (enabled) or "
+ + "to 0 (disabled).");
+ Preconditions.checkArgument(threadPoolSize > 0, "Thread pool must have capacity of at least 1 thread.");
+ Preconditions.checkArgument((keystorePath == null && keystorePassword == null) || (keystorePath != null
+ && keystorePassword != null), "Both keystore path and keystore password must be configured at once.");
+ Preconditions.checkArgument((truststorePath == null && truststorePassword == null) || (truststorePath != null
+ && truststorePassword != null), "Both truststore path and truststore password must be configured");
+ }
+
+ String getLoggingLevel() {
+ return loggingLevel;
+ }
+
+ List<String> getStreams() {
+ return new ArrayList<>(streams);
+ }
+
+ int getPingInterval() {
+ return pingInterval;
+ }
+
+ String getPingMessage() {
+ return pingMessage;
+ }
+
+ File getKeystorePath() {
+ return keystorePath;
+ }
+
+ String getKeystorePassword() {
+ return keystorePassword;
+ }
+
+ File getTruststorePath() {
+ return truststorePath;
+ }
+
+ String getTruststorePassword() {
+ return truststorePassword;
+ }
+
+ List<String> getIncludedProtocols() {
+ return new ArrayList<>(includedProtocols);
+ }
+
+ List<String> getExcludedProtocols() {
+ return new ArrayList<>(excludedProtocols);
+ }
+
+ List<String> getIncludedCipherSuites() {
+ return new ArrayList<>(includedCipherSuites);
+ }
+
+ List<String> getExcludedCipherSuites() {
+ return new ArrayList<>(excludedCipherSuites);
+ }
+
+ boolean isTrustAll() {
+ return trustAll;
+ }
+
+ boolean isRegenerationAllowed() {
+ return regenerationAllowed;
+ }
+
+ int getThreadPoolSize() {
+ return threadPoolSize;
+ }
+
+ Credentials getCredentials() {
+ return credentials;
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright © 2019 FRINX s.r.o. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.restconf.websocket.client;
+
+import com.google.common.collect.Sets;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.stream.Collectors;
+import org.apache.log4j.BasicConfigurator;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.AuthenticationStore;
+import org.eclipse.jetty.client.util.BasicAuthentication.BasicResult;
+import org.eclipse.jetty.util.resource.Resource;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.opendaylight.controller.config.threadpool.util.NamingThreadPoolFactory;
+import org.opendaylight.controller.config.threadpool.util.ScheduledThreadPoolWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Application starting point which is responsible for reading of input application arguments and creation of right
+ * client handlers according to the used scheme.
+ */
+public final class StartApplication {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StartApplication.class);
+
+ private static final String WS_SCHEME = "ws";
+ private static final String WSS_SCHEME = "wss";
+ private static final String THREAD_POOL_NAME = "websockets";
+
+ private StartApplication() {
+ }
+
+ public static void main(String[] args) {
+ BasicConfigurator.configure();
+ final Optional<ApplicationSettings> applicationSettings = ApplicationSettings.parseApplicationSettings(args);
+ if (applicationSettings.isPresent()) {
+ setLoggingLevel(applicationSettings.get().getLoggingLevel());
+ final SslContextFactory sslContextFactory = getSslContextFactory(applicationSettings.get());
+ final ThreadFactory threadFactory = new NamingThreadPoolFactory(THREAD_POOL_NAME);
+ final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolWrapper(
+ applicationSettings.get().getThreadPoolSize(), threadFactory).getExecutor();
+ final List<WebSocketClientHandler> clientHandlers = applicationSettings.get().getStreams().stream()
+ .map(streamName -> getWebSocketClientHandler(applicationSettings.get(), sslContextFactory,
+ scheduledExecutorService, streamName))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toList());
+ printHandledStreamsOverview(applicationSettings.get().getStreams(), clientHandlers);
+ startAndLockOnClientHandlers(scheduledExecutorService, clientHandlers);
+ }
+ System.exit(0);
+ }
+
+ private static void setLoggingLevel(final String loggingLevel) {
+ org.apache.log4j.Logger logger4j = org.apache.log4j.Logger.getRootLogger();
+ logger4j.setLevel(org.apache.log4j.Level.toLevel(loggingLevel.toUpperCase(Locale.getDefault())));
+ }
+
+ /**
+ * Starts threads of all successfully created web-socket client handlers and waits until they finishes their work
+ * (regular closing of the web-socket session or unexpected exception).
+ *
+ * @param scheduledExecutorService Executor for the web-socket client threads.
+ * @param clientHandlers List of created web-socket client handlers.
+ */
+ private static void startAndLockOnClientHandlers(final ScheduledExecutorService scheduledExecutorService,
+ final List<WebSocketClientHandler> clientHandlers) {
+ if (clientHandlers.isEmpty()) {
+ LOG.info("There aren't any usable web-socket client handlers - shutting down of the application.");
+ } else {
+ final List<? extends Future<?>> startedThreads = clientHandlers.stream()
+ .map(scheduledExecutorService::submit)
+ .collect(Collectors.toList());
+ LOG.info("Threads for all successfully created web-socket clients have been started.");
+ for (final Future<?> future : startedThreads) {
+ try {
+ future.get();
+ } catch (final ExecutionException | InterruptedException e) {
+ LOG.warn("One of the web-socket handlers ends with unexpected exception.", e);
+ }
+ }
+ LOG.info("All web-socket client threads have been closed - shutting down of the application.");
+ }
+ }
+
+ /**
+ * Prints information about successfully and unsuccessfully handled streams.
+ *
+ * @param streams Input URIs of web-socket streams that were tended to be handled.
+ * @param clientHandlers Successfully crafted web-socket client handlers.
+ */
+ private static void printHandledStreamsOverview(final List<String> streams,
+ final List<WebSocketClientHandler> clientHandlers) {
+ final Set<String> successfullyHandledStreams = clientHandlers.stream()
+ .map(WebSocketClientHandler::getUri)
+ .collect(Collectors.toSet());
+ final Set<String> unsuccessfullyHandledStreams = Sets.difference(new HashSet<>(streams),
+ successfullyHandledStreams);
+ if (!successfullyHandledStreams.isEmpty()) {
+ LOG.info("Successfully created stream handlers ({}): {}.", successfullyHandledStreams.size(),
+ successfullyHandledStreams);
+ }
+ if (!unsuccessfullyHandledStreams.isEmpty()) {
+ LOG.warn("Unsuccessfully handled streams ({}): {}.", unsuccessfullyHandledStreams.size(),
+ unsuccessfullyHandledStreams);
+ }
+ }
+
+ /**
+ * Deriving of web-socket client handler using URI schema (currently WS and WSS are supported).
+ *
+ * @param applicationSettings Application settings.
+ * @param sslContextFactory SSL context factory that us utilized if the WSS schema is used.
+ * @param scheduledExecutorService Executor for launching of ping processes if it is necessary.
+ * @param streamName Input URI that holds stream name and starts with specified WS or WSS schema.
+ * @return Instance of {@link WebSocketClientHandler} wrapped in {@link Optional} or {@link Optional#empty()} if
+ * the web-socket client handler cannot be created.
+ */
+ private static Optional<WebSocketClientHandler> getWebSocketClientHandler(
+ final ApplicationSettings applicationSettings, final SslContextFactory sslContextFactory,
+ final ScheduledExecutorService scheduledExecutorService, final String streamName) {
+ try {
+ final URI uri = URI.create(streamName);
+ if (uri.getScheme() == null) {
+ LOG.warn("Schema seems to be undefined in input URI {}. The web-socket client cannot be created "
+ + "for this stream.", uri);
+ } else {
+ HttpClient httpClient;
+ switch (uri.getScheme().toLowerCase(Locale.getDefault())) {
+ case WS_SCHEME:
+ httpClient = new HttpClient();
+ break;
+ case WSS_SCHEME:
+ httpClient = new HttpClient(sslContextFactory);
+ break;
+ default:
+ LOG.warn("Unknown schema {} in input URI {}. The web-socket client cannot be created "
+ + "for this stream.", uri.getScheme(), uri);
+ return Optional.empty();
+ }
+ httpClient.setExecutor(scheduledExecutorService);
+ if (applicationSettings.getCredentials() != null) {
+ final ApplicationSettings.Credentials credentials = applicationSettings.getCredentials();
+ final AuthenticationStore authenticationStore = httpClient.getAuthenticationStore();
+ authenticationStore.addAuthenticationResult(new BasicResult(uri, credentials.userName,
+ credentials.password));
+ }
+ return Optional.of(new WebSocketClientHandler(uri, applicationSettings.getPingInterval(),
+ applicationSettings.getPingMessage(), scheduledExecutorService, httpClient));
+ }
+ } catch (final IllegalArgumentException e) {
+ LOG.warn("Stream {} cannot be parsed to URI. The web-socket client won't be created.", streamName);
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Building of {@link org.eclipse.jetty.util.ssl.SslContextFactory} using input settings.
+ *
+ * @param applicationSettings User-defined settings.
+ * @return Instance of {@link org.eclipse.jetty.util.ssl.SslContextFactory}.
+ */
+ private static SslContextFactory getSslContextFactory(final ApplicationSettings applicationSettings) {
+ final SslContextFactory sslContextFactory = new SslContextFactory();
+ if (applicationSettings.getKeystorePath() != null) {
+ final Resource keyStoreResource = Resource.newResource(applicationSettings.getKeystorePath());
+ sslContextFactory.setKeyStoreResource(keyStoreResource);
+ sslContextFactory.setKeyStorePassword(applicationSettings.getKeystorePassword());
+ }
+ if (applicationSettings.getTruststorePath() != null) {
+ final Resource truststoreResource = Resource.newResource(applicationSettings.getTruststorePath());
+ sslContextFactory.setTrustStoreResource(truststoreResource);
+ sslContextFactory.setTrustStorePassword(applicationSettings.getTruststorePassword());
+ }
+ sslContextFactory.setTrustAll(applicationSettings.isTrustAll());
+ sslContextFactory.setExcludeProtocols(applicationSettings.getExcludedProtocols().toArray(new String[0]));
+ sslContextFactory.setExcludeCipherSuites(applicationSettings.getExcludedCipherSuites().toArray(new String[0]));
+ sslContextFactory.setIncludeCipherSuites(applicationSettings.getIncludedCipherSuites().toArray(new String[0]));
+ sslContextFactory.setIncludeProtocols(applicationSettings.getIncludedProtocols().toArray(new String[0]));
+ sslContextFactory.setRenegotiationAllowed(applicationSettings.isRegenerationAllowed());
+ return sslContextFactory;
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright © 2019 FRINX s.r.o. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.restconf.websocket.client;
+
+import com.google.common.base.MoreObjects;
+import java.net.URI;
+import java.util.concurrent.ScheduledExecutorService;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Web socket client handler that is responsible for starting of the web-socket session thread and waiting until
+ * the session dies.
+ */
+class WebSocketClientHandler implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WebSocketClientHandler.class);
+
+ private final URI uri;
+ private final int pingInterval;
+ private final String pingMessage;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final HttpClient httpClient;
+
+ /**
+ * Creation of the web-socket client handler.
+ *
+ * @param uri Full stream URI including schema.
+ * @param pingInterval Interval ath which the ping messages should be sent to remote server.
+ * @param pingMessage Text of the ping message.
+ * @param scheduledExecutorService Ping service executor.
+ * @param httpClient HTTP client with possibly configured authentication settings.
+ */
+ WebSocketClientHandler(final URI uri, final int pingInterval, final String pingMessage,
+ final ScheduledExecutorService scheduledExecutorService, final HttpClient httpClient) {
+ this.uri = uri;
+ this.pingInterval = pingInterval;
+ this.pingMessage = pingMessage;
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.httpClient = httpClient;
+ }
+
+ /**
+ * Starting of the web-socket client handler by listening to the initialised web-socket session. Then, thread
+ * is blocked until web-socket session will be closed gracefully or on error.
+ */
+ @Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void run() {
+ final WebSocketClient webSocketClient = new WebSocketClient(httpClient);
+ WebSocketSessionHandler webSocketSessionHandler;
+ if (pingInterval == 0) {
+ webSocketSessionHandler = new WebSocketSessionHandler();
+ } else {
+ webSocketSessionHandler = new WebSocketPingSessionHandler(scheduledExecutorService, pingMessage,
+ pingInterval);
+ }
+ try {
+ httpClient.start();
+ LOG.info("Starting of the web-socket client {}.", this);
+ webSocketClient.start();
+ final ClientUpgradeRequest request = new ClientUpgradeRequest();
+ webSocketClient.connect(webSocketSessionHandler, uri, request);
+ LOG.info("Web-socket client {} has been started successfully.", this);
+ webSocketSessionHandler.awaitClose();
+ } catch (final Exception e) {
+ LOG.error("Cannot start web-socket client {}.", this, e);
+ } finally {
+ try {
+ webSocketClient.stop();
+ httpClient.stop();
+ LOG.info("Web-socket client {} has been stopped.", this);
+ } catch (final Exception e) {
+ LOG.error("Cannot stop web-socket client {}.", this, e);
+ }
+ }
+ }
+
+ String getUri() {
+ return uri.toString();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("uri", uri).toString();
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright © 2019 FRINX s.r.o. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.restconf.websocket.client;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Web-socket session handler that is responsible for handling of incoming web-socket session events, frames and
+ * messages, and for periodical sending of ping frames to the remote endpoint of the web-socket session.
+ *
+ * @see WebSocket more information about Jetty's web-socket implementation
+ */
+@WebSocket
+class WebSocketPingSessionHandler extends WebSocketSessionHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WebSocketPingSessionHandler.class);
+
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final String pingMessage;
+ private final int pingInterval;
+
+ private ScheduledFuture<?> pingProcess;
+
+ /**
+ * Creation of the web-socket session handler using ping settings.
+ *
+ * @param scheduledExecutorService Ping service executor.
+ * @param pingMessage Text of the ping message.
+ * @param pingInterval Interval ath which the ping messages should be sent to remote server.
+ */
+ WebSocketPingSessionHandler(final ScheduledExecutorService scheduledExecutorService, final String pingMessage,
+ final int pingInterval) {
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.pingMessage = pingMessage;
+ this.pingInterval = pingInterval;
+ }
+
+ /**
+ * Handling of the initialized web-socket session. Created web-socket session is saved and the periodical ping
+ * process is executed.
+ *
+ * @param session Just created web-socket session.
+ * @see OnWebSocketConnect more information about this event
+ */
+ @Override
+ public synchronized void onWebSocketConnected(final Session session) {
+ super.onWebSocketConnected(session);
+ if (session != null) {
+ if (pingInterval != 0) {
+ startPingProcess();
+ }
+ }
+ }
+
+ /**
+ * Handling of the closed web-socket session. Related log messages are generated, the session latch is unreleased,
+ * and the ping process is stopped.
+ *
+ * @param statusCode Status code of the closed session.
+ * @param reason Reason why the web-socket session has been closed.
+ * @see OnWebSocketClose more information about this event
+ */
+ @Override
+ public synchronized void onWebSocketClosed(final int statusCode, final String reason) {
+ super.onWebSocketClosed(statusCode, reason);
+ if (pingInterval != 0) {
+ stopPingProcess();
+ }
+ }
+
+ private void startPingProcess() {
+ if (pingProcess == null || pingProcess.isDone() || pingProcess.isCancelled()) {
+ pingProcess = Objects.requireNonNull(scheduledExecutorService).scheduleWithFixedDelay(
+ () -> sendPingMessage(pingMessage), pingInterval, pingInterval, TimeUnit.MILLISECONDS);
+ LOG.info("{}: PING process has been started with setup delay {}.", getUri(), pingInterval);
+ } else {
+ LOG.warn("{}: PING process cannot be started because the previous process hasn't been stopped yet.",
+ getUri());
+ }
+ }
+
+ private void stopPingProcess() {
+ if (pingProcess != null && !pingProcess.isCancelled() && !pingProcess.isDone()) {
+ pingProcess.cancel(true);
+ if (webSocketSession != null) {
+ LOG.info("{}: PING process has been cancelled.", getUri());
+ } else {
+ LOG.warn("PING process of non-initialized session has been cancelled.");
+ }
+ } else {
+ if (webSocketSession != null) {
+ LOG.warn("{}: PING process han't been started - it doesn't have to cancelled.", getUri());
+ } else {
+ LOG.warn("PING process of non-initialized session han't been started - it doesn't have to cancelled.");
+ }
+ }
+ }
+
+ private synchronized void sendPingMessage(final String message) {
+ if (webSocketSession != null) {
+ if (webSocketSession.isOpen()) {
+ try {
+ webSocketSession.getRemote().sendPing(ByteBuffer.wrap(message.getBytes(Charset.defaultCharset())));
+ LOG.info("{}: Sent PING message to remote endpoint with body:\n{}", getUri(), message);
+ } catch (final IOException e) {
+ LOG.error("{}: Cannot send PING frame with message {} to remote endpoint.", getUri(), message, e);
+ }
+ } else {
+ LOG.warn("{}: PING frame with message {} cannot be sent to remote endpoint - "
+ + "web-socket session is closed.", getUri(), message);
+ }
+ } else {
+ LOG.warn("PING frame with message {} cannot be sent to remote endpoint - "
+ + "web-socket session hasn't been initialised yet", message);
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright © 2019 FRINX s.r.o. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.restconf.websocket.client;
+
+import java.util.concurrent.CountDownLatch;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.api.extensions.Frame;
+import org.eclipse.jetty.websocket.common.frames.PingFrame;
+import org.eclipse.jetty.websocket.common.frames.PongFrame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Web-socket session handler that is responsible for handling of incoming web-socket session events, frames
+ * and messages.
+ *
+ * @see WebSocket more information about Jetty's web-socket implementation
+ */
+@WebSocket
+public class WebSocketSessionHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WebSocketSessionHandler.class);
+
+ private final CountDownLatch sessionCloseLatch = new CountDownLatch(1);
+ Session webSocketSession;
+
+ /**
+ * Handling of the initialized web-socket session. Created web-socket session is saved.
+ *
+ * @param session Just created web-socket session.
+ * @see OnWebSocketConnect more information about this event
+ */
+ @OnWebSocketConnect
+ public synchronized void onWebSocketConnected(final Session session) {
+ if (session != null) {
+ webSocketSession = session;
+ LOG.info("Web-socket session has been initialized: {}", session);
+ } else {
+ LOG.warn("Created web-socket session is null.");
+ }
+ }
+
+ /**
+ * Handling of the closed web-socket session. Related log messages are generated and the session latch
+ * is unreleased.
+ *
+ * @param statusCode Status code of the closed session.
+ * @param reason Reason why the web-socket session has been closed.
+ * @see OnWebSocketClose more information about this event
+ */
+ @OnWebSocketClose
+ public synchronized void onWebSocketClosed(final int statusCode, final String reason) {
+ if (webSocketSession != null) {
+ LOG.info("{}: Web-socket session has been closed with status code {} and reason: {}.", getUri(),
+ statusCode, reason);
+ sessionCloseLatch.countDown();
+ } else {
+ LOG.warn("Trying to close web-socket session which initialization phase hasn't been registered yet "
+ + "with status code {} and reason: {}.", statusCode, reason);
+ }
+ }
+
+ /**
+ * Handling of the error that occurred on the web-socket. Error is logged but the web-socket is not explicitly
+ * closed on error because of the testing environment in which this tool should be used.
+ *
+ * @param error Error details.
+ * @see OnWebSocketError more information about this event
+ */
+ @OnWebSocketError
+ public synchronized void onWebSocketError(final Throwable error) {
+ if (webSocketSession != null) {
+ if (error != null) {
+ LOG.error("{}: An error occurred on web-socket session.", getUri(), error);
+ }
+ } else {
+ LOG.error("An error occurred on web-socket session which initialisation phase hasn't been "
+ + "registered yet.", error);
+ }
+ sessionCloseLatch.countDown();
+ }
+
+ /**
+ * Handling of incoming web-socket text message. If message is not null or empty the contents of the web-socket
+ * message is logged.
+ *
+ * @param message Web-socket text message.
+ * @see OnWebSocketMessage more information about this event
+ */
+ @OnWebSocketMessage
+ public synchronized void onWebSocketMessage(final String message) {
+ if (webSocketSession != null) {
+ if (webSocketSession.isOpen()) {
+ if (message != null) {
+ if (!message.isEmpty()) {
+ LOG.info("{}: Received web-socket message:\n{}.", getUri(), message);
+ } else {
+ LOG.info("{}: Received empty message.", getUri());
+ }
+ } else {
+ LOG.warn("{}: Received null message.", getUri());
+ }
+ } else {
+ LOG.warn("{}: Received web-socket message on closed web-socket session:\n{}", getUri(), message);
+ }
+ } else {
+ LOG.warn("Received web-socket message on web-socket session which initialisation phase hasn't been "
+ + "registered yet:\n{}", message);
+ }
+ }
+
+ /**
+ * Handling of incoming web-socket control frame. Only web-socket PING and PONG frames are processed and their
+ * content is logged. Web-socket PONG frames are automatically generated as the response to received PING frame
+ * by JETTY framework.
+ *
+ * @param frame Web-socket control frame.
+ * @see OnWebSocketFrame more information about this event
+ */
+ @OnWebSocketFrame
+ public synchronized void onWebSocketFrame(final Frame frame) {
+ if (webSocketSession != null) {
+ if (webSocketSession.isOpen()) {
+ if (frame != null) {
+ if (frame instanceof PingFrame) {
+ LOG.info("{}: Received PING frame with message (PONG respond is automatically generated):\n{}",
+ getUri(), ((PingFrame) frame).getPayloadAsUTF8());
+ } else if (frame instanceof PongFrame) {
+ LOG.info("{}: Received PONG frame with message:\n{}", getUri(),
+ ((PongFrame) frame).getPayloadAsUTF8());
+ }
+ } else {
+ LOG.warn("{}: Received null frame.", getUri());
+ }
+ } else {
+ LOG.warn("{}: Received web-socket frame on closed web-socket session:\n{}", getUri(), frame);
+ }
+ } else {
+ LOG.warn("Received web-socket frame on web-socket session which initialisation phase hasn't been "
+ + "registered yet:\n{}", frame);
+ }
+ }
+
+ String getUri() {
+ return webSocketSession.getUpgradeRequest().getRequestURI().toString();
+ }
+
+ /**
+ * Blocking of the current thread until the web-socket session is closed.
+ */
+ void awaitClose() {
+ try {
+ sessionCloseLatch.await();
+ } catch (final InterruptedException e) {
+ LOG.error("Web-socket session was closed by external interruption.", e);
+ }
+ }
+}
\ No newline at end of file