package org.opendaylight.controller.netconf.persist.impl;
import static com.google.common.base.Preconditions.checkNotNull;
-
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Collections2;
private final long conflictingVersionTimeoutMillis;
private final NetconfOperationServiceFactory configNetconfConnector;
private static final int QUEUE_SIZE = 100;
- private BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<List<? extends ConfigSnapshotHolder>>(QUEUE_SIZE);
+ private final BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
public ConfigPusherImpl(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
long conflictingVersionTimeoutMillis) {
List<? extends ConfigSnapshotHolder> configs;
while(true) {
configs = queue.take();
+
try {
internalPushConfigs(configs);
ConfigPersisterNotificationHandler jmxNotificationHandler = new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator);
}
LOG.debug("ConfigPusher has pushed configs {}", configs);
- } catch (NetconfDocumentedException e) {
- LOG.error("Error pushing configs {}",configs);
- throw new IllegalStateException(e);
+ } catch (Exception e) {
+ LOG.debug("Failed to push some of configs: {}", configs, e);
+ break;
}
}
}
+ @Override
public void pushConfigs(List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
LOG.debug("Requested to push configs {}", configs);
this.queue.put(configs);
// start pushing snapshots:
for (ConfigSnapshotHolder configSnapshotHolder : configs) {
if(configSnapshotHolder != null) {
+ LOG.info("Pushing configuration snapshot {}", configSnapshotHolder);
EditAndCommitResponse editAndCommitResponseWithRetries = null;
try {
editAndCommitResponseWithRetries = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
} catch (ConfigSnapshotFailureException e) {
- LOG.warn("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " +
+ LOG.error("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " +
"for detailed information see enclosed exception.", e.getConfigIdForReporting(), e);
- throw new IllegalStateException("Failed to apply configuration snapshot " + e.getConfigIdForReporting(), e);
+ onFailedConfigPush("Failed to apply configuration snapshot " + e.getConfigIdForReporting(), e);
+ } catch (Exception e) {
+ LOG.error("Failed to apply configuration snapshot: {}", configSnapshotHolder, e);
+ onFailedConfigPush("Failed to apply configuration snapshot " + configSnapshotHolder, e);
}
- LOG.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
+
+ LOG.info("Successfully pushed configuration snapshot {}", configSnapshotHolder);
result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
}
}
return result;
}
+ @VisibleForTesting
+ protected void onFailedConfigPush(String message, Exception cause) {
+ throw new IllegalStateException(message, cause);
+ }
+
/**
* First calls {@link #getOperationServiceWithRetries(java.util.Set, String)} in order to wait until
* expected capabilities are present, then tries to push configuration. If {@link ConflictingVersionException}
private static class NotEnoughCapabilitiesException extends ConfigPusherException {
private static final long serialVersionUID = 1L;
- private Set<String> missingCaps;
+ private final Set<String> missingCaps;
private NotEnoughCapabilitiesException(String message, Set<String> missingCaps) {
super(message);
import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
+import org.opendaylight.controller.netconf.persist.impl.ConfigPusherImpl;
import org.opendaylight.controller.netconf.persist.impl.osgi.MockedBundleContext.DummyAdapterWithInitialSnapshot;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.slf4j.Logger;
private void setUpContext(String requiredCapability) throws Exception {
DummyAdapterWithInitialSnapshot.expectedCapability = requiredCapability;
ctx = new MockedBundleContext(1000, 1000);
- configPersisterActivator = new ConfigPersisterActivator();
+ configPersisterActivator = new ConfigPersisterActivator() {
+ @Override
+ protected ConfigPusherImpl newConfigPusher(NetconfOperationServiceFactory service,
+ long maxWaitForCapabilitiesMillis, long conflictingVersionTimeoutMillis) {
+ return new ConfigPusherImpl(service, maxWaitForCapabilitiesMillis, conflictingVersionTimeoutMillis) {
+ @Override
+ protected void onFailedConfigPush(String message, Exception cause) {
+ handler.uncaughtException(Thread.currentThread(), cause);
+ super.onFailedConfigPush(message, cause);
+ }
+
+ };
+ }
+ };
}
private void setUpContextAndStartPersister(String requiredCapability, final NetconfOperationService conflictingService) throws Exception {
@Before
public void setUp() {
handler = new TestingExceptionHandler();
- Thread.setDefaultUncaughtExceptionHandler(handler);
}
@After
public void tearDown() throws Exception {
- Thread.setDefaultUncaughtExceptionHandler(null);
configPersisterActivator.stop(ctx.getBundleContext());
}
public void testPersisterSuccessfulPush() throws Exception {
setUpContextAndStartPersister("cap1", getWorkingService(getOKDocument()));
Thread.sleep(2000);
- assertCannotRegisterAsJMXListener_pushWasSuccessful();
+ handler.assertNoException();
}
// this means pushing of config was successful
doReturn(workingService).when(ctx.serviceFactory).createService(anyString());
configPersisterActivator.start(ctx.getBundleContext());
-
- Thread.sleep(1000);
- assertCannotRegisterAsJMXListener_pushWasSuccessful();
+ Thread.sleep(2000);
+ handler.assertNoException();
}
}