Do not store Optional in NetconfDeviceCommunicator
[netconf.git] / netconf / sal-netconf-connector / src / main / java / org / opendaylight / netconf / sal / connect / netconf / listener / NetconfDeviceCommunicator.java
index 83ee068eb65e77c9e76ba6f434946e89408823d5..25518c54e98aa2a2b763b9da79fa8c09bcaa800b 100644 (file)
@@ -7,27 +7,30 @@
  */
 package org.opendaylight.netconf.sal.connect.netconf.listener;
 
-import com.google.common.base.Optional;
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import io.netty.util.concurrent.Future;
+import java.io.EOFException;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import org.opendaylight.controller.config.util.xml.XmlElement;
-import org.opendaylight.controller.config.util.xml.XmlUtil;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.netconf.api.FailedNetconfMessage;
 import org.opendaylight.netconf.api.NetconfDocumentedException;
 import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.api.NetconfTerminationReason;
+import org.opendaylight.netconf.api.xml.XmlElement;
 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
+import org.opendaylight.netconf.api.xml.XmlUtil;
 import org.opendaylight.netconf.client.NetconfClientDispatcher;
 import org.opendaylight.netconf.client.NetconfClientSession;
 import org.opendaylight.netconf.client.NetconfClientSessionListener;
@@ -37,20 +40,21 @@ import org.opendaylight.netconf.sal.connect.api.RemoteDevice;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
+import org.opendaylight.yangtools.yang.common.Empty;
+import org.opendaylight.yangtools.yang.common.ErrorTag;
+import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class NetconfDeviceCommunicator
-        implements NetconfClientSessionListener, RemoteDeviceCommunicator<NetconfMessage> {
-
+public class NetconfDeviceCommunicator implements NetconfClientSessionListener, RemoteDeviceCommunicator {
     private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
 
-    protected final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice;
-    private final Optional<UserPreferences> overrideNetconfCapabilities;
+    protected final RemoteDevice<NetconfDeviceCommunicator> remoteDevice;
+    private final @Nullable UserPreferences overrideNetconfCapabilities;
     protected final RemoteDeviceId id;
     private final Lock sessionLock = new ReentrantLock();
 
@@ -58,46 +62,43 @@ public class NetconfDeviceCommunicator
     private final int concurentRpcMsgs;
 
     private final Queue<Request> requests = new ArrayDeque<>();
-    private NetconfClientSession session;
+    private NetconfClientSession currentSession;
 
-    private Future<?> initFuture;
-    private final SettableFuture<NetconfDeviceCapabilities> firstConnectionFuture;
+    private final SettableFuture<Empty> firstConnectionFuture = SettableFuture.create();
+    private Future<?> taskFuture;
 
     // isSessionClosing indicates a close operation on the session is issued and
     // tearDown will surely be called later to finish the close.
     // Used to allow only one thread to enter tearDown and other threads should
     // NOT enter it simultaneously and should end its close operation without
     // calling tearDown to release the locks they hold to avoid deadlock.
-    private final AtomicBoolean isSessionClosing = new AtomicBoolean(false);
+    private static final AtomicIntegerFieldUpdater<NetconfDeviceCommunicator> CLOSING_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(NetconfDeviceCommunicator.class, "closing");
+    private volatile int closing;
 
-    public Boolean isSessionClosing() {
-        return isSessionClosing.get();
+    public boolean isSessionClosing() {
+        return closing != 0;
     }
 
-    public NetconfDeviceCommunicator(
-            final RemoteDeviceId id,
-            final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
+    public NetconfDeviceCommunicator(final RemoteDeviceId id,
+            final RemoteDevice<NetconfDeviceCommunicator> remoteDevice,
             final UserPreferences netconfSessionPreferences, final int rpcMessageLimit) {
-        this(id, remoteDevice, Optional.of(netconfSessionPreferences), rpcMessageLimit);
+        this(id, remoteDevice, rpcMessageLimit, requireNonNull(netconfSessionPreferences));
     }
 
-    public NetconfDeviceCommunicator(
-            final RemoteDeviceId id,
-            final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
-            final int rpcMessageLimit) {
-        this(id, remoteDevice, Optional.<UserPreferences>absent(), rpcMessageLimit);
+    public NetconfDeviceCommunicator(final RemoteDeviceId id,
+            final RemoteDevice<NetconfDeviceCommunicator> remoteDevice, final int rpcMessageLimit) {
+        this(id, remoteDevice, rpcMessageLimit, null);
     }
 
-    private NetconfDeviceCommunicator(
-            final RemoteDeviceId id,
-            final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> remoteDevice,
-            final Optional<UserPreferences> overrideNetconfCapabilities, final int rpcMessageLimit) {
-        this.concurentRpcMsgs = rpcMessageLimit;
+    public NetconfDeviceCommunicator(final RemoteDeviceId id,
+            final RemoteDevice<NetconfDeviceCommunicator> remoteDevice, final int rpcMessageLimit,
+            final @Nullable UserPreferences overrideNetconfCapabilities) {
+        concurentRpcMsgs = rpcMessageLimit;
         this.id = id;
         this.remoteDevice = remoteDevice;
         this.overrideNetconfCapabilities = overrideNetconfCapabilities;
-        this.firstConnectionFuture = SettableFuture.create();
-        this.semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null;
+        semaphore = rpcMessageLimit > 0 ? new Semaphore(rpcMessageLimit) : null;
     }
 
     @Override
@@ -105,21 +106,21 @@ public class NetconfDeviceCommunicator
         sessionLock.lock();
         try {
             LOG.debug("{}: Session established", id);
-            this.session = session;
+            currentSession = session;
 
             NetconfSessionPreferences netconfSessionPreferences =
                                              NetconfSessionPreferences.fromNetconfSession(session);
             LOG.trace("{}: Session advertised capabilities: {}", id,
                     netconfSessionPreferences);
 
-            if (overrideNetconfCapabilities.isPresent()) {
-                final NetconfSessionPreferences sessionPreferences = overrideNetconfCapabilities
-                        .get().getSessionPreferences();
-                netconfSessionPreferences = overrideNetconfCapabilities.get().moduleBasedCapsOverrided()
+            final var localOverride = overrideNetconfCapabilities;
+            if (localOverride != null) {
+                final var sessionPreferences = localOverride.getSessionPreferences();
+                netconfSessionPreferences = localOverride.moduleBasedCapsOverrided()
                         ? netconfSessionPreferences.replaceModuleCaps(sessionPreferences)
                         : netconfSessionPreferences.addModuleCaps(sessionPreferences);
 
-                netconfSessionPreferences = overrideNetconfCapabilities.get().nonModuleBasedCapsOverrided()
+                netconfSessionPreferences = localOverride.nonModuleBasedCapsOverrided()
                         ? netconfSessionPreferences.replaceNonModuleCaps(sessionPreferences)
                         : netconfSessionPreferences.addNonModuleCaps(sessionPreferences);
                 LOG.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id,
@@ -127,12 +128,14 @@ public class NetconfDeviceCommunicator
             }
 
             remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this);
-            if (!firstConnectionFuture.isDone()) {
-                firstConnectionFuture.set(netconfSessionPreferences.getNetconfDeviceCapabilities());
-            }
         } finally {
             sessionLock.unlock();
         }
+
+        // FIXME: right, except ... this does not include the device schema setup, so is it really useful?
+        if (!firstConnectionFuture.set(Empty.value())) {
+            LOG.trace("{}: First connection already completed", id);
+        }
     }
 
     /**
@@ -140,23 +143,31 @@ public class NetconfDeviceCommunicator
      *
      * @param dispatcher {@code NetconfCLientDispatcher}
      * @param config     {@code NetconfClientConfiguration}
-     * @return future that returns succes on first succesfull connection and failure when the underlying
-     *     reconnecting strategy runs out of reconnection attempts
+     * @return a ListenableFuture that returns success on first successful connection and failure when the underlying
+     *         reconnecting strategy runs out of reconnection attempts
      */
-    public ListenableFuture<NetconfDeviceCapabilities> initializeRemoteConnection(
-            final NetconfClientDispatcher dispatcher, final NetconfClientConfiguration config) {
+    public ListenableFuture<Empty> initializeRemoteConnection(final NetconfClientDispatcher dispatcher,
+            final NetconfClientConfiguration config) {
+
+        final Future<?> connectFuture;
         if (config instanceof NetconfReconnectingClientConfiguration) {
-            initFuture = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
+            // FIXME: This is weird. If I understand it correctly we want to know about the first connection so as to
+            //        forward error state. Analyze the call graph to understand what is going on here. We really want
+            //        to move reconnection away from the socket layer, so that it can properly interface with sessions
+            //        and generally has some event-driven state (as all good network glue does). There is a second story
+            //        which is we want to avoid duplicate code, so it depends on other users as well.
+            final var future = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
+            taskFuture = future;
+            connectFuture = future.firstSessionFuture();
         } else {
-            initFuture = dispatcher.createClient(config);
+            taskFuture = connectFuture = dispatcher.createClient(config);
         }
 
-
-        initFuture.addListener(future -> {
+        connectFuture.addListener(future -> {
             if (!future.isSuccess() && !future.isCancelled()) {
                 LOG.debug("{}: Connection failed", id, future.cause());
-                NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause());
-                if (firstConnectionFuture.isDone()) {
+                remoteDevice.onRemoteSessionFailed(future.cause());
+                if (!firstConnectionFuture.isDone()) {
                     firstConnectionFuture.setException(future.cause());
                 }
             }
@@ -166,8 +177,8 @@ public class NetconfDeviceCommunicator
 
     public void disconnect() {
         // If session is already in closing, no need to close it again
-        if (session != null && isSessionClosing.compareAndSet(false, true)) {
-            session.close();
+        if (currentSession != null && startClosing() && currentSession.isUp()) {
+            currentSession.close();
         }
     }
 
@@ -176,11 +187,11 @@ public class NetconfDeviceCommunicator
             LOG.warn("It's curious that no one to close the session but tearDown is called!");
         }
         LOG.debug("Tearing down {}", reason);
-        final List<UncancellableFuture<RpcResult<NetconfMessage>>> futuresToCancel = Lists.newArrayList();
+        final List<UncancellableFuture<RpcResult<NetconfMessage>>> futuresToCancel = new ArrayList<>();
         sessionLock.lock();
         try {
-            if (session != null) {
-                session = null;
+            if (currentSession != null) {
+                currentSession = null;
                 /*
                  * Walk all requests, check if they have been executing
                  * or cancelled and remove them from the queue.
@@ -209,29 +220,32 @@ public class NetconfDeviceCommunicator
             if (Strings.isNullOrEmpty(reason)) {
                 future.set(createSessionDownRpcResult());
             } else {
-                future.set(createErrorRpcResult(RpcError.ErrorType.TRANSPORT, reason));
+                future.set(createErrorRpcResult(ErrorType.TRANSPORT, reason));
             }
         }
 
-        isSessionClosing.set(false);
+        closing = 0;
     }
 
     private RpcResult<NetconfMessage> createSessionDownRpcResult() {
-        return createErrorRpcResult(RpcError.ErrorType.TRANSPORT,
+        return createErrorRpcResult(ErrorType.TRANSPORT,
                 String.format("The netconf session to %1$s is disconnected", id.getName()));
     }
 
-    private static RpcResult<NetconfMessage> createErrorRpcResult(final RpcError.ErrorType errorType,
-            final String message) {
+    private static RpcResult<NetconfMessage> createErrorRpcResult(final ErrorType errorType, final String message) {
         return RpcResultBuilder.<NetconfMessage>failed()
-            .withError(errorType, NetconfDocumentedException.ErrorTag.OPERATION_FAILED.getTagValue(), message).build();
+            .withError(errorType, ErrorTag.OPERATION_FAILED, message).build();
     }
 
     @Override
     public void onSessionDown(final NetconfClientSession session, final Exception exception) {
         // If session is already in closing, no need to call tearDown again.
-        if (isSessionClosing.compareAndSet(false, true)) {
-            LOG.warn("{}: Session went down", id, exception);
+        if (startClosing()) {
+            if (exception instanceof EOFException) {
+                LOG.info("{}: Session went down: {}", id, exception.getMessage());
+            } else {
+                LOG.warn("{}: Session went down", id, exception);
+            }
             tearDown(null);
         }
     }
@@ -246,8 +260,8 @@ public class NetconfDeviceCommunicator
     @Override
     public void close() {
         // Cancel reconnect if in progress
-        if (initFuture != null) {
-            initFuture.cancel(false);
+        if (taskFuture != null) {
+            taskFuture.cancel(false);
         }
         // Disconnect from device
         // tear down not necessary, called indirectly by the close in disconnect()
@@ -274,7 +288,7 @@ public class NetconfDeviceCommunicator
         try {
             request = requests.peek();
             if (request != null && request.future.isUncancellable()) {
-                requests.poll();
+                request = requests.poll();
                 // we have just removed one request from the queue
                 // we can also release one permit
                 if (semaphore != null) {
@@ -289,45 +303,51 @@ public class NetconfDeviceCommunicator
             sessionLock.unlock();
         }
 
-        if (request != null) {
-
-            LOG.debug("{}: Message received {}", id, message);
+        if (request == null) {
+            // No matching request, bail out
+            return;
+        }
 
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("{}: Matched request: {} to response: {}", id, msgToS(request.request), msgToS(message));
-            }
 
-            try {
-                NetconfMessageTransformUtil.checkValidReply(request.request, message);
-            } catch (final NetconfDocumentedException e) {
-                LOG.warn(
-                        "{}: Invalid request-reply match,"
-                                + "reply message contains different message-id, request: {}, response: {}",
-                        id, msgToS(request.request), msgToS(message), e);
+        if (message instanceof FailedNetconfMessage) {
+            request.future.set(NetconfMessageTransformUtil.toRpcResult((FailedNetconfMessage) message));
+            return;
+        }
 
-                request.future.set(RpcResultBuilder.<NetconfMessage>failed()
-                        .withRpcError(NetconfMessageTransformUtil.toRpcError(e)).build());
+        LOG.debug("{}: Message received {}", id, message);
 
-                //recursively processing message to eventually find matching request
-                processMessage(message);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("{}: Matched request: {} to response: {}", id, msgToS(request.request), msgToS(message));
+        }
 
-                return;
-            }
+        try {
+            NetconfMessageTransformUtil.checkValidReply(request.request, message);
+        } catch (final NetconfDocumentedException e) {
+            LOG.warn("{}: Invalid request-reply match, reply message contains different message-id, "
+                + "request: {}, response: {}", id, msgToS(request.request), msgToS(message), e);
 
-            try {
-                NetconfMessageTransformUtil.checkSuccessReply(message);
-            } catch (final NetconfDocumentedException e) {
-                LOG.warn(
-                        "{}: Error reply from remote device, request: {}, response: {}",
-                        id, msgToS(request.request), msgToS(message), e);
+            request.future.set(RpcResultBuilder.<NetconfMessage>failed()
+                .withRpcError(NetconfMessageTransformUtil.toRpcError(e))
+                .build());
 
-                request.future.set(RpcResultBuilder.<NetconfMessage>failed()
-                        .withRpcError(NetconfMessageTransformUtil.toRpcError(e)).build());
-                return;
-            }
+            //recursively processing message to eventually find matching request
+            processMessage(message);
+            return;
+        }
 
-            request.future.set(RpcResultBuilder.success(message).build());
+        try {
+            NetconfMessageTransformUtil.checkSuccessReply(message);
+        } catch (final NetconfDocumentedException e) {
+            LOG.warn("{}: Error reply from remote device, request: {}, response: {}",
+                id, msgToS(request.request), msgToS(message), e);
+
+            request.future.set(RpcResultBuilder.<NetconfMessage>failed()
+                .withRpcError(NetconfMessageTransformUtil.toRpcError(e))
+                .build());
+            return;
         }
+
+        request.future.set(RpcResultBuilder.success(message).build());
     }
 
     private static String msgToS(final NetconfMessage msg) {
@@ -337,18 +357,15 @@ public class NetconfDeviceCommunicator
     @Override
     public ListenableFuture<RpcResult<NetconfMessage>> sendRequest(final NetconfMessage message, final QName rpc) {
         sessionLock.lock();
-
-        if (semaphore != null && !semaphore.tryAcquire()) {
-            LOG.warn("Limit of concurrent rpc messages was reached (limit :"
-                    + concurentRpcMsgs + "). Rpc reply message is needed. Discarding request of Netconf device with id"
-                    + id.getName());
-            sessionLock.unlock();
-            return Futures.immediateFailedFuture(new NetconfDocumentedException(
-                    "Limit of rpc messages was reached (Limit :" + concurentRpcMsgs
-                            + ") waiting for emptying the queue of Netconf device with id" + id.getName()));
-        }
-
         try {
+            if (semaphore != null && !semaphore.tryAcquire()) {
+                LOG.warn("Limit of concurrent rpc messages was reached (limit: {}). Rpc reply message is needed. "
+                    + "Discarding request of Netconf device with id: {}", concurentRpcMsgs, id.getName());
+                return FluentFutures.immediateFailedFluentFuture(new NetconfDocumentedException(
+                        "Limit of rpc messages was reached (Limit :" + concurentRpcMsgs
+                        + ") waiting for emptying the queue of Netconf device with id: " + id.getName()));
+            }
+
             return sendRequestWithLock(message, rpc);
         } finally {
             sessionLock.unlock();
@@ -361,16 +378,16 @@ public class NetconfDeviceCommunicator
             LOG.trace("{}: Sending message {}", id, msgToS(message));
         }
 
-        if (session == null) {
+        if (currentSession == null) {
             LOG.warn("{}: Session is disconnected, failing RPC request {}",
                     id, message);
-            return Futures.immediateFuture(createSessionDownRpcResult());
+            return FluentFutures.immediateFluentFuture(createSessionDownRpcResult());
         }
 
         final Request req = new Request(new UncancellableFuture<>(true), message);
         requests.add(req);
 
-        session.sendMessage(req.request).addListener(future -> {
+        currentSession.sendMessage(req.request).addListener(future -> {
             if (!future.isSuccess()) {
                 // We expect that a session down will occur at this point
                 LOG.debug("{}: Failed to send request {}", id,
@@ -378,8 +395,7 @@ public class NetconfDeviceCommunicator
                         future.cause());
 
                 if (future.cause() != null) {
-                    req.future.set(createErrorRpcResult(RpcError.ErrorType.TRANSPORT,
-                            future.cause().getLocalizedMessage()));
+                    req.future.set(createErrorRpcResult(ErrorType.TRANSPORT, future.cause().getLocalizedMessage()));
                 } else {
                     req.future.set(createSessionDownRpcResult()); // assume session is down
                 }
@@ -401,6 +417,10 @@ public class NetconfDeviceCommunicator
     }
 
     private static boolean isNotification(final NetconfMessage message) {
+        if (message.getDocument() == null) {
+            // We have no message, which mean we have a FailedNetconfMessage
+            return false;
+        }
         final XmlElement xmle = XmlElement.fromDomDocument(message.getDocument());
         return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName()) ;
     }
@@ -415,4 +435,8 @@ public class NetconfDeviceCommunicator
             this.request = request;
         }
     }
+
+    private boolean startClosing() {
+        return CLOSING_UPDATER.compareAndSet(this, 0, 1);
+    }
 }