X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fconfig-persister-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fpersist%2Fimpl%2Fosgi%2FConfigPersisterActivator.java;h=1246c78fbe21539589a23d641e58f4a1846da5c8;hb=c0c97bdca0c42607e8034bc91f51edf96d9e72a9;hp=857912021b0e21d07f60d740bbc6f6372ef0f6b6;hpb=335aee8c6154a25c76503223e636398d32135bc9;p=controller.git diff --git a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/osgi/ConfigPersisterActivator.java b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/osgi/ConfigPersisterActivator.java index 857912021b..1246c78fbe 100644 --- a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/osgi/ConfigPersisterActivator.java +++ b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/osgi/ConfigPersisterActivator.java @@ -8,13 +8,15 @@ package org.opendaylight.controller.netconf.persist.impl.osgi; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import org.opendaylight.controller.netconf.client.NetconfClient; import org.opendaylight.controller.netconf.persist.impl.ConfigPersisterNotificationHandler; import org.opendaylight.controller.netconf.persist.impl.ConfigPusher; +import org.opendaylight.controller.netconf.persist.impl.ConfigPusherConfiguration; +import org.opendaylight.controller.netconf.persist.impl.ConfigPusherConfigurationBuilder; import org.opendaylight.controller.netconf.persist.impl.PersisterAggregator; -import org.opendaylight.controller.netconf.persist.impl.Util; import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; @@ -24,17 +26,16 @@ import org.slf4j.LoggerFactory; import javax.management.MBeanServer; import java.lang.management.ManagementFactory; import java.net.InetSocketAddress; +import java.util.concurrent.ThreadFactory; import java.util.regex.Pattern; -import java.util.concurrent.TimeUnit; public class ConfigPersisterActivator implements BundleActivator { private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterActivator.class); - private final static MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer(); - private static final String IGNORED_MISSING_CAPABILITY_REGEX_SUFFIX = "ignoredMissingCapabilityRegex"; + public static final String IGNORED_MISSING_CAPABILITY_REGEX_SUFFIX = "ignoredMissingCapabilityRegex"; - private static final String PUSH_TIMEOUT = "pushTimeout"; + public static final String MAX_WAIT_FOR_CAPABILITIES_MILLIS = "maxWaitForCapabilitiesMillis"; public static final String NETCONF_CONFIG_PERSISTER = "netconf.config.persister"; @@ -42,75 +43,114 @@ public class ConfigPersisterActivator implements BundleActivator { public static final String DEFAULT_IGNORED_REGEX = "^urn:ietf:params:xml:ns:netconf:base:1.0"; + private final MBeanServer platformMBeanServer; + private final Optional initialConfigForPusher; private volatile ConfigPersisterNotificationHandler jmxNotificationHandler; - private volatile NetconfClient netconfClient; private Thread initializationThread; - private EventLoopGroup nettyThreadgroup; + private ThreadFactory initializationThreadFactory; + private EventLoopGroup nettyThreadGroup; private PersisterAggregator persisterAggregator; + public ConfigPersisterActivator() { + this(new ThreadFactory() { + @Override + public Thread newThread(Runnable initializationRunnable) { + return new Thread(initializationRunnable, "ConfigPersister-registrator"); + } + }, ManagementFactory.getPlatformMBeanServer(), null); + } + + @VisibleForTesting + protected ConfigPersisterActivator(ThreadFactory threadFactory, MBeanServer mBeanServer, + ConfigPusherConfiguration initialConfigForPusher) { + this.initializationThreadFactory = threadFactory; + this.platformMBeanServer = mBeanServer; + this.initialConfigForPusher = Optional.fromNullable(initialConfigForPusher); + } + @Override public void start(final BundleContext context) throws Exception { logger.debug("ConfigPersister starting"); PropertiesProviderBaseImpl propertiesProvider = new PropertiesProviderBaseImpl(context); - String regexProperty = propertiesProvider.getProperty(IGNORED_MISSING_CAPABILITY_REGEX_SUFFIX); - String regex; - if (regexProperty != null) { - regex = regexProperty; - } else { - regex = DEFAULT_IGNORED_REGEX; - } - - String timeoutProperty = propertiesProvider.getProperty(PUSH_TIMEOUT); - long timeout = timeoutProperty == null ? ConfigPusher.DEFAULT_TIMEOUT : TimeUnit.SECONDS.toNanos(Integer.valueOf(timeoutProperty)); - - final Pattern ignoredMissingCapabilityRegex = Pattern.compile(regex); - nettyThreadgroup = new NioEventLoopGroup(); + final Pattern ignoredMissingCapabilityRegex = getIgnoredCapabilitiesProperty(propertiesProvider); persisterAggregator = PersisterAggregator.createFromProperties(propertiesProvider); - final InetSocketAddress address = NetconfConfigUtil.extractTCPNetconfAddress(context, "Netconf is not configured, persister is not operational", true); - final ConfigPusher configPusher = new ConfigPusher(address, nettyThreadgroup); + final ConfigPusher configPusher = new ConfigPusher(getConfigurationForPusher(context, propertiesProvider)); // offload initialization to another thread in order to stop blocking activator Runnable initializationRunnable = new Runnable() { @Override public void run() { try { - netconfClient = configPusher.init(persisterAggregator.loadLastConfigs()); - jmxNotificationHandler = new ConfigPersisterNotificationHandler( - platformMBeanServer, netconfClient, persisterAggregator, + configPusher.pushConfigs(persisterAggregator.loadLastConfigs()); + jmxNotificationHandler = new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator, ignoredMissingCapabilityRegex); - jmxNotificationHandler.init(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("Interrupted while waiting for netconf connection"); // uncaught exception handler will deal with this failure throw new RuntimeException("Interrupted while waiting for netconf connection", e); } + logger.info("Configuration Persister initialization completed."); } }; - initializationThread = new Thread(initializationRunnable, "ConfigPersister-registrator"); + + initializationThread = initializationThreadFactory.newThread(initializationRunnable); initializationThread.start(); } + private Pattern getIgnoredCapabilitiesProperty(PropertiesProviderBaseImpl propertiesProvider) { + String regexProperty = propertiesProvider.getProperty(IGNORED_MISSING_CAPABILITY_REGEX_SUFFIX); + String regex; + if (regexProperty != null) { + regex = regexProperty; + } else { + regex = DEFAULT_IGNORED_REGEX; + } + return Pattern.compile(regex); + } + + private Optional getMaxWaitForCapabilitiesProperty(PropertiesProviderBaseImpl propertiesProvider) { + String timeoutProperty = propertiesProvider.getProperty(MAX_WAIT_FOR_CAPABILITIES_MILLIS); + return Optional.fromNullable(timeoutProperty == null ? null : Long.valueOf(timeoutProperty)); + } + + private ConfigPusherConfiguration getConfigurationForPusher(BundleContext context, + PropertiesProviderBaseImpl propertiesProvider) { + + // If configuration was injected via constructor, use it + if(initialConfigForPusher.isPresent()) + return initialConfigForPusher.get(); + + Optional maxWaitForCapabilitiesMillis = getMaxWaitForCapabilitiesProperty(propertiesProvider); + final InetSocketAddress address = NetconfConfigUtil.extractTCPNetconfAddress(context, + "Netconf is not configured, persister is not operational", true); + + nettyThreadGroup = new NioEventLoopGroup(); + + ConfigPusherConfigurationBuilder configPusherConfigurationBuilder = ConfigPusherConfigurationBuilder.aConfigPusherConfiguration(); + + if(maxWaitForCapabilitiesMillis.isPresent()) + configPusherConfigurationBuilder.withNetconfCapabilitiesWaitTimeoutMs(maxWaitForCapabilitiesMillis.get()); + + return configPusherConfigurationBuilder + .withEventLoopGroup(nettyThreadGroup) + .withNetconfAddress(address) + .build(); + } + @Override public void stop(BundleContext context) throws Exception { initializationThread.interrupt(); if (jmxNotificationHandler != null) { jmxNotificationHandler.close(); } - if (netconfClient != null) { - netconfClient = jmxNotificationHandler.getNetconfClient(); - try { - Util.closeClientAndDispatcher(netconfClient); - } catch (Exception e) { - logger.warn("Unable to close connection to netconf {}", netconfClient, e); - } - } - nettyThreadgroup.shutdownGracefully(); + if(nettyThreadGroup!=null) + nettyThreadGroup.shutdownGracefully(); persisterAggregator.close(); } }