import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.common.base.Function;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Collections2;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+
import javax.annotation.concurrent.Immutable;
+import javax.management.MBeanServerConnection;
+
import org.opendaylight.controller.config.api.ConflictingVersionException;
+import org.opendaylight.controller.config.persist.api.ConfigPusher;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
+import org.opendaylight.controller.config.persist.api.Persister;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.w3c.dom.Element;
import org.xml.sax.SAXException;
+import com.google.common.base.Function;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Collections2;
+
@Immutable
-public class ConfigPusher {
- private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class);
+public class ConfigPusherImpl implements ConfigPusher {
+ private static final Logger logger = LoggerFactory.getLogger(ConfigPusherImpl.class);
private final long maxWaitForCapabilitiesMillis;
private final long conflictingVersionTimeoutMillis;
private final NetconfOperationServiceFactory configNetconfConnector;
+ private static final int QUEUE_SIZE = 100;
+ private BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<List<? extends ConfigSnapshotHolder>>(QUEUE_SIZE);
- public ConfigPusher(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
+ public ConfigPusherImpl(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
long conflictingVersionTimeoutMillis) {
this.configNetconfConnector = configNetconfConnector;
this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
}
- public synchronized LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> pushConfigs(List<ConfigSnapshotHolder> configs) throws NetconfDocumentedException {
+ public void process(List<AutoCloseable> autoCloseables, MBeanServerConnection platformMBeanServer, Persister persisterAggregator) throws InterruptedException {
+ List<? extends ConfigSnapshotHolder> configs;
+ while(true) {
+ configs = queue.take();
+ try {
+ internalPushConfigs(configs);
+ ConfigPersisterNotificationHandler jmxNotificationHandler = new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator);
+ synchronized (autoCloseables) {
+ autoCloseables.add(jmxNotificationHandler);
+ }
+ /*
+ * We have completed initial configuration. At this point
+ * it is good idea to perform garbage collection to prune
+ * any garbage we have accumulated during startup.
+ */
+ logger.debug("Running post-initialization garbage collection...");
+ System.gc();
+ logger.debug("Post-initialization garbage collection completed.");
+ logger.debug("ConfigPusher has pushed configs {}, gc completed", configs);
+ }
+ catch (NetconfDocumentedException e) {
+ logger.error("Error pushing configs {}",configs);
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ public void pushConfigs(List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
+ logger.debug("Requested to push configs {}", configs);
+ this.queue.put(configs);
+ }
+
+ private LinkedHashMap<? extends ConfigSnapshotHolder, EditAndCommitResponse> internalPushConfigs(List<? extends ConfigSnapshotHolder> configs) throws NetconfDocumentedException {
logger.debug("Last config snapshots to be pushed to netconf: {}", configs);
LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> result = new LinkedHashMap<>();
// start pushing snapshots:
private static NetconfMessage getCommitMessage() {
String resource = "/netconfOp/commit.xml";
- try (InputStream stream = ConfigPusher.class.getResourceAsStream(resource)) {
+ try (InputStream stream = ConfigPusherImpl.class.getResourceAsStream(resource)) {
checkNotNull(stream, "Unable to load resource " + resource);
return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
} catch (SAXException | IOException e) {