Fix netconf-connector-config groupId
[netconf.git] / opendaylight / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / impl / NetconfTopologyImpl.java
index 027836993fc6a898f4b19d8e15c0e1a4c98dca9b..0a147409038fa6e0597ef090eadab2d7466594c3 100644 (file)
 
 package org.opendaylight.netconf.topology.impl;
 
 
 package org.opendaylight.netconf.topology.impl;
 
-import com.google.common.base.Preconditions;
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.EventExecutor;
-import java.math.BigDecimal;
-import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
 import org.opendaylight.controller.config.threadpool.ThreadPool;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
 import org.opendaylight.controller.config.threadpool.ThreadPool;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
-import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Provider;
 import org.opendaylight.netconf.client.NetconfClientDispatcher;
 import org.opendaylight.netconf.client.NetconfClientDispatcher;
-import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
-import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
-import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
-import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
-import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
-import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
-import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemas;
-import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
-import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
-import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalFacade;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.NetconfTopology;
+import org.opendaylight.netconf.topology.AbstractNetconfTopology;
 import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
 import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
-import org.opendaylight.netconf.topology.TopologyMountPointFacade;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
-import org.opendaylight.protocol.framework.TimedReconnectStrategy;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.Credentials;
+import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
-import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class NetconfTopologyImpl implements NetconfTopology, BindingAwareProvider, Provider, AutoCloseable {
+public class NetconfTopologyImpl extends AbstractNetconfTopology implements DataTreeChangeListener<Node>, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyImpl.class);
 
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyImpl.class);
 
-    private final String topologyId;
-    private final NetconfClientDispatcher clientDispatcher;
-    private final BindingAwareBroker bindingAwareBroker;
-    private final Broker domBroker;
-    private final EventExecutor eventExecutor;
-    private final ScheduledThreadPool keepaliveExecutor;
-    private final ThreadPool processingExecutor;
-    private final SchemaRepositoryProvider sharedSchemaRepository;
-
-    private SchemaSourceRegistry schemaSourceRegistry = null;
-    private SchemaContextFactory schemaContextFactory = null;
-
-    private DOMMountPointService mountPointService = null;
-    private DataBroker dataBroker = null;
-    private final HashMap<NodeId, NetconfConnectorDTO> activeConnectors = new HashMap<>();
+        private ListenerRegistration<NetconfTopologyImpl> datastoreListenerRegistration = null;
 
     public NetconfTopologyImpl(final String topologyId, final NetconfClientDispatcher clientDispatcher,
                                final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
                                final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
 
     public NetconfTopologyImpl(final String topologyId, final NetconfClientDispatcher clientDispatcher,
                                final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
                                final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
-                               final ThreadPool processingExecutor, final SchemaRepositoryProvider sharedSchemaRepository) {
-        this.topologyId = topologyId;
-        this.clientDispatcher = clientDispatcher;
-        this.bindingAwareBroker = bindingAwareBroker;
-        this.domBroker = domBroker;
-        this.eventExecutor = eventExecutor;
-        this.keepaliveExecutor = keepaliveExecutor;
-        this.processingExecutor = processingExecutor;
-        this.sharedSchemaRepository = sharedSchemaRepository;
-
+                               final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider) {
+        super(topologyId, clientDispatcher,
+                bindingAwareBroker, domBroker, eventExecutor,
+                keepaliveExecutor, processingExecutor, schemaRepositoryProvider);
         registerToSal(this, this);
     }
 
         registerToSal(this, this);
     }
 
-    private void registerToSal(BindingAwareProvider baProvider, Provider provider) {
-        domBroker.registerProvider(provider);
-        bindingAwareBroker.registerProvider(baProvider);
-    }
-
     @Override
     public void close() throws Exception {
         // close all existing connectors, delete whole topology in datastore?
         for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
     @Override
     public void close() throws Exception {
         // close all existing connectors, delete whole topology in datastore?
         for (NetconfConnectorDTO connectorDTO : activeConnectors.values()) {
-            connectorDTO.getCommunicator().disconnect();
+            connectorDTO.getCommunicator().close();
         }
         activeConnectors.clear();
         }
         activeConnectors.clear();
+
+        if (datastoreListenerRegistration != null) {
+            datastoreListenerRegistration.close();
+            datastoreListenerRegistration = null;
+        }
     }
 
     @Override
     }
 
     @Override
