import org.opendaylight.controller.config.manager.impl.osgi.mapping.RefreshingSCPModuleInfoRegistry;
import org.opendaylight.controller.config.manager.impl.util.OsgiRegistrationUtil;
import org.opendaylight.controller.config.spi.ModuleFactory;
-import org.opendaylight.yangtools.concepts.ObjectRegistration;
import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
-import org.opendaylight.yangtools.yang.binding.YangModuleInfo;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.util.tracker.ServiceTracker;
import javax.management.MBeanServer;
import java.lang.management.ManagementFactory;
import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import static org.opendaylight.controller.config.manager.impl.util.OsgiRegistrationUtil.registerService;
// track bundles containing factories
BlankTransactionServiceTracker blankTransactionServiceTracker = new BlankTransactionServiceTracker(
configRegistry);
- ModuleFactoryBundleTracker moduleFactoryBundleTracker = new ModuleFactoryBundleTracker(
+ ModuleFactoryBundleTracker primaryModuleFactoryBundleTracker = new ModuleFactoryBundleTracker(
blankTransactionServiceTracker);
// start extensible tracker
- ExtensibleBundleTracker<Collection<ObjectRegistration<YangModuleInfo>>> bundleTracker = new ExtensibleBundleTracker<>(context, moduleInfoBundleTracker, moduleFactoryBundleTracker);
+ ExtensibleBundleTracker<?> bundleTracker = new ExtensibleBundleTracker<>(context,
+ primaryModuleFactoryBundleTracker, moduleInfoBundleTracker);
bundleTracker.open();
// register config registry to OSGi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
import static com.google.common.base.Preconditions.checkState;
public class Activator implements BundleActivator {
public void run() {
NetconfOperationServiceFactoryImpl factory = new NetconfOperationServiceFactoryImpl(yangStoreService);
logger.debug("Registering into OSGi");
- osgiRegistration = context.registerService(NetconfOperationServiceFactory.class, factory, null);
+ Dictionary<String, String> properties = new Hashtable<>();
+ properties.put("name", "config-netconf-connector");
+ osgiRegistration = context.registerService(NetconfOperationServiceFactory.class, factory, properties);
}
}
}
}
@Override
- public NetconfOperationServiceImpl createService(long netconfSessionId, String netconfSessionIdForReporting) {
+ public NetconfOperationServiceImpl createService(String netconfSessionIdForReporting) {
try {
return new NetconfOperationServiceImpl(yangStoreService, jmxClient, netconfSessionIdForReporting);
} catch (YangStoreException e) {
import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreSnapshot;
import org.opendaylight.controller.netconf.confignetconfconnector.transactions.TransactionProvider;
import org.opendaylight.controller.netconf.impl.mapping.operations.DefaultCloseSession;
-import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationRouterImpl;
import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceSnapshot;
import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
Preconditions.checkState(priority != HandlingPriority.CANNOT_HANDLE);
- final Document response = op.handle(request, NetconfOperationRouterImpl.EXECUTION_TERMINATION_POINT);
+ final Document response = op.handle(request, NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
logger.debug("Got response\n{}", XmlUtil.toString(response));
return response.getDocumentElement();
}
<groupId>${project.groupId}</groupId>
<artifactId>netconf-api</artifactId>
</dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>netconf-client</artifactId>
- </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netconf-util</artifactId>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>mockito-configuration</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<instructions>
<Bundle-Activator>org.opendaylight.controller.netconf.persist.impl.osgi.ConfigPersisterActivator
</Bundle-Activator>
- <Require-Capability>org.opendaylight.controller.config.persister.storage.adapter
- </Require-Capability>
- <Import-Package>
- com.google.common.base,
- com.google.common.collect,
- javax.management,
- javax.xml.parsers,
- org.opendaylight.controller.config.persist.api,
- org.opendaylight.controller.netconf.api,
- org.opendaylight.controller.netconf.api.jmx,
- org.opendaylight.controller.netconf.client,
- org.opendaylight.controller.netconf.util.osgi,
- org.opendaylight.controller.netconf.util.xml,
- org.opendaylight.controller.netconf.util.messages,
- io.netty.channel,
- io.netty.channel.nio,
- io.netty.util.concurrent,
- org.osgi.framework,
- org.slf4j,
- org.w3c.dom,
- org.xml.sax,
- javax.xml.namespace,
- javax.xml.xpath,
- org.opendaylight.controller.config.api,
- org.opendaylight.controller.netconf.util
- </Import-Package>
+ <Require-Capability>org.opendaylight.controller.config.persister.storage.adapter</Require-Capability>
<Export-Package>
</Export-Package>
</instructions>
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
+
package org.opendaylight.controller.netconf.persist.impl;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.regex.Pattern;
-
-import static com.google.common.base.Preconditions.checkState;
+/**
+ * Inspects snapshot xml to be stored, remove all capabilities that are not referenced by it.
+ * Useful when persisting current configuration.
+ */
public class CapabilityStrippingConfigSnapshotHolder implements ConfigSnapshotHolder {
private static final Logger logger = LoggerFactory.getLogger(CapabilityStrippingConfigSnapshotHolder.class);
private final String configSnapshot;
private final StripCapabilitiesResult stripCapabilitiesResult;
- public CapabilityStrippingConfigSnapshotHolder(Element snapshot, Set<String> capabilities, Pattern ignoredMissingCapabilityRegex) {
+ public CapabilityStrippingConfigSnapshotHolder(Element snapshot, Set<String> capabilities) {
final XmlElement configElement = XmlElement.fromDomElement(snapshot);
configSnapshot = XmlUtil.toString(configElement.getDomElement());
- stripCapabilitiesResult = stripCapabilities(configElement, capabilities, ignoredMissingCapabilityRegex);
+ stripCapabilitiesResult = stripCapabilities(configElement, capabilities);
}
private static class StripCapabilitiesResult {
- private final SortedSet<String> requiredCapabilities, missingNamespaces;
+ private final SortedSet<String> requiredCapabilities, obsoleteCapabilities;
- private StripCapabilitiesResult(SortedSet<String> requiredCapabilities, SortedSet<String> missingNamespaces) {
+ private StripCapabilitiesResult(SortedSet<String> requiredCapabilities, SortedSet<String> obsoleteCapabilities) {
this.requiredCapabilities = Collections.unmodifiableSortedSet(requiredCapabilities);
- this.missingNamespaces = Collections.unmodifiableSortedSet(missingNamespaces);
+ this.obsoleteCapabilities = Collections.unmodifiableSortedSet(obsoleteCapabilities);
}
}
@VisibleForTesting
- static StripCapabilitiesResult stripCapabilities(XmlElement configElement, Set<String> allCapabilitiesFromHello,
- Pattern ignoredMissingCapabilityRegex) {
+ static StripCapabilitiesResult stripCapabilities(XmlElement configElement, Set<String> allCapabilitiesFromHello) {
// collect all namespaces
Set<String> foundNamespacesInXML = getNamespaces(configElement);
logger.trace("All capabilities {}\nFound namespaces in XML {}", allCapabilitiesFromHello, foundNamespacesInXML);
// required are referenced both in xml and hello
SortedSet<String> requiredCapabilities = new TreeSet<>();
// can be removed
- Set<String> obsoleteCapabilities = new HashSet<>();
- // are in xml but not in hello
- SortedSet<String> missingNamespaces = new TreeSet<>(foundNamespacesInXML);
+ SortedSet<String> obsoleteCapabilities = new TreeSet<>();
for (String capability : allCapabilitiesFromHello) {
String namespace = capability.replaceAll("\\?.*","");
if (foundNamespacesInXML.contains(namespace)) {
requiredCapabilities.add(capability);
- checkState(missingNamespaces.remove(namespace));
} else {
obsoleteCapabilities.add(capability);
}
logger.trace("Required capabilities {}, \nObsolete capabilities {}",
requiredCapabilities, obsoleteCapabilities);
- for(Iterator<String> iterator = missingNamespaces.iterator();iterator.hasNext(); ){
- String capability = iterator.next();
- if (ignoredMissingCapabilityRegex.matcher(capability).matches()){
- logger.trace("Ignoring missing capability {}", capability);
- iterator.remove();
- }
- }
- if (missingNamespaces.size() > 0) {
- logger.warn("Some capabilities are missing: {}", missingNamespaces);
- }
- return new StripCapabilitiesResult(requiredCapabilities, missingNamespaces);
+ return new StripCapabilitiesResult(requiredCapabilities, obsoleteCapabilities);
}
static Set<String> getNamespaces(XmlElement element){
result.add(attribute.getValue().getValue());
}
}
- //element.getAttributes()
for(XmlElement child: element.getChildElements()) {
result.addAll(getNamespaces(child));
}
}
@VisibleForTesting
- Set<String> getMissingNamespaces(){
- return stripCapabilitiesResult.missingNamespaces;
+ Set<String> getObsoleteCapabilities(){
+ return stripCapabilitiesResult.obsoleteCapabilities;
}
@Override
import javax.management.ObjectName;
import java.io.Closeable;
import java.io.IOException;
-import java.util.regex.Pattern;
/**
* Responsible for listening for notifications from netconf (via JMX) containing latest
public ConfigPersisterNotificationHandler(MBeanServerConnection mBeanServerConnection,
- Persister persisterAggregator, Pattern ignoredMissingCapabilityRegex) {
+ Persister persisterAggregator) {
this.mBeanServerConnection = mBeanServerConnection;
- listener = new ConfigPersisterNotificationListener(persisterAggregator, ignoredMissingCapabilityRegex);
+ listener = new ConfigPersisterNotificationListener(persisterAggregator);
registerAsJMXListener(mBeanServerConnection, listener);
}
private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationListener.class);
private final Persister persisterAggregator;
- private final Pattern ignoredMissingCapabilityRegex;
- ConfigPersisterNotificationListener(Persister persisterAggregator, Pattern ignoredMissingCapabilityRegex) {
+ ConfigPersisterNotificationListener(Persister persisterAggregator) {
this.persisterAggregator = persisterAggregator;
- this.ignoredMissingCapabilityRegex = ignoredMissingCapabilityRegex;
}
@Override
public void handleNotification(Notification notification, Object handback) {
- if (notification instanceof NetconfJMXNotification == false)
+ if (notification instanceof NetconfJMXNotification == false) {
return;
+ }
// Socket should not be closed at this point
// Activator unregisters this as JMX listener before close is called
logger.warn("Exception occured during notification handling: ", e);
throw e;
}
- } else
+ } else {
throw new IllegalStateException("Unknown config registry notification type " + notification);
+ }
}
private void handleAfterCommitNotification(final CommitJMXNotification notification) {
try {
persisterAggregator.persistConfig(new CapabilityStrippingConfigSnapshotHolder(notification.getConfigSnapshot(),
- notification.getCapabilities(), ignoredMissingCapabilityRegex));
+ notification.getCapabilities()));
logger.trace("Configuration persisted successfully");
} catch (IOException e) {
throw new RuntimeException("Unable to persist configuration snapshot", e);
package org.opendaylight.controller.netconf.persist.impl;
+import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Collections2;
import org.opendaylight.controller.config.api.ConflictingVersionException;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.client.NetconfClient;
-import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.controller.netconf.mapping.api.Capability;
+import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
import org.opendaylight.controller.netconf.util.NetconfUtil;
-import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import javax.annotation.concurrent.Immutable;
import java.io.IOException;
import java.io.InputStream;
-import java.util.Collections;
+import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
@Immutable
public class ConfigPusher {
private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class);
- private final ConfigPusherConfiguration configuration;
+ private final long maxWaitForCapabilitiesMillis;
+ private final long conflictingVersionTimeoutMillis;
+ private final NetconfOperationServiceFactory configNetconfConnector;
- public ConfigPusher(ConfigPusherConfiguration configuration) {
- this.configuration = configuration;
+ public ConfigPusher(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
+ long conflictingVersionTimeoutMillis) {
+ this.configNetconfConnector = configNetconfConnector;
+ this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
+ this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
}
- public synchronized LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponseWithRetries> pushConfigs(
- List<ConfigSnapshotHolder> configs) throws InterruptedException {
+ public synchronized LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> pushConfigs(List<ConfigSnapshotHolder> configs) {
logger.debug("Last config snapshots to be pushed to netconf: {}", configs);
-
- // first just make sure we can connect to netconf, even if nothing is being pushed
- {
- NetconfClient netconfClient = makeNetconfConnection(Collections.<String>emptySet());
- Util.closeClientAndDispatcher(netconfClient);
- }
- LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponseWithRetries> result = new LinkedHashMap<>();
+ LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> result = new LinkedHashMap<>();
// start pushing snapshots:
for (ConfigSnapshotHolder configSnapshotHolder : configs) {
- EditAndCommitResponseWithRetries editAndCommitResponseWithRetries = pushSnapshotWithRetries(configSnapshotHolder);
+ EditAndCommitResponse editAndCommitResponseWithRetries = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
logger.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
}
}
/**
- * Checks for ConflictingVersionException and retries until optimistic lock succeeds or maximal
- * number of attempts is reached.
+ * First calls {@link #getOperationServiceWithRetries(java.util.Set, String)} in order to wait until
+ * expected capabilities are present, then tries to push configuration. If {@link ConflictingVersionException}
+ * is caught, whole process is retried - new service instance need to be obtained from the factory. Closes
+ * {@link NetconfOperationService} after each use.
*/
- private synchronized EditAndCommitResponseWithRetries pushSnapshotWithRetries(ConfigSnapshotHolder configSnapshotHolder)
- throws InterruptedException {
-
- ConflictingVersionException lastException = null;
- int maxAttempts = configuration.netconfPushConfigAttempts;
-
- for (int retryAttempt = 1; retryAttempt <= maxAttempts; retryAttempt++) {
- NetconfClient netconfClient = makeNetconfConnection(configSnapshotHolder.getCapabilities());
- logger.trace("Pushing following xml to netconf {}", configSnapshotHolder);
- try {
- EditAndCommitResponse editAndCommitResponse = pushLastConfig(configSnapshotHolder, netconfClient);
- return new EditAndCommitResponseWithRetries(editAndCommitResponse, retryAttempt);
+ private synchronized EditAndCommitResponse pushConfigWithConflictingVersionRetries(ConfigSnapshotHolder configSnapshotHolder) {
+ ConflictingVersionException lastException;
+ Stopwatch stopwatch = new Stopwatch().start();
+ do {
+ try (NetconfOperationService operationService = getOperationServiceWithRetries(configSnapshotHolder.getCapabilities(), configSnapshotHolder.toString())) {
+ return pushConfig(configSnapshotHolder, operationService);
} catch (ConflictingVersionException e) {
- logger.debug("Conflicting version detected, will retry after timeout");
lastException = e;
- Thread.sleep(configuration.netconfPushConfigDelayMs);
- } catch (RuntimeException e) {
- throw new IllegalStateException("Unable to load " + configSnapshotHolder, e);
- } finally {
- Util.closeClientAndDispatcher(netconfClient);
+ logger.debug("Conflicting version detected, will retry after timeout");
+ sleep();
}
- }
- throw new IllegalStateException("Maximum attempt count has been reached for pushing " + configSnapshotHolder,
+ } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
+ throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
lastException);
}
- /**
- * @param expectedCaps capabilities that server hello must contain. Will retry until all are found or throws RuntimeException.
- * If empty set is provided, will only make sure netconf client successfuly connected to the server.
- * @return NetconfClient that has all required capabilities from server.
- */
- private synchronized NetconfClient makeNetconfConnection(Set<String> expectedCaps) throws InterruptedException {
-
- // TODO think about moving capability subset check to netconf client
- // could be utilized by integration tests
-
- final long pollingStartNanos = System.nanoTime();
- final long deadlineNanos = pollingStartNanos + TimeUnit.MILLISECONDS.toNanos(configuration.netconfCapabilitiesWaitTimeoutMs);
- int attempt = 0;
-
- NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("unknown",
- configuration.netconfAddress.getAddress().getHostAddress(),
- Integer.toString(configuration.netconfAddress.getPort()), "tcp", "persister");
-
- Set<String> latestCapabilities = null;
- while (System.nanoTime() < deadlineNanos) {
- attempt++;
- NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(configuration.eventLoopGroup,
- configuration.eventLoopGroup, additionalHeader, configuration.connectionAttemptTimeoutMs);
- NetconfClient netconfClient;
+ private NetconfOperationService getOperationServiceWithRetries(Set<String> expectedCapabilities, String idForReporting) {
+ Stopwatch stopwatch = new Stopwatch().start();
+ NotEnoughCapabilitiesException lastException;
+ do {
try {
- netconfClient = new NetconfClient(this.toString(), configuration.netconfAddress, configuration.connectionAttemptDelayMs, netconfClientDispatcher);
- } catch (IllegalStateException e) {
- logger.debug("Netconf {} was not initialized or is not stable, attempt {}", configuration.netconfAddress, attempt, e);
- netconfClientDispatcher.close();
- Thread.sleep(configuration.connectionAttemptDelayMs);
- continue;
- }
- latestCapabilities = netconfClient.getCapabilities();
- if (Util.isSubset(netconfClient, expectedCaps)) {
- logger.debug("Hello from netconf stable with {} capabilities", latestCapabilities);
- logger.trace("Session id received from netconf server: {}", netconfClient.getClientSession());
- return netconfClient;
+ return getOperationService(expectedCapabilities, idForReporting);
+ } catch (NotEnoughCapabilitiesException e) {
+ logger.debug("Not enough capabilities: " + e.toString());
+ lastException = e;
+ sleep();
}
- Set<String> allNotFound = computeNotFoundCapabilities(expectedCaps, latestCapabilities);
- logger.debug("Netconf server did not provide required capabilities. Attempt {}. " +
- "Expected but not found: {}, all expected {}, current {}",
- attempt, allNotFound, expectedCaps, latestCapabilities);
- Util.closeClientAndDispatcher(netconfClient);
- Thread.sleep(configuration.connectionAttemptDelayMs);
+ } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
+ throw new IllegalStateException("Max wait for capabilities reached." + lastException.getMessage(), lastException);
+ }
+
+ private static class NotEnoughCapabilitiesException extends Exception {
+ private NotEnoughCapabilitiesException(String message) {
+ super(message);
}
- if (latestCapabilities == null) {
- logger.error("Could not connect to the server in {} ms", configuration.netconfCapabilitiesWaitTimeoutMs);
- throw new RuntimeException("Could not connect to netconf server");
+ }
+
+ /**
+ * Get NetconfOperationService iif all required capabilities are present.
+ *
+ * @param expectedCapabilities that must be provided by configNetconfConnector
+ * @param idForReporting
+ * @return service if capabilities are present, otherwise absent value
+ */
+ private NetconfOperationService getOperationService(Set<String> expectedCapabilities, String idForReporting) throws NotEnoughCapabilitiesException {
+ NetconfOperationService serviceCandidate = configNetconfConnector.createService(idForReporting);
+ Set<String> notFoundDiff = computeNotFoundCapabilities(expectedCapabilities, serviceCandidate);
+ if (notFoundDiff.isEmpty()) {
+ return serviceCandidate;
+ } else {
+ serviceCandidate.close();
+ logger.debug("Netconf server did not provide required capabilities for {} " +
+ "Expected but not found: {}, all expected {}, current {}",
+ idForReporting, notFoundDiff, expectedCapabilities, serviceCandidate.getCapabilities()
+ );
+ throw new NotEnoughCapabilitiesException("Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundDiff);
}
- Set<String> allNotFound = computeNotFoundCapabilities(expectedCaps, latestCapabilities);
- logger.error("Netconf server did not provide required capabilities. Expected but not found: {}, all expected {}, current {}",
- allNotFound, expectedCaps, latestCapabilities);
- throw new RuntimeException("Netconf server did not provide required capabilities. Expected but not found:" + allNotFound);
}
- private static Set<String> computeNotFoundCapabilities(Set<String> expectedCaps, Set<String> latestCapabilities) {
- Set<String> allNotFound = new HashSet<>(expectedCaps);
- allNotFound.removeAll(latestCapabilities);
+ private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, NetconfOperationService serviceCandidate) {
+ Collection<String> actual = Collections2.transform(serviceCandidate.getCapabilities(), new Function<Capability, String>() {
+ @Override
+ public String apply(Capability input) {
+ return input.getCapabilityUri();
+ }
+ });
+ Set<String> allNotFound = new HashSet<>(expectedCapabilities);
+ allNotFound.removeAll(actual);
return allNotFound;
}
+
+ private void sleep() {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ }
+ }
+
/**
* Sends two RPCs to the netconf server: edit-config and commit.
*
* @param configSnapshotHolder
- * @param netconfClient
* @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager
* @throws java.lang.RuntimeException if edit-config or commit fails otherwise
*/
- private synchronized EditAndCommitResponse pushLastConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfClient netconfClient)
+ private synchronized EditAndCommitResponse pushConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfOperationService operationService)
throws ConflictingVersionException {
Element xmlToBePersisted;
throw new IllegalStateException("Cannot parse " + configSnapshotHolder);
}
logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
-
+ Stopwatch stopwatch = new Stopwatch().start();
NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted);
- // sending message to netconf
- NetconfMessage editResponseMessage;
- try {
- editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, netconfClient);
- } catch (IOException e) {
- throw new IllegalStateException("Edit-config failed on " + configSnapshotHolder, e);
- }
+ Document editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, operationService,
+ "edit-config", configSnapshotHolder.toString());
- // commit
- NetconfMessage commitResponseMessage;
- try {
- commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), netconfClient);
- } catch (IOException e) {
- throw new IllegalStateException("Edit commit succeeded, but commit failed on " + configSnapshotHolder, e);
- }
+ Document commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), operationService,
+ "commit", configSnapshotHolder.toString());
if (logger.isTraceEnabled()) {
StringBuilder response = new StringBuilder("editConfig response = {");
- response.append(XmlUtil.toString(editResponseMessage.getDocument()));
+ response.append(XmlUtil.toString(editResponseMessage));
response.append("}");
response.append("commit response = {");
- response.append(XmlUtil.toString(commitResponseMessage.getDocument()));
+ response.append(XmlUtil.toString(commitResponseMessage));
response.append("}");
logger.trace("Last configuration loaded successfully");
logger.trace("Detailed message {}", response);
+ logger.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
return new EditAndCommitResponse(editResponseMessage, commitResponseMessage);
}
+ private NetconfOperation findOperation(NetconfMessage request, NetconfOperationService operationService) {
+ TreeMap<HandlingPriority, NetconfOperation> allOperations = new TreeMap<>();
+ Set<NetconfOperation> netconfOperations = operationService.getNetconfOperations();
+ if (netconfOperations.isEmpty()) {
+ throw new IllegalStateException("Possible code error: no config operations");
+ }
+ for (NetconfOperation netconfOperation : netconfOperations) {
+ HandlingPriority handlingPriority = netconfOperation.canHandle(request.getDocument());
+ allOperations.put(handlingPriority, netconfOperation);
+ }
+ Entry<HandlingPriority, NetconfOperation> highestEntry = allOperations.lastEntry();
+ if (highestEntry.getKey().isCannotHandle()) {
+ throw new IllegalStateException("Possible code error: operation with highest priority is CANNOT_HANDLE");
+ }
+ return highestEntry.getValue();
+ }
+
+ private Document sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfOperationService operationService,
+ String operationNameForReporting, String configIdForReporting)
+ throws ConflictingVersionException {
- private NetconfMessage sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfClient netconfClient)
- throws ConflictingVersionException, IOException {
+ NetconfOperation operation = findOperation(request, operationService);
+ Document response;
try {
- NetconfMessage netconfMessage = netconfClient.sendMessage(request,
- configuration.netconfSendMessageMaxAttempts, configuration.netconfSendMessageDelayMs);
- NetconfUtil.checkIsMessageOk(netconfMessage);
- return netconfMessage;
- } catch(ConflictingVersionException e) {
- logger.trace("conflicting version detected: {}", e.toString());
+ response = operation.handle(request.getDocument(), NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
+ } catch (NetconfDocumentedException | RuntimeException e) {
+ throw new IllegalStateException("Failed to send " + operationNameForReporting +
+ " for configuration " + configIdForReporting, e);
+ }
+ try {
+ return NetconfUtil.checkIsMessageOk(response);
+ } catch (ConflictingVersionException e) {
+ logger.trace("conflicting version detected: {} while committing {}", e.toString(), configIdForReporting);
throw e;
- } catch (RuntimeException | ExecutionException | InterruptedException | TimeoutException e) { // TODO: change NetconfClient#sendMessage to throw checked exceptions
- logger.debug("Error while executing netconf transaction {} to {}", request, netconfClient, e);
- throw new IOException("Failed to execute netconf transaction", e);
}
}
-
// load editConfig.xml template, populate /rpc/edit-config/config with parameter
private static NetconfMessage createEditConfigMessage(Element dataElement) {
String editConfigResourcePath = "/netconfOp/editConfig.xml";
return new NetconfMessage(doc);
} catch (IOException | SAXException e) {
// error reading the xml file bundled into the jar
- throw new RuntimeException("Error while opening local resource " + editConfigResourcePath, e);
+ throw new IllegalStateException("Error while opening local resource " + editConfigResourcePath, e);
}
}
return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
} catch (SAXException | IOException e) {
// error reading the xml file bundled into the jar
- throw new RuntimeException("Error while opening local resource " + resource, e);
+ throw new IllegalStateException("Error while opening local resource " + resource, e);
}
}
static class EditAndCommitResponse {
- private final NetconfMessage editResponse, commitResponse;
+ private final Document editResponse, commitResponse;
- EditAndCommitResponse(NetconfMessage editResponse, NetconfMessage commitResponse) {
+ EditAndCommitResponse(Document editResponse, Document commitResponse) {
this.editResponse = editResponse;
this.commitResponse = commitResponse;
}
- public NetconfMessage getEditResponse() {
+ public Document getEditResponse() {
return editResponse;
}
- public NetconfMessage getCommitResponse() {
+ public Document getCommitResponse() {
return commitResponse;
}
'}';
}
}
-
-
- static class EditAndCommitResponseWithRetries {
- private final EditAndCommitResponse editAndCommitResponse;
- private final int retries;
-
- EditAndCommitResponseWithRetries(EditAndCommitResponse editAndCommitResponse, int retries) {
- this.editAndCommitResponse = editAndCommitResponse;
- this.retries = retries;
- }
-
- public int getRetries() {
- return retries;
- }
-
- public EditAndCommitResponse getEditAndCommitResponse() {
- return editAndCommitResponse;
- }
-
- @Override
- public String toString() {
- return "EditAndCommitResponseWithRetries{" +
- "editAndCommitResponse=" + editAndCommitResponse +
- ", retries=" + retries +
- '}';
- }
- }
-
}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.netconf.persist.impl;
-
-import io.netty.channel.EventLoopGroup;
-
-import javax.annotation.concurrent.Immutable;
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Configuration properties for ConfigPusher. Contains delays and timeouts for netconf
- * connection establishment, netconf capabilities stabilization and configuration push.
- */
-@Immutable
-public final class ConfigPusherConfiguration {
-
- public static final long DEFAULT_CONNECTION_ATTEMPT_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5);
- public static final int DEFAULT_CONNECTION_ATTEMPT_DELAY_MS = 5000;
-
- public static final int DEFAULT_NETCONF_SEND_MESSAGE_MAX_ATTEMPTS = 20;
- public static final int DEFAULT_NETCONF_SEND_MESSAGE_DELAY_MS = 1000;
-
- public static final long DEFAULT_NETCONF_CAPABILITIES_WAIT_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(2);
-
- public static final int DEFAULT_NETCONF_PUSH_CONFIG_ATTEMPTS = 30;
- public static final long DEFAULT_NETCONF_PUSH_CONFIG_DELAY_MS = TimeUnit.MINUTES.toMillis(1);
-
- final InetSocketAddress netconfAddress;
- final EventLoopGroup eventLoopGroup;
-
- /**
- * Total time to wait for capability stabilization
- */
- final long netconfCapabilitiesWaitTimeoutMs;
-
- /**
- * Delay between message send attempts
- */
- final int netconfSendMessageDelayMs;
- /**
- * Total number attempts to send a message
- */
- final int netconfSendMessageMaxAttempts;
-
- /**
- * Delay between connection establishment attempts
- */
- final int connectionAttemptDelayMs;
- /**
- * Total number of attempts to perform connection establishment
- */
- final long connectionAttemptTimeoutMs;
-
- /**
- * Total number of attempts to push configuration to netconf
- */
- final int netconfPushConfigAttempts;
- /**
- * Delay between configuration push attempts
- */
- final long netconfPushConfigDelayMs;
-
- ConfigPusherConfiguration(InetSocketAddress netconfAddress, long netconfCapabilitiesWaitTimeoutMs,
- int netconfSendMessageDelayMs, int netconfSendMessageMaxAttempts, int connectionAttemptDelayMs,
- long connectionAttemptTimeoutMs, EventLoopGroup eventLoopGroup, int netconfPushConfigAttempts,
- long netconfPushConfigDelayMs) {
- this.netconfAddress = netconfAddress;
- this.netconfCapabilitiesWaitTimeoutMs = netconfCapabilitiesWaitTimeoutMs;
- this.netconfSendMessageDelayMs = netconfSendMessageDelayMs;
- this.netconfSendMessageMaxAttempts = netconfSendMessageMaxAttempts;
- this.connectionAttemptDelayMs = connectionAttemptDelayMs;
- this.connectionAttemptTimeoutMs = connectionAttemptTimeoutMs;
- this.eventLoopGroup = eventLoopGroup;
- this.netconfPushConfigAttempts = netconfPushConfigAttempts;
- this.netconfPushConfigDelayMs = netconfPushConfigDelayMs;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.netconf.persist.impl;
-
-import io.netty.channel.EventLoopGroup;
-
-import java.net.InetSocketAddress;
-
-public class ConfigPusherConfigurationBuilder {
- InetSocketAddress netconfAddress;
- EventLoopGroup eventLoopGroup;
-
- long netconfCapabilitiesWaitTimeoutMs = ConfigPusherConfiguration.DEFAULT_NETCONF_CAPABILITIES_WAIT_TIMEOUT_MS;
- int netconfSendMessageDelayMs = ConfigPusherConfiguration.DEFAULT_NETCONF_SEND_MESSAGE_DELAY_MS;
- int netconfSendMessageMaxAttempts = ConfigPusherConfiguration.DEFAULT_NETCONF_SEND_MESSAGE_MAX_ATTEMPTS;
- int connectionAttemptDelayMs = ConfigPusherConfiguration.DEFAULT_CONNECTION_ATTEMPT_DELAY_MS;
- long connectionAttemptTimeoutMs = ConfigPusherConfiguration.DEFAULT_CONNECTION_ATTEMPT_TIMEOUT_MS;
- int netconfPushConfigAttempts = ConfigPusherConfiguration.DEFAULT_NETCONF_PUSH_CONFIG_ATTEMPTS;
- long netconfPushConfigDelayMs = ConfigPusherConfiguration.DEFAULT_NETCONF_PUSH_CONFIG_DELAY_MS;
-
- private ConfigPusherConfigurationBuilder() {
- }
-
- public static ConfigPusherConfigurationBuilder aConfigPusherConfiguration() {
- return new ConfigPusherConfigurationBuilder();
- }
-
- public ConfigPusherConfigurationBuilder withNetconfAddress(InetSocketAddress netconfAddress) {
- this.netconfAddress = netconfAddress;
- return this;
- }
-
- public ConfigPusherConfigurationBuilder withNetconfCapabilitiesWaitTimeoutMs(long netconfCapabilitiesWaitTimeoutMs) {
- this.netconfCapabilitiesWaitTimeoutMs = netconfCapabilitiesWaitTimeoutMs;
- return this;
- }
-
- public ConfigPusherConfigurationBuilder withNetconfSendMessageDelayMs(int netconfSendMessageDelayMs) {
- this.netconfSendMessageDelayMs = netconfSendMessageDelayMs;
- return this;
- }
-
- public ConfigPusherConfigurationBuilder withNetconfSendMessageMaxAttempts(int netconfSendMessageMaxAttempts) {
- this.netconfSendMessageMaxAttempts = netconfSendMessageMaxAttempts;
- return this;
- }
-
- public ConfigPusherConfigurationBuilder withConnectionAttemptDelayMs(int connectionAttemptDelayMs) {
- this.connectionAttemptDelayMs = connectionAttemptDelayMs;
- return this;
- }
-
- public ConfigPusherConfigurationBuilder withConnectionAttemptTimeoutMs(long connectionAttemptTimeoutMs) {
- this.connectionAttemptTimeoutMs = connectionAttemptTimeoutMs;
- return this;
- }
-
- public ConfigPusherConfigurationBuilder withEventLoopGroup(EventLoopGroup eventLoopGroup) {
- this.eventLoopGroup = eventLoopGroup;
- return this;
- }
-
- public ConfigPusherConfigurationBuilder withNetconfPushConfigAttempts(int netconfPushConfigAttempts) {
- this.netconfPushConfigAttempts = netconfPushConfigAttempts;
- return this;
- }
-
- public ConfigPusherConfigurationBuilder withNetconfPushConfigDelayMs(long netconfPushConfigDelayMs) {
- this.netconfPushConfigDelayMs = netconfPushConfigDelayMs;
- return this;
- }
-
- public ConfigPusherConfiguration build() {
- ConfigPusherConfiguration configPusherConfiguration = new ConfigPusherConfiguration(netconfAddress,
- netconfCapabilitiesWaitTimeoutMs, netconfSendMessageDelayMs, netconfSendMessageMaxAttempts,
- connectionAttemptDelayMs, connectionAttemptTimeoutMs, eventLoopGroup, netconfPushConfigAttempts,
- netconfPushConfigDelayMs);
- return configPusherConfiguration;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.persist.impl;
-
-import org.opendaylight.controller.netconf.client.NetconfClient;
-import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Set;
-
-public final class Util {
- private static final Logger logger = LoggerFactory.getLogger(Util.class);
-
-
- public static boolean isSubset(NetconfClient netconfClient, Set<String> expectedCaps) {
- return isSubset(netconfClient.getCapabilities(), expectedCaps);
-
- }
-
- private static boolean isSubset(Set<String> currentCapabilities, Set<String> expectedCaps) {
- for (String exCap : expectedCaps) {
- if (currentCapabilities.contains(exCap) == false)
- return false;
- }
- return true;
- }
-
- public static void closeClientAndDispatcher(NetconfClient client) {
- NetconfClientDispatcher dispatcher = client.getNetconfClientDispatcher();
- Exception fromClient = null;
- try {
- client.close();
- } catch (Exception e) {
- fromClient = e;
- } finally {
- try {
- dispatcher.close();
- } catch (Exception e) {
- if (fromClient != null) {
- e.addSuppressed(fromClient);
- }
- throw new RuntimeException("Error closing temporary client ", e);
- }
- }
- }
-}
package org.opendaylight.controller.netconf.persist.impl.osgi;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
+import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
import org.opendaylight.controller.netconf.persist.impl.ConfigPersisterNotificationHandler;
import org.opendaylight.controller.netconf.persist.impl.ConfigPusher;
-import org.opendaylight.controller.netconf.persist.impl.ConfigPusherConfiguration;
-import org.opendaylight.controller.netconf.persist.impl.ConfigPusherConfigurationBuilder;
import org.opendaylight.controller.netconf.persist.impl.PersisterAggregator;
-import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.Filter;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.MBeanServer;
import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.util.concurrent.ThreadFactory;
-import java.util.regex.Pattern;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
public class ConfigPersisterActivator implements BundleActivator {
private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterActivator.class);
- public static final String IGNORED_MISSING_CAPABILITY_REGEX_SUFFIX = "ignoredMissingCapabilityRegex";
-
- public static final String MAX_WAIT_FOR_CAPABILITIES_MILLIS = "maxWaitForCapabilitiesMillis";
+ public static final String MAX_WAIT_FOR_CAPABILITIES_MILLIS_PROPERTY = "maxWaitForCapabilitiesMillis";
+ private static final long MAX_WAIT_FOR_CAPABILITIES_MILLIS_DEFAULT = TimeUnit.MINUTES.toMillis(2);
+ public static final String CONFLICTING_VERSION_TIMEOUT_MILLIS_PROPERTY = "conflictingVersionTimeoutMillis";
+ private static final long CONFLICTING_VERSION_TIMEOUT_MILLIS_DEFAULT = TimeUnit.SECONDS.toMillis(30);
public static final String NETCONF_CONFIG_PERSISTER = "netconf.config.persister";
public static final String STORAGE_ADAPTER_CLASS_PROP_SUFFIX = "storageAdapterClass";
- public static final String DEFAULT_IGNORED_REGEX = "^urn:ietf:params:xml:ns:netconf:base:1.0";
- private final MBeanServer platformMBeanServer;
+ private static final MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
- private final Optional<ConfigPusherConfiguration> initialConfigForPusher;
- private volatile ConfigPersisterNotificationHandler jmxNotificationHandler;
- private Thread initializationThread;
- private ThreadFactory initializationThreadFactory;
- private EventLoopGroup nettyThreadGroup;
- private PersisterAggregator persisterAggregator;
+ private List<AutoCloseable> autoCloseables;
- public ConfigPersisterActivator() {
- this(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable initializationRunnable) {
- return new Thread(initializationRunnable, "ConfigPersister-registrator");
- }
- }, ManagementFactory.getPlatformMBeanServer(), null);
- }
-
- @VisibleForTesting
- protected ConfigPersisterActivator(ThreadFactory threadFactory, MBeanServer mBeanServer,
- ConfigPusherConfiguration initialConfigForPusher) {
- this.initializationThreadFactory = threadFactory;
- this.platformMBeanServer = mBeanServer;
- this.initialConfigForPusher = Optional.fromNullable(initialConfigForPusher);
- }
@Override
public void start(final BundleContext context) throws Exception {
logger.debug("ConfigPersister starting");
-
+ autoCloseables = new ArrayList<>();
PropertiesProviderBaseImpl propertiesProvider = new PropertiesProviderBaseImpl(context);
- final Pattern ignoredMissingCapabilityRegex = getIgnoredCapabilitiesProperty(propertiesProvider);
- persisterAggregator = PersisterAggregator.createFromProperties(propertiesProvider);
+ final PersisterAggregator persisterAggregator = PersisterAggregator.createFromProperties(propertiesProvider);
+ autoCloseables.add(persisterAggregator);
+ final long maxWaitForCapabilitiesMillis = getMaxWaitForCapabilitiesMillis(propertiesProvider);
+ final List<ConfigSnapshotHolder> configs = persisterAggregator.loadLastConfigs();
+ final long conflictingVersionTimeoutMillis = getConflictingVersionTimeoutMillis(propertiesProvider);
+ logger.trace("Following configs will be pushed: {}", configs);
+ ServiceTrackerCustomizer<NetconfOperationServiceFactory, NetconfOperationServiceFactory> configNetconfCustomizer = new ServiceTrackerCustomizer<NetconfOperationServiceFactory, NetconfOperationServiceFactory>() {
+ @Override
+ public NetconfOperationServiceFactory addingService(ServiceReference<NetconfOperationServiceFactory> reference) {
+ NetconfOperationServiceFactory service = reference.getBundle().getBundleContext().getService(reference);
+ final ConfigPusher configPusher = new ConfigPusher(service, maxWaitForCapabilitiesMillis, conflictingVersionTimeoutMillis);
+ logger.debug("Configuration Persister got %s", service);
+ final Thread pushingThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ configPusher.pushConfigs(configs);
+ logger.info("Configuration Persister initialization completed.");
+ ConfigPersisterNotificationHandler jmxNotificationHandler = new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator);
+ synchronized (ConfigPersisterActivator.this) {
+ autoCloseables.add(jmxNotificationHandler);
+ }
+ }
+ }, "config-pusher");
+ synchronized (ConfigPersisterActivator.this){
+ autoCloseables.add(new AutoCloseable() {
+ @Override
+ public void close() throws Exception {
+ pushingThread.interrupt();
+ }
+ });
+ }
+ pushingThread.start();
+ return service;
+ }
- final ConfigPusher configPusher = new ConfigPusher(getConfigurationForPusher(context, propertiesProvider));
+ @Override
+ public void modifiedService(ServiceReference<NetconfOperationServiceFactory> reference, NetconfOperationServiceFactory service) {
+ }
- // offload initialization to another thread in order to stop blocking activator
- Runnable initializationRunnable = new Runnable() {
@Override
- public void run() {
- try {
- configPusher.pushConfigs(persisterAggregator.loadLastConfigs());
- jmxNotificationHandler = new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator,
- ignoredMissingCapabilityRegex);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.error("Interrupted while waiting for netconf connection");
- // uncaught exception handler will deal with this failure
- throw new RuntimeException("Interrupted while waiting for netconf connection", e);
- }
- logger.info("Configuration Persister initialization completed.");
+ public void removedService(ServiceReference<NetconfOperationServiceFactory> reference, NetconfOperationServiceFactory service) {
}
};
- initializationThread = initializationThreadFactory.newThread(initializationRunnable);
- initializationThread.start();
- }
-
- private Pattern getIgnoredCapabilitiesProperty(PropertiesProviderBaseImpl propertiesProvider) {
- String regexProperty = propertiesProvider.getProperty(IGNORED_MISSING_CAPABILITY_REGEX_SUFFIX);
- String regex;
- if (regexProperty != null) {
- regex = regexProperty;
- } else {
- regex = DEFAULT_IGNORED_REGEX;
- }
- return Pattern.compile(regex);
- }
+ Filter filter = context.createFilter(getFilterString());
- private Optional<Long> getMaxWaitForCapabilitiesProperty(PropertiesProviderBaseImpl propertiesProvider) {
- String timeoutProperty = propertiesProvider.getProperty(MAX_WAIT_FOR_CAPABILITIES_MILLIS);
- return Optional.fromNullable(timeoutProperty == null ? null : Long.valueOf(timeoutProperty));
+ ServiceTracker<NetconfOperationServiceFactory, NetconfOperationServiceFactory> tracker =
+ new ServiceTracker<>(context, filter, configNetconfCustomizer);
+ tracker.open();
}
- private ConfigPusherConfiguration getConfigurationForPusher(BundleContext context,
- PropertiesProviderBaseImpl propertiesProvider) {
-
- // If configuration was injected via constructor, use it
- if(initialConfigForPusher.isPresent())
- return initialConfigForPusher.get();
-
- Optional<Long> maxWaitForCapabilitiesMillis = getMaxWaitForCapabilitiesProperty(propertiesProvider);
- final InetSocketAddress address = NetconfConfigUtil.extractTCPNetconfAddress(context,
- "Netconf is not configured, persister is not operational", true);
-
- nettyThreadGroup = new NioEventLoopGroup();
- ConfigPusherConfigurationBuilder configPusherConfigurationBuilder = ConfigPusherConfigurationBuilder.aConfigPusherConfiguration();
+ @VisibleForTesting
+ public static String getFilterString() {
+ return "(&" +
+ "(" + Constants.OBJECTCLASS + "=" + NetconfOperationServiceFactory.class.getName() + ")" +
+ "(name" + "=" + "config-netconf-connector" + ")" +
+ ")";
+ }
- if(maxWaitForCapabilitiesMillis.isPresent())
- configPusherConfigurationBuilder.withNetconfCapabilitiesWaitTimeoutMs(maxWaitForCapabilitiesMillis.get());
+ private long getConflictingVersionTimeoutMillis(PropertiesProviderBaseImpl propertiesProvider) {
+ String timeoutProperty = propertiesProvider.getProperty(CONFLICTING_VERSION_TIMEOUT_MILLIS_PROPERTY);
+ return timeoutProperty == null ? CONFLICTING_VERSION_TIMEOUT_MILLIS_DEFAULT : Long.valueOf(timeoutProperty);
+ }
- return configPusherConfigurationBuilder
- .withEventLoopGroup(nettyThreadGroup)
- .withNetconfAddress(address)
- .build();
+ private long getMaxWaitForCapabilitiesMillis(PropertiesProviderBaseImpl propertiesProvider) {
+ String timeoutProperty = propertiesProvider.getProperty(MAX_WAIT_FOR_CAPABILITIES_MILLIS_PROPERTY);
+ return timeoutProperty == null ? MAX_WAIT_FOR_CAPABILITIES_MILLIS_DEFAULT : Long.valueOf(timeoutProperty);
}
@Override
- public void stop(BundleContext context) throws Exception {
- initializationThread.interrupt();
- if (jmxNotificationHandler != null) {
- jmxNotificationHandler.close();
+ public synchronized void stop(BundleContext context) throws Exception {
+ Exception lastException = null;
+ for (AutoCloseable autoCloseable : autoCloseables) {
+ try {
+ autoCloseable.close();
+ } catch (Exception e) {
+ if (lastException == null) {
+ lastException = e;
+ } else {
+ lastException.addSuppressed(e);
+ }
+ }
+ }
+ if (lastException != null) {
+ throw lastException;
}
- if(nettyThreadGroup!=null)
- nettyThreadGroup.shutdownGracefully();
- persisterAggregator.close();
}
}
import com.google.common.collect.Sets;
import org.apache.commons.io.IOUtils;
import org.junit.Test;
-import org.opendaylight.controller.netconf.persist.impl.osgi.ConfigPersisterActivator;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.w3c.dom.Element;
import java.io.IOException;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
-import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
public class CapabilityStrippingConfigSnapshotHolderTest {
@Test
- public void testCapabilityStripping() throws Exception {
+ public void testCapabilityStripping() throws Exception {
Set<String> allCapabilities = readLines("/capabilities-all.txt");
Set<String> expectedCapabilities = readLines("/capabilities-stripped.txt");
String snapshotAsString = readToString("/snapshot.xml");
Element element = XmlUtil.readXmlToElement(snapshotAsString);
- {
- CapabilityStrippingConfigSnapshotHolder tested = new CapabilityStrippingConfigSnapshotHolder(
- element, allCapabilities, Pattern.compile(
- ConfigPersisterActivator.DEFAULT_IGNORED_REGEX
- ));
- assertEquals(expectedCapabilities, tested.getCapabilities());
- assertEquals(Collections.emptySet(), tested.getMissingNamespaces());
- }
- {
- // test regex
- CapabilityStrippingConfigSnapshotHolder tested = new CapabilityStrippingConfigSnapshotHolder(
- element, allCapabilities, Pattern.compile(
- "^bar"
- ));
- assertEquals(expectedCapabilities, tested.getCapabilities());
- assertEquals(Sets.newHashSet(ConfigPersisterActivator.DEFAULT_IGNORED_REGEX.substring(1)),
- tested.getMissingNamespaces());
- }
+ CapabilityStrippingConfigSnapshotHolder tested = new CapabilityStrippingConfigSnapshotHolder(
+ element, allCapabilities);
+ assertEquals(expectedCapabilities, tested.getCapabilities());
+
+ Set<String> obsoleteCapabilities = Sets.difference(allCapabilities, expectedCapabilities);
+
+ assertEquals(obsoleteCapabilities, tested.getObsoleteCapabilities());
}
private Set<String> readLines(String fileName) throws IOException {
*/
package org.opendaylight.controller.netconf.persist.impl.osgi;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeoutException;
-
-import javax.management.MBeanServer;
-
+import com.google.common.collect.Sets;
import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
-import org.junit.matchers.JUnitMatchers;
import org.opendaylight.controller.config.api.ConflictingVersionException;
-import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
-import org.opendaylight.controller.netconf.persist.impl.ConfigPusherConfiguration;
-import org.opendaylight.controller.netconf.persist.impl.ConfigPusherConfigurationBuilder;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.mapping.api.Capability;
+import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
+import org.opendaylight.controller.netconf.persist.impl.osgi.MockedBundleContext.DummyAdapterWithInitialSnapshot;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.xml.sax.SAXException;
-import com.google.common.collect.Lists;
-import io.netty.channel.nio.NioEventLoopGroup;
+import javax.management.MBeanServer;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
public class ConfigPersisterTest {
+ private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterTest.class);
private MockedBundleContext ctx;
private ConfigPersisterActivator configPersisterActivator;
private static final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+ private TestingExceptionHandler handler;
- private static final String NETCONF_ADDRESS = "localhost";
- private static final String NETCONF_PORT = "18383";
- private static NioEventLoopGroup eventLoopGroup;
- private void setUpContextAndStartPersister(Thread.UncaughtExceptionHandler exHandler, String requiredCapability, ConfigPusherConfiguration configuration)
- throws Exception {
- MockedBundleContext.DummyAdapterWithInitialSnapshot.expectedCapability = requiredCapability;
- ctx = new MockedBundleContext(NETCONF_ADDRESS, NETCONF_PORT);
- configPersisterActivator = new ConfigPersisterActivator(getThreadFactory(exHandler), mBeanServer,
- configuration);
+ private void setUpContextAndStartPersister(String requiredCapability) throws Exception {
+ DummyAdapterWithInitialSnapshot.expectedCapability = requiredCapability;
+ ctx = new MockedBundleContext(1000, 1000);
+ configPersisterActivator = new ConfigPersisterActivator();
configPersisterActivator.start(ctx.getBundleContext());
}
- @BeforeClass
- public static void setUp() throws Exception {
- eventLoopGroup = new NioEventLoopGroup();
+ @Before
+ public void setUp() {
+ handler = new TestingExceptionHandler();
+ Thread.setDefaultUncaughtExceptionHandler(handler);
}
@After
public void tearDown() throws Exception {
+ Thread.setDefaultUncaughtExceptionHandler(null);
configPersisterActivator.stop(ctx.getBundleContext());
}
- @AfterClass
- public static void closeNettyGroup() throws Exception {
- eventLoopGroup.shutdownGracefully();
- }
-
- @Test
- public void testPersisterNetconfNotStarting() throws Exception {
- final TestingExceptionHandler handler = new TestingExceptionHandler();
-
- setUpContextAndStartPersister(handler, "cap2", getConfiguration(100, 100).build());
-
- waitTestToFinish(2000);
-
- handler.assertException("connect to netconf endpoint", RuntimeException.class,
- "Could not connect to netconf server");
- }
-
@Test
public void testPersisterNotAllCapabilitiesProvided() throws Exception {
- final TestingExceptionHandler handler = new TestingExceptionHandler();
- ConfigPusherConfiguration cfg = getConfiguration(500, 1000)
- .withNetconfCapabilitiesWaitTimeoutMs(1000).build();
-
- setUpContextAndStartPersister(handler, "required-cap", cfg);
+ setUpContextAndStartPersister("required-cap");
+ Thread.sleep(2000);
+ handler.assertException(IllegalStateException.class, "Max wait for capabilities reached.Not enough capabilities " +
+ "for <data><config-snapshot/></data>. Expected but not found: [required-cap]");
- try (MockNetconfEndpoint endpoint = startMockNetconfEndpoint("cap1")) {
-
- waitTestToFinish(2500);
-
- handler.assertException("retrieve required capabilities from netconf endpoint", RuntimeException.class,
- "Expected but not found:[required-cap]");
- }
}
@Test
- public void testPersisterNoResponseFromNetconfAfterEdit() throws Exception {
- final TestingExceptionHandler handler = new TestingExceptionHandler();
- ConfigPusherConfiguration cfg = getConfigurationWithOnePushAttempt();
-
- setUpContextAndStartPersister(handler, "cap1", cfg);
-
- try (MockNetconfEndpoint endpoint = startMockNetconfEndpoint("cap1")) {
-
- waitTestToFinish(3000);
-
- handler.assertException("receive response from netconf endpoint", IllegalStateException.class,
- "Unable to load", TimeoutException.class,
- null, 3);
-
- assertEquals(1 + 2, endpoint.getReceivedMessages().size());
- assertHelloMessage(endpoint.getReceivedMessages().get(1));
- assertEditMessage(endpoint.getReceivedMessages().get(2));
- }
+ public void testPersisterSuccessfulPush() throws Exception {
+ setUpContextAndStartPersister("cap1");
+ NetconfOperationService service = getWorkingService(getOKDocument());
+ doReturn(service).when(ctx.serviceFactory).createService(anyString());
+ Thread.sleep(2000);
+ assertCannotRegisterAsJMXListener_pushWasSuccessful();
}
- private ConfigPusherConfiguration getConfigurationWithOnePushAttempt() {
- return getConfiguration(500, 1000)
- .withNetconfCapabilitiesWaitTimeoutMs(1000)
- .withNetconfPushConfigAttempts(1)
- .withNetconfPushConfigDelayMs(100)
- .withNetconfSendMessageMaxAttempts(3)
- .withNetconfSendMessageDelayMs(500).build();
+ // this means pushing of config was successful
+ public void assertCannotRegisterAsJMXListener_pushWasSuccessful() {
+ handler.assertException(RuntimeException.class, "Cannot register as JMX listener to netconf");
}
- @Test
- public void testPersisterSuccessfulPush() throws Exception {
- final TestingExceptionHandler handler = new TestingExceptionHandler();
- ConfigPusherConfiguration cfg = getConfigurationForSuccess();
-
- setUpContextAndStartPersister(handler, "cap1", cfg);
-
- try (MockNetconfEndpoint endpoint = startMockNetconfEndpoint("cap1", MockNetconfEndpoint.okMessage,
- MockNetconfEndpoint.okMessage)) {
+ public NetconfOperationService getWorkingService(Document document) throws SAXException, IOException, NetconfDocumentedException {
+ NetconfOperationService service = mock(NetconfOperationService.class);
+ Capability capability = mock(Capability.class);
+ doReturn(Sets.newHashSet(capability)).when(service).getCapabilities();
+ doReturn("cap1").when(capability).getCapabilityUri();
- waitTestToFinish(4000);
- handler.assertException("register as JMX listener", RuntimeException.class,
- "Cannot register as JMX listener to netconf");
-
- assertEquals(1 + 3, endpoint.getReceivedMessages().size());
- assertCommitMessage(endpoint.getReceivedMessages().get(3));
- }
+ NetconfOperation mockedOperation = mock(NetconfOperation.class);
+ doReturn(Sets.newHashSet(mockedOperation)).when(service).getNetconfOperations();
+ doReturn(HandlingPriority.getHandlingPriority(1)).when(mockedOperation).canHandle(any(Document.class));
+ doReturn(document).when(mockedOperation).handle(any(Document.class), any(NetconfOperationChainedExecution.class));
+ doNothing().when(service).close();
+ return service;
}
- private ConfigPusherConfiguration getConfigurationForSuccess() {
- return getConfiguration(500, 1000)
- .withNetconfCapabilitiesWaitTimeoutMs(1000)
- .withNetconfPushConfigAttempts(3)
- .withNetconfPushConfigDelayMs(100)
- .withNetconfSendMessageMaxAttempts(3)
- .withNetconfSendMessageDelayMs(500).build();
+ private Document getOKDocument() throws SAXException, IOException {
+ return XmlUtil.readXmlToDocument(
+ "<rpc-reply message-id=\"1\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ "<ok/>\n" +
+ "</rpc-reply>"
+ );
}
+
@Test
public void testPersisterConflictingVersionException() throws Exception {
- final TestingExceptionHandler handler = new TestingExceptionHandler();
- ConfigPusherConfiguration cfg = getConfigurationWithOnePushAttempt();
-
- setUpContextAndStartPersister(handler, "cap1", cfg);
-
- try (MockNetconfEndpoint endpoint = startMockNetconfEndpoint("cap1", MockNetconfEndpoint.okMessage,
- MockNetconfEndpoint.conflictingVersionErrorMessage); DefaultCommitNotificationProducer jMXNotifier = startJMXCommitNotifier();) {
-
- Thread.sleep(4000);
-
- handler.assertException("register as JMX listener", IllegalStateException.class,
- "Maximum attempt count has been reached for pushing", ConflictingVersionException.class, "Optimistic lock failed", 1);
-
- assertEquals(1 + 3, endpoint.getReceivedMessages().size());
- assertCommitMessage(endpoint.getReceivedMessages().get(3));
- }
+ setUpContextAndStartPersister("cap1");
+ NetconfOperationService service = getWorkingService(getConflictVersionDocument());
+ doReturn(service).when(ctx.serviceFactory).createService(anyString());
+ Thread.sleep(2000);
+ handler.assertException(IllegalStateException.class, "Max wait for conflicting version stabilization timeout");
}
- @Test
- public void testPersisterConflictingVersionExceptionThenSuccess() throws Exception {
- final TestingExceptionHandler handler = new TestingExceptionHandler();
- ConfigPusherConfiguration cfg = getConfigurationForSuccess();
-
- setUpContextAndStartPersister(handler, "cap1", cfg);
-
- MockNetconfEndpoint.MessageSequence conflictingMessageSequence = new MockNetconfEndpoint.MessageSequence(
- MockNetconfEndpoint.okMessage, MockNetconfEndpoint.conflictingVersionErrorMessage);
- MockNetconfEndpoint.MessageSequence okMessageSequence = new MockNetconfEndpoint.MessageSequence(
- MockNetconfEndpoint.okMessage, MockNetconfEndpoint.okMessage);
-
- try (MockNetconfEndpoint endpoint = startMockNetconfEndpoint("cap1",
- Lists.newArrayList(conflictingMessageSequence, okMessageSequence));
- DefaultCommitNotificationProducer jMXNotifier = startJMXCommitNotifier()) {
-
- Thread.sleep(4000);
-
- handler.assertNoException();
-
- assertEquals(1 + 3/*Hello + Edit + Commit*/ + 3/*Hello + Edit + Commit*/, endpoint.getReceivedMessages().size());
- assertCommitMessage(endpoint.getReceivedMessages().get(6));
- }
+ private Document getConflictVersionDocument() throws SAXException, IOException {
+ return XmlUtil.readXmlToDocument(
+ "<rpc-reply message-id=\"1\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+ "<rpc-error><error-info><error>" +
+ ConflictingVersionException.class.getCanonicalName() +
+ "</error></error-info></rpc-error>\n" +
+ "</rpc-reply>"
+ );
}
@Test
- public void testPersisterSuccessfulPushAndSuccessfulJMXRegistration() throws Exception {
- final TestingExceptionHandler handler = new TestingExceptionHandler();
- ConfigPusherConfiguration cfg = getConfigurationForSuccess();
-
- setUpContextAndStartPersister(handler, "cap1", cfg);
-
- try (MockNetconfEndpoint endpoint = startMockNetconfEndpoint("cap1", MockNetconfEndpoint.okMessage,
- MockNetconfEndpoint.okMessage); DefaultCommitNotificationProducer jMXNotifier = startJMXCommitNotifier()) {
-
- Thread.sleep(2000);
-
- handler.assertNoException();
-
- assertEquals(1 + 3, endpoint.getReceivedMessages().size());
- }
- }
-
- private ConfigPusherConfigurationBuilder getConfiguration(int connectionAttemptDelayMs, int connectionAttemptTimeoutMs) {
- return ConfigPusherConfigurationBuilder.aConfigPusherConfiguration()
- .withEventLoopGroup(eventLoopGroup)
- .withConnectionAttemptDelayMs(connectionAttemptDelayMs)
- .withConnectionAttemptTimeoutMs(connectionAttemptTimeoutMs)
- .withNetconfCapabilitiesWaitTimeoutMs(44)
- .withNetconfAddress(new InetSocketAddress(NETCONF_ADDRESS, Integer.valueOf(NETCONF_PORT)));
+ public void testSuccessConflictingVersionException() throws Exception {
+ setUpContextAndStartPersister("cap1");
+ doReturn(getWorkingService(getConflictVersionDocument())).when(ctx.serviceFactory).createService(anyString());
+ Thread.sleep(500);
+ // working service:
+ logger.info("Switching to working service **");
+ doReturn(getWorkingService(getOKDocument())).when(ctx.serviceFactory).createService(anyString());
+ Thread.sleep(1000);
+ assertCannotRegisterAsJMXListener_pushWasSuccessful();
}
- private void waitTestToFinish(int i) throws InterruptedException {
- Thread.sleep(i);
- }
-
-
- private DefaultCommitNotificationProducer startJMXCommitNotifier() {
- return new DefaultCommitNotificationProducer(mBeanServer);
- }
-
- private void assertEditMessage(String netconfMessage) {
- assertThat(netconfMessage,
- JUnitMatchers.containsString(MockedBundleContext.DummyAdapterWithInitialSnapshot.CONFIG_SNAPSHOT));
- }
-
- private void assertCommitMessage(String netconfMessage) {
- assertThat(netconfMessage, JUnitMatchers.containsString("<commit"));
- }
-
- private void assertHelloMessage(String netconfMessage) {
- assertThat(netconfMessage,
- JUnitMatchers.containsString("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">"));
- assertThat(netconfMessage, JUnitMatchers.containsString("<capability>"));
- }
-
- private MockNetconfEndpoint startMockNetconfEndpoint(String capability, List<MockNetconfEndpoint.MessageSequence> messageSequences) {
- // Add first empty sequence for testing connection created by config persister at startup
- messageSequences.add(0, new MockNetconfEndpoint.MessageSequence(Collections.<String>emptyList()));
- return new MockNetconfEndpoint(capability, NETCONF_PORT, messageSequences);
- }
-
- private MockNetconfEndpoint startMockNetconfEndpoint(String capability, String... messages) {
- return startMockNetconfEndpoint(capability, Lists.newArrayList(new MockNetconfEndpoint.MessageSequence(messages)));
- }
-
- public ThreadFactory getThreadFactory(final Thread.UncaughtExceptionHandler exHandler) {
- return new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, "config-persister-testing-activator");
- thread.setUncaughtExceptionHandler(exHandler);
- return thread;
- }
- };
- }
}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.netconf.persist.impl.osgi;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
-import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-
-import com.google.common.collect.Lists;
-
-class MockNetconfEndpoint implements AutoCloseable {
-
- public static final int READ_SOCKET_TIMEOUT = 3000;
-
- public static final String MSG_SEPARATOR = "]]>]]>\n";
-
- private final AtomicBoolean stopped = new AtomicBoolean(false);
- private List<String> receivedMessages = Lists.newCopyOnWriteArrayList();
- private Thread innerThread;
-
- MockNetconfEndpoint(String capability, String netconfPort, List<MessageSequence> messageSequence) {
- helloMessage = helloMessage.replace("capability_place_holder", capability);
- start(netconfPort, messageSequence);
- }
-
- private String helloMessage = "<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
- "<capabilities>\n" +
- "<capability>capability_place_holder</capability>\n" +
- "</capabilities>\n" +
- "<session-id>1</session-id>\n" +
- "</hello>\n" +
- MSG_SEPARATOR;
-
- public static String conflictingVersionErrorMessage;
- static {
- try {
- conflictingVersionErrorMessage = XmlUtil.toString(XmlFileLoader
- .xmlFileToDocument("netconfMessages/conflictingversion/conflictingVersionResponse.xml")) + MSG_SEPARATOR;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public static String okMessage = "<rpc-reply message-id=\"1\" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
- "<ok/>\n" +
- "</rpc-reply>" +
- MSG_SEPARATOR ;
-
- private void start(final String port, final List<MessageSequence> messagesToSend) {
- innerThread = new Thread(new Runnable() {
- @Override
- public void run() {
- int clientCounter = 0;
-
- while (stopped.get() == false) {
- try (ServerSocket s = new ServerSocket(Integer.valueOf(port))) {
- s.setSoTimeout(READ_SOCKET_TIMEOUT);
-
- Socket clientSocket = s.accept();
- clientCounter++;
- clientSocket.setSoTimeout(READ_SOCKET_TIMEOUT);
-
- PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
- BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
-
- // Negotiate
- sendMessage(out, helloMessage);
- receiveMessage(in);
-
- // Accept next message (edit-config)
- receiveMessage(in);
-
- for (String message : getMessageSequenceForClient(messagesToSend, clientCounter)) {
- sendMessage(out, message);
- receiveMessage(in);
- }
- } catch (SocketTimeoutException e) {
- // No more activity on netconf endpoint, close
- return;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private Iterable<? extends String> getMessageSequenceForClient(List<MessageSequence> messagesToSend,
- int clientCounter) {
- if (messagesToSend.size() <= clientCounter) {
- return messagesToSend.get(messagesToSend.size() - 1).getMessages();
- } else {
- return messagesToSend.get(clientCounter - 1).getMessages();
- }
- }
-
- private void receiveMessage(BufferedReader in) throws Exception {
- String message = readMessage(in);
- if(message == null || message.equals(""))
- return;
- receivedMessages.add(message);
- }
-
- private String readMessage(BufferedReader in) throws IOException {
- int c;
- StringBuilder b = new StringBuilder();
-
- while((c = in.read()) != -1) {
- b.append((char)c);
- if(b.toString().endsWith("]]>]]>"))
- break;
- }
-
- return b.toString();
- }
-
- private void sendMessage(PrintWriter out, String message) throws InterruptedException {
- out.print(message);
- out.flush();
- }
-
- });
- innerThread.setName("Mocked-netconf-endpoint-inner-thread");
- innerThread.start();
- }
-
- public List<String> getReceivedMessages() {
- return receivedMessages;
- }
-
- public void close() throws IOException, InterruptedException {
- stopped.set(true);
- innerThread.join();
- }
-
- static class MessageSequence {
- private List<String> messages;
-
- MessageSequence(List<String> messages) {
- this.messages = messages;
- }
-
- MessageSequence(String... messages) {
- this(Lists.newArrayList(messages));
- }
-
- public Collection<String> getMessages() {
- return messages;
- }
- }
-}
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.config.persist.api.Persister;
import org.opendaylight.controller.config.persist.api.PropertiesProvider;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
import org.opendaylight.controller.netconf.persist.impl.DummyAdapter;
+import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.ServiceListener;
+import org.osgi.framework.ServiceReference;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
final class MockedBundleContext {
-
@Mock
private BundleContext context;
+ @Mock
+ private Filter filter;
+ @Mock
+ private ServiceReference<?> serviceReference;
+ @Mock
+ private Bundle bundle;
+ @Mock
+ NetconfOperationServiceFactory serviceFactory;
+ @Mock
+ private NetconfOperationService service;
- MockedBundleContext(String netconfAddress, String netconfPort) {
+ MockedBundleContext(long maxWaitForCapabilitiesMillis, long conflictingVersionTimeoutMillis) throws Exception {
MockitoAnnotations.initMocks(this);
- initContext(netconfAddress, netconfPort);
+ doReturn(null).when(context).getProperty(anyString());
+ initContext(maxWaitForCapabilitiesMillis, conflictingVersionTimeoutMillis);
+ doReturn(filter).when(context).createFilter(ConfigPersisterActivator.getFilterString());
+ String filterString = "filter";
+ doReturn(filterString).when(filter).toString();
+ doNothing().when(context).addServiceListener(any(ServiceListener.class), eq(filterString));
+ ServiceReference<?>[] toBeReturned = {serviceReference};
+ doReturn(toBeReturned).when(context).getServiceReferences((String) null, filterString);
+ doReturn(bundle).when(serviceReference).getBundle();
+ doReturn(context).when(bundle).getBundleContext();
+ doReturn("").when(serviceReference).toString();
+ doReturn(serviceFactory).when(context).getService(any(ServiceReference.class));
+ doReturn(service).when(serviceFactory).createService(anyString());
+ doReturn(Collections.emptySet()).when(service).getCapabilities();
+ doNothing().when(service).close();
}
public BundleContext getBundleContext() {
return context;
}
- private void initContext(String netconfAddress, String netconfPort) {
- initProp(context, ConfigPersisterActivator.IGNORED_MISSING_CAPABILITY_REGEX_SUFFIX, null);
-
- initPropNoPrefix(context, "netconf.tcp.client.address", netconfAddress);
- initPropNoPrefix(context, "netconf.tcp.client.port", netconfPort);
-
+ private void initContext(long maxWaitForCapabilitiesMillis, long conflictingVersionTimeoutMillis) {
initProp(context, "active", "1");
initProp(context, "1." + ConfigPersisterActivator.STORAGE_ADAPTER_CLASS_PROP_SUFFIX, DummyAdapterWithInitialSnapshot.class.getName());
initProp(context, "1." + "readonly", "false");
initProp(context, "1." + ".properties.fileStorage", "target/configuration-persister-test/initial/");
-
+ initProp(context, ConfigPersisterActivator.MAX_WAIT_FOR_CAPABILITIES_MILLIS_PROPERTY, String.valueOf(maxWaitForCapabilitiesMillis));
+ initProp(context, ConfigPersisterActivator.CONFLICTING_VERSION_TIMEOUT_MILLIS_PROPERTY, String.valueOf(conflictingVersionTimeoutMillis));
}
private void initProp(BundleContext context, String key, String value) {
@Override
public List<ConfigSnapshotHolder> loadLastConfigs() throws IOException {
- return Lists.newArrayList(getConfigSnapshopt());
+ return Lists.newArrayList(getConfigSnapshot());
}
@Override
return this;
}
- public ConfigSnapshotHolder getConfigSnapshopt() {
+ public ConfigSnapshotHolder getConfigSnapshot() {
return new ConfigSnapshotHolder() {
@Override
public String getConfigSnapshot() {
this.t = e;
}
+ public void assertException(Class<? extends Exception> exType, String exMessageToContain) {
+ assertException(exMessageToContain, exType, exMessageToContain);
+ }
+
public void assertException(String failMessageSuffix, Class<? extends Exception> exType, String exMessageToContain) {
if(t == null) {
fail("Should fail to " + failMessageSuffix);
*/
package org.opendaylight.controller.netconf.impl.osgi;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
import org.opendaylight.controller.netconf.api.NetconfSession;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
public class NetconfOperationRouterImpl implements NetconfOperationRouter {
return sortedPriority;
}
- public static final NetconfOperationChainedExecution EXECUTION_TERMINATION_POINT = new NetconfOperationChainedExecution() {
- @Override
- public boolean isExecutionTermination() {
- return true;
- }
-
- @Override
- public Document execute(Document requestMessage) throws NetconfDocumentedException {
- throw new IllegalStateException("This execution represents the termination point in operation execution and cannot be executed itself");
- }
- };
-
private static class NetconfOperationExecution implements NetconfOperationChainedExecution {
private final NetconfOperation netconfOperation;
private NetconfOperationChainedExecution subsequentExecution;
package org.opendaylight.controller.netconf.impl.osgi;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
public class NetconfOperationServiceSnapshot implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(NetconfOperationServiceSnapshot.class);
Set<NetconfOperationService> services = new HashSet<>();
netconfSessionIdForReporting = getNetconfSessionIdForReporting(sessionId);
for (NetconfOperationServiceFactory factory : factories) {
- services.add(factory.createService(sessionId, netconfSessionIdForReporting));
+ services.add(factory.createService(netconfSessionIdForReporting));
}
this.services = Collections.unmodifiableSet(services);
}
private NetconfOperationServiceFactory mockOpF() {
return new NetconfOperationServiceFactory() {
@Override
- public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
+ public NetconfOperationService createService(String netconfSessionIdForReporting) {
return new NetconfOperationService() {
@Override
public Set<Capability> getCapabilities() {
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.config.persist.api.Persister;
import org.opendaylight.controller.config.spi.ModuleFactory;
-import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.jmx.CommitJMXNotification;
import org.opendaylight.controller.netconf.api.monitoring.NetconfManagementSession;
import org.opendaylight.controller.netconf.client.NetconfClient;
import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
import org.opendaylight.controller.netconf.confignetconfconnector.osgi.NetconfOperationServiceFactoryImpl;
+import org.opendaylight.controller.netconf.confignetconfconnector.osgi.YangStoreException;
import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
import org.opendaylight.controller.netconf.impl.NetconfServerDispatcher;
import org.opendaylight.controller.netconf.impl.osgi.NetconfMonitoringServiceImpl;
import java.util.Collection;
import java.util.List;
import java.util.Set;
-import java.util.regex.Pattern;
import static junit.framework.Assert.assertEquals;
import static org.mockito.Matchers.any;
try (NetconfClient persisterClient = new NetconfClient("persister", tcpAddress, 4000, clientDispatcher)) {
try (ConfigPersisterNotificationHandler configPersisterNotificationHandler = new ConfigPersisterNotificationHandler(
- platformMBeanServer, mockedAggregator, Pattern.compile(""))) {
+ platformMBeanServer, mockedAggregator)) {
try (NetconfClient netconfClient = new NetconfClient("client", tcpAddress, 4000, clientDispatcher)) {
return getHandlingPriority(priority + priorityIncrease);
}
+ public boolean isCannotHandle() {
+ return this.equals(CANNOT_HANDLE);
+ }
+
@Override
public int compareTo(HandlingPriority o) {
if (this == o)
* Do not execute if this is termination point
*/
Document execute(Document requestMessage) throws NetconfDocumentedException;
+
+ public static final NetconfOperationChainedExecution EXECUTION_TERMINATION_POINT = new NetconfOperationChainedExecution() {
+ @Override
+ public boolean isExecutionTermination() {
+ return true;
+ }
+
+ @Override
+ public Document execute(Document requestMessage) throws NetconfDocumentedException {
+ throw new IllegalStateException("This execution represents the termination point in operation execution and cannot be executed itself");
+ }
+ };
+
+
}
*/
public interface NetconfOperationServiceFactory {
- NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting);
+ NetconfOperationService createService(String netconfSessionIdForReporting);
}
}
@Override
- public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
+ public NetconfOperationService createService(String netconfSessionIdForReporting) {
return operationService;
}
}
return (doc == null) ? null : new NetconfMessage(doc);
}
- public static void checkIsMessageOk(NetconfMessage responseMessage) throws ConflictingVersionException {
- XmlElement element = XmlElement.fromDomDocument(responseMessage.getDocument());
+ public static Document checkIsMessageOk(NetconfMessage responseMessage) throws ConflictingVersionException {
+ return checkIsMessageOk(responseMessage.getDocument());
+ }
+
+ public static Document checkIsMessageOk(Document response) throws ConflictingVersionException {
+ XmlElement element = XmlElement.fromDomDocument(response);
Preconditions.checkState(element.getName().equals(XmlNetconfConstants.RPC_REPLY_KEY));
element = element.getOnlyChildElement();
if (element.getName().equals(XmlNetconfConstants.OK)) {
- return;
+ return response;
}
if (element.getName().equals(XmlNetconfConstants.RPC_ERROR)) {
throw new ConflictingVersionException(error);
}
throw new IllegalStateException("Can not load last configuration, operation failed: "
- + XmlUtil.toString(responseMessage.getDocument()));
+ + XmlUtil.toString(response));
}
logger.warn("Can not load last configuration. Operation failed.");
throw new IllegalStateException("Can not load last configuration. Operation failed: "
- + XmlUtil.toString(responseMessage.getDocument()));
+ + XmlUtil.toString(response));
}
}