Move RemoteDeviceId
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
index cfd9dd81131d438f55665b2c473ee652bc270d00..85859608d49d2b2cfc82e689bc5fb89b3200d811 100644 (file)
@@ -5,42 +5,56 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.netconf.topology.singleton.impl;
 
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorSystem;
-import com.google.common.base.Preconditions;
+import akka.util.Timeout;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.util.concurrent.EventExecutor;
+import java.time.Duration;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
-import javax.annotation.Nonnull;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import org.opendaylight.aaa.encrypt.AAAEncryptionService;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
 import org.opendaylight.controller.config.threadpool.ThreadPool;
-import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
 import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceId;
+import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
+import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseNetconfSchemas;
 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
+import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeTopologyService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
@@ -48,6 +62,7 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,51 +72,75 @@ public class NetconfTopologyManager
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
 
-    private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
+    private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
-            clusterRegistrations = new HashMap<>();
-
-    private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
+            clusterRegistrations = new ConcurrentHashMap<>();
 
+    private final BaseNetconfSchemas baseSchemas;
     private final DataBroker dataBroker;
-    private final RpcProviderRegistry rpcProviderRegistry;
+    private final DOMRpcProviderService rpcProviderRegistry;
+    private final DOMActionProviderService actionProviderRegistry;
     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
-    private final BindingAwareBroker bindingAwareBroker;
-    private final ScheduledThreadPool keepaliveExecutor;
-    private final ThreadPool processingExecutor;
-    private final Broker domBroker;
+    private final ScheduledExecutorService keepaliveExecutor;
+    private final ListeningExecutorService processingExecutor;
     private final ActorSystem actorSystem;
     private final EventExecutor eventExecutor;
     private final NetconfClientDispatcher clientDispatcher;
     private final String topologyId;
+    private final Duration writeTxIdleTimeout;
+    private final DOMMountPointService mountPointService;
+    private final AAAEncryptionService encryptionService;
+    private final RpcProviderService rpcProviderService;
+    private final DeviceActionFactory deviceActionFactory;
+    private final SchemaResourceManager resourceManager;
 
-    public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
-                           final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
-                           final BindingAwareBroker bindingAwareBroker,
-                           final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
-                           final Broker domBroker, final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
-                           final NetconfClientDispatcher clientDispatcher, final String topologyId) {
-        this.dataBroker = Preconditions.checkNotNull(dataBroker);
-        this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
-        this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
-        this.bindingAwareBroker = Preconditions.checkNotNull(bindingAwareBroker);
-        this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
-        this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
-        this.domBroker = Preconditions.checkNotNull(domBroker);
-        this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
-        this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
-        this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
-        this.topologyId = Preconditions.checkNotNull(topologyId);
+    private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
+    private Registration rpcReg;
+    private String privateKeyPath;
+    private String privateKeyPassphrase;
+
+    public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
+                                  final DOMRpcProviderService rpcProviderRegistry,
+                                  final DOMActionProviderService actionProviderService,
+                                  final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
+                                  final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
+                                  final ActorSystemProvider actorSystemProvider,
+                                  final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
+                                  final String topologyId, final Config config,
+                                  final DOMMountPointService mountPointService,
+                                  final AAAEncryptionService encryptionService,
+                                  final RpcProviderService rpcProviderService,
+                                  final DeviceActionFactory deviceActionFactory,
+                                  final SchemaResourceManager resourceManager) {
+        this.baseSchemas = requireNonNull(baseSchemas);
+        this.dataBroker = requireNonNull(dataBroker);
+        this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
+        actionProviderRegistry = requireNonNull(actionProviderService);
+        this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
+        this.keepaliveExecutor = keepaliveExecutor.getExecutor();
+        this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
+        actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
+        this.eventExecutor = requireNonNull(eventExecutor);
+        this.clientDispatcher = requireNonNull(clientDispatcher);
+        this.topologyId = requireNonNull(topologyId);
+        writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
+        this.mountPointService = mountPointService;
+        this.encryptionService = requireNonNull(encryptionService);
+        this.rpcProviderService = requireNonNull(rpcProviderService);
+        this.deviceActionFactory = requireNonNull(deviceActionFactory);
+        this.resourceManager = requireNonNull(resourceManager);
     }
 
     // Blueprint init method
     public void init() {
-        dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId);
+        dataChangeListenerRegistration = registerDataTreeChangeListener();
+        rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
+            new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
     }
 
     @Override
-    public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
-        for (DataTreeModification<Node> change : changes) {
+    public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
+        for (final DataTreeModification<Node> change : changes) {
             final DataObjectModification<Node> rootNode = change.getRootNode();
             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
@@ -129,115 +168,155 @@ public class NetconfTopologyManager
         }
     }
 
-    private void refreshNetconfDeviceContext(InstanceIdentifier<Node> instanceIdentifier, Node node) {
+    private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
         context.refresh(createSetup(instanceIdentifier, node));
     }
 
+    // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
+    // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
+    // retry registration several times and log the error.
+    // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
-        final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
-        Preconditions.checkNotNull(netconfNode);
-        Preconditions.checkNotNull(netconfNode.getHost());
-        Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
+        final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
+
+        final Timeout actorResponseWaitTime = Timeout.create(
+                Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
 
         final ServiceGroupIdentifier serviceGroupIdent =
                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
 
-        final NetconfTopologyContext newNetconfTopologyContext =
-                new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent);
+        final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
+            createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
 