-    public String getTopologyId() {
-        return topologyId;
+    protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(RemoteDeviceId id, Broker domBroker, BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
+        return new NetconfDeviceSalFacade(id, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
     }
 
     @Override
     }
 
     @Override
-    public DataBroker getDataBroker() {
-        return Preconditions.checkNotNull(dataBroker, "DataBroker not initialized yet");
+    public void registerMountPoint(ActorContext context, NodeId nodeId) {
+        throw new UnsupportedOperationException("MountPoint registration is not supported in regular topology, this happens automaticaly in the netconf pipeline");
     }
 
     @Override
     }
 
     @Override
-    public ListenableFuture<NetconfDeviceCapabilities> connectNode(NodeId nodeId, Node configNode) {
-        return setupConnection(nodeId, configNode);
+    public void registerMountPoint(ActorContext context, NodeId nodeId, ActorRef masterRef) {
+        throw new UnsupportedOperationException("MountPoint registration is not supported in regular topology, this happens automaticaly in the netconf pipeline");
     }
 
     @Override
     }
 
     @Override
-    public ListenableFuture<Void> disconnectNode(NodeId nodeId) {
-        if (!activeConnectors.containsKey(nodeId)) {
-            return Futures.immediateFailedFuture(new IllegalStateException("Unable to disconnect device that is not connected"));
-        }
-
-        // retrieve connection, and disconnect it
-        activeConnectors.remove(nodeId).getCommunicator().disconnect();
-        return Futures.immediateFuture(null);
+    public void unregisterMountPoint(NodeId nodeId) {
+        throw new UnsupportedOperationException("MountPoint registration is not supported in regular topology, this happens automaticaly in the netconf pipeline");
     }
 
     @Override
     }
 
     @Override
-    public void registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener) {
-        activeConnectors.get(node).getMountPointFacade().registerConnectionStatusListener(listener);
+    public ConnectionStatusListenerRegistration registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener) {
+        throw new UnsupportedOperationException("Registering a listener on a regular netconf device is not supported(supported only in clustered netconf topology)");
     }
 
     }
 
-    private ListenableFuture<NetconfDeviceCapabilities> setupConnection(final NodeId nodeId,
-                                                                        final Node configNode) {
-        final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
-
-        final NetconfConnectorDTO deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode);
-        final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator();
-        final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(deviceCommunicator, netconfNode);
-        final ListenableFuture<NetconfDeviceCapabilities> future = deviceCommunicator.initializeRemoteConnection(clientDispatcher, clientConfig);
+    @Override
+    public void onSessionInitiated(ProviderContext session) {
+        dataBroker = session.getSALService(DataBroker.class);
 
 
-        Futures.addCallback(future, new FutureCallback<NetconfDeviceCapabilities>() {
+        final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
+        initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
+        initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
+        Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
             @Override
             @Override
-            public void onSuccess(NetconfDeviceCapabilities result) {
-                LOG.debug("Connector for : " + nodeId.getValue() + " started succesfully");
-                activeConnectors.put(nodeId, deviceCommunicatorDTO);
+            public void onSuccess(Void result) {
+                LOG.debug("topology initialization successful");
             }
 
             @Override
             public void onFailure(Throwable t) {
             }
 
             @Override
             public void onFailure(Throwable t) {
-                LOG.error("Connector for : " + nodeId.getValue() + " failed");
-                // remove this node from active connectors?
+                LOG.error("Unable to initialize netconf-topology, {}", t);
             }
         });
 
             }
         });
 
-        return future;
-    }
-
-    private NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
-                                                         final NetconfNode node) {
-        IpAddress ipAddress = node.getHost().getIpAddress();
-        InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
-                ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
-                node.getPort().getValue());
-        RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
-
-        // we might need to create a new SalFacade to maintain backwards compatibility with special case loopback connection
-        TopologyMountPointFacade mountPointFacade =
-                new TopologyMountPointFacade(remoteDeviceId, domBroker, bindingAwareBroker, node.getDefaultRequestTimeoutMillis());
-        RemoteDeviceHandler<NetconfSessionPreferences> salFacade = mountPointFacade;
-        if (node.getKeepaliveDelay() > 0) {
-            salFacade = new KeepaliveSalFacade(remoteDeviceId, mountPointFacade, keepaliveExecutor.getExecutor(), node.getKeepaliveDelay());
-        }
-
-        NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
-                new NetconfDevice.SchemaResourcesDTO(schemaSourceRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
-
-        NetconfDevice device = new NetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
-                processingExecutor.getExecutor(), node.isReconnectOnChangedSchema());
-
-        return new NetconfConnectorDTO(new NetconfDeviceCommunicator(remoteDeviceId, device), mountPointFacade);
-    }
-
-    public NetconfReconnectingClientConfiguration getClientConfig(final NetconfDeviceCommunicator listener, NetconfNode node) {
-        final InetSocketAddress socketAddress = getSocketAddress(node.getHost(), node.getPort().getValue());
-        final long clientConnectionTimeoutMillis = node.getDefaultRequestTimeoutMillis();
-
-        final ReconnectStrategyFactory sf = new TimedReconnectStrategyFactory(eventExecutor,
-                node.getMaxConnectionAttempts(), node.getBetweenAttemptsTimeoutMillis(), node.getSleepFactor());
-        final ReconnectStrategy strategy = sf.createReconnectStrategy();
-
-        final AuthenticationHandler authHandler;
-        final Credentials credentials = node.getCredentials();
-        if (credentials instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) {
-            authHandler = new LoginPassword(
-                    ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getUsername(),
-                    ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getPassword());
-        } else {
-            throw new IllegalStateException("Only login/password authentification is supported");
-        }
-
-        return NetconfReconnectingClientConfigurationBuilder.create()
-                .withAddress(socketAddress)
-                .withConnectionTimeoutMillis(clientConnectionTimeoutMillis)
-                .withReconnectStrategy(strategy)
-                .withAuthHandler(authHandler)
-                .withProtocol(node.isTcpOnly() ?
-                        NetconfClientConfiguration.NetconfClientProtocol.TCP :
-                        NetconfClientConfiguration.NetconfClientProtocol.SSH)
-                .withConnectStrategyFactory(sf)
-                .withSessionListener(listener)
-                .build();
-    }
+        LOG.debug("Registering datastore listener");
+        datastoreListenerRegistration =
+                dataBroker.registerDataTreeChangeListener(
+                        new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, createTopologyId(topologyId).child(Node.class)), this);
 
 
-    @Override
-    public void onSessionInitiated(ProviderSession session) {
-        mountPointService = session.getService(DOMMountPointService.class);
-    }
 
 
-    @Override
-    public Collection<ProviderFunctionality> getProviderFunctionality() {
-        return Collections.emptySet();
     }
 
     @Override
     }
 
     @Override
