package org.opendaylight.controller.config.persist.impl;
import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Collections2;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import javax.management.MBeanServerConnection;
import org.opendaylight.controller.config.api.ConflictingVersionException;
+import org.opendaylight.controller.config.api.ModuleFactoryNotFoundException;
import org.opendaylight.controller.config.api.ValidationException;
import org.opendaylight.controller.config.facade.xml.ConfigExecution;
import org.opendaylight.controller.config.facade.xml.ConfigSubsystemFacade;
import org.opendaylight.controller.config.facade.xml.ConfigSubsystemFacadeFactory;
import org.opendaylight.controller.config.facade.xml.mapping.config.Config;
-import org.opendaylight.controller.config.facade.xml.osgi.YangStoreService;
-import org.opendaylight.controller.config.facade.xml.util.Util;
import org.opendaylight.controller.config.persist.api.ConfigPusher;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.config.persist.api.Persister;
import org.opendaylight.controller.config.util.capability.Capability;
import org.opendaylight.controller.config.util.xml.DocumentedException;
import org.opendaylight.controller.config.util.xml.XmlUtil;
-import org.opendaylight.yangtools.yang.model.api.Module;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
private final long maxWaitForCapabilitiesMillis;
private final long conflictingVersionTimeoutMillis;
- private BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+ private final BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
- private ConfigSubsystemFacadeFactory facade;
+ private final ConfigSubsystemFacadeFactory facade;
private ConfigPersisterNotificationHandler jmxNotificationHandler;
- public ConfigPusherImpl(ConfigSubsystemFacadeFactory facade, long maxWaitForCapabilitiesMillis,
- long conflictingVersionTimeoutMillis) {
+ public ConfigPusherImpl(final ConfigSubsystemFacadeFactory facade, final long maxWaitForCapabilitiesMillis,
+ final long conflictingVersionTimeoutMillis) {
this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
this.facade = facade;
}
- public void process(List<AutoCloseable> autoCloseables, MBeanServerConnection platformMBeanServer, Persister persisterAggregator) throws InterruptedException {
- while(true) {
- processSingle(autoCloseables, platformMBeanServer, persisterAggregator);
+ public void process(final List<AutoCloseable> autoCloseables, final MBeanServerConnection platformMBeanServer,
+ final Persister persisterAggregator, final boolean propagateExceptions) throws InterruptedException {
+ while(processSingle(autoCloseables, platformMBeanServer, persisterAggregator, propagateExceptions)) {
}
}
- void processSingle(final List<AutoCloseable> autoCloseables, final MBeanServerConnection platformMBeanServer, final Persister persisterAggregator) throws InterruptedException {
+ boolean processSingle(final List<AutoCloseable> autoCloseables, final MBeanServerConnection platformMBeanServer,
+ final Persister persisterAggregator, final boolean propagateExceptions) throws InterruptedException {
final List<? extends ConfigSnapshotHolder> configs = queue.take();
try {
internalPushConfigs(configs);
}
LOG.debug("ConfigPusher has pushed configs {}", configs);
- } catch (DocumentedException e) {
- LOG.error("Error pushing configs {}",configs);
- throw new IllegalStateException(e);
+ } catch (final Exception e) {
+ // Exceptions are logged to error downstream
+ LOG.debug("Failed to push some of configs: {}", configs, e);
+
+ if(propagateExceptions) {
+ if(e instanceof RuntimeException) {
+ throw (RuntimeException)e;
+ } else {
+ throw new IllegalStateException(e);
+ }
+ } else {
+ return false;
+ }
}
+
+ return true;
}
- public void pushConfigs(List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
+ @Override
+ public void pushConfigs(final List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
LOG.debug("Requested to push configs {}", configs);
this.queue.put(configs);
}
- private LinkedHashMap<? extends ConfigSnapshotHolder, Boolean> internalPushConfigs(List<? extends ConfigSnapshotHolder> configs)
+ private LinkedHashMap<? extends ConfigSnapshotHolder, Boolean> internalPushConfigs(final List<? extends ConfigSnapshotHolder> configs)
throws DocumentedException {
LOG.debug("Last config snapshots to be pushed to netconf: {}", configs);
LinkedHashMap<ConfigSnapshotHolder, Boolean> result = new LinkedHashMap<>();
// start pushing snapshots
for (ConfigSnapshotHolder configSnapshotHolder : configs) {
if (configSnapshotHolder != null) {
+ LOG.info("Pushing configuration snapshot {}", configSnapshotHolder);
boolean pushResult = false;
try {
pushResult = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
- } catch (ConfigSnapshotFailureException e) {
- LOG.warn("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " +
+ } catch (final ConfigSnapshotFailureException e) {
+ LOG.error("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " +
"for detailed information see enclosed exception.", e.getConfigIdForReporting(), e);
throw new IllegalStateException("Failed to apply configuration snapshot " + e.getConfigIdForReporting(), e);
+ } catch (final Exception e) {
+ String msg = String.format("Failed to apply configuration snapshot: %s", configSnapshotHolder);
+ LOG.error(msg, e);
+ throw new IllegalStateException(msg, e);
}
- LOG.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
+
+ LOG.info("Successfully pushed configuration snapshot {}", configSnapshotHolder);
result.put(configSnapshotHolder, pushResult);
}
}
- LOG.debug("All configuration snapshots have been pushed successfully.");
+ LOG.info("All configuration snapshots have been pushed successfully.");
return result;
}
- private synchronized boolean pushConfigWithConflictingVersionRetries(ConfigSnapshotHolder configSnapshotHolder) throws ConfigSnapshotFailureException {
+ private synchronized boolean pushConfigWithConflictingVersionRetries(final ConfigSnapshotHolder configSnapshotHolder) throws ConfigSnapshotFailureException {
ConflictingVersionException lastException;
Stopwatch stopwatch = Stopwatch.createUnstarted();
do {
stopwatch.start();
}
return pushConfig(configSnapshotHolder);
- } catch (ConflictingVersionException e) {
+ } catch (final ConflictingVersionException e) {
lastException = e;
LOG.info("Conflicting version detected, will retry after timeout");
sleep();
lastException);
}
- private void waitForCapabilities(Set<String> expectedCapabilities, String idForReporting) {
+ private void waitForCapabilities(final Set<String> expectedCapabilities, final String idForReporting) {
Stopwatch stopwatch = Stopwatch.createStarted();
ConfigPusherException lastException;
do {
throw new NotEnoughCapabilitiesException(
"Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundCapabilities, notFoundCapabilities);
}
- } catch (ConfigPusherException e) {
+ } catch (final ConfigPusherException e) {
LOG.debug("Not enough capabilities: {}", e.toString());
lastException = e;
sleep();
+ ((NotEnoughCapabilitiesException) lastException).getMissingCaps(), lastException);
}
- private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, Set<Capability> currentCapabilities) {
+ private static Set<String> computeNotFoundCapabilities(final Set<String> expectedCapabilities, final Set<Capability> currentCapabilities) {
Collection<String> actual = transformCapabilities(currentCapabilities);
Set<String> allNotFound = new HashSet<>(expectedCapabilities);
allNotFound.removeAll(actual);
}
static Set<String> transformCapabilities(final Set<Capability> currentCapabilities) {
- return new HashSet<>(Collections2.transform(currentCapabilities, new Function<Capability, String>() {
- @Override
- public String apply(@Nonnull final Capability input) {
- return input.getCapabilityUri();
- }
- }));
+ return new HashSet<>(Collections2.transform(currentCapabilities, Capability::getCapabilityUri));
}
static class ConfigPusherException extends Exception {
static class NotEnoughCapabilitiesException extends ConfigPusherException {
private static final long serialVersionUID = 1L;
- private Set<String> missingCaps;
+ private final Set<String> missingCaps;
- NotEnoughCapabilitiesException(String message, Set<String> missingCaps) {
+ NotEnoughCapabilitiesException(final String message, final Set<String> missingCaps) {
super(message);
this.missingCaps = missingCaps;
}
}
}
- private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, YangStoreService yangStoreService) {
-
- Collection<String> actual = Collections2.transform(yangStoreService.getModules(), new Function<Module, String>() {
- @Nullable
- @Override
- public String apply(Module input) {
- final String withoutRevision = input.getNamespace().toString() + "?module=" + input.getName();
- return !input.getRevision().equals(NO_REVISION) ? withoutRevision + "&revision=" + Util.writeDate(input.getRevision()) : withoutRevision;
- }
- });
-
- Set<String> allNotFound = new HashSet<>(expectedCapabilities);
- allNotFound.removeAll(actual);
- return allNotFound;
- }
-
private void sleep() {
try {
Thread.sleep(100);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
- private synchronized boolean pushConfig(ConfigSnapshotHolder configSnapshotHolder) throws ConfigSnapshotFailureException, ConflictingVersionException {
+ private synchronized boolean pushConfig(final ConfigSnapshotHolder configSnapshotHolder) throws ConfigSnapshotFailureException, ConflictingVersionException {
Element xmlToBePersisted;
try {
xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
final ConfigSubsystemFacade currentFacade = this.facade.createFacade("config-push");
try {
ConfigExecution configExecution = createConfigExecution(xmlToBePersisted, currentFacade);
- currentFacade.executeConfigExecution(configExecution);
- } catch (ValidationException | DocumentedException e) {
+ executeWithMissingModuleFactoryRetries(currentFacade, configExecution);
+ } catch (ValidationException | DocumentedException | ModuleFactoryNotFoundException e) {
LOG.trace("Validation for config: {} failed", configSnapshotHolder, e);
throw new ConfigSnapshotFailureException(configSnapshotHolder.toString(), "edit", e);
}
return true;
}
- private ConfigExecution createConfigExecution(Element xmlToBePersisted, final ConfigSubsystemFacade currentFacade) throws DocumentedException {
+ private void executeWithMissingModuleFactoryRetries(final ConfigSubsystemFacade facade, final ConfigExecution configExecution)
+ throws DocumentedException, ValidationException, ModuleFactoryNotFoundException {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ ModuleFactoryNotFoundException lastException = null;
+ do {
+ try {
+ facade.executeConfigExecution(configExecution);
+ return;
+ } catch (final ModuleFactoryNotFoundException e) {
+ LOG.debug("{} - will retry after timeout", e.toString());
+ lastException = e;
+ sleep();
+ }
+ } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
+
+ throw lastException;
+ }
+
+ private ConfigExecution createConfigExecution(final Element xmlToBePersisted, final ConfigSubsystemFacade currentFacade) throws DocumentedException {
final Config configMapping = currentFacade.getConfigMapping();
return currentFacade.getConfigExecution(configMapping, xmlToBePersisted);
}