this.facade = facade;
}
- public void process(List<AutoCloseable> autoCloseables, MBeanServerConnection platformMBeanServer, Persister persisterAggregator) throws InterruptedException {
- while(true) {
- processSingle(autoCloseables, platformMBeanServer, persisterAggregator);
+ public void process(List<AutoCloseable> autoCloseables, MBeanServerConnection platformMBeanServer,
+ Persister persisterAggregator, boolean propagateExceptions) throws InterruptedException {
+ while(processSingle(autoCloseables, platformMBeanServer, persisterAggregator, propagateExceptions)) {
+ ;
}
}
- void processSingle(final List<AutoCloseable> autoCloseables, final MBeanServerConnection platformMBeanServer, final Persister persisterAggregator) throws InterruptedException {
+ boolean processSingle(final List<AutoCloseable> autoCloseables, final MBeanServerConnection platformMBeanServer,
+ final Persister persisterAggregator, boolean propagateExceptions) throws InterruptedException {
final List<? extends ConfigSnapshotHolder> configs = queue.take();
try {
internalPushConfigs(configs);
}
LOG.debug("ConfigPusher has pushed configs {}", configs);
- } catch (DocumentedException e) {
- LOG.error("Error pushing configs {}",configs);
- throw new IllegalStateException(e);
+ } catch (Exception e) {
+ // Exceptions are logged to error downstream
+ LOG.debug("Failed to push some of configs: {}", configs, e);
+
+ if(propagateExceptions) {
+ if(e instanceof RuntimeException) {
+ throw (RuntimeException)e;
+ } else {
+ throw new IllegalStateException(e);
+ }
+ } else {
+ return false;
+ }
}
+
+ return true;
}
@Override
// start pushing snapshots
for (ConfigSnapshotHolder configSnapshotHolder : configs) {
if (configSnapshotHolder != null) {
+ LOG.info("Pushing configuration snapshot {}", configSnapshotHolder);
boolean pushResult = false;
try {
pushResult = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
} catch (ConfigSnapshotFailureException e) {
- LOG.warn("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " +
+ LOG.error("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " +
"for detailed information see enclosed exception.", e.getConfigIdForReporting(), e);
throw new IllegalStateException("Failed to apply configuration snapshot " + e.getConfigIdForReporting(), e);
+ } catch (Exception e) {
+ String msg = String.format("Failed to apply configuration snapshot: %s", configSnapshotHolder);
+ LOG.error(msg, e);
+ throw new IllegalStateException(msg, e);
}
- LOG.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
+
+ LOG.info("Successfully pushed configuration snapshot {}", configSnapshotHolder);
result.put(configSnapshotHolder, pushResult);
}
}