-    public void onSessionInitiated(ProviderContext session) {
-        dataBroker = session.getSALService(DataBroker.class);
-    }
-
-    private static final class NetconfConnectorDTO {
-
-        private final NetconfDeviceCommunicator communicator;
-        private final TopologyMountPointFacade mountPointFacade;
-
-        private NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final TopologyMountPointFacade mountPointFacade) {
-            this.communicator = communicator;
-            this.mountPointFacade = mountPointFacade;
-        }
-
-        public NetconfDeviceCommunicator getCommunicator() {
-            return communicator;
-        }
-
-        public TopologyMountPointFacade getMountPointFacade() {
-            return mountPointFacade;
-        }
-    }
-
-    private static final class TimedReconnectStrategyFactory implements ReconnectStrategyFactory {
-        private final Long connectionAttempts;
-        private final EventExecutor executor;
-        private final double sleepFactor;
-        private final int minSleep;
-
-        TimedReconnectStrategyFactory(final EventExecutor executor, final Long maxConnectionAttempts, final int minSleep, final BigDecimal sleepFactor) {
-            if (maxConnectionAttempts != null && maxConnectionAttempts > 0) {
-                connectionAttempts = maxConnectionAttempts;
-            } else {
-                connectionAttempts = null;
+    public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Node>> collection) {
+        for (DataTreeModification<Node> change : collection) {
+            final DataObjectModification<Node> rootNode = change.getRootNode();
+            switch (rootNode.getModificationType()) {
+                case SUBTREE_MODIFIED:
+                    LOG.debug("Config for node {} updated", getNodeId(rootNode.getIdentifier()));
+                    disconnectNode(getNodeId(rootNode.getIdentifier()));
+                    connectNode(getNodeId(rootNode.getIdentifier()), rootNode.getDataAfter());
+                    break;
+                case WRITE:
+                    LOG.debug("Config for node {} created", getNodeId(rootNode.getIdentifier()));
+                    if (activeConnectors.containsKey(getNodeId(rootNode.getIdentifier()))) {
+                        LOG.warn("RemoteDevice{{}} was already configured, reconfiguring..", getNodeId(rootNode.getIdentifier()));
+                        disconnectNode(getNodeId(rootNode.getIdentifier()));
+                    }
+                    connectNode(getNodeId(rootNode.getIdentifier()), rootNode.getDataAfter());
+                    break;
+                case DELETE:
+                    LOG.debug("Config for node {} deleted", getNodeId(rootNode.getIdentifier()));
+                    disconnectNode(getNodeId(rootNode.getIdentifier()));
+                    break;
             }
             }
-
-            this.sleepFactor = sleepFactor.doubleValue();
-            this.executor = executor;
-            this.minSleep = minSleep;
-        }
-
-        @Override
-        public ReconnectStrategy createReconnectStrategy() {
-            final Long maxSleep = null;
-            final Long deadline = null;
-
-            return new TimedReconnectStrategy(executor, minSleep,
-                    minSleep, sleepFactor, maxSleep, connectionAttempts, deadline);
         }
     }
 
         }
     }
 
-    private InetSocketAddress getSocketAddress(final Host host, int port) {
-        if(host.getDomainName() != null) {
-            return new InetSocketAddress(host.getDomainName().getValue(), port);
-        } else {
-            final IpAddress ipAddress = host.getIpAddress();
-            final String ip = ipAddress.getIpv4Address() != null ? ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue();
-            return new InetSocketAddress(ip, port);
-        }
+    private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
+        final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
+        final InstanceIdentifier<NetworkTopology> networkTopologyId = InstanceIdentifier.builder(NetworkTopology.class).build();
+        wtx.merge(datastoreType, networkTopologyId, networkTopology);
+        final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
+        wtx.merge(datastoreType, networkTopologyId.child(Topology.class, new TopologyKey(new TopologyId(topologyId))), topology);
     }
     }
+
 }
 }