Fix netconf-connector-config groupId
[netconf.git] / opendaylight / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / impl / NetconfTopologyImpl.java
index 027836993fc6a898f4b19d8e15c0e1a4c98dca9b..0a147409038fa6e0597ef090eadab2d7466594c3 100644 (file)
 
 package org.opendaylight.netconf.topology.impl;
 
-import com.google.common.base.Preconditions;
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.util.concurrent.EventExecutor;
-import java.math.BigDecimal;
-import java.net.InetSocketAddress;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
 import org.opendaylight.controller.config.threadpool.ThreadPool;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
-import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
 import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Provider;
 import org.opendaylight.netconf.client.NetconfClientDispatcher;
-import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
-import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
-import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
-import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
-import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
-import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
-import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
-import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
-import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
-import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalFacade;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.NetconfTopology;
+import org.opendaylight.netconf.topology.AbstractNetconfTopology;
 import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
-import org.opendaylight.netconf.topology.TopologyMountPointFacade;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
-import org.opendaylight.protocol.framework.TimedReconnectStrategy;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.Credentials;
+import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
-import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class NetconfTopologyImpl implements NetconfTopology, BindingAwareProvider, Provider, AutoCloseable {
+public class NetconfTopologyImpl extends AbstractNetconfTopology implements DataTreeChangeListener<Node>, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyImpl.class);
 
-    private final String topologyId;
-    private final NetconfClientDispatcher clientDispatcher;
-    private final BindingAwareBroker bindingAwareBroker;
-    private final Broker domBroker;
-    private final EventExecutor eventExecutor;
-    private final ScheduledThreadPool keepaliveExecutor;
-    private final ThreadPool processingExecutor;
-    private final SchemaRepositoryProvider sharedSchemaRepository;
-
-    private SchemaSourceRegistry schemaSourceRegistry = null;
-    private SchemaContextFactory schemaContextFactory = null;
-
-    private DOMMountPointService mountPointService = null;
-    private DataBroker dataBroker = null;
-    private final HashMap<NodeId, NetconfConnectorDTO> activeConnectors = new HashMap<>();
+        private ListenerRegistration<NetconfTopologyImpl> datastoreListenerRegistration = null;
 
     public NetconfTopologyImpl(final String topologyId, final NetconfClientDispatcher clientDispatcher,
                                final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
                                final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
-                               final ThreadPool processingExecutor, final SchemaRepositoryProvider sharedSchemaRepository) {
-        this.topologyId = topologyId;
-        this.clientDispatcher = clientDispatcher;
-        this.bindingAwareBroker = bindingAwareBroker;
-        this.domBroker = domBroker;
-        this.eventExecutor = eventExecutor;
-        this.keepaliveExecutor = keepaliveExecutor;
-        this.processingExecutor = processingExecutor;
-        this.sharedSchemaRepository = sharedSchemaRepository;
-
+                               final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider) {
+        super(topologyId, clientDispatcher,
+                bindingAwareBroker, domBroker, eventExecutor,
+                keepaliveExecutor, processingExecutor, schemaRepositoryProvider);
         registerToSal(this, this);
     }
 
-    private void registerToSal(BindingAwareProvider baProvider, Provider provider) {
-        domBroker.registerProvider(provider);
-        bindingAwareBroker.registerProvider(baProvider);
-    }
-
     @Override
     public void close() throws Exception {
         // close all existing connectors, delete whole topology in datastore?
         for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
-            connectorDTO.getCommunicator().disconnect();
+            connectorDTO.getCommunicator().close();
         }
         activeConnectors.clear();
+
+        if (datastoreListenerRegistration != null) {
+            datastoreListenerRegistration.close();
+            datastoreListenerRegistration = null;
+        }
     }
 
     @Override
