Merge "Migrate restconf to MD-SAL APIs"
[netconf.git] / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / AbstractNetconfTopology.java
index c9ec7bf87facb077e98af782900f0283749e0d6d..66eea5692b4838f2fec4d81e167135de3f3087f1 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import io.netty.handler.ssl.SslHandler;
@@ -51,6 +52,7 @@ import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurati
 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPasswordHandler;
+import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
 import org.opendaylight.netconf.sal.connect.api.RemoteDevice;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.netconf.sal.connect.netconf.LibraryModulesSchemas;
@@ -200,15 +202,16 @@ public abstract class AbstractNetconfTopology implements NetconfTopology {
         }
     }
 
-    protected final String topologyId;
     private final NetconfClientDispatcher clientDispatcher;
     private final EventExecutor eventExecutor;
+    private final DeviceActionFactory deviceActionFactory;
+    private final NetconfKeystoreAdapter keystoreAdapter;
     protected final ScheduledThreadPool keepaliveExecutor;
-    protected final ThreadPool processingExecutor;
+    protected final ListeningExecutorService processingExecutor;
     protected final SharedSchemaRepository sharedSchemaRepository;
     protected final DataBroker dataBroker;
     protected final DOMMountPointService mountPointService;
-    private final NetconfKeystoreAdapter keystoreAdapter;
+    protected final String topologyId;
     protected SchemaSourceRegistry schemaRegistry = DEFAULT_SCHEMA_REPOSITORY;
     protected SchemaRepository schemaRepository = DEFAULT_SCHEMA_REPOSITORY;
     protected SchemaContextFactory schemaContextFactory = DEFAULT_SCHEMA_CONTEXT_FACTORY;
@@ -222,12 +225,14 @@ public abstract class AbstractNetconfTopology implements NetconfTopology {
                                       final ThreadPool processingExecutor,
                                       final SchemaRepositoryProvider schemaRepositoryProvider,
                                       final DataBroker dataBroker, final DOMMountPointService mountPointService,
-                                      final AAAEncryptionService encryptionService) {
+                                      final AAAEncryptionService encryptionService,
+                                      final DeviceActionFactory deviceActionFactory) {
         this.topologyId = topologyId;
         this.clientDispatcher = clientDispatcher;
         this.eventExecutor = eventExecutor;
         this.keepaliveExecutor = keepaliveExecutor;
-        this.processingExecutor = processingExecutor;
+        this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
+        this.deviceActionFactory = deviceActionFactory;
         this.sharedSchemaRepository = schemaRepositoryProvider.getSharedSchemaRepository();
         this.dataBroker = dataBroker;
         this.mountPointService = mountPointService;
@@ -286,12 +291,12 @@ public abstract class AbstractNetconfTopology implements NetconfTopology {
         Futures.addCallback(future, new FutureCallback<NetconfDeviceCapabilities>() {
             @Override
             public void onSuccess(final NetconfDeviceCapabilities result) {
-                LOG.debug("Connector for : " + nodeId.getValue() + " started succesfully");
+                LOG.debug("Connector for {} started succesfully", nodeId.getValue());
             }
 
             @Override
             public void onFailure(final Throwable throwable) {
-                LOG.error("Connector for : " + nodeId.getValue() + " failed");
+                LOG.error("Connector for {} failed", nodeId.getValue(), throwable);
                 // remove this node from active connectors?
             }
         }, MoreExecutors.directExecutor());
@@ -299,8 +304,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology {
         return future;
     }
 
-    protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
-                                                           final NetconfNode node) {
+    protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId, final NetconfNode node) {
         //setup default values since default value is not supported in mdsal
         final long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null
                 ? DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
@@ -320,7 +324,7 @@ public abstract class AbstractNetconfTopology implements NetconfTopology {
 
         if (keepaliveDelay > 0) {
             LOG.warn("Adding keepalive facade, for device {}", nodeId);
-            salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(),
+            salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, this.keepaliveExecutor.getExecutor(),
                     keepaliveDelay, defaultRequestTimeoutMillis);
         }
 
@@ -356,13 +360,16 @@ public abstract class AbstractNetconfTopology implements NetconfTopology {
         if (node.isSchemaless()) {
             device = new SchemalessNetconfDevice(remoteDeviceId, salFacade);
         } else {
-            device = new NetconfDeviceBuilder()
+            NetconfDeviceBuilder netconfDeviceBuilder = new NetconfDeviceBuilder()
                     .setReconnectOnSchemasChange(reconnectOnChangedSchema)
                     .setSchemaResourcesDTO(schemaResourcesDTO)
-                    .setGlobalProcessingExecutor(processingExecutor.getExecutor())
+                    .setGlobalProcessingExecutor(this.processingExecutor)
                     .setId(remoteDeviceId)
-                    .setSalFacade(salFacade)
-                    .build();
+                    .setSalFacade(salFacade);
+            if (this.deviceActionFactory != null) {
+                netconfDeviceBuilder.setDeviceActionFactory(this.deviceActionFactory);
+            }
+            device = netconfDeviceBuilder.build();
         }
 
         final Optional<UserPreferences> userCapabilities = getUserCapabilities(node);
@@ -373,9 +380,15 @@ public abstract class AbstractNetconfTopology implements NetconfTopology {
             LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId);
         }
 
-        return new NetconfConnectorDTO(userCapabilities.isPresent()
-                ? new NetconfDeviceCommunicator(remoteDeviceId, device, userCapabilities.get(), rpcMessageLimit)
-                : new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit), salFacade);
+        NetconfDeviceCommunicator netconfDeviceCommunicator =
+             userCapabilities.isPresent() ? new NetconfDeviceCommunicator(remoteDeviceId, device,
+                     userCapabilities.get(), rpcMessageLimit)
+            : new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit);
+
+        if (salFacade instanceof KeepaliveSalFacade) {
+            ((KeepaliveSalFacade)salFacade).setListener(netconfDeviceCommunicator);
+        }
+        return new NetconfConnectorDTO(netconfDeviceCommunicator, salFacade);
     }
 
     protected NetconfDevice.SchemaResourcesDTO setupSchemaCacheDTO(final NodeId nodeId, final NetconfNode node) {
@@ -503,6 +516,10 @@ public abstract class AbstractNetconfTopology implements NetconfTopology {
         } else {
             throw new IllegalStateException("Unsupported protocol type: " + node.getProtocol().getName().getClass());
         }
+        if (node.getOdlHelloMessageCapabilities() != null) {
+            reconnectingClientConfigurationBuilder
+                    .withOdlHelloCapabilities(node.getOdlHelloMessageCapabilities().getCapability());
+        }
 
         return reconnectingClientConfigurationBuilder
                 .withAddress(socketAddress)