Rework createReconnectingClient() 73/98573/1
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 3 Sep 2021 11:10:24 +0000 (13:10 +0200)
committerIvan Hrasko <ivan.hrasko@pantheon.tech>
Tue, 16 Nov 2021 17:35:50 +0000 (18:35 +0100)
We seem to have competing interests going on, where we want some
information from the first connection attempt propagated outwards.

JIRA: NETCONF-784
Change-Id: I232d523b60c36de85fea909c6405b1d7cac39c57
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
netconf/callhome-provider/src/main/java/org/opendaylight/netconf/callhome/mount/CallHomeMountDispatcher.java
netconf/callhome-provider/src/main/java/org/opendaylight/netconf/callhome/mount/CallHomeMountSessionContext.java
netconf/callhome-provider/src/main/java/org/opendaylight/netconf/callhome/mount/SingleReconnectFuture.java [new file with mode: 0644]
netconf/netconf-client/src/main/java/org/opendaylight/netconf/client/NetconfClientDispatcher.java
netconf/netconf-client/src/main/java/org/opendaylight/netconf/client/NetconfClientDispatcherImpl.java
netconf/netconf-client/src/test/java/org/opendaylight/netconf/client/NetconfClientDispatcherImplTest.java
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfDispatcher.java
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/ReconnectFuture.java [new file with mode: 0644]
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/ReconnectPromise.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/MountPointEndToEndTest.java
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java

index 407fa8e60ee87ab37e7a5d25e067d28eb3f16506..e23917a3b33c850f22eb843ca7222899eaeac1a3 100644 (file)
@@ -26,6 +26,7 @@ import org.opendaylight.netconf.client.NetconfClientDispatcher;
 import org.opendaylight.netconf.client.NetconfClientSession;
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
+import org.opendaylight.netconf.nettyutil.ReconnectFuture;
 import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
 import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseNetconfSchemas;
@@ -92,18 +93,16 @@ public class CallHomeMountDispatcher implements NetconfClientDispatcher, CallHom
     }
 
     @Override
