Fix thread safety issues in netconf client 17/5217/3
authorRobert Varga <rovarga@cisco.com>
Sun, 9 Feb 2014 19:32:33 +0000 (20:32 +0100)
committerRobert Varga <rovarga@cisco.com>
Tue, 11 Feb 2014 14:43:43 +0000 (15:43 +0100)
This patch introduces a proper asynchronous interface and reworks
internals such that thread safety is maintained.

Change-Id: I6eb1c56518b0b3cc6f64c1df8bc0c857298f79b9
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceListener.java
opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java
opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfSessionListener.java
opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/AbstractNetconfClientNotifySessionListener.java
opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClient.java
opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientDispatcher.java
opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionListener.java
opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionListener.java
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITTest.java

index 7e88ea17d01edeafef2a48c1311359ef50ef7666..c9fb1fc0b895ffabcd609eed14424e7411856413 100644 (file)
@@ -125,7 +125,7 @@ AutoCloseable {
         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.")
         checkState(eventExecutor != null, "Event executor must be set.");
 
-        val listener = new NetconfDeviceListener(this, eventExecutor);
+        val listener = new NetconfDeviceListener(this);
         val task = startClientTask(dispatcher, listener)
         if (mountInstance != null) {
             commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this)
index 69fe4aa1904e57ac83512400eaf6a3688a8da3ea..13cd5dbcf03a185ce99a5631684696bf74ef3ab5 100644 (file)
@@ -7,61 +7,19 @@
  */
 package org.opendaylight.controller.sal.connect.netconf;
 
-import com.google.common.base.Objects;
-
-import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Promise;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.eclipse.xtext.xbase.lib.Exceptions;
-import org.eclipse.xtext.xbase.lib.Functions.Function0;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.client.AbstractNetconfClientNotifySessionListener;
 import org.opendaylight.controller.netconf.client.NetconfClientSession;
-import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
-import org.opendaylight.controller.netconf.util.xml.XmlElement;
-import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
-import org.opendaylight.controller.sal.connect.netconf.NetconfDevice;
-import org.opendaylight.controller.sal.connect.netconf.NetconfMapping;
 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.w3c.dom.Document;
-
-@SuppressWarnings("all")
-class NetconfDeviceListener extends NetconfClientSessionListener {
-    private final NetconfDevice device;
-    private final EventExecutor eventExecutor;
-
-    public NetconfDeviceListener(final NetconfDevice device, final EventExecutor eventExecutor) {
-        this.device = device;
-        this.eventExecutor = eventExecutor;
-    }
 
-    private Promise<NetconfMessage> messagePromise;
-    private ConcurrentMap<String, Promise<NetconfMessage>> promisedMessages;
+import com.google.common.base.Preconditions;
 
-    private final ReentrantLock promiseLock = new ReentrantLock();
+class NetconfDeviceListener extends AbstractNetconfClientNotifySessionListener {
+    private final NetconfDevice device;
 
-    public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
-        if (isNotification(message)) {
-            this.onNotification(session, message);
-        } else {
-            try {
-                this.promiseLock.lock();
-                boolean _notEquals = (!Objects.equal(this.messagePromise, null));
-                if (_notEquals) {
-                    this.device.logger.debug("Setting promised reply {} with message {}", this.messagePromise, message);
-                    this.messagePromise.setSuccess(message);
-                    this.messagePromise = null;
-                }
-            } finally {
-                this.promiseLock.unlock();
-            }
-        }
+    public NetconfDeviceListener(final NetconfDevice device) {
+        this.device = Preconditions.checkNotNull(device);
     }
 
     /**
@@ -76,6 +34,7 @@ class NetconfDeviceListener extends NetconfClientSessionListener {
      *            NetconfClientSessionListener#onMessage(NetconfClientSession,
      *            NetconfMessage)}
      */
+    @Override
     public void onNotification(final NetconfClientSession session, final NetconfMessage message) {
         this.device.logger.debug("Received NETCONF notification.", message);
         CompositeNode domNotification = null;
@@ -92,65 +51,4 @@ class NetconfDeviceListener extends NetconfClientSessionListener {
             }
         }
     }
-
-    private static CompositeNode getNotificationBody(final CompositeNode node) {
-        List<Node<? extends Object>> _children = node.getChildren();
-        for (final Node<? extends Object> child : _children) {
-            if ((child instanceof CompositeNode)) {
-                return ((CompositeNode) child);
-            }
-        }
-        return null;
-    }
-
-    public NetconfMessage getLastMessage(final int attempts, final int attemptMsDelay) throws InterruptedException {
-        final Promise<NetconfMessage> promise = this.promiseReply();
-        this.device.logger.debug("Waiting for reply {}", promise);
-        int _plus = (attempts * attemptMsDelay);
-        final boolean messageAvailable = promise.await(_plus);
-        if (messageAvailable) {
-            try {
-                try {
-                    return promise.get();
-                } catch (Throwable _e) {
-                    throw Exceptions.sneakyThrow(_e);
-                }
-            } catch (final Throwable _t) {
-                if (_t instanceof ExecutionException) {
-                    final ExecutionException e = (ExecutionException) _t;
-                    IllegalStateException _illegalStateException = new IllegalStateException(e);
-                    throw _illegalStateException;
-                } else {
-                    throw Exceptions.sneakyThrow(_t);
-                }
-            }
-        }
-        String _plus_1 = ("Unsuccessful after " + Integer.valueOf(attempts));
-        String _plus_2 = (_plus_1 + " attempts.");
-        IllegalStateException _illegalStateException_1 = new IllegalStateException(_plus_2);
-        throw _illegalStateException_1;
-    }
-
-    public synchronized Promise<NetconfMessage> promiseReply() {
-        this.device.logger.debug("Promising reply.");
-        this.promiseLock.lock();
-        try {
-            boolean _equals = Objects.equal(this.messagePromise, null);
-            if (_equals) {
-                Promise<NetconfMessage> _newPromise = this.eventExecutor.<NetconfMessage> newPromise();
-                this.messagePromise = _newPromise;
-                return this.messagePromise;
-            }
-            return this.messagePromise;
-        } finally {
-            this.promiseLock.unlock();
-        }
-    }
-
-    public boolean isNotification(final NetconfMessage message) {
-        Document _document = message.getDocument();
-        final XmlElement xmle = XmlElement.fromDomDocument(_document);
-        String _name = xmle.getName();
-        return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(_name);
-    }
 }
index 01d872d89cad71156b792b37dc2e78f1749f393a..1d48e9287bb82fd1dbeb29cef0dbcd7144f701df 100644 (file)
@@ -8,9 +8,22 @@
 
 package org.opendaylight.controller.netconf.persist.impl;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import io.netty.channel.EventLoopGroup;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.concurrent.Immutable;
+
 import org.opendaylight.controller.config.api.ConflictingVersionException;
 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
@@ -27,16 +40,8 @@ import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.xml.sax.SAXException;
 
-import javax.annotation.concurrent.Immutable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 
 @Immutable
 public class ConfigPusher {
@@ -59,7 +64,7 @@ public class ConfigPusher {
     }
 
     public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadGroup,
-                        long maxWaitForCapabilitiesMillis, long connectionTimeoutMillis) {
+            long maxWaitForCapabilitiesMillis, long connectionTimeoutMillis) {
         this.address = address;
         this.nettyThreadGroup = nettyThreadGroup;
         this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
@@ -224,13 +229,12 @@ public class ConfigPusher {
             NetconfMessage netconfMessage = netconfClient.sendMessage(request, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY);
             NetconfUtil.checkIsMessageOk(netconfMessage);
             return netconfMessage;
-        } catch (RuntimeException e) { // TODO: change NetconfClient#sendMessage to throw checked exceptions
+        } catch (RuntimeException | ExecutionException | InterruptedException | TimeoutException e) {
             logger.debug("Error while executing netconf transaction {} to {}", request, netconfClient, e);
             throw new IOException("Failed to execute netconf transaction", e);
         }
     }
 
-
     // load editConfig.xml template, populate /rpc/edit-config/config with parameter
     private static NetconfMessage createEditConfigMessage(Element dataElement) {
         String editConfigResourcePath = "/netconfOp/editConfig.xml";
@@ -316,4 +320,4 @@ public class ConfigPusher {
                     '}';
         }
     }
-}
\ No newline at end of file
+}
index 54cb471604d6fc5114f5d3b5e1dbe1c273fa4f01..0f7869d97a80047731f27a48cf7e241df618fd1b 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.netconf.api;
 
 import org.opendaylight.protocol.framework.SessionListener;
 
