From: Ed Warnicke Date: Mon, 11 Aug 2014 02:43:06 +0000 (-0500) Subject: Minor changes to config-persister to allow for karaf X-Git-Tag: release/helium~307^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=3be4dd6d6bc05aab3e41fb8d94f7b4e00cb9c29e Minor changes to config-persister to allow for karaf These minor changes expose a ConfigPusher service for use by a config-persister-feature-adapter (to follow) that can push configs attached to features. Change-Id: I382d2ab578d19ea9c24afa521a9a5a047360fc50 Signed-off-by: Ed Warnicke --- diff --git a/opendaylight/config/config-persister-api/src/main/java/org/opendaylight/controller/config/persist/api/ConfigPusher.java b/opendaylight/config/config-persister-api/src/main/java/org/opendaylight/controller/config/persist/api/ConfigPusher.java new file mode 100644 index 0000000000..2dade8a82b --- /dev/null +++ b/opendaylight/config/config-persister-api/src/main/java/org/opendaylight/controller/config/persist/api/ConfigPusher.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.config.persist.api; + +import java.util.List; +/* + * The config pusher service pushes configs into the config subsystem + */ +public interface ConfigPusher { + + /* + * Pushes configs into the config subsystem + */ + + public void pushConfigs(List configs) throws InterruptedException; +} diff --git a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusherImpl.java similarity index 85% rename from opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java rename to opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusherImpl.java index fff8d611b7..5f311b5232 100644 --- a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java +++ b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusherImpl.java @@ -10,9 +10,6 @@ package org.opendaylight.controller.netconf.persist.impl; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.common.base.Function; -import com.google.common.base.Stopwatch; -import com.google.common.collect.Collections2; import java.io.IOException; import java.io.InputStream; import java.util.Collection; @@ -23,10 +20,17 @@ import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; + import javax.annotation.concurrent.Immutable; +import javax.management.MBeanServerConnection; + import org.opendaylight.controller.config.api.ConflictingVersionException; +import org.opendaylight.controller.config.persist.api.ConfigPusher; import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder; +import org.opendaylight.controller.config.persist.api.Persister; import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants; @@ -45,22 +49,60 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; import org.xml.sax.SAXException; +import com.google.common.base.Function; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Collections2; + @Immutable -public class ConfigPusher { - private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class); +public class ConfigPusherImpl implements ConfigPusher { + private static final Logger logger = LoggerFactory.getLogger(ConfigPusherImpl.class); private final long maxWaitForCapabilitiesMillis; private final long conflictingVersionTimeoutMillis; private final NetconfOperationServiceFactory configNetconfConnector; + private static final int QUEUE_SIZE = 100; + private BlockingQueue> queue = new LinkedBlockingQueue>(QUEUE_SIZE); - public ConfigPusher(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis, + public ConfigPusherImpl(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis, long conflictingVersionTimeoutMillis) { this.configNetconfConnector = configNetconfConnector; this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis; this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis; } - public synchronized LinkedHashMap pushConfigs(List configs) throws NetconfDocumentedException { + public void process(List autoCloseables, MBeanServerConnection platformMBeanServer, Persister persisterAggregator) throws InterruptedException { + List configs; + while(true) { + configs = queue.take(); + try { + internalPushConfigs(configs); + ConfigPersisterNotificationHandler jmxNotificationHandler = new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator); + synchronized (autoCloseables) { + autoCloseables.add(jmxNotificationHandler); + } + /* + * We have completed initial configuration. At this point + * it is good idea to perform garbage collection to prune + * any garbage we have accumulated during startup. + */ + logger.debug("Running post-initialization garbage collection..."); + System.gc(); + logger.debug("Post-initialization garbage collection completed."); + logger.debug("ConfigPusher has pushed configs {}, gc completed", configs); + } + catch (NetconfDocumentedException e) { + logger.error("Error pushing configs {}",configs); + throw new IllegalStateException(e); + } + } + } + + public void pushConfigs(List configs) throws InterruptedException { + logger.debug("Requested to push configs {}", configs); + this.queue.put(configs); + } + + private LinkedHashMap internalPushConfigs(List configs) throws NetconfDocumentedException { logger.debug("Last config snapshots to be pushed to netconf: {}", configs); LinkedHashMap result = new LinkedHashMap<>(); // start pushing snapshots: @@ -278,7 +320,7 @@ public class ConfigPusher { private static NetconfMessage getCommitMessage() { String resource = "/netconfOp/commit.xml"; - try (InputStream stream = ConfigPusher.class.getResourceAsStream(resource)) { + try (InputStream stream = ConfigPusherImpl.class.getResourceAsStream(resource)) { checkNotNull(stream, "Unable to load resource " + resource); return new NetconfMessage(XmlUtil.readXmlToDocument(stream)); } catch (SAXException | IOException e) { diff --git a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/osgi/ConfigPersisterActivator.java b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/osgi/ConfigPersisterActivator.java index 48ae0cb91a..0a48e6c67d 100644 --- a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/osgi/ConfigPersisterActivator.java +++ b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/osgi/ConfigPersisterActivator.java @@ -8,13 +8,18 @@ package org.opendaylight.controller.netconf.persist.impl.osgi; -import com.google.common.annotations.VisibleForTesting; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import javax.management.MBeanServer; + +import org.opendaylight.controller.config.persist.api.ConfigPusher; import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder; -import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.mapping.api.NetconfOperationProvider; import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory; -import org.opendaylight.controller.netconf.persist.impl.ConfigPersisterNotificationHandler; -import org.opendaylight.controller.netconf.persist.impl.ConfigPusher; +import org.opendaylight.controller.netconf.persist.impl.ConfigPusherImpl; import org.opendaylight.controller.netconf.persist.impl.PersisterAggregator; import org.opendaylight.controller.netconf.util.CloseableUtil; import org.osgi.framework.BundleActivator; @@ -23,16 +28,13 @@ import org.osgi.framework.Constants; import org.osgi.framework.Filter; import org.osgi.framework.InvalidSyntaxException; import org.osgi.framework.ServiceReference; +import org.osgi.framework.ServiceRegistration; import org.osgi.util.tracker.ServiceTracker; import org.osgi.util.tracker.ServiceTrackerCustomizer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.management.MBeanServer; -import java.lang.management.ManagementFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; public class ConfigPersisterActivator implements BundleActivator { @@ -49,11 +51,15 @@ public class ConfigPersisterActivator implements BundleActivator { public static final String STORAGE_ADAPTER_CLASS_PROP_SUFFIX = "storageAdapterClass"; private List autoCloseables; + private volatile BundleContext context; + ServiceRegistration registration; @Override public void start(final BundleContext context) throws Exception { logger.debug("ConfigPersister starting"); + this.context = context; + autoCloseables = new ArrayList<>(); PropertiesProviderBaseImpl propertiesProvider = new PropertiesProviderBaseImpl(context); @@ -81,8 +87,14 @@ public class ConfigPersisterActivator implements BundleActivator { } @Override - public synchronized void stop(BundleContext context) throws Exception { - CloseableUtil.closeAll(autoCloseables); + public void stop(BundleContext context) throws Exception { + synchronized(autoCloseables) { + CloseableUtil.closeAll(autoCloseables); + if (registration != null) { + registration.unregister(); + } + this.context = null; + } } @@ -147,35 +159,29 @@ public class ConfigPersisterActivator implements BundleActivator { logger.trace("Got InnerCustomizer.addingService {}", reference); NetconfOperationServiceFactory service = reference.getBundle().getBundleContext().getService(reference); - final ConfigPusher configPusher = new ConfigPusher(service, maxWaitForCapabilitiesMillis, conflictingVersionTimeoutMillis); + logger.debug("Creating new job queue"); + + final ConfigPusherImpl configPusher = new ConfigPusherImpl(service, maxWaitForCapabilitiesMillis, conflictingVersionTimeoutMillis); logger.debug("Configuration Persister got {}", service); + logger.debug("Context was {}", context); + logger.debug("Registration was {}", registration); + final Thread pushingThread = new Thread(new Runnable() { @Override public void run() { try { - configPusher.pushConfigs(configs); - } catch (NetconfDocumentedException e) { - logger.error("Error pushing configs {}",configs); - throw new IllegalStateException(e); + if(configs != null && !configs.isEmpty()) { + configPusher.pushConfigs(configs); + } + registration = context.registerService(ConfigPusher.class.getName(), configPusher, null); + configPusher.process(autoCloseables, platformMBeanServer, persisterAggregator); + } catch (InterruptedException e) { + logger.info("ConfigPusher thread stopped",e); } logger.info("Configuration Persister initialization completed."); - - /* - * We have completed initial configuration. At this point - * it is good idea to perform garbage collection to prune - * any garbage we have accumulated during startup. - */ - logger.debug("Running post-initialization garbage collection..."); - System.gc(); - logger.debug("Post-initialization garbage collection completed."); - - ConfigPersisterNotificationHandler jmxNotificationHandler = new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator); - synchronized (ConfigPersisterActivator.this) { - autoCloseables.add(jmxNotificationHandler); - } } }, "config-pusher"); - synchronized (ConfigPersisterActivator.this) { + synchronized (autoCloseables) { autoCloseables.add(new AutoCloseable() { @Override public void close() { diff --git a/opendaylight/netconf/config-persister-impl/src/test/java/org/opendaylight/controller/netconf/persist/impl/osgi/MockedBundleContext.java b/opendaylight/netconf/config-persister-impl/src/test/java/org/opendaylight/controller/netconf/persist/impl/osgi/MockedBundleContext.java index 95fd5f6549..3e5249468d 100644 --- a/opendaylight/netconf/config-persister-impl/src/test/java/org/opendaylight/controller/netconf/persist/impl/osgi/MockedBundleContext.java +++ b/opendaylight/netconf/config-persister-impl/src/test/java/org/opendaylight/controller/netconf/persist/impl/osgi/MockedBundleContext.java @@ -7,10 +7,23 @@ */ package org.opendaylight.controller.netconf.persist.impl.osgi; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.Dictionary; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.config.persist.api.ConfigPusher; import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder; import org.opendaylight.controller.config.persist.api.Persister; import org.opendaylight.controller.config.persist.api.PropertiesProvider; @@ -23,18 +36,10 @@ import org.osgi.framework.BundleContext; import org.osgi.framework.Filter; import org.osgi.framework.ServiceListener; import org.osgi.framework.ServiceReference; +import org.osgi.framework.ServiceRegistration; -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; final class MockedBundleContext { @Mock @@ -49,6 +54,8 @@ final class MockedBundleContext { NetconfOperationServiceFactory serviceFactory; @Mock private NetconfOperationService service; + @Mock + private ServiceRegistration registration; MockedBundleContext(long maxWaitForCapabilitiesMillis, long conflictingVersionTimeoutMillis) throws Exception { MockitoAnnotations.initMocks(this); @@ -77,6 +84,11 @@ final class MockedBundleContext { doReturn(Collections.emptySet()).when(service).getCapabilities(); doNothing().when(service).close(); doReturn("serviceFactoryMock").when(serviceFactory).toString(); + + doNothing().when(registration).unregister(); + doReturn(registration).when(context).registerService( + eq(ConfigPusher.class.getName()), any(Closeable.class), + any(Dictionary.class)); } public BundleContext getBundleContext() {