Fix thread safety issues in netconf client
[controller.git] / opendaylight / netconf / netconf-client / src / main / java / org / opendaylight / controller / netconf / client / NetconfClientSessionListener.java
index d3c1b22c845a9c0a30888cbd933e935322cfedd8..1ac2e7e26462c84a239757f93ee6b2d926f53e46 100644 (file)
 
 package org.opendaylight.controller.netconf.client;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import javax.annotation.concurrent.GuardedBy;
+
 import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.api.NetconfSessionListener;
 import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
-import org.opendaylight.protocol.framework.SessionListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.base.Preconditions;
+
+public class NetconfClientSessionListener implements NetconfSessionListener<NetconfClientSession> {
+    private static final class RequestEntry {
+        final Promise<NetconfMessage> promise;
+        final NetconfMessage request;
 
-public class NetconfClientSessionListener implements
-        SessionListener<NetconfMessage, NetconfClientSession, NetconfTerminationReason> {
+        public RequestEntry(Promise<NetconfMessage> future, NetconfMessage request) {
+            this.promise = Preconditions.checkNotNull(future);
+            this.request = Preconditions.checkNotNull(request);
+        }
+    }
 
     private static final Logger logger = LoggerFactory.getLogger(NetconfClientSessionListener.class);
-    private AtomicBoolean up = new AtomicBoolean(false);
+
+    @GuardedBy("this")
+    private final Queue<RequestEntry> requests = new ArrayDeque<>();
+
+    @GuardedBy("this")
+    private NetconfClientSession clientSession;
+
+    @GuardedBy("this")
+    private void dispatchRequest() {
+        while (!requests.isEmpty()) {
+            final RequestEntry e = requests.peek();
+            if (e.promise.setUncancellable()) {
+                logger.debug("Sending message {}", e.request);
+                clientSession.sendMessage(e.request);
+                break;
+            }
+
+            logger.debug("Message {} has been cancelled, skipping it", e.request);
+            requests.poll();
+        }
+    }
 
     @Override
-    public void onSessionUp(NetconfClientSession clientSession) {
-        up.set(true);
+    public final synchronized void onSessionUp(NetconfClientSession clientSession) {
+        this.clientSession = Preconditions.checkNotNull(clientSession);
+        logger.debug("Client session {} went up", clientSession);
+        dispatchRequest();
+    }
+
+    private synchronized void tearDown(final Exception cause) {
+        final RequestEntry e = requests.poll();
+        if (e != null) {
+            e.promise.setFailure(cause);
+        }
+
+        this.clientSession = null;
     }
 
     @Override
-    public void onSessionDown(NetconfClientSession clientSession, Exception e) {
-        logger.debug("Client Session {} down, reason: {}", clientSession, e.getMessage());
-        up.set(false);
+    public final void onSessionDown(NetconfClientSession clientSession, Exception e) {
+        logger.debug("Client Session {} went down unexpectedly", clientSession, e);
+        tearDown(e);
     }
 
     @Override
-    public void onSessionTerminated(NetconfClientSession clientSession,
+    public final void onSessionTerminated(NetconfClientSession clientSession,
             NetconfTerminationReason netconfTerminationReason) {
         logger.debug("Client Session {} terminated, reason: {}", clientSession,
                 netconfTerminationReason.getErrorMessage());
-        up.set(false);
+        tearDown(new RuntimeException(netconfTerminationReason.getErrorMessage()));
     }
 
     @Override
     public synchronized void onMessage(NetconfClientSession session, NetconfMessage message) {
-        synchronized (messages) {
-            this.messages.add(message);
+        logger.debug("New message arrived: {}", message);
+
+        final RequestEntry e = requests.poll();
+        if (e != null) {
+            e.promise.setSuccess(message);
+            dispatchRequest();
+        } else {
+            logger.info("Ignoring unsolicited message {}", message);
         }
     }
 
-    private int lastReadMessage = -1;
-    private List<NetconfMessage> messages = Lists.newArrayList();
-
-    public NetconfMessage getLastMessage(int attempts, int attemptMsDelay) throws InterruptedException {
-        Preconditions.checkState(up.get(), "Session was not up yet");
-
-        for (int i = 0; i < attempts; i++) {
-            synchronized (messages) {
-                if (messages.size() - 1 > lastReadMessage) {
-                    lastReadMessage++;
-                    return messages.get(lastReadMessage);
-                }
-            }
+    final synchronized Future<NetconfMessage> sendRequest(NetconfMessage message) {
+        final RequestEntry req = new RequestEntry(GlobalEventExecutor.INSTANCE.<NetconfMessage>newPromise(), message);
 
-            if (up.get() == false)
-                throw new IllegalStateException("Session ended while trying to read message");
-            Thread.sleep(attemptMsDelay);
+        requests.add(req);
+        if (clientSession != null) {
+            dispatchRequest();
         }
 
-        throw new IllegalStateException("No netconf message to read");
+        return req.promise;
     }
 }