BUG-1018 Implement BackwardsCompatible BI broker data notifications
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / config / yang / md / sal / connector / netconf / NetconfConnectorModule.java
index a04baaa91298ccf218c74958d76596c89545b204..feeab1a4b79b01d51ec5070474718e858fbedee7 100644 (file)
@@ -7,37 +7,53 @@
  */
 package org.opendaylight.controller.config.yang.md.sal.connector.netconf;
 
+import static org.opendaylight.controller.config.api.JmxAttributeValidationException.checkCondition;
+import static org.opendaylight.controller.config.api.JmxAttributeValidationException.checkNotNull;
+
 import com.google.common.net.InetAddresses;
-import io.netty.channel.EventLoopGroup;
+import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.GlobalEventExecutor;
+import java.io.File;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
-import org.opendaylight.controller.netconf.client.NetconfSshClientDispatcher;
-import org.opendaylight.controller.netconf.util.handler.ssh.authentication.AuthenticationHandler;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfigurationBuilder;
+import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
 import org.opendaylight.controller.netconf.util.handler.ssh.authentication.LoginPassword;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.controller.sal.connect.netconf.InventoryUtils;
 import org.opendaylight.controller.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.controller.sal.connect.netconf.NetconfDeviceListener;
+import org.opendaylight.controller.sal.core.api.data.DataChangeListener;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
 import org.opendaylight.protocol.framework.TimedReconnectStrategy;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
 import org.opendaylight.yangtools.yang.model.util.repo.FilesystemSchemaCachingProvider;
 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProviders;
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.opendaylight.controller.config.api.JmxAttributeValidationException.checkCondition;
-import static org.opendaylight.controller.config.api.JmxAttributeValidationException.checkNotNull;
-
 /**
-*
-*/
+ *
+ */
 public final class NetconfConnectorModule extends org.opendaylight.controller.config.yang.md.sal.connector.netconf.AbstractNetconfConnectorModule
 {
     private static final Logger logger = LoggerFactory.getLogger(NetconfConnectorModule.class);
@@ -68,88 +84,122 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
         checkNotNull(getBetweenAttemptsTimeoutMillis(), betweenAttemptsTimeoutMillisJmxAttribute);
         checkCondition(getBetweenAttemptsTimeoutMillis() > 0, "must be > 0", betweenAttemptsTimeoutMillisJmxAttribute);
 
+        // FIXME BUG-944 remove backwards compatibility
+        if(getClientDispatcher() == null) {
+            checkCondition(getBossThreadGroup() != null, "Client dispatcher was not set, thread groups have to be set instead", bossThreadGroupJmxAttribute);
+            checkCondition(getWorkerThreadGroup() != null, "Client dispatcher was not set, thread groups have to be set instead", workerThreadGroupJmxAttribute);
+        }
+
+        // Check username + password in case of ssh
+        if(getTcpOnly() == false) {
+            checkNotNull(getUsername(), usernameJmxAttribute);
+            checkNotNull(getPassword(), passwordJmxAttribute);
+        }
+
     }
 
     @Override
     public java.lang.AutoCloseable createInstance() {
-        
+        ServiceReference<DataProviderService> serviceReference = bundleContext.getServiceReference(DataProviderService.class);
+
+        DataProviderService dataProviderService =
+                bundleContext.getService(serviceReference);
+
         getDomRegistryDependency();
         NetconfDevice device = new NetconfDevice(getIdentifier().getInstanceName());
-        String addressValue = getAddress();
 
-        Long connectionAttempts;
-        if (getMaxConnectionAttempts() != null && getMaxConnectionAttempts() > 0) {
-            connectionAttempts = getMaxConnectionAttempts();
-        } else {
-            logger.trace("Setting {} on {} to infinity", maxConnectionAttemptsJmxAttribute, this);
-            connectionAttempts = null;
-        }
-        long clientConnectionTimeoutMillis = getConnectionTimeoutMillis();
-        /*
-         * Uncomment after Switch to IP Address
-        if(getAddress().getIpv4Address() != null) {
-            addressValue = getAddress().getIpv4Address().getValue();
-        } else {
-            addressValue = getAddress().getIpv6Address().getValue();
-        }
-        */
-        double sleepFactor = 1.0;
-        int minSleep = 1000;
-        Long maxSleep = null;
-        Long deadline = null;
-        ReconnectStrategy strategy = new TimedReconnectStrategy(GlobalEventExecutor.INSTANCE, getBetweenAttemptsTimeoutMillis(),
-                minSleep, sleepFactor, maxSleep, connectionAttempts, deadline);
-        
-        device.setReconnectStrategy(strategy);
-        
-        InetAddress addr = InetAddresses.forString(addressValue);
-        InetSocketAddress socketAddress = new InetSocketAddress(addr , getPort().intValue());
+        device.setClientConfig(getClientConfig(device));
 
-        
         device.setProcessingExecutor(getGlobalProcessingExecutor());
-        
-        device.setSocketAddress(socketAddress);
+
         device.setEventExecutor(getEventExecutorDependency());
-        device.setDispatcher(createDispatcher(clientConnectionTimeoutMillis));
+        device.setDispatcher(getClientDispatcher() == null ? createDispatcher() : getClientDispatcherDependency());
         device.setSchemaSourceProvider(getGlobalNetconfSchemaProvider(bundleContext));
-        
+        device.setDataProviderService(dataProviderService);
         getDomRegistryDependency().registerProvider(device, bundleContext);
         device.start();
         return device;
     }
 
     private ExecutorService getGlobalProcessingExecutor() {
-        if(GLOBAL_PROCESSING_EXECUTOR == null) {
-            
-            GLOBAL_PROCESSING_EXECUTOR = Executors.newCachedThreadPool();
-            
-        }
-        return GLOBAL_PROCESSING_EXECUTOR;
+        return GLOBAL_PROCESSING_EXECUTOR == null ? Executors.newCachedThreadPool() : GLOBAL_PROCESSING_EXECUTOR;
     }
 
     private synchronized AbstractCachingSchemaSourceProvider<String, InputStream> getGlobalNetconfSchemaProvider(BundleContext bundleContext) {
         if(GLOBAL_NETCONF_SOURCE_PROVIDER == null) {
             String storageFile = "cache/schema";
-//            File directory = bundleContext.getDataFile(storageFile);
-            File directory = new File("cache/schema");
+            //            File directory = bundleContext.getDataFile(storageFile);
+            File directory = new File(storageFile);
             SchemaSourceProvider<String> defaultProvider = SchemaSourceProviders.noopProvider();
             GLOBAL_NETCONF_SOURCE_PROVIDER = FilesystemSchemaCachingProvider.createFromStringSourceProvider(defaultProvider, directory);
         }
         return GLOBAL_NETCONF_SOURCE_PROVIDER;
     }
 