-        final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration  =
-                clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
+        int tries = 3;
+        while (true) {
+            try {
+                final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
+                        clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
+                clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
+                contexts.put(instanceIdentifier, newNetconfTopologyContext);
+                break;
+            } catch (final RuntimeException e) {
+                LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
 
-        clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
-        contexts.put(instanceIdentifier, newNetconfTopologyContext);
+                if (--tries <= 0) {
+                    LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
+                            newNetconfTopologyContext, e);
+                    close(newNetconfTopologyContext);
+                    break;
+                }
+            }
+        }
     }
 
     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
-        if (contexts.containsKey(instanceIdentifier)) {
-            try {
-                clusterRegistrations.get(instanceIdentifier).close();
-                contexts.get(instanceIdentifier).closeFinal();
-            } catch (Exception e) {
-                LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
-            }
-            contexts.remove(instanceIdentifier);
-            clusterRegistrations.remove(instanceIdentifier);
+        final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
+        if (netconfTopologyContext != null) {
+            close(clusterRegistrations.remove(instanceIdentifier));
+            close(netconfTopologyContext);
         }
     }
 
+    @VisibleForTesting
+    protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
+            final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
+            final DeviceActionFactory deviceActionFact) {
+        return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService,
+            deviceActionFact);
+    }
+
     @Override
     public void close() {
+        if (rpcReg != null) {
+            rpcReg.close();
+            rpcReg = null;
+        }
         if (dataChangeListenerRegistration != null) {
             dataChangeListenerRegistration.close();
             dataChangeListenerRegistration = null;
         }
-        contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
-            try {
-                netconfTopologyContext.closeFinal();
-            } catch (Exception e) {
-                LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
-            }
-        });
-        clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
-            try {
-                clusterSingletonServiceRegistration.close();
-            } catch (Exception e) {
-                LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
-            }
-        });
+
+        contexts.values().forEach(NetconfTopologyManager::close);
+        clusterRegistrations.values().forEach(NetconfTopologyManager::close);
+
         contexts.clear();
         clusterRegistrations.clear();
     }
 
-    private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(String topologyId) {
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private static void close(final AutoCloseable closeable) {
+        try {
+            closeable.close();
+        } catch (Exception e) {
+            LOG.warn("Error closing {}", closeable, e);
+        }
+    }
+
+    /**
+     * Sets the private key path from location specified in configuration file using blueprint.
+     */
+    public void setPrivateKeyPath(final String privateKeyPath) {
+        this.privateKeyPath = privateKeyPath;
+    }
+
+    /**
+     * Sets the private key passphrase from location specified in configuration file using blueprint.
+     */
+    public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
+        this.privateKeyPassphrase = privateKeyPassphrase;
+    }
+
+    private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
-        initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
-        initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
-        Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
+        // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
+        //        also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
+        //        oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
+        wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
+            .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
+            .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
+        wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
             @Override
-            public void onSuccess(Void result) {
+            public void onSuccess(final CommitInfo result) {
                 LOG.debug("topology initialization successful");
             }
 
             @Override
-            public void onFailure(@Nonnull Throwable throwable) {
-                LOG.error("Unable to initialize netconf-topology, {}", throwable);
+            public void onFailure(final Throwable throwable) {
+                LOG.error("Unable to initialize netconf-topology", throwable);
             }
-        });
+        }, MoreExecutors.directExecutor());
 
         LOG.debug("Registering datastore listener");
-        return dataBroker.registerDataTreeChangeListener(
-                        new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
-                                NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
-    }
-
-    private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, String topologyId) {
-        final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
-        final InstanceIdentifier<NetworkTopology> networkTopologyId =
-                InstanceIdentifier.builder(NetworkTopology.class).build();
-        wtx.merge(datastoreType, networkTopologyId, networkTopology);
-        final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
-        wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
-                new TopologyKey(new TopologyId(topologyId))), topology);
+        return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
+            NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
     }
 
     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
-        final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
+        final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
+        final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
+
+        return NetconfTopologySetupBuilder.create()
                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
+                .setBaseSchemas(baseSchemas)
                 .setDataBroker(dataBroker)
                 .setInstanceIdentifier(instanceIdentifier)
                 .setRpcProviderRegistry(rpcProviderRegistry)
+                .setActionProviderRegistry(actionProviderRegistry)
                 .setNode(node)
-                .setBindingAwareBroker(bindingAwareBroker)
                 .setActorSystem(actorSystem)
                 .setEventExecutor(eventExecutor)
-                .setDomBroker(domBroker)
                 .setKeepaliveExecutor(keepaliveExecutor)
                 .setProcessingExecutor(processingExecutor)
                 .setTopologyId(topologyId)
-                .setNetconfClientDispatcher(clientDispatcher);
-
-        return builder.build();
+                .setNetconfClientDispatcher(clientDispatcher)
+                .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
+                    deviceId))
+                .setIdleTimeout(writeTxIdleTimeout)
+                .setPrivateKeyPath(privateKeyPath)
+                .setPrivateKeyPassphrase(privateKeyPassphrase)
+                .setEncryptionService(encryptionService)
+                .build();
     }
 }