--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.config.persist.api;
+
+import java.util.List;
+/*
+ * The config pusher service pushes configs into the config subsystem
+ */
+public interface ConfigPusher {
+
+ /*
+ * Pushes configs into the config subsystem
+ */
+
+ public void pushConfigs(List<? extends ConfigSnapshotHolder> configs) throws InterruptedException;
+}
import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.common.base.Function;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Collections2;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+
import javax.annotation.concurrent.Immutable;
+import javax.management.MBeanServerConnection;
+
import org.opendaylight.controller.config.api.ConflictingVersionException;
+import org.opendaylight.controller.config.persist.api.ConfigPusher;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
+import org.opendaylight.controller.config.persist.api.Persister;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.w3c.dom.Element;
import org.xml.sax.SAXException;
+import com.google.common.base.Function;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Collections2;
+
@Immutable
-public class ConfigPusher {
- private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class);
+public class ConfigPusherImpl implements ConfigPusher {
+ private static final Logger logger = LoggerFactory.getLogger(ConfigPusherImpl.class);
private final long maxWaitForCapabilitiesMillis;
private final long conflictingVersionTimeoutMillis;
private final NetconfOperationServiceFactory configNetconfConnector;
+ private static final int QUEUE_SIZE = 100;
+ private BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<List<? extends ConfigSnapshotHolder>>(QUEUE_SIZE);
- public ConfigPusher(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
+ public ConfigPusherImpl(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
long conflictingVersionTimeoutMillis) {
this.configNetconfConnector = configNetconfConnector;
this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
}
- public synchronized LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> pushConfigs(List<ConfigSnapshotHolder> configs) throws NetconfDocumentedException {
+ public void process(List<AutoCloseable> autoCloseables, MBeanServerConnection platformMBeanServer, Persister persisterAggregator) throws InterruptedException {
+ List<? extends ConfigSnapshotHolder> configs;
+ while(true) {
+ configs = queue.take();
+ try {
+ internalPushConfigs(configs);
+ ConfigPersisterNotificationHandler jmxNotificationHandler = new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator);
+ synchronized (autoCloseables) {
+ autoCloseables.add(jmxNotificationHandler);
+ }
+ /*
+ * We have completed initial configuration. At this point
+ * it is good idea to perform garbage collection to prune
+ * any garbage we have accumulated during startup.
+ */
+ logger.debug("Running post-initialization garbage collection...");
+ System.gc();
+ logger.debug("Post-initialization garbage collection completed.");
+ logger.debug("ConfigPusher has pushed configs {}, gc completed", configs);
+ }
+ catch (NetconfDocumentedException e) {
+ logger.error("Error pushing configs {}",configs);
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ public void pushConfigs(List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
+ logger.debug("Requested to push configs {}", configs);
+ this.queue.put(configs);
+ }
+
+ private LinkedHashMap<? extends ConfigSnapshotHolder, EditAndCommitResponse> internalPushConfigs(List<? extends ConfigSnapshotHolder> configs) throws NetconfDocumentedException {
logger.debug("Last config snapshots to be pushed to netconf: {}", configs);
LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> result = new LinkedHashMap<>();
// start pushing snapshots:
private static NetconfMessage getCommitMessage() {
String resource = "/netconfOp/commit.xml";
- try (InputStream stream = ConfigPusher.class.getResourceAsStream(resource)) {
+ try (InputStream stream = ConfigPusherImpl.class.getResourceAsStream(resource)) {
checkNotNull(stream, "Unable to load resource " + resource);
return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
} catch (SAXException | IOException e) {
package org.opendaylight.controller.netconf.persist.impl.osgi;
-import com.google.common.annotations.VisibleForTesting;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.MBeanServer;
+
+import org.opendaylight.controller.config.persist.api.ConfigPusher;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
-import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationProvider;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
-import org.opendaylight.controller.netconf.persist.impl.ConfigPersisterNotificationHandler;
-import org.opendaylight.controller.netconf.persist.impl.ConfigPusher;
+import org.opendaylight.controller.netconf.persist.impl.ConfigPusherImpl;
import org.opendaylight.controller.netconf.persist.impl.PersisterAggregator;
import org.opendaylight.controller.netconf.util.CloseableUtil;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.Filter;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.MBeanServer;
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
public class ConfigPersisterActivator implements BundleActivator {
public static final String STORAGE_ADAPTER_CLASS_PROP_SUFFIX = "storageAdapterClass";
private List<AutoCloseable> autoCloseables;
+ private volatile BundleContext context;
+ ServiceRegistration<?> registration;
@Override
public void start(final BundleContext context) throws Exception {
logger.debug("ConfigPersister starting");
+ this.context = context;
+
autoCloseables = new ArrayList<>();
PropertiesProviderBaseImpl propertiesProvider = new PropertiesProviderBaseImpl(context);
}
@Override
- public synchronized void stop(BundleContext context) throws Exception {
- CloseableUtil.closeAll(autoCloseables);
+ public void stop(BundleContext context) throws Exception {
+ synchronized(autoCloseables) {
+ CloseableUtil.closeAll(autoCloseables);
+ if (registration != null) {
+ registration.unregister();
+ }
+ this.context = null;
+ }
}
logger.trace("Got InnerCustomizer.addingService {}", reference);
NetconfOperationServiceFactory service = reference.getBundle().getBundleContext().getService(reference);
- final ConfigPusher configPusher = new ConfigPusher(service, maxWaitForCapabilitiesMillis, conflictingVersionTimeoutMillis);
+ logger.debug("Creating new job queue");
+
+ final ConfigPusherImpl configPusher = new ConfigPusherImpl(service, maxWaitForCapabilitiesMillis, conflictingVersionTimeoutMillis);
logger.debug("Configuration Persister got {}", service);
+ logger.debug("Context was {}", context);
+ logger.debug("Registration was {}", registration);
+
final Thread pushingThread = new Thread(new Runnable() {
@Override
public void run() {
try {
- configPusher.pushConfigs(configs);
- } catch (NetconfDocumentedException e) {
- logger.error("Error pushing configs {}",configs);
- throw new IllegalStateException(e);
+ if(configs != null && !configs.isEmpty()) {
+ configPusher.pushConfigs(configs);
+ }
+ registration = context.registerService(ConfigPusher.class.getName(), configPusher, null);
+ configPusher.process(autoCloseables, platformMBeanServer, persisterAggregator);
+ } catch (InterruptedException e) {
+ logger.info("ConfigPusher thread stopped",e);
}
logger.info("Configuration Persister initialization completed.");
-
- /*
- * We have completed initial configuration. At this point
- * it is good idea to perform garbage collection to prune
- * any garbage we have accumulated during startup.
- */
- logger.debug("Running post-initialization garbage collection...");
- System.gc();
- logger.debug("Post-initialization garbage collection completed.");
-
- ConfigPersisterNotificationHandler jmxNotificationHandler = new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator);
- synchronized (ConfigPersisterActivator.this) {
- autoCloseables.add(jmxNotificationHandler);
- }
}
}, "config-pusher");
- synchronized (ConfigPersisterActivator.this) {
+ synchronized (autoCloseables) {
autoCloseables.add(new AutoCloseable() {
@Override
public void close() {
*/
package org.opendaylight.controller.netconf.persist.impl.osgi;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.config.persist.api.ConfigPusher;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.config.persist.api.Persister;
import org.opendaylight.controller.config.persist.api.PropertiesProvider;
import org.osgi.framework.Filter;
import org.osgi.framework.ServiceListener;
import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
final class MockedBundleContext {
@Mock
NetconfOperationServiceFactory serviceFactory;
@Mock
private NetconfOperationService service;
+ @Mock
+ private ServiceRegistration<?> registration;
MockedBundleContext(long maxWaitForCapabilitiesMillis, long conflictingVersionTimeoutMillis) throws Exception {
MockitoAnnotations.initMocks(this);
doReturn(Collections.emptySet()).when(service).getCapabilities();
doNothing().when(service).close();
doReturn("serviceFactoryMock").when(serviceFactory).toString();
+
+ doNothing().when(registration).unregister();
+ doReturn(registration).when(context).registerService(
+ eq(ConfigPusher.class.getName()), any(Closeable.class),
+ any(Dictionary.class));
}
public BundleContext getBundleContext() {