-    private NetconfClientDispatcher createDispatcher(long clientConnectionTimeoutMillis) {
-        EventLoopGroup bossGroup = getBossThreadGroupDependency();
-        EventLoopGroup workerGroup = getWorkerThreadGroupDependency();
-        if(getTcpOnly()) {
-            return new NetconfClientDispatcher( bossGroup, workerGroup, clientConnectionTimeoutMillis);
-        } else {
-            AuthenticationHandler authHandler = new LoginPassword(getUsername(),getPassword());
-            return new NetconfSshClientDispatcher(authHandler , bossGroup, workerGroup, clientConnectionTimeoutMillis);
-        }
+    // FIXME BUG-944 remove backwards compatibility
+    /**
+     * @deprecated Use getClientDispatcherDependency method instead to retrieve injected dispatcher.
+     * This one creates new instance of NetconfClientDispatcher and will be removed in near future.
+     */
+    @Deprecated
+    private NetconfClientDispatcher createDispatcher() {
+        return new NetconfClientDispatcherImpl(getBossThreadGroupDependency(), getWorkerThreadGroupDependency(), new HashedWheelTimer());
     }
 
     public void setBundleContext(BundleContext bundleContext) {
         this.bundleContext = bundleContext;
     }
+
+    public NetconfReconnectingClientConfiguration getClientConfig(final NetconfDevice device) {
+        InetSocketAddress socketAddress = getSocketAddress();
+        ReconnectStrategy strategy = getReconnectStrategy();
+        long clientConnectionTimeoutMillis = getConnectionTimeoutMillis();
+
+        return NetconfReconnectingClientConfigurationBuilder.create()
+        .withAddress(socketAddress)
+        .withConnectionTimeoutMillis(clientConnectionTimeoutMillis)
+        .withReconnectStrategy(strategy)
+        .withSessionListener(new NetconfDeviceListener(device))
+        .withAuthHandler(new LoginPassword(getUsername(),getPassword()))
+        .withProtocol(getTcpOnly() ?
+                NetconfClientConfiguration.NetconfClientProtocol.TCP :
+                NetconfClientConfiguration.NetconfClientProtocol.SSH)
+        .withConnectStrategyFactory(new ReconnectStrategyFactory() {
+            @Override
+            public ReconnectStrategy createReconnectStrategy() {
+                return getReconnectStrategy();
+            }
+        })
+        .build();
+    }
+
+    private ReconnectStrategy getReconnectStrategy() {
+        Long connectionAttempts;
+        if (getMaxConnectionAttempts() != null && getMaxConnectionAttempts() > 0) {
+            connectionAttempts = getMaxConnectionAttempts();
+        } else {
+            logger.trace("Setting {} on {} to infinity", maxConnectionAttemptsJmxAttribute, this);
+            connectionAttempts = null;
+        }
+        double sleepFactor = 1.5;
+        int minSleep = 1000;
+        Long maxSleep = null;
+        Long deadline = null;
+
+        return new TimedReconnectStrategy(GlobalEventExecutor.INSTANCE, getBetweenAttemptsTimeoutMillis(),
+                minSleep, sleepFactor, maxSleep, connectionAttempts, deadline);
+    }
+
+    private InetSocketAddress getSocketAddress() {
+        /*
+         * Uncomment after Switch to IP Address
+        if(getAddress().getIpv4Address() != null) {
+            addressValue = getAddress().getIpv4Address().getValue();
+        } else {
+            addressValue = getAddress().getIpv6Address().getValue();
+        }
+         */
+        InetAddress inetAddress = InetAddresses.forString(getAddress());
+        return new InetSocketAddress(inetAddress, getPort().intValue());
+    }
 }