-    public String getTopologyId() {
-        return topologyId;
+    protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(RemoteDeviceId id, Broker domBroker, BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
+        return new NetconfDeviceSalFacade(id, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
     }
 
     @Override
-    public DataBroker getDataBroker() {
-        return Preconditions.checkNotNull(dataBroker, "DataBroker not initialized yet");
+    public void registerMountPoint(ActorContext context, NodeId nodeId) {
+        throw new UnsupportedOperationException("MountPoint registration is not supported in regular topology, this happens automaticaly in the netconf pipeline");
     }
 
     @Override
-    public ListenableFuture<NetconfDeviceCapabilities> connectNode(NodeId nodeId, Node configNode) {
-        return setupConnection(nodeId, configNode);
+    public void registerMountPoint(ActorContext context, NodeId nodeId, ActorRef masterRef) {
+        throw new UnsupportedOperationException("MountPoint registration is not supported in regular topology, this happens automaticaly in the netconf pipeline");
     }
 
     @Override
-    public ListenableFuture<Void> disconnectNode(NodeId nodeId) {
-        if (!activeConnectors.containsKey(nodeId)) {
-            return Futures.immediateFailedFuture(new IllegalStateException("Unable to disconnect device that is not connected"));
-        }
-
-        // retrieve connection, and disconnect it
-        activeConnectors.remove(nodeId).getCommunicator().disconnect();
-        return Futures.immediateFuture(null);
+    public void unregisterMountPoint(NodeId nodeId) {
+        throw new UnsupportedOperationException("MountPoint registration is not supported in regular topology, this happens automaticaly in the netconf pipeline");
     }
 
     @Override
-    public void registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener) {
-        activeConnectors.get(node).getMountPointFacade().registerConnectionStatusListener(listener);
+    public ConnectionStatusListenerRegistration registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener) {
+        throw new UnsupportedOperationException("Registering a listener on a regular netconf device is not supported(supported only in clustered netconf topology)");
     }
 
-    private ListenableFuture<NetconfDeviceCapabilities> setupConnection(final NodeId nodeId,
-                                                                        final Node configNode) {
-        final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
-
-        final NetconfConnectorDTO deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode);
-        final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator();
-        final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(deviceCommunicator, netconfNode);
-        final ListenableFuture<NetconfDeviceCapabilities> future = deviceCommunicator.initializeRemoteConnection(clientDispatcher, clientConfig);
+    @Override
+    public void onSessionInitiated(ProviderContext session) {
+        dataBroker = session.getSALService(DataBroker.class);
 
-        Futures.addCallback(future, new FutureCallback<NetconfDeviceCapabilities>() {
+        final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
+        initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
+        initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
+        Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
             @Override
-            public void onSuccess(NetconfDeviceCapabilities result) {
-                LOG.debug("Connector for : " + nodeId.getValue() + " started succesfully");
-                activeConnectors.put(nodeId, deviceCommunicatorDTO);
+            public void onSuccess(Void result) {
+                LOG.debug("topology initialization successful");
             }
 
             @Override
             public void onFailure(Throwable t) {
-                LOG.error("Connector for : " + nodeId.getValue() + " failed");
-                // remove this node from active connectors?
+                LOG.error("Unable to initialize netconf-topology, {}", t);
             }
         });
 
-        return future;
-    }
-
-    private NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
-                                                         final NetconfNode node) {
-        IpAddress ipAddress = node.getHost().getIpAddress();
-        InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
-                ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
-                node.getPort().getValue());
-        RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
-
-        // we might need to create a new SalFacade to maintain backwards compatibility with special case loopback connection
-        TopologyMountPointFacade mountPointFacade =
-                new TopologyMountPointFacade(remoteDeviceId, domBroker, bindingAwareBroker, node.getDefaultRequestTimeoutMillis());
-        RemoteDeviceHandler<NetconfSessionPreferences> salFacade = mountPointFacade;
-        if (node.getKeepaliveDelay() > 0) {
-            salFacade = new KeepaliveSalFacade(remoteDeviceId, mountPointFacade, keepaliveExecutor.getExecutor(), node.getKeepaliveDelay());
-        }
-
-        NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
-                new NetconfDevice.SchemaResourcesDTO(schemaSourceRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
-
-        NetconfDevice device = new NetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
-                processingExecutor.getExecutor(), node.isReconnectOnChangedSchema());
-
-        return new NetconfConnectorDTO(new NetconfDeviceCommunicator(remoteDeviceId, device), mountPointFacade);
-    }
-
-    public NetconfReconnectingClientConfiguration getClientConfig(final NetconfDeviceCommunicator listener, NetconfNode node) {
-        final InetSocketAddress socketAddress = getSocketAddress(node.getHost(), node.getPort().getValue());
-        final long clientConnectionTimeoutMillis = node.getDefaultRequestTimeoutMillis();
-
-        final ReconnectStrategyFactory sf = new TimedReconnectStrategyFactory(eventExecutor,
-                node.getMaxConnectionAttempts(), node.getBetweenAttemptsTimeoutMillis(), node.getSleepFactor());
-        final ReconnectStrategy strategy = sf.createReconnectStrategy();
-
-        final AuthenticationHandler authHandler;
-        final Credentials credentials = node.getCredentials();
-        if (credentials instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) {
-            authHandler = new LoginPassword(
-                    ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getUsername(),
-                    ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getPassword());
-        } else {
-            throw new IllegalStateException("Only login/password authentification is supported");
-        }
-
-        return NetconfReconnectingClientConfigurationBuilder.create()
-                .withAddress(socketAddress)
-                .withConnectionTimeoutMillis(clientConnectionTimeoutMillis)
-                .withReconnectStrategy(strategy)
-                .withAuthHandler(authHandler)
-                .withProtocol(node.isTcpOnly() ?
-                        NetconfClientConfiguration.NetconfClientProtocol.TCP :
-                        NetconfClientConfiguration.NetconfClientProtocol.SSH)
-                .withConnectStrategyFactory(sf)
-                .withSessionListener(listener)
-                .build();
-    }
+        LOG.debug("Registering datastore listener");
+        datastoreListenerRegistration =
+                dataBroker.registerDataTreeChangeListener(
+                        new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, createTopologyId(topologyId).child(Node.class)), this);
 
-    @Override
-    public void onSessionInitiated(ProviderSession session) {
-        mountPointService = session.getService(DOMMountPointService.class);
-    }
 
-    @Override
-    public Collection<ProviderFunctionality> getProviderFunctionality() {
-        return Collections.emptySet();
     }
 
     @Override
-    public void onSessionInitiated(ProviderContext session) {
-        dataBroker = session.getSALService(DataBroker.class);
-    }
-
-    private static final class NetconfConnectorDTO {
-
-        private final NetconfDeviceCommunicator communicator;
-        private final TopologyMountPointFacade mountPointFacade;
-
-        private NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final TopologyMountPointFacade mountPointFacade) {
-            this.communicator = communicator;
-            this.mountPointFacade = mountPointFacade;
-        }
-
-        public NetconfDeviceCommunicator getCommunicator() {
-            return communicator;
-        }
-
-        public TopologyMountPointFacade getMountPointFacade() {
-            return mountPointFacade;
-        }
-    }
-
-    private static final class TimedReconnectStrategyFactory implements ReconnectStrategyFactory {
-        private final Long connectionAttempts;
-        private final EventExecutor executor;
-        private final double sleepFactor;
-        private final int minSleep;
-
-        TimedReconnectStrategyFactory(final EventExecutor executor, final Long maxConnectionAttempts, final int minSleep, final BigDecimal sleepFactor) {
-            if (maxConnectionAttempts != null && maxConnectionAttempts > 0) {
-                connectionAttempts = maxConnectionAttempts;
-            } else {
-                connectionAttempts = null;
+    public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Node>> collection) {
+        for (DataTreeModification<Node> change : collection) {
+            final DataObjectModification<Node> rootNode = change.getRootNode();
+            switch (rootNode.getModificationType()) {
+                case SUBTREE_MODIFIED:
+                    LOG.debug("Config for node {} updated", getNodeId(rootNode.getIdentifier()));
+                    disconnectNode(getNodeId(rootNode.getIdentifier()));
+                    connectNode(getNodeId(rootNode.getIdentifier()), rootNode.getDataAfter());
+                    break;
+                case WRITE:
+                    LOG.debug("Config for node {} created", getNodeId(rootNode.getIdentifier()));
+                    if (activeConnectors.containsKey(getNodeId(rootNode.getIdentifier()))) {
+                        LOG.warn("RemoteDevice{{}} was already configured, reconfiguring..", getNodeId(rootNode.getIdentifier()));
+                        disconnectNode(getNodeId(rootNode.getIdentifier()));
+                    }
+                    connectNode(getNodeId(rootNode.getIdentifier()), rootNode.getDataAfter());
+                    break;
+                case DELETE:
+                    LOG.debug("Config for node {} deleted", getNodeId(rootNode.getIdentifier()));
+                    disconnectNode(getNodeId(rootNode.getIdentifier()));
+                    break;
             }
-
-            this.sleepFactor = sleepFactor.doubleValue();
-            this.executor = executor;
-            this.minSleep = minSleep;
-        }
-
-        @Override
-        public ReconnectStrategy createReconnectStrategy() {
-            final Long maxSleep = null;
-            final Long deadline = null;
-
-            return new TimedReconnectStrategy(executor, minSleep,
-                    minSleep, sleepFactor, maxSleep, connectionAttempts, deadline);
         }
     }
 
-    private InetSocketAddress getSocketAddress(final Host host, int port) {
-        if(host.getDomainName() != null) {
-            return new InetSocketAddress(host.getDomainName().getValue(), port);
-        } else {
-            final IpAddress ipAddress = host.getIpAddress();
-            final String ip = ipAddress.getIpv4Address() != null ? ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue();
-            return new InetSocketAddress(ip, port);
-        }
+    private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
+        final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
+        final InstanceIdentifier<NetworkTopology> networkTopologyId = InstanceIdentifier.builder(NetworkTopology.class).build();
+        wtx.merge(datastoreType, networkTopologyId, networkTopology);
+        final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
+        wtx.merge(datastoreType, networkTopologyId.child(Topology.class, new TopologyKey(new TopologyId(topologyId))), topology);
     }
+
 }