Merge "NECONF-524 : Setting the netconf keepalive logic to be more proactive."
authorJakub Morvay <jakub.morvay@gmail.com>
Mon, 6 Aug 2018 14:50:23 +0000 (14:50 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 6 Aug 2018 14:50:23 +0000 (14:50 +0000)
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteDeviceConnectorImpl.java
netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/AbstractNetconfTopology.java
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/listener/NetconfDeviceCommunicator.java
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/KeepaliveSalFacade.java

index 52446922908603798865a4668e201cb31b7f5745..68366c115cf3c82e51ca5958c2345266d27500c8 100644 (file)
@@ -215,14 +215,19 @@ public class RemoteDeviceConnectorImpl implements RemoteDeviceConnector {
             LOG.info("{}: Concurrent rpc limit is smaller than 1, no limit will be enforced.", remoteDeviceId);
         }
 
-        return new NetconfConnectorDTO(
-                userCapabilities.isPresent() ? new NetconfDeviceCommunicator(remoteDeviceId, device,
-                        new UserPreferences(userCapabilities.get(),
-                                Objects.isNull(node.getYangModuleCapabilities())
-                                        ? false : node.getYangModuleCapabilities().isOverride(),
-                                Objects.isNull(node.getNonModuleCapabilities())
-                                        ? false : node.getNonModuleCapabilities().isOverride()), rpcMessageLimit)
-                        : new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit), salFacade);
+        NetconfDeviceCommunicator netconfDeviceCommunicator =
+             userCapabilities.isPresent() ? new NetconfDeviceCommunicator(remoteDeviceId, device,
+             new UserPreferences(userCapabilities.get(),
+                 Objects.isNull(node.getYangModuleCapabilities())
+                         ? false : node.getYangModuleCapabilities().isOverride(),
+                 Objects.isNull(node.getNonModuleCapabilities())
+                         ? false : node.getNonModuleCapabilities().isOverride()), rpcMessageLimit)
+            : new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit);
+
+        if (salFacade instanceof KeepaliveSalFacade) {
+            ((KeepaliveSalFacade)salFacade).setListener(netconfDeviceCommunicator);
+        }
+        return new NetconfConnectorDTO(netconfDeviceCommunicator, salFacade);
     }
 
     private static Optional<NetconfSessionPreferences> getUserCapabilities(final NetconfNode node) {
index c9ec7bf87facb077e98af782900f0283749e0d6d..342d3e9cdaaa2a5661463dee7071aadc025e203a 100644 (file)
@@ -373,9 +373,15 @@ public abstract class AbstractNetconfTopology implements NetconfTopology {
             LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId);
         }
 
-        return new NetconfConnectorDTO(userCapabilities.isPresent()
-                ? new NetconfDeviceCommunicator(remoteDeviceId, device, userCapabilities.get(), rpcMessageLimit)
-                : new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit), salFacade);
+        NetconfDeviceCommunicator netconfDeviceCommunicator =
+             userCapabilities.isPresent() ? new NetconfDeviceCommunicator(remoteDeviceId, device,
+                     userCapabilities.get(), rpcMessageLimit)
+            : new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit);
+
+        if (salFacade instanceof KeepaliveSalFacade) {
+            ((KeepaliveSalFacade)salFacade).setListener(netconfDeviceCommunicator);
+        }
+        return new NetconfConnectorDTO(netconfDeviceCommunicator, salFacade);
     }
 
     protected NetconfDevice.SchemaResourcesDTO setupSchemaCacheDTO(final NodeId nodeId, final NetconfNode node) {
index 7968fcda978255007baaf65ce23cecea3826f541..b962ae20757c034bbef70201a3fd6be0b2bf59c1 100644 (file)
@@ -167,7 +167,7 @@ public class NetconfDeviceCommunicator
 
     public void disconnect() {
         // If session is already in closing, no need to close it again
-        if (currentSession != null && isSessionClosing.compareAndSet(false, true)) {
+        if (currentSession != null && isSessionClosing.compareAndSet(false, true) && currentSession.isUp()) {
             currentSession.close();
         }
     }
index 82cf4b23379dfced7e83256447c09c72071d86d6..73b1296ef8bcd1ae122d68d5909f6568ab67ca7c 100644 (file)
@@ -21,6 +21,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
@@ -66,6 +67,7 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler<NetconfSess
     private volatile NetconfDeviceCommunicator listener;
     private volatile ScheduledFuture<?> currentKeepalive;
     private volatile DOMRpcService currentDeviceRpc;
+    private final AtomicBoolean lastKeepAliveSucceeded = new AtomicBoolean(false);
 
     public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
                               final ScheduledExecutorService executor, final long keepaliveDelaySeconds,
@@ -104,7 +106,7 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler<NetconfSess
         if (currentKeepalive != null) {
             currentKeepalive.cancel(false);
         }
-        scheduleKeepalive();
+        scheduleKeepalives();
     }
 
     /**
@@ -133,13 +135,15 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler<NetconfSess
         salFacade.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc1);
 
         LOG.debug("{}: Netconf session initiated, starting keepalives", id);
-        scheduleKeepalive();
+        scheduleKeepalives();
     }
 
-    private void scheduleKeepalive() {
+    private void scheduleKeepalives() {
+        lastKeepAliveSucceeded.set(true);
         Preconditions.checkState(currentDeviceRpc != null);
-        LOG.trace("{}: Scheduling next keepalive in {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS);
-        currentKeepalive = executor.schedule(new Keepalive(currentKeepalive), keepaliveDelaySeconds, TimeUnit.SECONDS);
+        LOG.trace("{}: Scheduling keepalives every  {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS);
+        currentKeepalive = executor.scheduleWithFixedDelay(new Keepalive(),
+          keepaliveDelaySeconds, keepaliveDelaySeconds, TimeUnit.SECONDS);
     }
 
     @Override
@@ -179,18 +183,13 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler<NetconfSess
      */
     private class Keepalive implements Runnable, FutureCallback<DOMRpcResult> {
 
-        private final ScheduledFuture<?> previousKeepalive;
-
-        Keepalive(final ScheduledFuture<?> previousKeepalive) {
-            this.previousKeepalive = previousKeepalive;
-        }
-
         @Override
         public void run() {
             LOG.trace("{}: Invoking keepalive RPC", id);
 
             try {
-                if (previousKeepalive != null && !previousKeepalive.isDone()) {
+                boolean lastJobSucceeded = lastKeepAliveSucceeded.getAndSet(false);
+                if (!lastJobSucceeded) {
                     onFailure(new IllegalStateException("Previous keepalive timed out"));
                 } else {
                     Futures.addCallback(currentDeviceRpc.invokeRpc(PATH, KEEPALIVE_PAYLOAD), this,
@@ -212,11 +211,10 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler<NetconfSess
             // No matter what response we got, rpc-reply or rpc-error,
             // we got it from device so the netconf session is OK
             if (result != null && result.getResult() != null) {
-                LOG.debug("{}: Keepalive RPC successful with response: {}", id, result.getResult());
-                scheduleKeepalive();
-            } else if (result != null && !result.getErrors().isEmpty()) {
+                lastKeepAliveSucceeded.set(true);
+            }  else if (result != null && result.getErrors() != null) {
                 LOG.warn("{}: Keepalive RPC failed with error: {}", id, result.getErrors());
-                scheduleKeepalive();
+                lastKeepAliveSucceeded.set(true);
             } else {
                 LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
                 reconnect();