package org.opendaylight.controller.netconf.persist.impl;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import io.netty.channel.EventLoopGroup;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Function;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Collections2;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.Immutable;
import org.opendaylight.controller.config.api.ConflictingVersionException;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.client.NetconfClient;
-import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.mapping.api.Capability;
+import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
import org.opendaylight.controller.netconf.util.NetconfUtil;
-import org.opendaylight.controller.netconf.util.messages.NetconfMessageAdditionalHeader;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
-import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import org.xml.sax.SAXException;
-import javax.annotation.concurrent.Immutable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
@Immutable
public class ConfigPusher {
- private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class);
- private static final int NETCONF_SEND_ATTEMPT_MS_DELAY = 1000;
- private static final int NETCONF_SEND_ATTEMPTS = 20;
-
- private final InetSocketAddress address;
- private final EventLoopGroup nettyThreadgroup;
-
- // Default timeout for netconf becoming stable
- public static final long DEFAULT_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(2);
- private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 5000;
- private final int delayMillis = 5000;
- private final long timeoutNanos;
- private final long connectionTimeoutMillis;
-
- public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadgroup) {
- this(address, nettyThreadgroup, DEFAULT_TIMEOUT_NANOS, DEFAULT_CONNECTION_TIMEOUT_MILLIS);
- }
+ private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class);
- public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadgroup, long timeoutNanos, long connectionTimeoutMillis) {
- this.address = address;
- this.nettyThreadgroup = nettyThreadgroup;
- this.timeoutNanos = timeoutNanos;
- this.connectionTimeoutMillis = connectionTimeoutMillis;
- }
+ private final long maxWaitForCapabilitiesMillis;
+ private final long conflictingVersionTimeoutMillis;
+ private final NetconfOperationServiceFactory configNetconfConnector;
- public synchronized NetconfClient init(List<ConfigSnapshotHolder> configs) throws InterruptedException {
- logger.debug("Last config snapshots to be pushed to netconf: {}", configs);
- return pushAllConfigs(configs);
+ public ConfigPusher(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
+ long conflictingVersionTimeoutMillis) {
+ this.configNetconfConnector = configNetconfConnector;
+ this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
+ this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
}
- private synchronized NetconfClient pushAllConfigs(List<ConfigSnapshotHolder> configs) throws InterruptedException {
- // first just make sure we can connect to netconf, even if nothing is being pushed
- NetconfClient netconfClient = makeNetconfConnection(Collections.<String>emptySet(), Optional.<NetconfClient>absent());
+ public synchronized LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> pushConfigs(List<ConfigSnapshotHolder> configs) throws NetconfDocumentedException {
+ logger.debug("Last config snapshots to be pushed to netconf: {}", configs);
+ LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> result = new LinkedHashMap<>();
// start pushing snapshots:
- for (ConfigSnapshotHolder configSnapshotHolder: configs){
- netconfClient = pushSnapshotWithRetries(configSnapshotHolder, Optional.of(netconfClient));
- logger.debug("Config snapshot pushed successfully: {}", configSnapshotHolder);
+ for (ConfigSnapshotHolder configSnapshotHolder : configs) {
+ if(configSnapshotHolder != null) {
+ EditAndCommitResponse editAndCommitResponseWithRetries = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
+ logger.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
+ result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
+ }
}
-
logger.debug("All configuration snapshots have been pushed successfully.");
- return netconfClient;
+ return result;
}
- private synchronized NetconfClient pushSnapshotWithRetries(ConfigSnapshotHolder configSnapshotHolder,
- Optional<NetconfClient> oldClientForPossibleReuse)
- throws InterruptedException {
+ /**
+ * First calls {@link #getOperationServiceWithRetries(java.util.Set, String)} in order to wait until
+ * expected capabilities are present, then tries to push configuration. If {@link ConflictingVersionException}
+ * is caught, whole process is retried - new service instance need to be obtained from the factory. Closes
+ * {@link NetconfOperationService} after each use.
+ */
+ private synchronized EditAndCommitResponse pushConfigWithConflictingVersionRetries(ConfigSnapshotHolder configSnapshotHolder) throws NetconfDocumentedException {
+ ConflictingVersionException lastException;
+ Stopwatch stopwatch = new Stopwatch().start();
+ do {
+ String idForReporting = configSnapshotHolder.toString();
+ SortedSet<String> expectedCapabilities = checkNotNull(configSnapshotHolder.getCapabilities(),
+ "Expected capabilities must not be null - %s, check %s", idForReporting,
+ configSnapshotHolder.getClass().getName());
+ try (NetconfOperationService operationService = getOperationServiceWithRetries(expectedCapabilities, idForReporting)) {
+ return pushConfig(configSnapshotHolder, operationService);
+ } catch (ConflictingVersionException e) {
+ lastException = e;
+ logger.debug("Conflicting version detected, will retry after timeout");
+ sleep();
+ }
+ } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
+ throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
+ lastException);
+ }
- Exception lastException = null;
- int maxAttempts = 30;
- for(int i = 0 ; i < maxAttempts; i++) {
- NetconfClient netconfClient = makeNetconfConnection(configSnapshotHolder.getCapabilities(), oldClientForPossibleReuse);
- logger.trace("Pushing following xml to netconf {}", configSnapshotHolder);
+ private NetconfOperationService getOperationServiceWithRetries(Set<String> expectedCapabilities, String idForReporting) {
+ Stopwatch stopwatch = new Stopwatch().start();
+ NotEnoughCapabilitiesException lastException;
+ do {
try {
- pushLastConfig(configSnapshotHolder, netconfClient);
- return netconfClient;
- } catch (ConflictingVersionException | IOException e) {
- Util.closeClientAndDispatcher(netconfClient);
+ return getOperationService(expectedCapabilities, idForReporting);
+ } catch (NotEnoughCapabilitiesException e) {
+ logger.debug("Not enough capabilities: " + e.toString());
lastException = e;
- Thread.sleep(1000);
- } catch (SAXException e) {
- throw new IllegalStateException("Unable to load last config", e);
+ sleep();
}
- }
- throw new IllegalStateException("Failed to push configuration, maximum attempt count has been reached: "
- + maxAttempts, lastException);
+ } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
+ throw new IllegalStateException("Max wait for capabilities reached." + lastException.getMessage(), lastException);
}
- /**
- * @param expectedCaps capabilities that server hello must contain. Will retry until all are found or throws RuntimeException.
- * If empty set is provided, will only make sure netconf client successfuly connected to the server.
- * @param maybeOldClient if present, close it.
- * @return NetconfClient that has all required capabilities from server.
- */
- private synchronized NetconfClient makeNetconfConnection(Set<String> expectedCaps,
- Optional<NetconfClient> maybeOldClient)
- throws InterruptedException {
+ private static class NotEnoughCapabilitiesException extends Exception {
+ private static final long serialVersionUID = 1L;
- if (maybeOldClient.isPresent()) {
- NetconfClient oldClient = maybeOldClient.get();
- Util.closeClientAndDispatcher(oldClient);
+ private NotEnoughCapabilitiesException(String message, Throwable cause) {
+ super(message, cause);
}
- // TODO think about moving capability subset check to netconf client
- // could be utilized by integration tests
-
- final long pollingStart = System.nanoTime();
- final long deadline = pollingStart + timeoutNanos;
- int attempt = 0;
+ private NotEnoughCapabilitiesException(String message) {
+ super(message);
+ }
+ }
- String additionalHeader = NetconfMessageAdditionalHeader.toString("unknown", address.getAddress().getHostAddress(),
- Integer.toString(address.getPort()), "tcp", Optional.of("persister"));
+ /**
+ * Get NetconfOperationService iif all required capabilities are present.
+ *
+ * @param expectedCapabilities that must be provided by configNetconfConnector
+ * @param idForReporting
+ * @return service if capabilities are present, otherwise absent value
+ */
+ private NetconfOperationService getOperationService(Set<String> expectedCapabilities, String idForReporting) throws NotEnoughCapabilitiesException {
+ NetconfOperationService serviceCandidate;
+ try {
+ serviceCandidate = configNetconfConnector.createService(idForReporting);
+ } catch(RuntimeException e) {
+ throw new NotEnoughCapabilitiesException("Netconf service not stable for " + idForReporting, e);
+ }
+ Set<String> notFoundDiff = computeNotFoundCapabilities(expectedCapabilities, serviceCandidate);
+ if (notFoundDiff.isEmpty()) {
+ return serviceCandidate;
+ } else {
+ serviceCandidate.close();
+ logger.trace("Netconf server did not provide required capabilities for {} " +
+ "Expected but not found: {}, all expected {}, current {}",
+ idForReporting, notFoundDiff, expectedCapabilities, serviceCandidate.getCapabilities()
+ );
+ throw new NotEnoughCapabilitiesException("Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundDiff);
+ }
+ }
- Set<String> latestCapabilities = null;
- while (System.nanoTime() < deadline) {
- attempt++;
- NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(nettyThreadgroup,
- nettyThreadgroup, additionalHeader, connectionTimeoutMillis);
- NetconfClient netconfClient;
- try {
- netconfClient = new NetconfClient(this.toString(), address, delayMillis, netconfClientDispatcher);
- } catch (IllegalStateException e) {
- logger.debug("Netconf {} was not initialized or is not stable, attempt {}", address, attempt, e);
- netconfClientDispatcher.close();
- Thread.sleep(delayMillis);
- continue;
- }
- latestCapabilities = netconfClient.getCapabilities();
- if (Util.isSubset(netconfClient, expectedCaps)) {
- logger.debug("Hello from netconf stable with {} capabilities", latestCapabilities);
- logger.trace("Session id received from netconf server: {}", netconfClient.getClientSession());
- return netconfClient;
+ private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, NetconfOperationService serviceCandidate) {
+ Collection<String> actual = Collections2.transform(serviceCandidate.getCapabilities(), new Function<Capability, String>() {
+ @Override
+ public String apply(Capability input) {
+ return input.getCapabilityUri();
}
- logger.debug("Polling hello from netconf, attempt {}, capabilities {}", attempt, latestCapabilities);
- Util.closeClientAndDispatcher(netconfClient);
- Thread.sleep(delayMillis);
- }
- if (latestCapabilities == null) {
- logger.error("Could not connect to the server in {} ms", timeoutNanos / 1000);
- throw new RuntimeException("Could not connect to netconf server");
- }
- Set<String> allNotFound = new HashSet<>(expectedCaps);
- allNotFound.removeAll(latestCapabilities);
- logger.error("Netconf server did not provide required capabilities. Expected but not found: {}, all expected {}, current {}",
- allNotFound, expectedCaps, latestCapabilities);
- throw new RuntimeException("Netconf server did not provide required capabilities. Expected but not found:" + allNotFound);
+ });
+ Set<String> allNotFound = new HashSet<>(expectedCapabilities);
+ allNotFound.removeAll(actual);
+ return allNotFound;
}
- private synchronized void pushLastConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfClient netconfClient)
- throws ConflictingVersionException, IOException, SAXException {
-
- Element xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
- logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
- StringBuilder response = new StringBuilder("editConfig response = {");
-
- NetconfMessage message = createEditConfigMessage(xmlToBePersisted, "/netconfOp/editConfig.xml");
-
- // sending message to netconf
- NetconfMessage responseMessage = getResponse(message, netconfClient);
- NetconfUtil.checkIsMessageOk(responseMessage);
- response.append(XmlUtil.toString(responseMessage.getDocument()));
- response.append("}");
- responseMessage = getResponse(getNetconfMessageFromResource("/netconfOp/commit.xml"), netconfClient);
+ private void sleep() {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ }
+ }
+ /**
+ * Sends two RPCs to the netconf server: edit-config and commit.
+ *
+ * @param configSnapshotHolder
+ * @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager
+ * @throws java.lang.RuntimeException if edit-config or commit fails otherwise
+ */
+ private synchronized EditAndCommitResponse pushConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfOperationService operationService)
+ throws ConflictingVersionException, NetconfDocumentedException {
+ Element xmlToBePersisted;
+ try {
+ xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
+ } catch (SAXException | IOException e) {
+ throw new IllegalStateException("Cannot parse " + configSnapshotHolder);
+ }
+ logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
+ Stopwatch stopwatch = new Stopwatch().start();
+ NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted);
+
+ Document editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, operationService,
+ "edit-config", configSnapshotHolder.toString());
+
+ Document commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), operationService,
+ "commit", configSnapshotHolder.toString());
+
+ if (logger.isTraceEnabled()) {
+ StringBuilder response = new StringBuilder("editConfig response = {");
+ response.append(XmlUtil.toString(editResponseMessage));
+ response.append("}");
+ response.append("commit response = {");
+ response.append(XmlUtil.toString(commitResponseMessage));
+ response.append("}");
+ logger.trace("Last configuration loaded successfully");
+ logger.trace("Detailed message {}", response);
+ logger.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ }
+ return new EditAndCommitResponse(editResponseMessage, commitResponseMessage);
+ }
- NetconfUtil.checkIsMessageOk(responseMessage);
- response.append("commit response = {");
- response.append(XmlUtil.toString(responseMessage.getDocument()));
- response.append("}");
- logger.trace("Last configuration loaded successfully");
- logger.trace("Detailed message {}", response);
+ private NetconfOperation findOperation(NetconfMessage request, NetconfOperationService operationService) throws NetconfDocumentedException {
+ TreeMap<HandlingPriority, NetconfOperation> allOperations = new TreeMap<>();
+ Set<NetconfOperation> netconfOperations = operationService.getNetconfOperations();
+ if (netconfOperations.isEmpty()) {
+ throw new IllegalStateException("Possible code error: no config operations");
+ }
+ for (NetconfOperation netconfOperation : netconfOperations) {
+ HandlingPriority handlingPriority = netconfOperation.canHandle(request.getDocument());
+ allOperations.put(handlingPriority, netconfOperation);
+ }
+ Entry<HandlingPriority, NetconfOperation> highestEntry = allOperations.lastEntry();
+ if (highestEntry.getKey().isCannotHandle()) {
+ throw new IllegalStateException("Possible code error: operation with highest priority is CANNOT_HANDLE");
+ }
+ return highestEntry.getValue();
}
- private static NetconfMessage getResponse(NetconfMessage request, NetconfClient netconfClient) throws IOException {
+ private Document sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfOperationService operationService,
+ String operationNameForReporting, String configIdForReporting)
+ throws ConflictingVersionException, NetconfDocumentedException {
+
+ NetconfOperation operation = findOperation(request, operationService);
+ Document response;
try {
- return netconfClient.sendMessage(request, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY);
- } catch (RuntimeException e) {
- logger.debug("Error while executing netconf transaction {} to {}", request, netconfClient, e);
- throw new IOException("Failed to execute netconf transaction", e);
+ response = operation.handle(request.getDocument(), NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
+ } catch (NetconfDocumentedException | RuntimeException e) {
+ if (e instanceof NetconfDocumentedException && e.getCause() instanceof ConflictingVersionException) {
+ throw (ConflictingVersionException) e.getCause();
+ }
+ throw new IllegalStateException("Failed to send " + operationNameForReporting +
+ " for configuration " + configIdForReporting, e);
}
+ return NetconfUtil.checkIsMessageOk(response);
}
- private static NetconfMessage createEditConfigMessage(Element dataElement, String editConfigResourcename) throws IOException, SAXException {
- try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcename)) {
- Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcename);
+ // load editConfig.xml template, populate /rpc/edit-config/config with parameter
+ private static NetconfMessage createEditConfigMessage(Element dataElement) throws NetconfDocumentedException {
+ String editConfigResourcePath = "/netconfOp/editConfig.xml";
+ try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcePath)) {
+ checkNotNull(stream, "Unable to load resource " + editConfigResourcePath);
Document doc = XmlUtil.readXmlToDocument(stream);
- doc.getDocumentElement();
XmlElement editConfigElement = XmlElement.fromDomDocument(doc).getOnlyChildElement();
XmlElement configWrapper = editConfigElement.getOnlyChildElement(XmlNetconfConstants.CONFIG_KEY);
editConfigElement.getDomElement().removeChild(configWrapper.getDomElement());
for (XmlElement el : XmlElement.fromDomElement(dataElement).getChildElements()) {
- configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), true));
+ boolean deep = true;
+ configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), deep));
}
editConfigElement.appendChild(configWrapper.getDomElement());
return new NetconfMessage(doc);
} catch (IOException | SAXException e) {
- logger.debug("Failed to create edit-config message for resource {}", editConfigResourcename, e);
- throw e;
+ // error reading the xml file bundled into the jar
+ throw new IllegalStateException("Error while opening local resource " + editConfigResourcePath, e);
}
}
- private static NetconfMessage getNetconfMessageFromResource(String resource) throws IOException, SAXException {
+ private static NetconfMessage getCommitMessage() {
+ String resource = "/netconfOp/commit.xml";
try (InputStream stream = ConfigPusher.class.getResourceAsStream(resource)) {
- Preconditions.checkNotNull(stream, "Unable to load resource " + resource);
+ checkNotNull(stream, "Unable to load resource " + resource);
return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
} catch (SAXException | IOException e) {
- logger.debug("Failed to parse netconf message for resource {}", resource, e);
- throw e;
+ // error reading the xml file bundled into the jar
+ throw new IllegalStateException("Error while opening local resource " + resource, e);
+ }
+ }
+
+ static class EditAndCommitResponse {
+ private final Document editResponse, commitResponse;
+
+ EditAndCommitResponse(Document editResponse, Document commitResponse) {
+ this.editResponse = editResponse;
+ this.commitResponse = commitResponse;
+ }
+
+ public Document getEditResponse() {
+ return editResponse;
+ }
+
+ public Document getCommitResponse() {
+ return commitResponse;
+ }
+
+ @Override
+ public String toString() {
+ return "EditAndCommitResponse{" +
+ "editResponse=" + editResponse +
+ ", commitResponse=" + commitResponse +
+ '}';
}
}
}