-    public Future<Void> createReconnectingClient(final NetconfReconnectingClientConfiguration clientConfiguration) {
-        return activateChannel(clientConfiguration);
+    public ReconnectFuture createReconnectingClient(final NetconfReconnectingClientConfiguration clientConfiguration) {
+        return new SingleReconnectFuture(eventExecutor, activateChannel(clientConfiguration));
     }
 
-    private <V> Future<V> activateChannel(final NetconfClientConfiguration conf) {
+    private Future<NetconfClientSession> activateChannel(final NetconfClientConfiguration conf) {
         final InetSocketAddress remoteAddr = conf.getAddress();
         final CallHomeMountSessionContext context = getSessionManager().getByAddress(remoteAddr);
         LOG.info("Activating NETCONF channel for ip {} device context {}", remoteAddr, context);
-        if (context == null) {
-            return new FailedFuture<>(eventExecutor, new NullPointerException());
-        }
-        return context.activateNetconfChannel(conf.getSessionListener());
+        return context == null ? new FailedFuture<>(eventExecutor, new NullPointerException())
+            : context.activateNetconfChannel(conf.getSessionListener());
     }
 
     void createTopology() {
index 4765bf93b8f35a14810cae71e468fd83f8471433..1c557fd13dcc6e0a0f279ff1676fe27fa1f8cd55 100644 (file)
@@ -77,9 +77,8 @@ class CallHomeMountSessionContext {
                 .build();
     }
 
-    @SuppressWarnings("unchecked")
-    <V> Promise<V> activateNetconfChannel(final NetconfClientSessionListener sessionListener) {
-        return (Promise<V>) activator.activate(wrap(sessionListener));
+    Promise<NetconfClientSession> activateNetconfChannel(final NetconfClientSessionListener sessionListener) {
+        return activator.activate(wrap(sessionListener));
     }
 
     @Override
diff --git a/netconf/callhome-provider/src/main/java/org/opendaylight/netconf/callhome/mount/SingleReconnectFuture.java b/netconf/callhome-provider/src/main/java/org/opendaylight/netconf/callhome/mount/SingleReconnectFuture.java
new file mode 100644 (file)
index 0000000..1a26a4f
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.callhome.mount;
+
+import static java.util.Objects.requireNonNull;
+
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import org.opendaylight.netconf.client.NetconfClientSession;
+import org.opendaylight.netconf.nettyutil.ReconnectFuture;
+import org.opendaylight.yangtools.yang.common.Empty;
+
+final class SingleReconnectFuture extends DefaultPromise<Empty> implements ReconnectFuture {
+    private final Future<NetconfClientSession> sessionFuture;
+
+    SingleReconnectFuture(final EventExecutor eventExecutor, final Future<NetconfClientSession> sessionFuture) {
+        super(eventExecutor);
+        this.sessionFuture = requireNonNull(sessionFuture);
+        sessionFuture.addListener(future -> {
+            if (!isDone()) {
+                if (future.isCancelled()) {
+                    cancel(false);
+                } else if (future.isSuccess()) {
+                    setSuccess(Empty.getInstance());
+                } else {
+                    setFailure(future.cause());
+                }
+            }
+        });
+    }
+
+    @Override
+    public boolean cancel(final boolean mayInterruptIfRunning) {
+        if (super.cancel(mayInterruptIfRunning)) {
+            if (!sessionFuture.isDone()) {
+                sessionFuture.cancel(mayInterruptIfRunning);
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public Future<?> firstSessionFuture() {
+        return sessionFuture;
+    }
+}
index 66fdff674dff563dbac1dad9464e5f81ad45fa1c..b1d2a12468fdc9f6423de6b6bffc778510a0ef18 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.netconf.client;
 import io.netty.util.concurrent.Future;
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
+import org.opendaylight.netconf.nettyutil.ReconnectFuture;
 
 public interface NetconfClientDispatcher {
 
@@ -22,5 +23,5 @@ public interface NetconfClientDispatcher {
      */
     Future<NetconfClientSession> createClient(NetconfClientConfiguration clientConfiguration);
 
-    Future<Void> createReconnectingClient(NetconfReconnectingClientConfiguration clientConfiguration);
+    ReconnectFuture createReconnectingClient(NetconfReconnectingClientConfiguration clientConfiguration);
 }
index 747a5f75fd72750dbb36353511d7deab60dd6d90..ace8cf611ba7fedad8bddf644928de006cda9712 100644 (file)
@@ -16,6 +16,7 @@ import java.util.Set;
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
 import org.opendaylight.netconf.nettyutil.AbstractNetconfDispatcher;
+import org.opendaylight.netconf.nettyutil.ReconnectFuture;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,7 +54,7 @@ public class NetconfClientDispatcherImpl
     }
 
     @Override
-    public Future<Void> createReconnectingClient(final NetconfReconnectingClientConfiguration clientConfiguration) {
+    public ReconnectFuture createReconnectingClient(final NetconfReconnectingClientConfiguration clientConfiguration) {
         switch (clientConfiguration.getProtocol()) {
             case TCP:
                 return createReconnectingTcpClient(clientConfiguration);
@@ -73,7 +74,7 @@ public class NetconfClientDispatcherImpl
                         currentConfiguration.getSessionListener()).initialize(ch, promise));
     }
 
-    private Future<Void> createReconnectingTcpClient(
+    private ReconnectFuture createReconnectingTcpClient(
             final NetconfReconnectingClientConfiguration currentConfiguration) {
         LOG.debug("Creating reconnecting TCP client with configuration: {}", currentConfiguration);
         final TcpClientChannelInitializer init =
@@ -93,7 +94,7 @@ public class NetconfClientDispatcherImpl
                         currentConfiguration.getSshClient()).initialize(ch, sessionPromise));
     }
 
-    private Future<Void> createReconnectingSshClient(
+    private ReconnectFuture createReconnectingSshClient(
             final NetconfReconnectingClientConfiguration currentConfiguration) {
         LOG.debug("Creating reconnecting SSH client with configuration: {}", currentConfiguration);
         final SshClientChannelInitializer init = new SshClientChannelInitializer(currentConfiguration.getAuthHandler(),
@@ -113,7 +114,7 @@ public class NetconfClientDispatcherImpl
                     .initialize(ch, sessionPromise));
     }
 
-    private Future<Void> createReconnectingTlsClient(
+    private ReconnectFuture createReconnectingTlsClient(
             final NetconfReconnectingClientConfiguration currentConfiguration) {
         LOG.debug("Creating reconnecting TLS client with configuration: {}", currentConfiguration);
         final TlsClientChannelInitializer init = new TlsClientChannelInitializer(
index 172871a5660385a9dcd062bd324bdf7d9690cb4b..62a2b33eb9c81299c6f289af2ca7d1bb89beebbc 100644 (file)
@@ -27,6 +27,7 @@ import org.opendaylight.netconf.api.messages.NetconfHelloMessageAdditionalHeader
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
+import org.opendaylight.netconf.nettyutil.ReconnectFuture;
 import org.opendaylight.netconf.nettyutil.ReconnectStrategy;
 import org.opendaylight.netconf.nettyutil.ReconnectStrategyFactory;
 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
@@ -89,8 +90,8 @@ public class NetconfClientDispatcherImplTest {
         Future<NetconfClientSession> sshSession = dispatcher.createClient(cfg);
         Future<NetconfClientSession> tcpSession = dispatcher.createClient(cfg2);
 
-        Future<Void> sshReconn = dispatcher.createReconnectingClient(cfg);
-        final Future<Void> tcpReconn = dispatcher.createReconnectingClient(cfg2);
+        ReconnectFuture sshReconn = dispatcher.createReconnectingClient(cfg);
+        final ReconnectFuture tcpReconn = dispatcher.createReconnectingClient(cfg2);
 
         assertNotNull(sshSession);
         assertNotNull(tcpSession);
@@ -109,7 +110,7 @@ public class NetconfClientDispatcherImplTest {
                 .withSslHandlerFactory(sslHandlerFactory).build();
 
         Future<NetconfClientSession> tlsSession = dispatcher.createClient(cfg3);
-        Future<Void> tlsReconn = dispatcher.createReconnectingClient(cfg3);
+        ReconnectFuture tlsReconn = dispatcher.createReconnectingClient(cfg3);
 
         assertNotNull(tlsSession);
         assertNotNull(tlsReconn);
index c4e3b479789b0f3b13e991dda1a3ebf0d1e5027e..61c42f0239d42a8b22fe4e412c78502e7e1681c0 100644 (file)
@@ -225,7 +225,7 @@ public abstract class AbstractNetconfDispatcher<S extends NetconfSession, L exte
      *             instead.
      */
     @Deprecated
-    protected Future<Void> createReconnectingClient(final InetSocketAddress address,
+    protected ReconnectFuture createReconnectingClient(final InetSocketAddress address,
             final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
             final PipelineInitializer<S> initializer) {
         return createReconnectingClient(address, connectStrategyFactory, initializer);
@@ -239,7 +239,7 @@ public abstract class AbstractNetconfDispatcher<S extends NetconfSession, L exte
      * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
      *         success is never reported, only failure when it runs out of reconnection attempts.
      */
-    protected Future<Void> createReconnectingClient(final InetSocketAddress address,
+    protected ReconnectFuture createReconnectingClient(final InetSocketAddress address,
             final ReconnectStrategyFactory connectStrategyFactory, final PipelineInitializer<S> initializer) {
         final Bootstrap b = new Bootstrap();
 
diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/ReconnectFuture.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/ReconnectFuture.java
new file mode 100644 (file)
index 0000000..62fc049
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.nettyutil;
+
+import com.google.common.annotations.Beta;
+import io.netty.util.concurrent.Future;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.yangtools.yang.common.Empty;
+
+/**
+ * A future representing the task of reconnecting of a certain channel. This future never completes successfully, it
+ * either fails when the underlying strategy gives up, or when it is cancelled. It additionally exposes an additional
+ * future, which completes when the session is established for the first time.
+ */
+@Beta
+public interface ReconnectFuture extends Future<Empty> {
+    /**
+     * Return a Future which completes when the first session is established.
+     *
+     * @return First session establishment future
+     */
+    @NonNull Future<?> firstSessionFuture();
+}
index a3de2d436aecfe9d8d83af628bf72183c9a7bb22..60f5d205604d16b70b0fa7bd240d7a4b5a7b56fe 100644 (file)
@@ -16,36 +16,89 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
 import org.opendaylight.netconf.api.NetconfSession;
 import org.opendaylight.netconf.api.NetconfSessionListener;
+import org.opendaylight.netconf.nettyutil.AbstractNetconfDispatcher.PipelineInitializer;
+import org.opendaylight.yangtools.yang.common.Empty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Deprecated
 final class ReconnectPromise<S extends NetconfSession, L extends NetconfSessionListener<? super S>>
-        extends DefaultPromise<Void> {
+        extends DefaultPromise<Empty> implements ReconnectFuture {
     private static final Logger LOG = LoggerFactory.getLogger(ReconnectPromise.class);
 
     private final AbstractNetconfDispatcher<S, L> dispatcher;
     private final InetSocketAddress address;
     private final ReconnectStrategyFactory strategyFactory;
     private final Bootstrap bootstrap;
-    private final AbstractNetconfDispatcher.PipelineInitializer<S> initializer;
+    private final PipelineInitializer<S> initializer;
+    private final Promise<Empty> firstSessionFuture;
+    /**
+     * Channel handler that responds to channelInactive event and reconnects the session unless the promise is
+     * cancelled.
+     */
+    private final ChannelInboundHandlerAdapter inboundHandler = new ChannelInboundHandlerAdapter() {
+        @Override
+        public void channelInactive(final ChannelHandlerContext ctx) {
+            // This is the ultimate channel inactive handler, not forwarding
+            if (isCancelled()) {
+                return;
+            }
+
+            synchronized (ReconnectPromise.this) {
+                final Future<?> attempt = pending;
+                if (!attempt.isDone() || !attempt.isSuccess()) {
+                    // Connection refused, negotiation failed, or similar
+                    LOG.debug("Connection to {} was dropped during negotiation, reattempting", address);
+                }
+
+                LOG.debug("Reconnecting after connection to {} was dropped", address);
+                lockedConnect();
+            }
+        }
+    };
+
+    @GuardedBy("this")
     private Future<?> pending;
 
     ReconnectPromise(final EventExecutor executor, final AbstractNetconfDispatcher<S, L> dispatcher,
             final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
             final Bootstrap bootstrap, final AbstractNetconfDispatcher.PipelineInitializer<S> initializer) {
         super(executor);
-        this.bootstrap = bootstrap;
+        this.firstSessionFuture = new DefaultPromise<>(executor);
+        this.bootstrap = requireNonNull(bootstrap);
         this.initializer = requireNonNull(initializer);
         this.dispatcher = requireNonNull(dispatcher);
         this.address = requireNonNull(address);
         this.strategyFactory = requireNonNull(connectStrategyFactory);
     }
 
+    @Override
+    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+        if (super.cancel(mayInterruptIfRunning)) {
+            firstSessionFuture.cancel(mayInterruptIfRunning);
+            pending.cancel(mayInterruptIfRunning);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public Future<?> firstSessionFuture() {
+        return firstSessionFuture;
+    }
+
     synchronized void connect() {
+        lockedConnect();
+    }
+
+    @Holding("this")
+    private void lockedConnect() {
         final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
 
         // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support
@@ -62,11 +115,13 @@ final class ReconnectPromise<S extends NetconfSession, L extends NetconfSessionL
             channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this));
         });
 
-        pending.addListener(future -> {
-            if (!future.isSuccess() && !ReconnectPromise.this.isDone()) {
-                ReconnectPromise.this.setFailure(future.cause());
-            }
-        });
+        if (!firstSessionFuture.isDone()) {
+            pending.addListener(future -> {
+                if (!future.isSuccess() && !firstSessionFuture.isDone()) {
+                    firstSessionFuture.setFailure(future.cause());
+                }
+            });
+        }
     }
 
     /**
@@ -82,17 +137,6 @@ final class ReconnectPromise<S extends NetconfSession, L extends NetconfSessionL
         return pending.isDone() && pending.isSuccess();
     }
 
-    @Override
-    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
-        if (super.cancel(mayInterruptIfRunning)) {
-            requireNonNull(pending);
-            this.pending.cancel(mayInterruptIfRunning);
-            return true;
-        }
-
-        return false;
-    }
-
     /**
      * Channel handler that responds to channelInactive event and reconnects the session.
      * Only if the promise was not canceled.
index 83bd1d80ea67e5cdabefa24357db22b82e9744f5..2e4e88904e7a76d0981117973ad61debbec28188 100644 (file)
@@ -17,6 +17,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
@@ -37,7 +38,6 @@ import com.google.common.util.concurrent.SettableFuture;
 import com.typesafe.config.ConfigFactory;
 import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.GlobalEventExecutor;
-import io.netty.util.concurrent.SucceededFuture;
 import java.io.File;
 import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
@@ -98,6 +98,7 @@ import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegist
 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
 import org.opendaylight.mdsal.singleton.dom.impl.DOMClusterSingletonServiceProviderImpl;
 import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.nettyutil.ReconnectFuture;
 import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
 import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
 import org.opendaylight.netconf.sal.connect.impl.DefaultSchemaResourceManager;
@@ -249,8 +250,7 @@ public class MountPointEndToEndTest extends AbstractBaseSchemasTest {
 
         yangNodeInstanceId = bindingToNormalized.toYangInstanceIdentifier(NODE_INSTANCE_ID);
 
-        doReturn(new SucceededFuture(GlobalEventExecutor.INSTANCE, null)).when(mockClientDispatcher)
-                .createReconnectingClient(any());
+        doReturn(mock(ReconnectFuture.class)).when(mockClientDispatcher).createReconnectingClient(any());
 
         LOG.info("****** Setup complete");
     }
index de840e27d09ac143fbb88af6370a821240e1c994..7ef1e4929babce07c424f8092fcf0b62965d6a47 100644 (file)
@@ -62,7 +62,7 @@ public class NetconfDeviceCommunicator
     private NetconfClientSession currentSession;
 
     private final SettableFuture<NetconfDeviceCapabilities> firstConnectionFuture;
-    private Future<?> initFuture;
+    private Future<?> taskFuture;
 
     // isSessionClosing indicates a close operation on the session is issued and
     // tearDown will surely be called later to finish the close.
@@ -148,18 +148,26 @@ public class NetconfDeviceCommunicator
      */
     public ListenableFuture<NetconfDeviceCapabilities> initializeRemoteConnection(
             final NetconfClientDispatcher dispatcher, final NetconfClientConfiguration config) {
+
+        final Future<?> connectFuture;
         if (config instanceof NetconfReconnectingClientConfiguration) {
-            initFuture = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
+            // FIXME: This is weird. If I understand it correctly we want to know about the first connection so as to
+            //        forward error state. Analyze the call graph to understand what is going on here. We really want
+            //        to move reconnection away from the socket layer, so that it can properly interface with sessions
+            //        and generally has some event-driven state (as all good network glue does). There is a second story
+            //        which is we want to avoid duplicate code, so it depends on other users as well.
+            final var future = dispatcher.createReconnectingClient((NetconfReconnectingClientConfiguration) config);
+            taskFuture = future;
+            connectFuture = future.firstSessionFuture();
         } else {
-            initFuture = dispatcher.createClient(config);
+            taskFuture = connectFuture = dispatcher.createClient(config);
         }
 
-
-        initFuture.addListener(future -> {
+        connectFuture.addListener(future -> {
             if (!future.isSuccess() && !future.isCancelled()) {
                 LOG.debug("{}: Connection failed", id, future.cause());
-                NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause());
-                if (firstConnectionFuture.isDone()) {
+                remoteDevice.onRemoteSessionFailed(future.cause());
+                if (!firstConnectionFuture.isDone()) {
                     firstConnectionFuture.setException(future.cause());
                 }
             }
@@ -249,8 +257,8 @@ public class NetconfDeviceCommunicator
     @Override
     public void close() {
         // Cancel reconnect if in progress
-        if (initFuture != null) {
-            initFuture.cancel(false);
+        if (taskFuture != null) {
+            taskFuture.cancel(false);
         }
         // Disconnect from device
         // tear down not necessary, called indirectly by the close in disconnect()