X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fconfig%2Fconfig-persister-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fconfig%2Fpersist%2Fimpl%2FConfigPusherImpl.java;h=90f71ca51aa1fc338fdae21aaacdff0a665196ca;hp=fbc86a43fdab99807aeb0688402a5193c5650881;hb=edc7fa3aa93bf109acb36c6f2c69a19cf8e38af2;hpb=23fe9ca678ada6263fec5dd996f4025e4a32fcf5 diff --git a/opendaylight/config/config-persister-impl/src/main/java/org/opendaylight/controller/config/persist/impl/ConfigPusherImpl.java b/opendaylight/config/config-persister-impl/src/main/java/org/opendaylight/controller/config/persist/impl/ConfigPusherImpl.java index fbc86a43fd..90f71ca51a 100644 --- a/opendaylight/config/config-persister-impl/src/main/java/org/opendaylight/controller/config/persist/impl/ConfigPusherImpl.java +++ b/opendaylight/config/config-persister-impl/src/main/java/org/opendaylight/controller/config/persist/impl/ConfigPusherImpl.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.config.persist.impl; import static com.google.common.base.Preconditions.checkNotNull; - import com.google.common.base.Function; import com.google.common.base.Stopwatch; import com.google.common.collect.Collections2; @@ -29,6 +28,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; import javax.management.MBeanServerConnection; import org.opendaylight.controller.config.api.ConflictingVersionException; +import org.opendaylight.controller.config.api.ModuleFactoryNotFoundException; import org.opendaylight.controller.config.api.ValidationException; import org.opendaylight.controller.config.facade.xml.ConfigExecution; import org.opendaylight.controller.config.facade.xml.ConfigSubsystemFacade; @@ -57,9 +57,9 @@ public class ConfigPusherImpl implements ConfigPusher { private final long maxWaitForCapabilitiesMillis; private final long conflictingVersionTimeoutMillis; - private BlockingQueue> queue = new LinkedBlockingQueue<>(QUEUE_SIZE); + private final BlockingQueue> queue = new LinkedBlockingQueue<>(QUEUE_SIZE); - private ConfigSubsystemFacadeFactory facade; + private final ConfigSubsystemFacadeFactory facade; private ConfigPersisterNotificationHandler jmxNotificationHandler; public ConfigPusherImpl(ConfigSubsystemFacadeFactory facade, long maxWaitForCapabilitiesMillis, @@ -69,13 +69,15 @@ public class ConfigPusherImpl implements ConfigPusher { this.facade = facade; } - public void process(List autoCloseables, MBeanServerConnection platformMBeanServer, Persister persisterAggregator) throws InterruptedException { - while(true) { - processSingle(autoCloseables, platformMBeanServer, persisterAggregator); + public void process(List autoCloseables, MBeanServerConnection platformMBeanServer, + Persister persisterAggregator, boolean propagateExceptions) throws InterruptedException { + while(processSingle(autoCloseables, platformMBeanServer, persisterAggregator, propagateExceptions)) { + ; } } - void processSingle(final List autoCloseables, final MBeanServerConnection platformMBeanServer, final Persister persisterAggregator) throws InterruptedException { + boolean processSingle(final List autoCloseables, final MBeanServerConnection platformMBeanServer, + final Persister persisterAggregator, boolean propagateExceptions) throws InterruptedException { final List configs = queue.take(); try { internalPushConfigs(configs); @@ -90,12 +92,25 @@ public class ConfigPusherImpl implements ConfigPusher { } LOG.debug("ConfigPusher has pushed configs {}", configs); - } catch (DocumentedException e) { - LOG.error("Error pushing configs {}",configs); - throw new IllegalStateException(e); + } catch (Exception e) { + // Exceptions are logged to error downstream + LOG.debug("Failed to push some of configs: {}", configs, e); + + if(propagateExceptions) { + if(e instanceof RuntimeException) { + throw (RuntimeException)e; + } else { + throw new IllegalStateException(e); + } + } else { + return false; + } } + + return true; } + @Override public void pushConfigs(List configs) throws InterruptedException { LOG.debug("Requested to push configs {}", configs); this.queue.put(configs); @@ -108,15 +123,21 @@ public class ConfigPusherImpl implements ConfigPusher { // start pushing snapshots for (ConfigSnapshotHolder configSnapshotHolder : configs) { if (configSnapshotHolder != null) { + LOG.info("Pushing configuration snapshot {}", configSnapshotHolder); boolean pushResult = false; try { pushResult = pushConfigWithConflictingVersionRetries(configSnapshotHolder); } catch (ConfigSnapshotFailureException e) { - LOG.warn("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " + + LOG.error("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " + "for detailed information see enclosed exception.", e.getConfigIdForReporting(), e); throw new IllegalStateException("Failed to apply configuration snapshot " + e.getConfigIdForReporting(), e); + } catch (Exception e) { + String msg = String.format("Failed to apply configuration snapshot: %s", configSnapshotHolder); + LOG.error(msg, e); + throw new IllegalStateException(msg, e); } - LOG.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result); + + LOG.info("Successfully pushed configuration snapshot {}", configSnapshotHolder); result.put(configSnapshotHolder, pushResult); } } @@ -217,7 +238,7 @@ public class ConfigPusherImpl implements ConfigPusher { static class NotEnoughCapabilitiesException extends ConfigPusherException { private static final long serialVersionUID = 1L; - private Set missingCaps; + private final Set missingCaps; NotEnoughCapabilitiesException(String message, Set missingCaps) { super(message); @@ -281,8 +302,8 @@ public class ConfigPusherImpl implements ConfigPusher { final ConfigSubsystemFacade currentFacade = this.facade.createFacade("config-push"); try { ConfigExecution configExecution = createConfigExecution(xmlToBePersisted, currentFacade); - currentFacade.executeConfigExecution(configExecution); - } catch (ValidationException | DocumentedException e) { + executeWithMissingModuleFactoryRetries(currentFacade, configExecution); + } catch (ValidationException | DocumentedException | ModuleFactoryNotFoundException e) { LOG.trace("Validation for config: {} failed", configSnapshotHolder, e); throw new ConfigSnapshotFailureException(configSnapshotHolder.toString(), "edit", e); } @@ -299,6 +320,24 @@ public class ConfigPusherImpl implements ConfigPusher { return true; } + private void executeWithMissingModuleFactoryRetries(ConfigSubsystemFacade facade, ConfigExecution configExecution) + throws DocumentedException, ValidationException, ModuleFactoryNotFoundException { + Stopwatch stopwatch = Stopwatch.createStarted(); + ModuleFactoryNotFoundException lastException = null; + do { + try { + facade.executeConfigExecution(configExecution); + return; + } catch (ModuleFactoryNotFoundException e) { + LOG.debug("{} - will retry after timeout", e.toString()); + lastException = e; + sleep(); + } + } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis); + + throw lastException; + } + private ConfigExecution createConfigExecution(Element xmlToBePersisted, final ConfigSubsystemFacade currentFacade) throws DocumentedException { final Config configMapping = currentFacade.getConfigMapping(); return currentFacade.getConfigExecution(configMapping, xmlToBePersisted);