Pulled Configuration class from ConfigPusher that contains all the delays, timeouts and attempts numbers.
Add tests for config perister.
Change-Id: I187e721e0120931d0cca519353db840905816a20
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
try {
status = this.transactionProvider.commitTransaction();
} catch (final IllegalStateException e) {
- // FIXME: when can IllegalStateException occur?
+ // TODO Illegal state thrown when no transaction yet for user
+ // Throw other exception type, or create transaction automatically
logger.warn("Commit failed: ", e);
final Map<String, String> errorInfo = new HashMap<>();
errorInfo.put(ErrorTag.operation_failed.name(),
</dependency>
<!-- test dependencies -->
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>netconf-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>netconf-util</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>mockito-configuration</artifactId>
package org.opendaylight.controller.netconf.persist.impl;
import com.google.common.base.Preconditions;
-import io.netty.channel.EventLoopGroup;
import org.opendaylight.controller.config.api.ConflictingVersionException;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import javax.annotation.concurrent.Immutable;
import java.io.IOException;
import java.io.InputStream;
-import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
@Immutable
public class ConfigPusher {
- private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class);
- private static final int NETCONF_SEND_ATTEMPT_MS_DELAY = 1000;
- private static final int NETCONF_SEND_ATTEMPTS = 20;
-
- private final InetSocketAddress address;
- private final EventLoopGroup nettyThreadGroup;
-
- // Default timeout for netconf becoming stable
- public static final long DEFAULT_MAX_WAIT_FOR_CAPABILITIES_MILLIS = TimeUnit.MINUTES.toMillis(2);
- public static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 5000;
- private final int delayMillis = 5000;
- private final long maxWaitForCapabilitiesMillis;
- private final long connectionTimeoutMillis;
-
- public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadGroup) {
- this(address, nettyThreadGroup, DEFAULT_MAX_WAIT_FOR_CAPABILITIES_MILLIS, DEFAULT_CONNECTION_TIMEOUT_MILLIS);
- }
+ private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class);
+
+ private final ConfigPusherConfiguration configuration;
- public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadGroup,
- long maxWaitForCapabilitiesMillis, long connectionTimeoutMillis) {
- this.address = address;
- this.nettyThreadGroup = nettyThreadGroup;
- this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
- this.connectionTimeoutMillis = connectionTimeoutMillis;
+ public ConfigPusher(ConfigPusherConfiguration configuration) {
+ this.configuration = configuration;
}
public synchronized LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponseWithRetries> pushConfigs(
throws InterruptedException {
ConflictingVersionException lastException = null;
- int maxAttempts = 30;
+ int maxAttempts = configuration.netconfPushConfigAttempts;
for (int retryAttempt = 1; retryAttempt <= maxAttempts; retryAttempt++) {
NetconfClient netconfClient = makeNetconfConnection(configSnapshotHolder.getCapabilities());
} catch (ConflictingVersionException e) {
logger.debug("Conflicting version detected, will retry after timeout");
lastException = e;
- Thread.sleep(1000);
+ Thread.sleep(configuration.netconfPushConfigDelayMs);
} catch (RuntimeException e) {
throw new IllegalStateException("Unable to load " + configSnapshotHolder, e);
} finally {
// could be utilized by integration tests
final long pollingStartNanos = System.nanoTime();
- final long deadlineNanos = pollingStartNanos + TimeUnit.MILLISECONDS.toNanos(maxWaitForCapabilitiesMillis);
+ final long deadlineNanos = pollingStartNanos + TimeUnit.MILLISECONDS.toNanos(configuration.netconfCapabilitiesWaitTimeoutMs);
int attempt = 0;
- NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("unknown", address.getAddress().getHostAddress(),
- Integer.toString(address.getPort()), "tcp", "persister");
+ NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("unknown",
+ configuration.netconfAddress.getAddress().getHostAddress(),
+ Integer.toString(configuration.netconfAddress.getPort()), "tcp", "persister");
Set<String> latestCapabilities = null;
while (System.nanoTime() < deadlineNanos) {
attempt++;
- NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(nettyThreadGroup,
- nettyThreadGroup, additionalHeader, connectionTimeoutMillis);
+ NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(configuration.eventLoopGroup,
+ configuration.eventLoopGroup, additionalHeader, configuration.connectionAttemptTimeoutMs);
NetconfClient netconfClient;
try {
- netconfClient = new NetconfClient(this.toString(), address, delayMillis, netconfClientDispatcher);
+ netconfClient = new NetconfClient(this.toString(), configuration.netconfAddress, configuration.connectionAttemptDelayMs, netconfClientDispatcher);
} catch (IllegalStateException e) {
- logger.debug("Netconf {} was not initialized or is not stable, attempt {}", address, attempt, e);
+ logger.debug("Netconf {} was not initialized or is not stable, attempt {}", configuration.netconfAddress, attempt, e);
netconfClientDispatcher.close();
- Thread.sleep(delayMillis);
+ Thread.sleep(configuration.connectionAttemptDelayMs);
continue;
}
latestCapabilities = netconfClient.getCapabilities();
"Expected but not found: {}, all expected {}, current {}",
attempt, allNotFound, expectedCaps, latestCapabilities);
Util.closeClientAndDispatcher(netconfClient);
- Thread.sleep(delayMillis);
+ Thread.sleep(configuration.connectionAttemptDelayMs);
}
if (latestCapabilities == null) {
- logger.error("Could not connect to the server in {} ms", maxWaitForCapabilitiesMillis);
+ logger.error("Could not connect to the server in {} ms", configuration.netconfCapabilitiesWaitTimeoutMs);
throw new RuntimeException("Could not connect to netconf server");
}
Set<String> allNotFound = computeNotFoundCapabilities(expectedCaps, latestCapabilities);
}
- private static NetconfMessage sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfClient netconfClient)
+ private NetconfMessage sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfClient netconfClient)
throws ConflictingVersionException, IOException {
try {
- NetconfMessage netconfMessage = netconfClient.sendMessage(request, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY);
+ NetconfMessage netconfMessage = netconfClient.sendMessage(request,
+ configuration.netconfSendMessageMaxAttempts, configuration.netconfSendMessageDelayMs);
NetconfUtil.checkIsMessageOk(netconfMessage);
return netconfMessage;
- }catch(ConflictingVersionException e) {
+ } catch(ConflictingVersionException e) {
logger.trace("conflicting version detected: {}", e.toString());
throw e;
} catch (RuntimeException | ExecutionException | InterruptedException | TimeoutException e) { // TODO: change NetconfClient#sendMessage to throw checked exceptions
'}';
}
}
+
}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.netconf.persist.impl;
+
+import io.netty.channel.EventLoopGroup;
+
+import javax.annotation.concurrent.Immutable;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Configuration properties for ConfigPusher. Contains delays and timeouts for netconf
+ * connection establishment, netconf capabilities stabilization and configuration push.
+ */
+@Immutable
+public final class ConfigPusherConfiguration {
+
+ public static final long DEFAULT_CONNECTION_ATTEMPT_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5);
+ public static final int DEFAULT_CONNECTION_ATTEMPT_DELAY_MS = 5000;
+
+ public static final int DEFAULT_NETCONF_SEND_MESSAGE_MAX_ATTEMPTS = 20;
+ public static final int DEFAULT_NETCONF_SEND_MESSAGE_DELAY_MS = 1000;
+
+ public static final long DEFAULT_NETCONF_CAPABILITIES_WAIT_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(2);
+
+ public static final int DEFAULT_NETCONF_PUSH_CONFIG_ATTEMPTS = 30;
+ public static final long DEFAULT_NETCONF_PUSH_CONFIG_DELAY_MS = TimeUnit.MINUTES.toMillis(1);
+
+ final InetSocketAddress netconfAddress;
+ final EventLoopGroup eventLoopGroup;
+
+ /**
+ * Total time to wait for capability stabilization
+ */
+ final long netconfCapabilitiesWaitTimeoutMs;
+
+ /**
+ * Delay between message send attempts
+ */
+ final int netconfSendMessageDelayMs;
+ /**
+ * Total number attempts to send a message
+ */
+ final int netconfSendMessageMaxAttempts;
+
+ /**
+ * Delay between connection establishment attempts
+ */
+ final int connectionAttemptDelayMs;
+ /**
+ * Total number of attempts to perform connection establishment
+ */
+ final long connectionAttemptTimeoutMs;
+
+ /**
+ * Total number of attempts to push configuration to netconf
+ */
+ final int netconfPushConfigAttempts;
+ /**
+ * Delay between configuration push attempts
+ */
+ final long netconfPushConfigDelayMs;
+
+ ConfigPusherConfiguration(InetSocketAddress netconfAddress, long netconfCapabilitiesWaitTimeoutMs,
+ int netconfSendMessageDelayMs, int netconfSendMessageMaxAttempts, int connectionAttemptDelayMs,
+ long connectionAttemptTimeoutMs, EventLoopGroup eventLoopGroup, int netconfPushConfigAttempts,
+ long netconfPushConfigDelayMs) {
+ this.netconfAddress = netconfAddress;
+ this.netconfCapabilitiesWaitTimeoutMs = netconfCapabilitiesWaitTimeoutMs;
+ this.netconfSendMessageDelayMs = netconfSendMessageDelayMs;
+ this.netconfSendMessageMaxAttempts = netconfSendMessageMaxAttempts;
+ this.connectionAttemptDelayMs = connectionAttemptDelayMs;
+ this.connectionAttemptTimeoutMs = connectionAttemptTimeoutMs;
+ this.eventLoopGroup = eventLoopGroup;
+ this.netconfPushConfigAttempts = netconfPushConfigAttempts;
+ this.netconfPushConfigDelayMs = netconfPushConfigDelayMs;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.netconf.persist.impl;
+
+import io.netty.channel.EventLoopGroup;
+
+import java.net.InetSocketAddress;
+
+public class ConfigPusherConfigurationBuilder {
+ InetSocketAddress netconfAddress;
+ EventLoopGroup eventLoopGroup;
+
+ long netconfCapabilitiesWaitTimeoutMs = ConfigPusherConfiguration.DEFAULT_NETCONF_CAPABILITIES_WAIT_TIMEOUT_MS;
+ int netconfSendMessageDelayMs = ConfigPusherConfiguration.DEFAULT_NETCONF_SEND_MESSAGE_DELAY_MS;
+ int netconfSendMessageMaxAttempts = ConfigPusherConfiguration.DEFAULT_NETCONF_SEND_MESSAGE_MAX_ATTEMPTS;
+ int connectionAttemptDelayMs = ConfigPusherConfiguration.DEFAULT_CONNECTION_ATTEMPT_DELAY_MS;
+ long connectionAttemptTimeoutMs = ConfigPusherConfiguration.DEFAULT_CONNECTION_ATTEMPT_TIMEOUT_MS;
+ int netconfPushConfigAttempts = ConfigPusherConfiguration.DEFAULT_NETCONF_PUSH_CONFIG_ATTEMPTS;
+ long netconfPushConfigDelayMs = ConfigPusherConfiguration.DEFAULT_NETCONF_PUSH_CONFIG_DELAY_MS;
+
+ private ConfigPusherConfigurationBuilder() {
+ }
+
+ public static ConfigPusherConfigurationBuilder aConfigPusherConfiguration() {
+ return new ConfigPusherConfigurationBuilder();
+ }
+
+ public ConfigPusherConfigurationBuilder withNetconfAddress(InetSocketAddress netconfAddress) {
+ this.netconfAddress = netconfAddress;
+ return this;
+ }
+
+ public ConfigPusherConfigurationBuilder withNetconfCapabilitiesWaitTimeoutMs(long netconfCapabilitiesWaitTimeoutMs) {
+ this.netconfCapabilitiesWaitTimeoutMs = netconfCapabilitiesWaitTimeoutMs;
+ return this;
+ }
+
+ public ConfigPusherConfigurationBuilder withNetconfSendMessageDelayMs(int netconfSendMessageDelayMs) {
+ this.netconfSendMessageDelayMs = netconfSendMessageDelayMs;
+ return this;
+ }
+
+ public ConfigPusherConfigurationBuilder withNetconfSendMessageMaxAttempts(int netconfSendMessageMaxAttempts) {
+ this.netconfSendMessageMaxAttempts = netconfSendMessageMaxAttempts;
+ return this;
+ }
+
+ public ConfigPusherConfigurationBuilder withConnectionAttemptDelayMs(int connectionAttemptDelayMs) {
+ this.connectionAttemptDelayMs = connectionAttemptDelayMs;
+ return this;
+ }
+
+ public ConfigPusherConfigurationBuilder withConnectionAttemptTimeoutMs(long connectionAttemptTimeoutMs) {
+ this.connectionAttemptTimeoutMs = connectionAttemptTimeoutMs;
+ return this;
+ }
+
+ public ConfigPusherConfigurationBuilder withEventLoopGroup(EventLoopGroup eventLoopGroup) {
+ this.eventLoopGroup = eventLoopGroup;
+ return this;
+ }
+
+ public ConfigPusherConfigurationBuilder withNetconfPushConfigAttempts(int netconfPushConfigAttempts) {
+ this.netconfPushConfigAttempts = netconfPushConfigAttempts;
+ return this;
+ }
+
+ public ConfigPusherConfigurationBuilder withNetconfPushConfigDelayMs(long netconfPushConfigDelayMs) {
+ this.netconfPushConfigDelayMs = netconfPushConfigDelayMs;
+ return this;
+ }
+
+ public ConfigPusherConfiguration build() {
+ ConfigPusherConfiguration configPusherConfiguration = new ConfigPusherConfiguration(netconfAddress,
+ netconfCapabilitiesWaitTimeoutMs, netconfSendMessageDelayMs, netconfSendMessageMaxAttempts,
+ connectionAttemptDelayMs, connectionAttemptTimeoutMs, eventLoopGroup, netconfPushConfigAttempts,
+ netconfPushConfigDelayMs);
+ return configPusherConfiguration;
+ }
+}
public static PersisterAggregator createFromProperties(PropertiesProviderBaseImpl propertiesProvider) {
List<PersisterWithConfiguration> persisterWithConfigurations = new ArrayList<>();
String prefixes = propertiesProvider.getProperty("active");
- if (prefixes.isEmpty() == false) {
+ if (prefixes!=null && prefixes.isEmpty() == false) {
String [] keys = prefixes.split(",");
for (String index: keys) {
persisterWithConfigurations.add(PersisterAggregator.loadConfiguration(index, propertiesProvider));
package org.opendaylight.controller.netconf.persist.impl.osgi;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.opendaylight.controller.netconf.persist.impl.ConfigPersisterNotificationHandler;
import org.opendaylight.controller.netconf.persist.impl.ConfigPusher;
+import org.opendaylight.controller.netconf.persist.impl.ConfigPusherConfiguration;
+import org.opendaylight.controller.netconf.persist.impl.ConfigPusherConfigurationBuilder;
import org.opendaylight.controller.netconf.persist.impl.PersisterAggregator;
import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil;
import org.osgi.framework.BundleActivator;
import javax.management.MBeanServer;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
+import java.util.concurrent.ThreadFactory;
import java.util.regex.Pattern;
public class ConfigPersisterActivator implements BundleActivator {
private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterActivator.class);
- private final static MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
- private static final String IGNORED_MISSING_CAPABILITY_REGEX_SUFFIX = "ignoredMissingCapabilityRegex";
+ public static final String IGNORED_MISSING_CAPABILITY_REGEX_SUFFIX = "ignoredMissingCapabilityRegex";
- private static final String MAX_WAIT_FOR_CAPABILITIES_MILLIS = "maxWaitForCapabilitiesMillis";
+ public static final String MAX_WAIT_FOR_CAPABILITIES_MILLIS = "maxWaitForCapabilitiesMillis";
public static final String NETCONF_CONFIG_PERSISTER = "netconf.config.persister";
public static final String DEFAULT_IGNORED_REGEX = "^urn:ietf:params:xml:ns:netconf:base:1.0";
+ private final MBeanServer platformMBeanServer;
+ private final Optional<ConfigPusherConfiguration> initialConfigForPusher;
private volatile ConfigPersisterNotificationHandler jmxNotificationHandler;
private Thread initializationThread;
+ private ThreadFactory initializationThreadFactory;
private EventLoopGroup nettyThreadGroup;
private PersisterAggregator persisterAggregator;
+ public ConfigPersisterActivator() {
+ this(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable initializationRunnable) {
+ return new Thread(initializationRunnable, "ConfigPersister-registrator");
+ }
+ }, ManagementFactory.getPlatformMBeanServer(), null);
+ }
+
+ @VisibleForTesting
+ protected ConfigPersisterActivator(ThreadFactory threadFactory, MBeanServer mBeanServer,
+ ConfigPusherConfiguration initialConfigForPusher) {
+ this.initializationThreadFactory = threadFactory;
+ this.platformMBeanServer = mBeanServer;
+ this.initialConfigForPusher = Optional.fromNullable(initialConfigForPusher);
+ }
+
@Override
public void start(final BundleContext context) throws Exception {
logger.debug("ConfigPersister starting");
PropertiesProviderBaseImpl propertiesProvider = new PropertiesProviderBaseImpl(context);
- String regexProperty = propertiesProvider.getProperty(IGNORED_MISSING_CAPABILITY_REGEX_SUFFIX);
- String regex;
- if (regexProperty != null) {
- regex = regexProperty;
- } else {
- regex = DEFAULT_IGNORED_REGEX;
- }
-
- String timeoutProperty = propertiesProvider.getProperty(MAX_WAIT_FOR_CAPABILITIES_MILLIS);
- long maxWaitForCapabilitiesMillis;
- if (timeoutProperty == null) {
- maxWaitForCapabilitiesMillis = ConfigPusher.DEFAULT_MAX_WAIT_FOR_CAPABILITIES_MILLIS;
- } else {
- maxWaitForCapabilitiesMillis = Integer.valueOf(timeoutProperty);
- }
-
- final Pattern ignoredMissingCapabilityRegex = Pattern.compile(regex);
- nettyThreadGroup = new NioEventLoopGroup();
+ final Pattern ignoredMissingCapabilityRegex = getIgnoredCapabilitiesProperty(propertiesProvider);
persisterAggregator = PersisterAggregator.createFromProperties(propertiesProvider);
- final InetSocketAddress address = NetconfConfigUtil.extractTCPNetconfAddress(context,
- "Netconf is not configured, persister is not operational", true);
- final ConfigPusher configPusher = new ConfigPusher(address, nettyThreadGroup, maxWaitForCapabilitiesMillis,
- ConfigPusher.DEFAULT_CONNECTION_TIMEOUT_MILLIS);
+ final ConfigPusher configPusher = new ConfigPusher(getConfigurationForPusher(context, propertiesProvider));
// offload initialization to another thread in order to stop blocking activator
Runnable initializationRunnable = new Runnable() {
logger.info("Configuration Persister initialization completed.");
}
};
- initializationThread = new Thread(initializationRunnable, "ConfigPersister-registrator");
+
+ initializationThread = initializationThreadFactory.newThread(initializationRunnable);
initializationThread.start();
}
+ private Pattern getIgnoredCapabilitiesProperty(PropertiesProviderBaseImpl propertiesProvider) {
+ String regexProperty = propertiesProvider.getProperty(IGNORED_MISSING_CAPABILITY_REGEX_SUFFIX);
+ String regex;
+ if (regexProperty != null) {
+ regex = regexProperty;
+ } else {
+ regex = DEFAULT_IGNORED_REGEX;
+ }
+ return Pattern.compile(regex);
+ }
+
+ private Optional<Long> getMaxWaitForCapabilitiesProperty(PropertiesProviderBaseImpl propertiesProvider) {
+ String timeoutProperty = propertiesProvider.getProperty(MAX_WAIT_FOR_CAPABILITIES_MILLIS);
+ return Optional.fromNullable(timeoutProperty == null ? null : Long.valueOf(timeoutProperty));
+ }
+
+ private ConfigPusherConfiguration getConfigurationForPusher(BundleContext context,
+ PropertiesProviderBaseImpl propertiesProvider) {
+
+ // If configuration was injected via constructor, use it
+ if(initialConfigForPusher.isPresent())
+ return initialConfigForPusher.get();
+
+ Optional<Long> maxWaitForCapabilitiesMillis = getMaxWaitForCapabilitiesProperty(propertiesProvider);
+ final InetSocketAddress address = NetconfConfigUtil.extractTCPNetconfAddress(context,
+ "Netconf is not configured, persister is not operational", true);
+
+ nettyThreadGroup = new NioEventLoopGroup();
+
+ ConfigPusherConfigurationBuilder configPusherConfigurationBuilder = ConfigPusherConfigurationBuilder.aConfigPusherConfiguration();
+
+ if(maxWaitForCapabilitiesMillis.isPresent())
+ configPusherConfigurationBuilder.withNetconfCapabilitiesWaitTimeoutMs(maxWaitForCapabilitiesMillis.get());
+
+ return configPusherConfigurationBuilder
+ .withEventLoopGroup(nettyThreadGroup)
+ .withNetconfAddress(address)
+ .build();
+ }
+
@Override
public void stop(BundleContext context) throws Exception {
initializationThread.interrupt();
if (jmxNotificationHandler != null) {
jmxNotificationHandler.close();
}
- nettyThreadGroup.shutdownGracefully();
+ if(nettyThreadGroup!=null)
+ nettyThreadGroup.shutdownGracefully();
persisterAggregator.close();
}
}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.netconf.persist.impl.osgi;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeoutException;
+
+import javax.management.MBeanServer;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.matchers.JUnitMatchers;
+import org.opendaylight.controller.config.api.ConflictingVersionException;
+import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
+import org.opendaylight.controller.netconf.persist.impl.ConfigPusherConfiguration;
+import org.opendaylight.controller.netconf.persist.impl.ConfigPusherConfigurationBuilder;
+
+import com.google.common.collect.Lists;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+public class ConfigPersisterTest {
+
+ private MockedBundleContext ctx;
+ private ConfigPersisterActivator configPersisterActivator;
+ private static final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+
+ private static final String NETCONF_ADDRESS = "localhost";
+ private static final String NETCONF_PORT = "18383";
+ private static NioEventLoopGroup eventLoopGroup;
+
+ private void setUpContextAndStartPersister(Thread.UncaughtExceptionHandler exHandler, String requiredCapability, ConfigPusherConfiguration configuration)
+ throws Exception {
+ MockedBundleContext.DummyAdapterWithInitialSnapshot.expectedCapability = requiredCapability;
+ ctx = new MockedBundleContext(NETCONF_ADDRESS, NETCONF_PORT);
+ configPersisterActivator = new ConfigPersisterActivator(getThreadFactory(exHandler), mBeanServer,
+ configuration);
+ configPersisterActivator.start(ctx.getBundleContext());
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ eventLoopGroup = new NioEventLoopGroup();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ configPersisterActivator.stop(ctx.getBundleContext());
+ }
+
+ @AfterClass
+ public static void closeNettyGroup() throws Exception {
+ eventLoopGroup.shutdownGracefully();
+ }
+
+ @Test
+ public void testPersisterNetconfNotStarting() throws Exception {
+ final TestingExceptionHandler handler = new TestingExceptionHandler();
+
+ setUpContextAndStartPersister(handler, "cap2", getConfiguration(100, 100).build());
+
+ waitTestToFinish(2000);
+
+ handler.assertException("connect to netconf endpoint", RuntimeException.class,
+ "Could not connect to netconf server");
+ }
+
+ @Test
+ public void testPersisterNotAllCapabilitiesProvided() throws Exception {
+ final TestingExceptionHandler handler = new TestingExceptionHandler();
+ ConfigPusherConfiguration cfg = getConfiguration(500, 1000)
+ .withNetconfCapabilitiesWaitTimeoutMs(1000).build();
+
+ setUpContextAndStartPersister(handler, "required-cap", cfg);
+
+ try (MockNetconfEndpoint endpoint = startMockNetconfEndpoint("cap1")) {
+
+ waitTestToFinish(2500);
+
+ handler.assertException("retrieve required capabilities from netconf endpoint", RuntimeException.class,
+ "Expected but not found:[required-cap]");
+ }
+ }
+
+ @Test
+ public void testPersisterNoResponseFromNetconfAfterEdit() throws Exception {
+ final TestingExceptionHandler handler = new TestingExceptionHandler();
+ ConfigPusherConfiguration cfg = getConfigurationWithOnePushAttempt();
+
+ setUpContextAndStartPersister(handler, "cap1", cfg);
+
+ try (MockNetconfEndpoint endpoint = startMockNetconfEndpoint("cap1")) {
+
+ waitTestToFinish(3000);
+
+ handler.assertException("receive response from netconf endpoint", IllegalStateException.class,
+ "Unable to load", TimeoutException.class,
+ null, 3);
+
+ assertEquals(1 + 2, endpoint.getReceivedMessages().size());
+ assertHelloMessage(endpoint.getReceivedMessages().get(1));
+ assertEditMessage(endpoint.getReceivedMessages().get(2));
+ }
+ }
+
+ private ConfigPusherConfiguration getConfigurationWithOnePushAttempt() {
+ return getConfiguration(500, 1000)
+ .withNetconfCapabilitiesWaitTimeoutMs(1000)
+ .withNetconfPushConfigAttempts(1)
+ .withNetconfPushConfigDelayMs(100)
+ .withNetconfSendMessageMaxAttempts(3)
+ .withNetconfSendMessageDelayMs(500).build();
+ }
+
+ @Test
+ public void testPersisterSuccessfulPush() throws Exception {
+ final TestingExceptionHandler handler = new TestingExceptionHandler();
+ ConfigPusherConfiguration cfg = getConfigurationForSuccess();
+
+ setUpContextAndStartPersister(handler, "cap1", cfg);
+
+ try (MockNetconfEndpoint endpoint = startMockNetconfEndpoint("cap1", MockNetconfEndpoint.okMessage,
+ MockNetconfEndpoint.okMessage)) {
+
+ waitTestToFinish(4000);
+
+ handler.assertException("register as JMX listener", RuntimeException.class,
+ "Cannot register as JMX listener to netconf");
+
+ assertEquals(1 + 3, endpoint.getReceivedMessages().size());
+ assertCommitMessage(endpoint.getReceivedMessages().get(3));
+ }
+ }
+
+ private ConfigPusherConfiguration getConfigurationForSuccess() {
+ return getConfiguration(500, 1000)
+ .withNetconfCapabilitiesWaitTimeoutMs(1000)
+ .withNetconfPushConfigAttempts(3)
+ .withNetconfPushConfigDelayMs(100)
+ .withNetconfSendMessageMaxAttempts(3)
+ .withNetconfSendMessageDelayMs(500).build();
+ }
+
+ @Test
+ public void testPersisterConflictingVersionException() throws Exception {
+ final TestingExceptionHandler handler = new TestingExceptionHandler();
+ ConfigPusherConfiguration cfg = getConfigurationWithOnePushAttempt();
+
+ setUpContextAndStartPersister(handler, "cap1", cfg);
+
+ try (MockNetconfEndpoint endpoint = startMockNetconfEndpoint("cap1", MockNetconfEndpoint.okMessage,
+ MockNetconfEndpoint.conflictingVersionErrorMessage); DefaultCommitNotificationProducer jMXNotifier = startJMXCommitNotifier();) {
+
+ Thread.sleep(4000);
+
+ handler.assertException("register as JMX listener", IllegalStateException.class,
+ "Maximum attempt count has been reached for pushing", ConflictingVersionException.class, "Optimistic lock failed", 1);
+
+ assertEquals(1 + 3, endpoint.getReceivedMessages().size());
+ assertCommitMessage(endpoint.getReceivedMessages().get(3));
+ }
+ }
+
+ @Test
+ public void testPersisterConflictingVersionExceptionThenSuccess() throws Exception {
+ final TestingExceptionHandler handler = new TestingExceptionHandler();
+ ConfigPusherConfiguration cfg = getConfigurationForSuccess();
+
+ setUpContextAndStartPersister(handler, "cap1", cfg);
+
+ MockNetconfEndpoint.MessageSequence conflictingMessageSequence = new MockNetconfEndpoint.MessageSequence(
+ MockNetconfEndpoint.okMessage, MockNetconfEndpoint.conflictingVersionErrorMessage);
+ MockNetconfEndpoint.MessageSequence okMessageSequence = new MockNetconfEndpoint.MessageSequence(
+ MockNetconfEndpoint.okMessage, MockNetconfEndpoint.okMessage);
+
+ try (MockNetconfEndpoint endpoint = startMockNetconfEndpoint("cap1",
+ Lists.newArrayList(conflictingMessageSequence, okMessageSequence));
+ DefaultCommitNotificationProducer jMXNotifier = startJMXCommitNotifier()) {
+
+ Thread.sleep(4000);
+
+ handler.assertNoException();
+
+ assertEquals(1 + 3/*Hello + Edit + Commit*/ + 3/*Hello + Edit + Commit*/, endpoint.getReceivedMessages().size());
+ assertCommitMessage(endpoint.getReceivedMessages().get(6));
+ }
+ }
+
+ @Test
+ public void testPersisterSuccessfulPushAndSuccessfulJMXRegistration() throws Exception {
+ final TestingExceptionHandler handler = new TestingExceptionHandler();
+ ConfigPusherConfiguration cfg = getConfigurationForSuccess();
+
+ setUpContextAndStartPersister(handler, "cap1", cfg);
+
+ try (MockNetconfEndpoint endpoint = startMockNetconfEndpoint("cap1", MockNetconfEndpoint.okMessage,
+ MockNetconfEndpoint.okMessage); DefaultCommitNotificationProducer jMXNotifier = startJMXCommitNotifier()) {
+
+ Thread.sleep(2000);
+
+ handler.assertNoException();
+
+ assertEquals(1 + 3, endpoint.getReceivedMessages().size());
+ }
+ }
+
+ private ConfigPusherConfigurationBuilder getConfiguration(int connectionAttemptDelayMs, int connectionAttemptTimeoutMs) {
+ return ConfigPusherConfigurationBuilder.aConfigPusherConfiguration()
+ .withEventLoopGroup(eventLoopGroup)
+ .withConnectionAttemptDelayMs(connectionAttemptDelayMs)
+ .withConnectionAttemptTimeoutMs(connectionAttemptTimeoutMs)
+ .withNetconfCapabilitiesWaitTimeoutMs(44)
+ .withNetconfAddress(new InetSocketAddress(NETCONF_ADDRESS, Integer.valueOf(NETCONF_PORT)));
+ }
+
+ private void waitTestToFinish(int i) throws InterruptedException {
+ Thread.sleep(i);
+ }
+
+
+ private DefaultCommitNotificationProducer startJMXCommitNotifier() {
+ return new DefaultCommitNotificationProducer(mBeanServer);
+ }
+
+ private void assertEditMessage(String netconfMessage) {
+ assertThat(netconfMessage,
+ JUnitMatchers.containsString(MockedBundleContext.DummyAdapterWithInitialSnapshot.CONFIG_SNAPSHOT));
+ }
+
+ private void assertCommitMessage(String netconfMessage) {
+ assertThat(netconfMessage, JUnitMatchers.containsString("<commit"));
+ }
+
+ private void assertHelloMessage(String netconfMessage) {
+ assertThat(netconfMessage,
+ JUnitMatchers.containsString("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">"));
+ assertThat(netconfMessage, JUnitMatchers.containsString("<capability>"));
+ }
+
+ private MockNetconfEndpoint startMockNetconfEndpoint(String capability, List<MockNetconfEndpoint.MessageSequence> messageSequences) {
+ // Add first empty sequence for testing connection created by config persister at startup
+ messageSequences.add(0, new MockNetconfEndpoint.MessageSequence(Collections.<String>emptyList()));
+ return new MockNetconfEndpoint(capability, NETCONF_PORT, messageSequences);
+ }
+
+ private MockNetconfEndpoint startMockNetconfEndpoint(String capability, String... messages) {
+ return startMockNetconfEndpoint(capability, Lists.newArrayList(new MockNetconfEndpoint.MessageSequence(messages)));
+ }
+
+ public ThreadFactory getThreadFactory(final Thread.UncaughtExceptionHandler exHandler) {
+ return new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, "config-persister-testing-activator");
+ thread.setUncaughtExceptionHandler(exHandler);
+ return thread;
+ }
+ };
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.netconf.persist.impl.osgi;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+
+import com.google.common.collect.Lists;
+
+class MockNetconfEndpoint implements AutoCloseable {
+
+ public static final int READ_SOCKET_TIMEOUT = 3000;
+
+ public static final String MSG_SEPARATOR = "]]>]]>\n";
+
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
+ private List<String> receivedMessages = Lists.newCopyOnWriteArrayList();
+ private Thread innerThread;
+
+ MockNetconfEndpoint(String capability, String netconfPort, List<MessageSequence> messageSequence) {
+ helloMessage = helloMessage.replace("capability_place_holder", capability);
+ start(netconfPort, messageSequence);
+ }
+
+ private String helloMessage = "<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ "<capabilities>\n" +
+ "<capability>capability_place_holder</capability>\n" +
+ "</capabilities>\n" +
+ "<session-id>1</session-id>\n" +
+ "</hello>\n" +
+ MSG_SEPARATOR;
+
+ public static String conflictingVersionErrorMessage;
+ static {
+ try {
+ conflictingVersionErrorMessage = XmlUtil.toString(XmlFileLoader
+ .xmlFileToDocument("netconfMessages/conflictingversion/conflictingVersionResponse.xml")) + MSG_SEPARATOR;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static String okMessage = "<rpc-reply message-id=\"1\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ "<ok/>\n" +
+ "</rpc-reply>" +
+ MSG_SEPARATOR ;
+
+ private void start(final String port, final List<MessageSequence> messagesToSend) {
+ innerThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ int clientCounter = 0;
+
+ while (stopped.get() == false) {
+ try (ServerSocket s = new ServerSocket(Integer.valueOf(port))) {
+ s.setSoTimeout(READ_SOCKET_TIMEOUT);
+
+ Socket clientSocket = s.accept();
+ clientCounter++;
+ clientSocket.setSoTimeout(READ_SOCKET_TIMEOUT);
+
+ PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
+ BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
+
+ // Negotiate
+ sendMessage(out, helloMessage);
+ receiveMessage(in);
+
+ // Accept next message (edit-config)
+ receiveMessage(in);
+
+ for (String message : getMessageSequenceForClient(messagesToSend, clientCounter)) {
+ sendMessage(out, message);
+ receiveMessage(in);
+ }
+ } catch (SocketTimeoutException e) {
+ // No more activity on netconf endpoint, close
+ return;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private Iterable<? extends String> getMessageSequenceForClient(List<MessageSequence> messagesToSend,
+ int clientCounter) {
+ if (messagesToSend.size() <= clientCounter) {
+ return messagesToSend.get(messagesToSend.size() - 1).getMessages();
+ } else {
+ return messagesToSend.get(clientCounter - 1).getMessages();
+ }
+ }
+
+ private void receiveMessage(BufferedReader in) throws Exception {
+ String message = readMessage(in);
+ if(message == null || message.equals(""))
+ return;
+ receivedMessages.add(message);
+ }
+
+ private String readMessage(BufferedReader in) throws IOException {
+ int c;
+ StringBuilder b = new StringBuilder();
+
+ while((c = in.read()) != -1) {
+ b.append((char)c);
+ if(b.toString().endsWith("]]>]]>"))
+ break;
+ }
+
+ return b.toString();
+ }
+
+ private void sendMessage(PrintWriter out, String message) throws InterruptedException {
+ out.print(message);
+ out.flush();
+ }
+
+ });
+ innerThread.setName("Mocked-netconf-endpoint-inner-thread");
+ innerThread.start();
+ }
+
+ public List<String> getReceivedMessages() {
+ return receivedMessages;
+ }
+
+ public void close() throws IOException, InterruptedException {
+ stopped.set(true);
+ innerThread.join();
+ }
+
+ static class MessageSequence {
+ private List<String> messages;
+
+ MessageSequence(List<String> messages) {
+ this.messages = messages;
+ }
+
+ MessageSequence(String... messages) {
+ this(Lists.newArrayList(messages));
+ }
+
+ public Collection<String> getMessages() {
+ return messages;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.netconf.persist.impl.osgi;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
+import org.opendaylight.controller.config.persist.api.Persister;
+import org.opendaylight.controller.config.persist.api.PropertiesProvider;
+import org.opendaylight.controller.netconf.persist.impl.DummyAdapter;
+import org.osgi.framework.BundleContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static org.mockito.Mockito.doReturn;
+
+final class MockedBundleContext {
+
+ @Mock
+ private BundleContext context;
+
+ MockedBundleContext(String netconfAddress, String netconfPort) {
+ MockitoAnnotations.initMocks(this);
+ initContext(netconfAddress, netconfPort);
+ }
+
+ public BundleContext getBundleContext() {
+ return context;
+ }
+
+ private void initContext(String netconfAddress, String netconfPort) {
+ initProp(context, ConfigPersisterActivator.IGNORED_MISSING_CAPABILITY_REGEX_SUFFIX, null);
+
+ initPropNoPrefix(context, "netconf.tcp.client.address", netconfAddress);
+ initPropNoPrefix(context, "netconf.tcp.client.port", netconfPort);
+
+ initProp(context, "active", "1");
+ initProp(context, "1." + ConfigPersisterActivator.STORAGE_ADAPTER_CLASS_PROP_SUFFIX, DummyAdapterWithInitialSnapshot.class.getName());
+ initProp(context, "1." + "readonly", "false");
+ initProp(context, "1." + ".properties.fileStorage", "target/configuration-persister-test/initial/");
+
+ }
+
+ private void initProp(BundleContext context, String key, String value) {
+ initPropNoPrefix(context, ConfigPersisterActivator.NETCONF_CONFIG_PERSISTER + "." + key, value);
+ }
+
+ private void initPropNoPrefix(BundleContext context, String key, String value) {
+ doReturn(value).when(context).getProperty(key);
+ }
+
+ public static class DummyAdapterWithInitialSnapshot extends DummyAdapter {
+
+ public static final String CONFIG_SNAPSHOT = "config-snapshot";
+ public static String expectedCapability = "cap2";
+
+ @Override
+ public List<ConfigSnapshotHolder> loadLastConfigs() throws IOException {
+ return Lists.newArrayList(getConfigSnapshopt());
+ }
+
+ @Override
+ public Persister instantiate(PropertiesProvider propertiesProvider) {
+ return this;
+ }
+
+ public ConfigSnapshotHolder getConfigSnapshopt() {
+ return new ConfigSnapshotHolder() {
+ @Override
+ public String getConfigSnapshot() {
+ return "<data><" + CONFIG_SNAPSHOT + "/></data>";
+ }
+
+ @Override
+ public SortedSet<String> getCapabilities() {
+ TreeSet<String> strings = Sets.newTreeSet();
+ strings.add(expectedCapability);
+ return strings;
+ }
+
+ @Override
+ public String toString() {
+ return getConfigSnapshot();
+ }
+ };
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.netconf.persist.impl.osgi;
+
+import org.junit.matchers.JUnitMatchers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+final class TestingExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+ private Throwable t;
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ this.t = e;
+ }
+
+ public void assertException(String failMessageSuffix, Class<? extends Exception> exType, String exMessageToContain) {
+ if(t == null) {
+ fail("Should fail to " + failMessageSuffix);
+ }
+ else {
+ assertException(t, exType, exMessageToContain);
+ }
+ }
+
+ public void assertNoException() {
+ assertNull("No exception expected but was " + t, t);
+ }
+
+ private void assertException(Throwable t, Class<? extends Exception> exType, String exMessageToContain) {
+ assertEquals("Expected exception of type " + exType + " but was " + t, exType, t.getClass());
+ if(exMessageToContain!=null) {
+ assertThat(t.getMessage(), JUnitMatchers.containsString(exMessageToContain));
+ }
+ }
+
+ public void assertException(String failMessageSuffix, Class<? extends Exception> exType,
+ String exMessageToContain, Class<? extends Exception> nestedExType, String nestedExMessageToContain,
+ int nestedExDepth) {
+ assertException(failMessageSuffix, exType, exMessageToContain);
+ assertNotNull("Expected nested exception in " + t, t.getCause());
+ assertException(getNestedException(t, nestedExDepth), nestedExType, nestedExMessageToContain);
+ }
+
+ private Throwable getNestedException(Throwable t, int nestedExDepth) {
+
+ int depth = 0;
+ while(t.getCause() != null) {
+ t = t.getCause();
+ depth++;
+ if(nestedExDepth == depth)
+ return t;
+ }
+ throw new IllegalArgumentException("Unable to get nested exception from " + t + " from depth " + nestedExDepth);
+ }
+}
public static Document xmlFileToDocument(final String fileName) throws IOException, SAXException,
ParserConfigurationException {
try (InputStream resourceAsStream = XmlFileLoader.class.getClassLoader().getResourceAsStream(fileName)) {
- Preconditions.checkNotNull(resourceAsStream);
+ Preconditions.checkNotNull(resourceAsStream, fileName);
final Document doc = XmlUtil.readXmlToDocument(resourceAsStream);
return doc;
}