From 8ffb3e08bfa609405e883444480642b93c83242a Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sun, 9 Feb 2014 20:32:33 +0100 Subject: [PATCH 1/1] Fix thread safety issues in netconf client This patch introduces a proper asynchronous interface and reworks internals such that thread safety is maintained. Change-Id: I6eb1c56518b0b3cc6f64c1df8bc0c857298f79b9 Signed-off-by: Robert Varga --- .../sal/connect/netconf/NetconfDevice.xtend | 2 +- .../netconf/NetconfDeviceListener.java | 116 ++---------------- .../netconf/persist/impl/ConfigPusher.java | 36 +++--- .../netconf/api/NetconfSessionListener.java | 3 +- ...actNetconfClientNotifySessionListener.java | 2 +- .../netconf/client/NetconfClient.java | 51 ++++---- .../client/NetconfClientDispatcher.java | 28 ++--- .../client/NetconfClientSessionListener.java | 108 +++++++++++----- .../client/NetconfSshClientDispatcher.java | 7 +- .../impl/NetconfServerSessionListener.java | 20 ++- .../controller/netconf/it/NetconfITTest.java | 87 +++++++------ 11 files changed, 206 insertions(+), 254 deletions(-) diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend index 7e88ea17d0..c9fb1fc0b8 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend @@ -125,7 +125,7 @@ AutoCloseable { checkState(schemaSourceProvider != null, "Schema Source Provider must be set.") checkState(eventExecutor != null, "Event executor must be set."); - val listener = new NetconfDeviceListener(this, eventExecutor); + val listener = new NetconfDeviceListener(this); val task = startClientTask(dispatcher, listener) if (mountInstance != null) { commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this) diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java index 69fe4aa190..13cd5dbcf0 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java @@ -7,61 +7,19 @@ */ package org.opendaylight.controller.sal.connect.netconf; -import com.google.common.base.Objects; - -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.Promise; - -import java.util.List; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.locks.ReentrantLock; - -import org.eclipse.xtext.xbase.lib.Exceptions; -import org.eclipse.xtext.xbase.lib.Functions.Function0; import org.opendaylight.controller.netconf.api.NetconfMessage; +import org.opendaylight.controller.netconf.client.AbstractNetconfClientNotifySessionListener; import org.opendaylight.controller.netconf.client.NetconfClientSession; -import org.opendaylight.controller.netconf.client.NetconfClientSessionListener; -import org.opendaylight.controller.netconf.util.xml.XmlElement; -import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; -import org.opendaylight.controller.sal.connect.netconf.NetconfDevice; -import org.opendaylight.controller.sal.connect.netconf.NetconfMapping; import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance; import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.api.Node; -import org.w3c.dom.Document; - -@SuppressWarnings("all") -class NetconfDeviceListener extends NetconfClientSessionListener { - private final NetconfDevice device; - private final EventExecutor eventExecutor; - - public NetconfDeviceListener(final NetconfDevice device, final EventExecutor eventExecutor) { - this.device = device; - this.eventExecutor = eventExecutor; - } - private Promise messagePromise; - private ConcurrentMap> promisedMessages; +import com.google.common.base.Preconditions; - private final ReentrantLock promiseLock = new ReentrantLock(); +class NetconfDeviceListener extends AbstractNetconfClientNotifySessionListener { + private final NetconfDevice device; - public void onMessage(final NetconfClientSession session, final NetconfMessage message) { - if (isNotification(message)) { - this.onNotification(session, message); - } else { - try { - this.promiseLock.lock(); - boolean _notEquals = (!Objects.equal(this.messagePromise, null)); - if (_notEquals) { - this.device.logger.debug("Setting promised reply {} with message {}", this.messagePromise, message); - this.messagePromise.setSuccess(message); - this.messagePromise = null; - } - } finally { - this.promiseLock.unlock(); - } - } + public NetconfDeviceListener(final NetconfDevice device) { + this.device = Preconditions.checkNotNull(device); } /** @@ -76,6 +34,7 @@ class NetconfDeviceListener extends NetconfClientSessionListener { * NetconfClientSessionListener#onMessage(NetconfClientSession, * NetconfMessage)} */ + @Override public void onNotification(final NetconfClientSession session, final NetconfMessage message) { this.device.logger.debug("Received NETCONF notification.", message); CompositeNode domNotification = null; @@ -92,65 +51,4 @@ class NetconfDeviceListener extends NetconfClientSessionListener { } } } - - private static CompositeNode getNotificationBody(final CompositeNode node) { - List> _children = node.getChildren(); - for (final Node child : _children) { - if ((child instanceof CompositeNode)) { - return ((CompositeNode) child); - } - } - return null; - } - - public NetconfMessage getLastMessage(final int attempts, final int attemptMsDelay) throws InterruptedException { - final Promise promise = this.promiseReply(); - this.device.logger.debug("Waiting for reply {}", promise); - int _plus = (attempts * attemptMsDelay); - final boolean messageAvailable = promise.await(_plus); - if (messageAvailable) { - try { - try { - return promise.get(); - } catch (Throwable _e) { - throw Exceptions.sneakyThrow(_e); - } - } catch (final Throwable _t) { - if (_t instanceof ExecutionException) { - final ExecutionException e = (ExecutionException) _t; - IllegalStateException _illegalStateException = new IllegalStateException(e); - throw _illegalStateException; - } else { - throw Exceptions.sneakyThrow(_t); - } - } - } - String _plus_1 = ("Unsuccessful after " + Integer.valueOf(attempts)); - String _plus_2 = (_plus_1 + " attempts."); - IllegalStateException _illegalStateException_1 = new IllegalStateException(_plus_2); - throw _illegalStateException_1; - } - - public synchronized Promise promiseReply() { - this.device.logger.debug("Promising reply."); - this.promiseLock.lock(); - try { - boolean _equals = Objects.equal(this.messagePromise, null); - if (_equals) { - Promise _newPromise = this.eventExecutor. newPromise(); - this.messagePromise = _newPromise; - return this.messagePromise; - } - return this.messagePromise; - } finally { - this.promiseLock.unlock(); - } - } - - public boolean isNotification(final NetconfMessage message) { - Document _document = message.getDocument(); - final XmlElement xmle = XmlElement.fromDomDocument(_document); - String _name = xmle.getName(); - return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(_name); - } } diff --git a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java index 01d872d89c..1d48e9287b 100644 --- a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java +++ b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java @@ -8,9 +8,22 @@ package org.opendaylight.controller.netconf.persist.impl; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import io.netty.channel.EventLoopGroup; + +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.concurrent.Immutable; + import org.opendaylight.controller.config.api.ConflictingVersionException; import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder; import org.opendaylight.controller.netconf.api.NetconfMessage; @@ -27,16 +40,8 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; import org.xml.sax.SAXException; -import javax.annotation.concurrent.Immutable; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; @Immutable public class ConfigPusher { @@ -59,7 +64,7 @@ public class ConfigPusher { } public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadGroup, - long maxWaitForCapabilitiesMillis, long connectionTimeoutMillis) { + long maxWaitForCapabilitiesMillis, long connectionTimeoutMillis) { this.address = address; this.nettyThreadGroup = nettyThreadGroup; this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis; @@ -224,13 +229,12 @@ public class ConfigPusher { NetconfMessage netconfMessage = netconfClient.sendMessage(request, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY); NetconfUtil.checkIsMessageOk(netconfMessage); return netconfMessage; - } catch (RuntimeException e) { // TODO: change NetconfClient#sendMessage to throw checked exceptions + } catch (RuntimeException | ExecutionException | InterruptedException | TimeoutException e) { logger.debug("Error while executing netconf transaction {} to {}", request, netconfClient, e); throw new IOException("Failed to execute netconf transaction", e); } } - // load editConfig.xml template, populate /rpc/edit-config/config with parameter private static NetconfMessage createEditConfigMessage(Element dataElement) { String editConfigResourcePath = "/netconfOp/editConfig.xml"; @@ -316,4 +320,4 @@ public class ConfigPusher { '}'; } } -} \ No newline at end of file +} diff --git a/opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfSessionListener.java b/opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfSessionListener.java index 54cb471604..0f7869d97a 100644 --- a/opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfSessionListener.java +++ b/opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfSessionListener.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.netconf.api; import org.opendaylight.protocol.framework.SessionListener; -public interface NetconfSessionListener extends - SessionListener { +public interface NetconfSessionListener extends SessionListener { } diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/AbstractNetconfClientNotifySessionListener.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/AbstractNetconfClientNotifySessionListener.java index 48109d1353..aee4085599 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/AbstractNetconfClientNotifySessionListener.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/AbstractNetconfClientNotifySessionListener.java @@ -31,7 +31,7 @@ public abstract class AbstractNetconfClientNotifySessionListener extends Netconf * @param message {@see NetconfClientSessionListener#onMessage(NetconfClientSession, NetconfMessage)} */ @Override - public final synchronized void onMessage(NetconfClientSession session, NetconfMessage message) { + public final void onMessage(NetconfClientSession session, NetconfMessage message) { if (isNotification(message)) { onNotification(session, message); } else { diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClient.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClient.java index b8951a4789..a9dd2c3394 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClient.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClient.java @@ -8,17 +8,8 @@ package org.opendaylight.controller.netconf.client; -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; -import com.google.common.collect.Sets; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; -import org.opendaylight.controller.netconf.api.NetconfMessage; -import org.opendaylight.protocol.framework.NeverReconnectStrategy; -import org.opendaylight.protocol.framework.ReconnectStrategy; -import org.opendaylight.protocol.framework.TimedReconnectStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; @@ -27,6 +18,18 @@ import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.opendaylight.controller.netconf.api.NetconfMessage; +import org.opendaylight.protocol.framework.NeverReconnectStrategy; +import org.opendaylight.protocol.framework.ReconnectStrategy; +import org.opendaylight.protocol.framework.TimedReconnectStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; public class NetconfClient implements Closeable { @@ -98,25 +101,31 @@ public class NetconfClient implements Closeable { this.sessionId = clientSession.getSessionId(); } - public NetconfMessage sendMessage(NetconfMessage message) { + public Future sendRequest(NetconfMessage message) { + return sessionListener.sendRequest(message); + } + + /** + * @deprecated Use {@link sendRequest} instead + */ + @Deprecated + public NetconfMessage sendMessage(NetconfMessage message) throws ExecutionException, InterruptedException, TimeoutException { return sendMessage(message, 5, 1000); } - public NetconfMessage sendMessage(NetconfMessage message, int attempts, int attemptMsDelay) { - Stopwatch stopwatch = new Stopwatch().start(); - Preconditions.checkState(clientSession.isUp(), "Session was not up yet"); + /** + * @deprecated Use {@link sendRequest} instead + */ + @Deprecated + public NetconfMessage sendMessage(NetconfMessage message, int attempts, int attemptMsDelay) throws ExecutionException, InterruptedException, TimeoutException { //logger.debug("Sending message: {}",XmlUtil.toString(message.getDocument())); - clientSession.sendMessage(message); + final Stopwatch stopwatch = new Stopwatch().start(); + try { - return sessionListener.getLastMessage(attempts, attemptMsDelay); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(this + " Cannot read message from " + address, e); - } catch (IllegalStateException e) { - throw new IllegalStateException(this + " Cannot read message from " + address, e); + return sessionListener.sendRequest(message).get(attempts * attemptMsDelay, TimeUnit.MILLISECONDS); } finally { stopwatch.stop(); - logger.debug("Total time spent waiting for response {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS)); + logger.debug("Total time spent waiting for response from {}: {} ms", address, stopwatch.elapsed(TimeUnit.MILLISECONDS)); } } diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientDispatcher.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientDispatcher.java index 1228a84a8a..dd08bf565c 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientDispatcher.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientDispatcher.java @@ -8,25 +8,24 @@ package org.opendaylight.controller.netconf.client; -import com.google.common.base.Optional; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import org.opendaylight.controller.netconf.api.NetconfMessage; + +import java.io.Closeable; +import java.net.InetSocketAddress; + import org.opendaylight.controller.netconf.api.NetconfSession; -import org.opendaylight.controller.netconf.api.NetconfTerminationReason; import org.opendaylight.controller.netconf.util.AbstractChannelInitializer; import org.opendaylight.protocol.framework.AbstractDispatcher; import org.opendaylight.protocol.framework.ReconnectStrategy; -import org.opendaylight.protocol.framework.SessionListener; import org.opendaylight.protocol.framework.SessionListenerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.net.InetSocketAddress; +import com.google.common.base.Optional; public class NetconfClientDispatcher extends AbstractDispatcher implements Closeable { @@ -69,24 +68,25 @@ public class NetconfClientDispatcher extends AbstractDispatcher promise) { - super.initialize(ch,promise); + super.initialize(ch,promise); } @Override protected void initializeAfterDecoder(SocketChannel ch, Promise promise) { - ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(new SessionListenerFactory() { - @Override - public SessionListener getSessionListener() { - return sessionListener; - } - }, ch, promise)); + ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator( + new SessionListenerFactory() { + @Override + public NetconfClientSessionListener getSessionListener() { + return sessionListener; + } + }, ch, promise)); } } diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionListener.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionListener.java index d3c1b22c84..1ac2e7e264 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionListener.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionListener.java @@ -8,68 +8,108 @@ package org.opendaylight.controller.netconf.client; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.Promise; + +import java.util.ArrayDeque; +import java.util.Queue; + +import javax.annotation.concurrent.GuardedBy; + import org.opendaylight.controller.netconf.api.NetconfMessage; +import org.opendaylight.controller.netconf.api.NetconfSessionListener; import org.opendaylight.controller.netconf.api.NetconfTerminationReason; -import org.opendaylight.protocol.framework.SessionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.base.Preconditions; + +public class NetconfClientSessionListener implements NetconfSessionListener { + private static final class RequestEntry { + final Promise promise; + final NetconfMessage request; -public class NetconfClientSessionListener implements - SessionListener { + public RequestEntry(Promise future, NetconfMessage request) { + this.promise = Preconditions.checkNotNull(future); + this.request = Preconditions.checkNotNull(request); + } + } private static final Logger logger = LoggerFactory.getLogger(NetconfClientSessionListener.class); - private AtomicBoolean up = new AtomicBoolean(false); + + @GuardedBy("this") + private final Queue requests = new ArrayDeque<>(); + + @GuardedBy("this") + private NetconfClientSession clientSession; + + @GuardedBy("this") + private void dispatchRequest() { + while (!requests.isEmpty()) { + final RequestEntry e = requests.peek(); + if (e.promise.setUncancellable()) { + logger.debug("Sending message {}", e.request); + clientSession.sendMessage(e.request); + break; + } + + logger.debug("Message {} has been cancelled, skipping it", e.request); + requests.poll(); + } + } @Override - public void onSessionUp(NetconfClientSession clientSession) { - up.set(true); + public final synchronized void onSessionUp(NetconfClientSession clientSession) { + this.clientSession = Preconditions.checkNotNull(clientSession); + logger.debug("Client session {} went up", clientSession); + dispatchRequest(); + } + + private synchronized void tearDown(final Exception cause) { + final RequestEntry e = requests.poll(); + if (e != null) { + e.promise.setFailure(cause); + } + + this.clientSession = null; } @Override - public void onSessionDown(NetconfClientSession clientSession, Exception e) { - logger.debug("Client Session {} down, reason: {}", clientSession, e.getMessage()); - up.set(false); + public final void onSessionDown(NetconfClientSession clientSession, Exception e) { + logger.debug("Client Session {} went down unexpectedly", clientSession, e); + tearDown(e); } @Override - public void onSessionTerminated(NetconfClientSession clientSession, + public final void onSessionTerminated(NetconfClientSession clientSession, NetconfTerminationReason netconfTerminationReason) { logger.debug("Client Session {} terminated, reason: {}", clientSession, netconfTerminationReason.getErrorMessage()); - up.set(false); + tearDown(new RuntimeException(netconfTerminationReason.getErrorMessage())); } @Override public synchronized void onMessage(NetconfClientSession session, NetconfMessage message) { - synchronized (messages) { - this.messages.add(message); + logger.debug("New message arrived: {}", message); + + final RequestEntry e = requests.poll(); + if (e != null) { + e.promise.setSuccess(message); + dispatchRequest(); + } else { + logger.info("Ignoring unsolicited message {}", message); } } - private int lastReadMessage = -1; - private List messages = Lists.newArrayList(); - - public NetconfMessage getLastMessage(int attempts, int attemptMsDelay) throws InterruptedException { - Preconditions.checkState(up.get(), "Session was not up yet"); - - for (int i = 0; i < attempts; i++) { - synchronized (messages) { - if (messages.size() - 1 > lastReadMessage) { - lastReadMessage++; - return messages.get(lastReadMessage); - } - } + final synchronized Future sendRequest(NetconfMessage message) { + final RequestEntry req = new RequestEntry(GlobalEventExecutor.INSTANCE.newPromise(), message); - if (up.get() == false) - throw new IllegalStateException("Session ended while trying to read message"); - Thread.sleep(attemptMsDelay); + requests.add(req); + if (clientSession != null) { + dispatchRequest(); } - throw new IllegalStateException("No netconf message to read"); + return req.promise; } } diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java index c1d5b2bdf7..25beb65179 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java @@ -17,15 +17,12 @@ import io.netty.util.concurrent.Promise; import java.io.IOException; import java.net.InetSocketAddress; -import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.api.NetconfSession; -import org.opendaylight.controller.netconf.api.NetconfTerminationReason; import org.opendaylight.controller.netconf.util.AbstractChannelInitializer; import org.opendaylight.controller.netconf.util.handler.ssh.SshHandler; import org.opendaylight.controller.netconf.util.handler.ssh.authentication.AuthenticationHandler; import org.opendaylight.controller.netconf.util.handler.ssh.client.Invoker; import org.opendaylight.protocol.framework.ReconnectStrategy; -import org.opendaylight.protocol.framework.SessionListener; import org.opendaylight.protocol.framework.SessionListenerFactory; import com.google.common.base.Optional; @@ -92,9 +89,9 @@ public class NetconfSshClientDispatcher extends NetconfClientDispatcher { @Override protected void initializeAfterDecoder(SocketChannel ch, Promise promise) { - ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(new SessionListenerFactory() { + ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(new SessionListenerFactory() { @Override - public SessionListener getSessionListener() { + public NetconfClientSessionListener getSessionListener() { return sessionListener; } }, ch, promise)); diff --git a/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionListener.java b/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionListener.java index 43e55d746a..460288fe33 100644 --- a/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionListener.java +++ b/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionListener.java @@ -8,10 +8,11 @@ package org.opendaylight.controller.netconf.impl; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; +import static com.google.common.base.Preconditions.checkState; + import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.NetconfMessage; +import org.opendaylight.controller.netconf.api.NetconfSessionListener; import org.opendaylight.controller.netconf.api.NetconfTerminationReason; import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationRouterImpl; import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService; @@ -19,25 +20,22 @@ import org.opendaylight.controller.netconf.util.messages.SendErrorExceptionUtil; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.util.xml.XmlUtil; -import org.opendaylight.protocol.framework.SessionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.Node; -import static com.google.common.base.Preconditions.checkState; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; -public class NetconfServerSessionListener implements - SessionListener { +public class NetconfServerSessionListener implements NetconfSessionListener { + public static final String MESSAGE_ID = "message-id"; static final Logger logger = LoggerFactory.getLogger(NetconfServerSessionListener.class); - public static final String MESSAGE_ID = "message-id"; private final SessionMonitoringService monitoringService; + private final NetconfOperationRouterImpl operationRouter; - private NetconfOperationRouterImpl operationRouter; - - public NetconfServerSessionListener(NetconfOperationRouterImpl operationRouter, - SessionMonitoringService monitoringService) { + public NetconfServerSessionListener(NetconfOperationRouterImpl operationRouter, SessionMonitoringService monitoringService) { this.operationRouter = operationRouter; this.monitoringService = monitoringService; } diff --git a/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITTest.java b/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITTest.java index 954da5f487..fce3f70e73 100644 --- a/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITTest.java +++ b/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITTest.java @@ -8,12 +8,33 @@ package org.opendaylight.controller.netconf.it; -import ch.ethz.ssh2.Connection; -import ch.ethz.ssh2.Session; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import static java.util.Collections.emptyList; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import io.netty.channel.ChannelFuture; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.management.ManagementFactory; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import javax.management.ObjectName; +import javax.xml.parsers.ParserConfigurationException; + import junit.framework.Assert; + import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -52,26 +73,11 @@ import org.w3c.dom.NamedNodeMap; import org.w3c.dom.Node; import org.xml.sax.SAXException; -import javax.management.ObjectName; -import javax.xml.parsers.ParserConfigurationException; -import java.io.IOException; -import java.io.InputStream; -import java.lang.management.ManagementFactory; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Set; +import ch.ethz.ssh2.Connection; +import ch.ethz.ssh2.Session; -import static java.util.Collections.emptyList; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertNotNull; -import static junit.framework.Assert.assertTrue; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; public class NetconfITTest extends AbstractNetconfConfigTest { @@ -85,7 +91,7 @@ public class NetconfITTest extends AbstractNetconfConfigTest { private static final String PASSWORD = "netconf"; private NetconfMessage getConfig, getConfigCandidate, editConfig, - closeSession, startExi, stopExi; + closeSession, startExi, stopExi; private DefaultCommitNotificationProducer commitNot; private NetconfServerDispatcher dispatch; @@ -304,7 +310,7 @@ public class NetconfITTest extends AbstractNetconfConfigTest { } } - */ + */ @Test public void testCloseSession() throws Exception { @@ -350,12 +356,12 @@ public class NetconfITTest extends AbstractNetconfConfigTest { assertEquals("ok", XmlElement.fromDomDocument(rpcReply).getOnlyChildElement().getName()); } - private Document assertGetConfigWorks(final NetconfClient netconfClient) throws InterruptedException { + private Document assertGetConfigWorks(final NetconfClient netconfClient) throws InterruptedException, ExecutionException, TimeoutException { return assertGetConfigWorks(netconfClient, this.getConfig); } private Document assertGetConfigWorks(final NetconfClient netconfClient, final NetconfMessage getConfigMessage) - throws InterruptedException { + throws InterruptedException, ExecutionException, TimeoutException { final NetconfMessage rpcReply = netconfClient.sendMessage(getConfigMessage); assertNotNull(rpcReply); assertEquals("data", XmlElement.fromDomDocument(rpcReply.getDocument()).getOnlyChildElement().getName()); @@ -423,19 +429,20 @@ public class NetconfITTest extends AbstractNetconfConfigTest { sess.getStdin().write(XmlUtil.toString(this.getConfig.getDocument()).getBytes()); new Thread(){ - public void run(){ - while (true){ - byte[] bytes = new byte[1024]; - int c = 0; - try { - c = sess.getStdout().read(bytes); - } catch (IOException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - logger.info("got data:"+bytes); - if (c == 0) break; - } - } + @Override + public void run(){ + while (true){ + byte[] bytes = new byte[1024]; + int c = 0; + try { + c = sess.getStdout().read(bytes); + } catch (IOException e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + logger.info("got data:"+bytes); + if (c == 0) break; + } + } }.join(); } -- 2.36.6