package org.opendaylight.controller.config.persist.impl;
import static com.google.common.base.Preconditions.checkNotNull;
-
import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Collections2;
import javax.annotation.concurrent.Immutable;
import javax.management.MBeanServerConnection;
import org.opendaylight.controller.config.api.ConflictingVersionException;
+import org.opendaylight.controller.config.api.ModuleFactoryNotFoundException;
import org.opendaylight.controller.config.api.ValidationException;
import org.opendaylight.controller.config.facade.xml.ConfigExecution;
import org.opendaylight.controller.config.facade.xml.ConfigSubsystemFacade;
private final long maxWaitForCapabilitiesMillis;
private final long conflictingVersionTimeoutMillis;
- private BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+ private final BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
- private ConfigSubsystemFacadeFactory facade;
+ private final ConfigSubsystemFacadeFactory facade;
private ConfigPersisterNotificationHandler jmxNotificationHandler;
public ConfigPusherImpl(ConfigSubsystemFacadeFactory facade, long maxWaitForCapabilitiesMillis,
this.facade = facade;
}
- public void process(List<AutoCloseable> autoCloseables, MBeanServerConnection platformMBeanServer, Persister persisterAggregator) throws InterruptedException {
- while(true) {
- processSingle(autoCloseables, platformMBeanServer, persisterAggregator);
+ public void process(List<AutoCloseable> autoCloseables, MBeanServerConnection platformMBeanServer,
+ Persister persisterAggregator, boolean propagateExceptions) throws InterruptedException {
+ while(processSingle(autoCloseables, platformMBeanServer, persisterAggregator, propagateExceptions)) {
+ ;
}
}
- void processSingle(final List<AutoCloseable> autoCloseables, final MBeanServerConnection platformMBeanServer, final Persister persisterAggregator) throws InterruptedException {
+ boolean processSingle(final List<AutoCloseable> autoCloseables, final MBeanServerConnection platformMBeanServer,
+ final Persister persisterAggregator, boolean propagateExceptions) throws InterruptedException {
final List<? extends ConfigSnapshotHolder> configs = queue.take();
try {
internalPushConfigs(configs);
}
LOG.debug("ConfigPusher has pushed configs {}", configs);
- } catch (DocumentedException e) {
- LOG.error("Error pushing configs {}",configs);
- throw new IllegalStateException(e);
+ } catch (Exception e) {
+ // Exceptions are logged to error downstream
+ LOG.debug("Failed to push some of configs: {}", configs, e);
+
+ if(propagateExceptions) {
+ if(e instanceof RuntimeException) {
+ throw (RuntimeException)e;
+ } else {
+ throw new IllegalStateException(e);
+ }
+ } else {
+ return false;
+ }
}
+
+ return true;
}
+ @Override
public void pushConfigs(List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
LOG.debug("Requested to push configs {}", configs);
this.queue.put(configs);
// start pushing snapshots
for (ConfigSnapshotHolder configSnapshotHolder : configs) {
if (configSnapshotHolder != null) {
+ LOG.info("Pushing configuration snapshot {}", configSnapshotHolder);
boolean pushResult = false;
try {
pushResult = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
} catch (ConfigSnapshotFailureException e) {
- LOG.warn("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " +
+ LOG.error("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " +
"for detailed information see enclosed exception.", e.getConfigIdForReporting(), e);
throw new IllegalStateException("Failed to apply configuration snapshot " + e.getConfigIdForReporting(), e);
+ } catch (Exception e) {
+ String msg = String.format("Failed to apply configuration snapshot: %s", configSnapshotHolder);
+ LOG.error(msg, e);
+ throw new IllegalStateException(msg, e);
}
- LOG.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
+
+ LOG.info("Successfully pushed configuration snapshot {}", configSnapshotHolder);
result.put(configSnapshotHolder, pushResult);
}
}
static class NotEnoughCapabilitiesException extends ConfigPusherException {
private static final long serialVersionUID = 1L;
- private Set<String> missingCaps;
+ private final Set<String> missingCaps;
NotEnoughCapabilitiesException(String message, Set<String> missingCaps) {
super(message);
final ConfigSubsystemFacade currentFacade = this.facade.createFacade("config-push");
try {
ConfigExecution configExecution = createConfigExecution(xmlToBePersisted, currentFacade);
- currentFacade.executeConfigExecution(configExecution);
- } catch (ValidationException | DocumentedException e) {
+ executeWithMissingModuleFactoryRetries(currentFacade, configExecution);
+ } catch (ValidationException | DocumentedException | ModuleFactoryNotFoundException e) {
LOG.trace("Validation for config: {} failed", configSnapshotHolder, e);
throw new ConfigSnapshotFailureException(configSnapshotHolder.toString(), "edit", e);
}
return true;
}
+ private void executeWithMissingModuleFactoryRetries(ConfigSubsystemFacade facade, ConfigExecution configExecution)
+ throws DocumentedException, ValidationException, ModuleFactoryNotFoundException {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ ModuleFactoryNotFoundException lastException = null;
+ do {
+ try {
+ facade.executeConfigExecution(configExecution);
+ return;
+ } catch (ModuleFactoryNotFoundException e) {
+ LOG.debug("{} - will retry after timeout", e.toString());
+ lastException = e;
+ sleep();
+ }
+ } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
+
+ throw lastException;
+ }
+
private ConfigExecution createConfigExecution(Element xmlToBePersisted, final ConfigSubsystemFacade currentFacade) throws DocumentedException {
final Config configMapping = currentFacade.getConfigMapping();
return currentFacade.getConfigExecution(configMapping, xmlToBePersisted);