X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fconfig-persister-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fpersist%2Fimpl%2FConfigPusher.java;h=957db50c61a8b7ca076fc26fcc2c9886251f73bb;hp=0a844b69ab8a61d37d39bd11fa1a1f4bf2b37867;hb=494d528befd3908cc1b5f1c7706d34414819ef03;hpb=a84d1bd3fba5d6fb7d9777e1508221e2f773e94f diff --git a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java index 0a844b69ab..957db50c61 100644 --- a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java +++ b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java @@ -8,16 +8,30 @@ package org.opendaylight.controller.netconf.persist.impl; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import io.netty.channel.EventLoopGroup; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import javax.annotation.concurrent.Immutable; + import org.opendaylight.controller.config.api.ConflictingVersionException; import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder; +import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.NetconfMessage; -import org.opendaylight.controller.netconf.client.NetconfClient; -import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; +import org.opendaylight.controller.netconf.mapping.api.Capability; +import org.opendaylight.controller.netconf.mapping.api.HandlingPriority; +import org.opendaylight.controller.netconf.mapping.api.NetconfOperation; +import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution; +import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService; +import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory; import org.opendaylight.controller.netconf.util.NetconfUtil; -import org.opendaylight.controller.netconf.util.messages.NetconfMessageAdditionalHeader; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.util.xml.XmlUtil; @@ -27,208 +41,269 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; import org.xml.sax.SAXException; -import javax.annotation.concurrent.Immutable; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Collections2; @Immutable public class ConfigPusher { - private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class); - private static final int NETCONF_SEND_ATTEMPT_MS_DELAY = 1000; - private static final int NETCONF_SEND_ATTEMPTS = 20; - - private final InetSocketAddress address; - private final EventLoopGroup nettyThreadgroup; + private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class); - // Default timeout for netconf becoming stable - public static final long DEFAULT_TIMEOUT = TimeUnit.MINUTES.toNanos(2); - private final int delayMillis = 5000; - private final long timeoutNanos; + private final long maxWaitForCapabilitiesMillis; + private final long conflictingVersionTimeoutMillis; + private final NetconfOperationServiceFactory configNetconfConnector; - public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadgroup) { - this(address, nettyThreadgroup, DEFAULT_TIMEOUT); + public ConfigPusher(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis, + long conflictingVersionTimeoutMillis) { + this.configNetconfConnector = configNetconfConnector; + this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis; + this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis; } - @Deprecated - public ConfigPusher(InetSocketAddress address, long timeoutMillis, EventLoopGroup nettyThreadgroup) { - this(address, nettyThreadgroup, TimeUnit.MILLISECONDS.toNanos(timeoutMillis)); - } - - public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadgroup, long timeoutNanos) { - this.address = address; - this.nettyThreadgroup = nettyThreadgroup; - this.timeoutNanos = timeoutNanos; - } - - public synchronized NetconfClient init(List configs) throws InterruptedException { + public synchronized LinkedHashMap pushConfigs(List configs) throws NetconfDocumentedException { logger.debug("Last config snapshots to be pushed to netconf: {}", configs); - return pushAllConfigs(configs); - } - - private synchronized NetconfClient pushAllConfigs(List configs) throws InterruptedException { - // first just make sure we can connect to netconf, even if nothing is being pushed - NetconfClient netconfClient = makeNetconfConnection(Collections.emptySet(), Optional.absent()); + LinkedHashMap result = new LinkedHashMap<>(); // start pushing snapshots: - for (ConfigSnapshotHolder configSnapshotHolder: configs){ - netconfClient = pushSnapshotWithRetries(configSnapshotHolder, Optional.of(netconfClient)); - logger.debug("Config snapshot pushed successfully: {}", configSnapshotHolder); + for (ConfigSnapshotHolder configSnapshotHolder : configs) { + EditAndCommitResponse editAndCommitResponseWithRetries = pushConfigWithConflictingVersionRetries(configSnapshotHolder); + logger.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result); + result.put(configSnapshotHolder, editAndCommitResponseWithRetries); } - logger.debug("All configuration snapshots have been pushed successfully."); - return netconfClient; + return result; } - private synchronized NetconfClient pushSnapshotWithRetries(ConfigSnapshotHolder configSnapshotHolder, - Optional oldClientForPossibleReuse) - throws InterruptedException { + /** + * First calls {@link #getOperationServiceWithRetries(java.util.Set, String)} in order to wait until + * expected capabilities are present, then tries to push configuration. If {@link ConflictingVersionException} + * is caught, whole process is retried - new service instance need to be obtained from the factory. Closes + * {@link NetconfOperationService} after each use. + */ + private synchronized EditAndCommitResponse pushConfigWithConflictingVersionRetries(ConfigSnapshotHolder configSnapshotHolder) throws NetconfDocumentedException { + ConflictingVersionException lastException; + Stopwatch stopwatch = new Stopwatch().start(); + do { + try (NetconfOperationService operationService = getOperationServiceWithRetries(configSnapshotHolder.getCapabilities(), configSnapshotHolder.toString())) { + return pushConfig(configSnapshotHolder, operationService); + } catch (ConflictingVersionException e) { + lastException = e; + logger.debug("Conflicting version detected, will retry after timeout"); + sleep(); + } + } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis); + throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms", + lastException); + } - Exception lastException = null; - int maxAttempts = 30; - for(int i = 0 ; i < maxAttempts; i++) { - NetconfClient netconfClient = makeNetconfConnection(configSnapshotHolder.getCapabilities(), oldClientForPossibleReuse); - logger.trace("Pushing following xml to netconf {}", configSnapshotHolder); + private NetconfOperationService getOperationServiceWithRetries(Set expectedCapabilities, String idForReporting) { + Stopwatch stopwatch = new Stopwatch().start(); + NotEnoughCapabilitiesException lastException; + do { try { - pushLastConfig(configSnapshotHolder, netconfClient); - return netconfClient; - } catch (ConflictingVersionException | IOException e) { - Util.closeClientAndDispatcher(netconfClient); + return getOperationService(expectedCapabilities, idForReporting); + } catch (NotEnoughCapabilitiesException e) { + logger.debug("Not enough capabilities: " + e.toString()); lastException = e; - Thread.sleep(1000); - } catch (SAXException e) { - throw new IllegalStateException("Unable to load last config", e); + sleep(); } - } - throw new IllegalStateException("Failed to push configuration, maximum attempt count has been reached: " - + maxAttempts, lastException); + } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis); + throw new IllegalStateException("Max wait for capabilities reached." + lastException.getMessage(), lastException); } - /** - * @param expectedCaps capabilities that server hello must contain. Will retry until all are found or throws RuntimeException. - * If empty set is provided, will only make sure netconf client successfuly connected to the server. - * @param maybeOldClient if present, close it. - * @return NetconfClient that has all required capabilities from server. - */ - private synchronized NetconfClient makeNetconfConnection(Set expectedCaps, - Optional maybeOldClient) - throws InterruptedException { + private static class NotEnoughCapabilitiesException extends Exception { + private static final long serialVersionUID = 1L; - if (maybeOldClient.isPresent()) { - NetconfClient oldClient = maybeOldClient.get(); - Util.closeClientAndDispatcher(oldClient); + private NotEnoughCapabilitiesException(String message, Throwable cause) { + super(message, cause); } - // TODO think about moving capability subset check to netconf client - // could be utilized by integration tests - - final long pollingStart = System.nanoTime(); - final long deadline = pollingStart + timeoutNanos; - int attempt = 0; + private NotEnoughCapabilitiesException(String message) { + super(message); + } + } - String additionalHeader = NetconfMessageAdditionalHeader.toString("unknown", address.getAddress().getHostAddress(), - Integer.toString(address.getPort()), "tcp", Optional.of("persister")); + /** + * Get NetconfOperationService iif all required capabilities are present. + * + * @param expectedCapabilities that must be provided by configNetconfConnector + * @param idForReporting + * @return service if capabilities are present, otherwise absent value + */ + private NetconfOperationService getOperationService(Set expectedCapabilities, String idForReporting) throws NotEnoughCapabilitiesException { + NetconfOperationService serviceCandidate; + try { + serviceCandidate = configNetconfConnector.createService(idForReporting); + } catch(RuntimeException e) { + throw new NotEnoughCapabilitiesException("Netconf service not stable for " + idForReporting, e); + } + Set notFoundDiff = computeNotFoundCapabilities(expectedCapabilities, serviceCandidate); + if (notFoundDiff.isEmpty()) { + return serviceCandidate; + } else { + serviceCandidate.close(); + logger.trace("Netconf server did not provide required capabilities for {} " + + "Expected but not found: {}, all expected {}, current {}", + idForReporting, notFoundDiff, expectedCapabilities, serviceCandidate.getCapabilities() + ); + throw new NotEnoughCapabilitiesException("Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundDiff); + } + } - Set latestCapabilities = new HashSet<>(); - while (System.nanoTime() < deadline) { - attempt++; - NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(nettyThreadgroup, - nettyThreadgroup, additionalHeader); - NetconfClient netconfClient; - try { - netconfClient = new NetconfClient(this.toString(), address, delayMillis, netconfClientDispatcher); - } catch (IllegalStateException e) { - logger.debug("Netconf {} was not initialized or is not stable, attempt {}", address, attempt, e); - netconfClientDispatcher.close(); - Thread.sleep(delayMillis); - continue; - } - latestCapabilities = netconfClient.getCapabilities(); - if (Util.isSubset(netconfClient, expectedCaps)) { - logger.debug("Hello from netconf stable with {} capabilities", latestCapabilities); - logger.trace("Session id received from netconf server: {}", netconfClient.getClientSession()); - return netconfClient; + private static Set computeNotFoundCapabilities(Set expectedCapabilities, NetconfOperationService serviceCandidate) { + Collection actual = Collections2.transform(serviceCandidate.getCapabilities(), new Function() { + @Override + public String apply(Capability input) { + return input.getCapabilityUri(); } - logger.debug("Polling hello from netconf, attempt {}, capabilities {}", attempt, latestCapabilities); - Util.closeClientAndDispatcher(netconfClient); - Thread.sleep(delayMillis); - } - Set allNotFound = new HashSet<>(expectedCaps); - allNotFound.removeAll(latestCapabilities); - logger.error("Netconf server did not provide required capabilities. Expected but not found: {}, all expected {}, current {}", - allNotFound, expectedCaps, latestCapabilities); - throw new RuntimeException("Netconf server did not provide required capabilities. Expected but not found:" + allNotFound); + }); + Set allNotFound = new HashSet<>(expectedCapabilities); + allNotFound.removeAll(actual); + return allNotFound; } - private synchronized void pushLastConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfClient netconfClient) - throws ConflictingVersionException, IOException, SAXException { - - Element xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot()); - logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder); - StringBuilder response = new StringBuilder("editConfig response = {"); - - NetconfMessage message = createEditConfigMessage(xmlToBePersisted, "/netconfOp/editConfig.xml"); - // sending message to netconf - NetconfMessage responseMessage = getResponse(message, netconfClient); - - NetconfUtil.checkIsMessageOk(responseMessage); - response.append(XmlUtil.toString(responseMessage.getDocument())); - response.append("}"); - responseMessage = getResponse(getNetconfMessageFromResource("/netconfOp/commit.xml"), netconfClient); + private void sleep() { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + /** + * Sends two RPCs to the netconf server: edit-config and commit. + * + * @param configSnapshotHolder + * @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager + * @throws java.lang.RuntimeException if edit-config or commit fails otherwise + */ + private synchronized EditAndCommitResponse pushConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfOperationService operationService) + throws ConflictingVersionException, NetconfDocumentedException { + Element xmlToBePersisted; + try { + xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot()); + } catch (SAXException | IOException e) { + throw new IllegalStateException("Cannot parse " + configSnapshotHolder); + } + logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder); + Stopwatch stopwatch = new Stopwatch().start(); + NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted); + + Document editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, operationService, + "edit-config", configSnapshotHolder.toString()); + + Document commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), operationService, + "commit", configSnapshotHolder.toString()); + + if (logger.isTraceEnabled()) { + StringBuilder response = new StringBuilder("editConfig response = {"); + response.append(XmlUtil.toString(editResponseMessage)); + response.append("}"); + response.append("commit response = {"); + response.append(XmlUtil.toString(commitResponseMessage)); + response.append("}"); + logger.trace("Last configuration loaded successfully"); + logger.trace("Detailed message {}", response); + logger.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } + return new EditAndCommitResponse(editResponseMessage, commitResponseMessage); + } - NetconfUtil.checkIsMessageOk(responseMessage); - response.append("commit response = {"); - response.append(XmlUtil.toString(responseMessage.getDocument())); - response.append("}"); - logger.trace("Last configuration loaded successfully"); - logger.trace("Detailed message {}", response); + private NetconfOperation findOperation(NetconfMessage request, NetconfOperationService operationService) throws NetconfDocumentedException { + TreeMap allOperations = new TreeMap<>(); + Set netconfOperations = operationService.getNetconfOperations(); + if (netconfOperations.isEmpty()) { + throw new IllegalStateException("Possible code error: no config operations"); + } + for (NetconfOperation netconfOperation : netconfOperations) { + HandlingPriority handlingPriority = netconfOperation.canHandle(request.getDocument()); + allOperations.put(handlingPriority, netconfOperation); + } + Entry highestEntry = allOperations.lastEntry(); + if (highestEntry.getKey().isCannotHandle()) { + throw new IllegalStateException("Possible code error: operation with highest priority is CANNOT_HANDLE"); + } + return highestEntry.getValue(); } - private static NetconfMessage getResponse(NetconfMessage request, NetconfClient netconfClient) throws IOException { + private Document sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfOperationService operationService, + String operationNameForReporting, String configIdForReporting) + throws ConflictingVersionException, NetconfDocumentedException { + + NetconfOperation operation = findOperation(request, operationService); + Document response; try { - return netconfClient.sendMessage(request, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY); - } catch (RuntimeException e) { - logger.debug("Error while executing netconf transaction {} to {}", request, netconfClient, e); - throw new IOException("Failed to execute netconf transaction", e); + response = operation.handle(request.getDocument(), NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT); + } catch (NetconfDocumentedException | RuntimeException e) { + if (e instanceof NetconfDocumentedException && e.getCause() instanceof ConflictingVersionException) { + throw (ConflictingVersionException) e.getCause(); + } + throw new IllegalStateException("Failed to send " + operationNameForReporting + + " for configuration " + configIdForReporting, e); } + return NetconfUtil.checkIsMessageOk(response); } - private static NetconfMessage createEditConfigMessage(Element dataElement, String editConfigResourcename) throws IOException, SAXException { - try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcename)) { - Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcename); + // load editConfig.xml template, populate /rpc/edit-config/config with parameter + private static NetconfMessage createEditConfigMessage(Element dataElement) throws NetconfDocumentedException { + String editConfigResourcePath = "/netconfOp/editConfig.xml"; + try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcePath)) { + Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcePath); Document doc = XmlUtil.readXmlToDocument(stream); - doc.getDocumentElement(); XmlElement editConfigElement = XmlElement.fromDomDocument(doc).getOnlyChildElement(); XmlElement configWrapper = editConfigElement.getOnlyChildElement(XmlNetconfConstants.CONFIG_KEY); editConfigElement.getDomElement().removeChild(configWrapper.getDomElement()); for (XmlElement el : XmlElement.fromDomElement(dataElement).getChildElements()) { - configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), true)); + boolean deep = true; + configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), deep)); } editConfigElement.appendChild(configWrapper.getDomElement()); return new NetconfMessage(doc); } catch (IOException | SAXException e) { - logger.debug("Failed to create edit-config message for resource {}", editConfigResourcename, e); - throw e; + // error reading the xml file bundled into the jar + throw new IllegalStateException("Error while opening local resource " + editConfigResourcePath, e); } } - private static NetconfMessage getNetconfMessageFromResource(String resource) throws IOException, SAXException { + private static NetconfMessage getCommitMessage() { + String resource = "/netconfOp/commit.xml"; try (InputStream stream = ConfigPusher.class.getResourceAsStream(resource)) { Preconditions.checkNotNull(stream, "Unable to load resource " + resource); return new NetconfMessage(XmlUtil.readXmlToDocument(stream)); } catch (SAXException | IOException e) { - logger.debug("Failed to parse netconf message for resource {}", resource, e); - throw e; + // error reading the xml file bundled into the jar + throw new IllegalStateException("Error while opening local resource " + resource, e); + } + } + + static class EditAndCommitResponse { + private final Document editResponse, commitResponse; + + EditAndCommitResponse(Document editResponse, Document commitResponse) { + this.editResponse = editResponse; + this.commitResponse = commitResponse; + } + + public Document getEditResponse() { + return editResponse; + } + + public Document getCommitResponse() { + return commitResponse; + } + + @Override + public String toString() { + return "EditAndCommitResponse{" + + "editResponse=" + editResponse + + ", commitResponse=" + commitResponse + + '}'; } } }