-public interface NetconfSessionListener extends
-        SessionListener<NetconfMessage, NetconfSession, NetconfTerminationReason> {
+public interface NetconfSessionListener<S extends NetconfSession> extends SessionListener<NetconfMessage, S, NetconfTerminationReason> {
 
 }
index 48109d1353ec6eae5d301342de409e4205c312ec..aee4085599c7fc05ba25493c8b5df2f95f1b0560 100644 (file)
@@ -31,7 +31,7 @@ public abstract class AbstractNetconfClientNotifySessionListener extends Netconf
      * @param message {@see NetconfClientSessionListener#onMessage(NetconfClientSession, NetconfMessage)}
      */
     @Override
-    public final synchronized void onMessage(NetconfClientSession session, NetconfMessage message) {
+    public final void onMessage(NetconfClientSession session, NetconfMessage message) {
         if (isNotification(message)) {
             onNotification(session, message);
         } else {
index b8951a4789e1f1b4bc305cf6ed0c824a8e960ccf..a9dd2c3394b4ad15b38360edcd5be18ee701fcd8 100644 (file)
@@ -8,17 +8,8 @@
 
 package org.opendaylight.controller.netconf.client;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GlobalEventExecutor;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.protocol.framework.NeverReconnectStrategy;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.protocol.framework.TimedReconnectStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -27,6 +18,18 @@ import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.protocol.framework.NeverReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.TimedReconnectStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
 
 public class NetconfClient implements Closeable {
 
@@ -98,25 +101,31 @@ public class NetconfClient implements Closeable {
         this.sessionId = clientSession.getSessionId();
     }
 
-    public NetconfMessage sendMessage(NetconfMessage message) {
+    public Future<NetconfMessage> sendRequest(NetconfMessage message) {
+        return sessionListener.sendRequest(message);
+    }
+
+    /**
+     * @deprecated Use {@link sendRequest} instead
+     */
+    @Deprecated
+    public NetconfMessage sendMessage(NetconfMessage message) throws ExecutionException, InterruptedException, TimeoutException {
         return sendMessage(message, 5, 1000);
     }
 
-    public NetconfMessage sendMessage(NetconfMessage message, int attempts, int attemptMsDelay) {
-        Stopwatch stopwatch = new Stopwatch().start();
-        Preconditions.checkState(clientSession.isUp(), "Session was not up yet");
+    /**
+     * @deprecated Use {@link sendRequest} instead
+     */
+    @Deprecated
+    public NetconfMessage sendMessage(NetconfMessage message, int attempts, int attemptMsDelay) throws ExecutionException, InterruptedException, TimeoutException {
         //logger.debug("Sending message: {}",XmlUtil.toString(message.getDocument()));
-        clientSession.sendMessage(message);
+        final Stopwatch stopwatch = new Stopwatch().start();
+
         try {
-            return sessionListener.getLastMessage(attempts, attemptMsDelay);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException(this + " Cannot read message from " + address, e);
-        } catch (IllegalStateException e) {
-            throw new IllegalStateException(this + " Cannot read message from " + address, e);
+            return sessionListener.sendRequest(message).get(attempts * attemptMsDelay, TimeUnit.MILLISECONDS);
         } finally {
             stopwatch.stop();
-            logger.debug("Total time spent waiting for response {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
+            logger.debug("Total time spent waiting for response from {}: {} ms", address, stopwatch.elapsed(TimeUnit.MILLISECONDS));
         }
     }
 
index 1228a84a8a71a07fb1be4438407c5c8e8e4af54f..dd08bf565c5d10c951d1cb3d87ee24162c46cb91 100644 (file)
@@ -8,25 +8,24 @@
 
 package org.opendaylight.controller.netconf.client;
 
-import com.google.common.base.Optional;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.Promise;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
+
+import java.io.Closeable;
+import java.net.InetSocketAddress;
+
 import org.opendaylight.controller.netconf.api.NetconfSession;
-import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
 import org.opendaylight.controller.netconf.util.AbstractChannelInitializer;
 import org.opendaylight.protocol.framework.AbstractDispatcher;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.protocol.framework.SessionListener;
 import org.opendaylight.protocol.framework.SessionListenerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.net.InetSocketAddress;
+import com.google.common.base.Optional;
 
 public class NetconfClientDispatcher extends AbstractDispatcher<NetconfClientSession, NetconfClientSessionListener> implements Closeable {
 
@@ -69,24 +68,25 @@ public class NetconfClientDispatcher extends AbstractDispatcher<NetconfClientSes
         private final NetconfClientSessionListener sessionListener;
 
         private ClientChannelInitializer(NetconfClientSessionNegotiatorFactory negotiatorFactory,
-                                            NetconfClientSessionListener sessionListener) {
+                NetconfClientSessionListener sessionListener) {
             this.negotiatorFactory = negotiatorFactory;
             this.sessionListener = sessionListener;
         }
 
         @Override
         public void initialize(SocketChannel ch, Promise<? extends NetconfSession> promise) {
-                super.initialize(ch,promise);
+            super.initialize(ch,promise);
         }
 
         @Override
         protected void initializeAfterDecoder(SocketChannel ch, Promise<? extends NetconfSession> promise) {
-            ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(new SessionListenerFactory() {
-                @Override
-                public SessionListener<NetconfMessage, NetconfClientSession, NetconfTerminationReason> getSessionListener() {
-                    return sessionListener;
-                }
-            }, ch, promise));
+            ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(
+                    new SessionListenerFactory<NetconfClientSessionListener>() {
+                        @Override
+                        public NetconfClientSessionListener getSessionListener() {
+                            return sessionListener;
+                        }
+                    }, ch, promise));
         }
 
     }
index d3c1b22c845a9c0a30888cbd933e935322cfedd8..1ac2e7e26462c84a239757f93ee6b2d926f53e46 100644 (file)
 
 package org.opendaylight.controller.netconf.client;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import javax.annotation.concurrent.GuardedBy;
+
 import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.api.NetconfSessionListener;
 import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
-import org.opendaylight.protocol.framework.SessionListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.base.Preconditions;
+
+public class NetconfClientSessionListener implements NetconfSessionListener<NetconfClientSession> {
+    private static final class RequestEntry {
+        final Promise<NetconfMessage> promise;
+        final NetconfMessage request;
 
-public class NetconfClientSessionListener implements
-        SessionListener<NetconfMessage, NetconfClientSession, NetconfTerminationReason> {
+        public RequestEntry(Promise<NetconfMessage> future, NetconfMessage request) {
+            this.promise = Preconditions.checkNotNull(future);
+            this.request = Preconditions.checkNotNull(request);
+        }
+    }
 
     private static final Logger logger = LoggerFactory.getLogger(NetconfClientSessionListener.class);
-    private AtomicBoolean up = new AtomicBoolean(false);
+
+    @GuardedBy("this")
+    private final Queue<RequestEntry> requests = new ArrayDeque<>();
+
+    @GuardedBy("this")
+    private NetconfClientSession clientSession;
+
+    @GuardedBy("this")
+    private void dispatchRequest() {
+        while (!requests.isEmpty()) {
+            final RequestEntry e = requests.peek();
+            if (e.promise.setUncancellable()) {
+                logger.debug("Sending message {}", e.request);
+                clientSession.sendMessage(e.request);
+                break;
+            }
+
+            logger.debug("Message {} has been cancelled, skipping it", e.request);
+            requests.poll();
+        }
+    }
 
     @Override
-    public void onSessionUp(NetconfClientSession clientSession) {
-        up.set(true);
+    public final synchronized void onSessionUp(NetconfClientSession clientSession) {
+        this.clientSession = Preconditions.checkNotNull(clientSession);
+        logger.debug("Client session {} went up", clientSession);
+        dispatchRequest();
+    }
+
+    private synchronized void tearDown(final Exception cause) {
+        final RequestEntry e = requests.poll();
+        if (e != null) {
+            e.promise.setFailure(cause);
+        }
+
+        this.clientSession = null;
     }
 
     @Override
-    public void onSessionDown(NetconfClientSession clientSession, Exception e) {
-        logger.debug("Client Session {} down, reason: {}", clientSession, e.getMessage());
-        up.set(false);
+    public final void onSessionDown(NetconfClientSession clientSession, Exception e) {
+        logger.debug("Client Session {} went down unexpectedly", clientSession, e);
+        tearDown(e);
     }
 
     @Override
-    public void onSessionTerminated(NetconfClientSession clientSession,
+    public final void onSessionTerminated(NetconfClientSession clientSession,
             NetconfTerminationReason netconfTerminationReason) {
         logger.debug("Client Session {} terminated, reason: {}", clientSession,
                 netconfTerminationReason.getErrorMessage());
-        up.set(false);
+        tearDown(new RuntimeException(netconfTerminationReason.getErrorMessage()));
     }
 
     @Override
     public synchronized void onMessage(NetconfClientSession session, NetconfMessage message) {
-        synchronized (messages) {
-            this.messages.add(message);
+        logger.debug("New message arrived: {}", message);
+
+        final RequestEntry e = requests.poll();
+        if (e != null) {
+            e.promise.setSuccess(message);
+            dispatchRequest();
+        } else {
+            logger.info("Ignoring unsolicited message {}", message);
         }
     }
 
-    private int lastReadMessage = -1;
-    private List<NetconfMessage> messages = Lists.newArrayList();
-
-    public NetconfMessage getLastMessage(int attempts, int attemptMsDelay) throws InterruptedException {
-        Preconditions.checkState(up.get(), "Session was not up yet");
-
-        for (int i = 0; i < attempts; i++) {
-            synchronized (messages) {
-                if (messages.size() - 1 > lastReadMessage) {
-                    lastReadMessage++;
-                    return messages.get(lastReadMessage);
-                }
-            }
+    final synchronized Future<NetconfMessage> sendRequest(NetconfMessage message) {
+        final RequestEntry req = new RequestEntry(GlobalEventExecutor.INSTANCE.<NetconfMessage>newPromise(), message);
 
-            if (up.get() == false)
-                throw new IllegalStateException("Session ended while trying to read message");
-            Thread.sleep(attemptMsDelay);
+        requests.add(req);
+        if (clientSession != null) {
+            dispatchRequest();
         }
 
-        throw new IllegalStateException("No netconf message to read");
+        return req.promise;
     }
 }
index c1d5b2bdf79e5ce06ac6e4a4c3121f88ae84e25a..25beb65179f9d54789739dd1faa8428704bf793f 100644 (file)
@@ -17,15 +17,12 @@ import io.netty.util.concurrent.Promise;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.api.NetconfSession;
-import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
 import org.opendaylight.controller.netconf.util.AbstractChannelInitializer;
 import org.opendaylight.controller.netconf.util.handler.ssh.SshHandler;
 import org.opendaylight.controller.netconf.util.handler.ssh.authentication.AuthenticationHandler;
 import org.opendaylight.controller.netconf.util.handler.ssh.client.Invoker;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.protocol.framework.SessionListener;
 import org.opendaylight.protocol.framework.SessionListenerFactory;
 
 import com.google.common.base.Optional;
@@ -92,9 +89,9 @@ public class NetconfSshClientDispatcher extends NetconfClientDispatcher {
 
         @Override
         protected void initializeAfterDecoder(SocketChannel ch, Promise<? extends NetconfSession> promise) {
-            ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(new SessionListenerFactory() {
+            ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(new SessionListenerFactory<NetconfClientSessionListener>() {
                 @Override
-                public SessionListener<NetconfMessage, NetconfClientSession, NetconfTerminationReason> getSessionListener() {
+                public NetconfClientSessionListener getSessionListener() {
                     return sessionListener;
                 }
             }, ch, promise));
index 43e55d746a4ee59d1ec9bb35245f74e0edbca018..460288fe33256147a6a55c666cfbbd9dd8181862 100644 (file)
@@ -8,10 +8,11 @@
 
 package org.opendaylight.controller.netconf.impl;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
+import static com.google.common.base.Preconditions.checkState;
+
 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.api.NetconfSessionListener;
 import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationRouterImpl;
 import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
@@ -19,25 +20,22 @@ import org.opendaylight.controller.netconf.util.messages.SendErrorExceptionUtil;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.opendaylight.protocol.framework.SessionListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
 import org.w3c.dom.Node;
 
-import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 
-public class NetconfServerSessionListener implements
-        SessionListener<NetconfMessage, NetconfServerSession, NetconfTerminationReason> {
+public class NetconfServerSessionListener implements NetconfSessionListener<NetconfServerSession> {
+    public static final String MESSAGE_ID = "message-id";
 
     static final Logger logger = LoggerFactory.getLogger(NetconfServerSessionListener.class);
-    public static final String MESSAGE_ID = "message-id";
     private final SessionMonitoringService monitoringService;
+    private final NetconfOperationRouterImpl operationRouter;
 
-    private NetconfOperationRouterImpl operationRouter;
-
-    public NetconfServerSessionListener(NetconfOperationRouterImpl operationRouter,
-                                        SessionMonitoringService monitoringService) {
+    public NetconfServerSessionListener(NetconfOperationRouterImpl operationRouter, SessionMonitoringService monitoringService) {
         this.operationRouter = operationRouter;
         this.monitoringService = monitoringService;
     }
index 954da5f4874a03c10f772822a163750d451f0e6b..fce3f70e73b346bc94e2fb306bdf64d7c25aff3d 100644 (file)
@@ -8,12 +8,33 @@
 
 package org.opendaylight.controller.netconf.it;
 
-import ch.ethz.ssh2.Connection;
-import ch.ethz.ssh2.Session;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import static java.util.Collections.emptyList;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import io.netty.channel.ChannelFuture;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import javax.management.ObjectName;
+import javax.xml.parsers.ParserConfigurationException;
+
 import junit.framework.Assert;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -52,26 +73,11 @@ import org.w3c.dom.NamedNodeMap;
 import org.w3c.dom.Node;
 import org.xml.sax.SAXException;
 
-import javax.management.ObjectName;
-import javax.xml.parsers.ParserConfigurationException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
+import ch.ethz.ssh2.Connection;
+import ch.ethz.ssh2.Session;
 
-import static java.util.Collections.emptyList;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertTrue;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public class NetconfITTest extends AbstractNetconfConfigTest {
 
@@ -85,7 +91,7 @@ public class NetconfITTest extends AbstractNetconfConfigTest {
     private static final String PASSWORD = "netconf";
 
     private NetconfMessage getConfig, getConfigCandidate, editConfig,
-            closeSession, startExi, stopExi;
+    closeSession, startExi, stopExi;
     private DefaultCommitNotificationProducer commitNot;
     private NetconfServerDispatcher dispatch;
 
@@ -304,7 +310,7 @@ public class NetconfITTest extends AbstractNetconfConfigTest {
 
         }
     }
-    */
+     */
 
     @Test
     public void testCloseSession() throws Exception {
@@ -350,12 +356,12 @@ public class NetconfITTest extends AbstractNetconfConfigTest {
         assertEquals("ok", XmlElement.fromDomDocument(rpcReply).getOnlyChildElement().getName());
     }
 
-    private Document assertGetConfigWorks(final NetconfClient netconfClient) throws InterruptedException {
+    private Document assertGetConfigWorks(final NetconfClient netconfClient) throws InterruptedException, ExecutionException, TimeoutException {
         return assertGetConfigWorks(netconfClient, this.getConfig);
     }
 
     private Document assertGetConfigWorks(final NetconfClient netconfClient, final NetconfMessage getConfigMessage)
-            throws InterruptedException {
+            throws InterruptedException, ExecutionException, TimeoutException {
         final NetconfMessage rpcReply = netconfClient.sendMessage(getConfigMessage);
         assertNotNull(rpcReply);
         assertEquals("data", XmlElement.fromDomDocument(rpcReply.getDocument()).getOnlyChildElement().getName());
@@ -423,19 +429,20 @@ public class NetconfITTest extends AbstractNetconfConfigTest {
         sess.getStdin().write(XmlUtil.toString(this.getConfig.getDocument()).getBytes());
 
         new Thread(){
-           public void run(){
-               while (true){
-                 byte[] bytes = new byte[1024];
-                   int c = 0;
-                   try {
-                       c = sess.getStdout().read(bytes);
-                   } catch (IOException e) {
-                       e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                   }
-                   logger.info("got data:"+bytes);
-                 if (c == 0) break;
-               }
-           }
+            @Override
+            public void run(){
+                while (true){
+                    byte[] bytes = new byte[1024];
+                    int c = 0;
+                    try {
+                        c = sess.getStdout().read(bytes);
+                    } catch (IOException e) {
+                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    }
+                    logger.info("got data:"+bytes);
+                    if (c == 0) break;
+                }
+            }
         }.join();
     }