X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fconfig-persister-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fpersist%2Fimpl%2FConfigPusher.java;h=01d872d89cad71156b792b37dc2e78f1749f393a;hp=0b623baaa480dde170d6866d7fe1564242ed704e;hb=c5b0b028392646507133df0af5efcee547763b6d;hpb=58e0b181f70b2e8bf1c8097bb804f6d1f28b00b8 diff --git a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java index 0b623baaa4..01d872d89c 100644 --- a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java +++ b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java @@ -8,23 +8,15 @@ package org.opendaylight.controller.netconf.persist.impl; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import io.netty.channel.EventLoopGroup; - -import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import javax.annotation.concurrent.Immutable; - import org.opendaylight.controller.config.api.ConflictingVersionException; import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.client.NetconfClient; import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; +import org.opendaylight.controller.netconf.util.NetconfUtil; import org.opendaylight.controller.netconf.util.messages.NetconfMessageAdditionalHeader; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; @@ -35,8 +27,16 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; import org.xml.sax.SAXException; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; +import javax.annotation.concurrent.Immutable; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; @Immutable public class ConfigPusher { @@ -45,118 +45,120 @@ public class ConfigPusher { private static final int NETCONF_SEND_ATTEMPTS = 20; private final InetSocketAddress address; - private final EventLoopGroup nettyThreadgroup; - - - public static final long DEFAULT_TIMEOUT = 120000L;// 120 seconds until netconf must be stable - private final long timeout; + private final EventLoopGroup nettyThreadGroup; - public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadgroup) { - this(address, DEFAULT_TIMEOUT, nettyThreadgroup); + // Default timeout for netconf becoming stable + public static final long DEFAULT_MAX_WAIT_FOR_CAPABILITIES_MILLIS = TimeUnit.MINUTES.toMillis(2); + public static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 5000; + private final int delayMillis = 5000; + private final long maxWaitForCapabilitiesMillis; + private final long connectionTimeoutMillis; + public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadGroup) { + this(address, nettyThreadGroup, DEFAULT_MAX_WAIT_FOR_CAPABILITIES_MILLIS, DEFAULT_CONNECTION_TIMEOUT_MILLIS); } - public ConfigPusher(InetSocketAddress address, long timeout, EventLoopGroup nettyThreadgroup) { + public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadGroup, + long maxWaitForCapabilitiesMillis, long connectionTimeoutMillis) { this.address = address; - this.timeout = timeout; - - this.nettyThreadgroup = nettyThreadgroup; + this.nettyThreadGroup = nettyThreadGroup; + this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis; + this.connectionTimeoutMillis = connectionTimeoutMillis; } - public synchronized NetconfClient init(List configs) throws InterruptedException { + public synchronized LinkedHashMap pushConfigs( + List configs) throws InterruptedException { logger.debug("Last config snapshots to be pushed to netconf: {}", configs); - return pushAllConfigs(configs); - } - private synchronized NetconfClient pushAllConfigs(List configs) throws InterruptedException { - NetconfClient netconfClient = makeNetconfConnection(Collections.emptySet(), Optional.absent()); - for (ConfigSnapshotHolder configSnapshotHolder: configs){ - netconfClient = pushSnapshotWithRetries(configSnapshotHolder, Optional.of(netconfClient)); + // first just make sure we can connect to netconf, even if nothing is being pushed + { + NetconfClient netconfClient = makeNetconfConnection(Collections.emptySet()); + Util.closeClientAndDispatcher(netconfClient); } - return netconfClient; + LinkedHashMap result = new LinkedHashMap<>(); + // start pushing snapshots: + for (ConfigSnapshotHolder configSnapshotHolder : configs) { + EditAndCommitResponseWithRetries editAndCommitResponseWithRetries = pushSnapshotWithRetries(configSnapshotHolder); + logger.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result); + result.put(configSnapshotHolder, editAndCommitResponseWithRetries); + } + logger.debug("All configuration snapshots have been pushed successfully."); + return result; } - private synchronized NetconfClient pushSnapshotWithRetries(ConfigSnapshotHolder configSnapshotHolder, - Optional oldClientForPossibleReuse) + /** + * Checks for ConflictingVersionException and retries until optimistic lock succeeds or maximal + * number of attempts is reached. + */ + private synchronized EditAndCommitResponseWithRetries pushSnapshotWithRetries(ConfigSnapshotHolder configSnapshotHolder) throws InterruptedException { ConflictingVersionException lastException = null; int maxAttempts = 30; - for(int i = 0 ; i < maxAttempts; i++) { - NetconfClient netconfClient = makeNetconfConnection(configSnapshotHolder.getCapabilities(), oldClientForPossibleReuse); - final String configSnapshot = configSnapshotHolder.getConfigSnapshot(); - logger.trace("Pushing following xml to netconf {}", configSnapshot); + + for (int retryAttempt = 1; retryAttempt <= maxAttempts; retryAttempt++) { + NetconfClient netconfClient = makeNetconfConnection(configSnapshotHolder.getCapabilities()); + logger.trace("Pushing following xml to netconf {}", configSnapshotHolder); try { - pushLastConfig(configSnapshotHolder, netconfClient); - return netconfClient; - } catch(ConflictingVersionException e) { - Util.closeClientAndDispatcher(netconfClient); + EditAndCommitResponse editAndCommitResponse = pushLastConfig(configSnapshotHolder, netconfClient); + return new EditAndCommitResponseWithRetries(editAndCommitResponse, retryAttempt); + } catch (ConflictingVersionException e) { lastException = e; Thread.sleep(1000); - } catch (SAXException | IOException e) { - throw new IllegalStateException("Unable to load last config", e); + } catch (RuntimeException e) { + throw new IllegalStateException("Unable to load " + configSnapshotHolder, e); + } finally { + Util.closeClientAndDispatcher(netconfClient); } } - throw new IllegalStateException("Failed to push configuration, maximum attempt count has been reached: " - + maxAttempts, lastException); + throw new IllegalStateException("Maximum attempt count has been reached for pushing " + configSnapshotHolder, + lastException); } /** * @param expectedCaps capabilities that server hello must contain. Will retry until all are found or throws RuntimeException. * If empty set is provided, will only make sure netconf client successfuly connected to the server. - * @param oldClientForPossibleReuse if present, try to get expected capabilities from it before closing it and retrying with - * new client connection. * @return NetconfClient that has all required capabilities from server. */ - private synchronized NetconfClient makeNetconfConnection(Set expectedCaps, - Optional oldClientForPossibleReuse) - throws InterruptedException { - - if (oldClientForPossibleReuse.isPresent()) { - NetconfClient oldClient = oldClientForPossibleReuse.get(); - if (Util.isSubset(oldClient, expectedCaps)) { - return oldClient; - } else { - Util.closeClientAndDispatcher(oldClient); - } - } + private synchronized NetconfClient makeNetconfConnection(Set expectedCaps) throws InterruptedException { // TODO think about moving capability subset check to netconf client // could be utilized by integration tests - long pollingStart = System.currentTimeMillis(); - int delay = 5000; - + final long pollingStartNanos = System.nanoTime(); + final long deadlineNanos = pollingStartNanos + TimeUnit.MILLISECONDS.toNanos(maxWaitForCapabilitiesMillis); int attempt = 0; - long deadline = pollingStart + timeout; - String additionalHeader = NetconfMessageAdditionalHeader.toString("unknown", address.getAddress().getHostAddress(), Integer.toString(address.getPort()), "tcp", Optional.of("persister")); - Set latestCapabilities = new HashSet<>(); - while (System.currentTimeMillis() < deadline) { + Set latestCapabilities = null; + while (System.nanoTime() < deadlineNanos) { attempt++; - NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(nettyThreadgroup, - nettyThreadgroup, additionalHeader); + NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(nettyThreadGroup, + nettyThreadGroup, additionalHeader, connectionTimeoutMillis); NetconfClient netconfClient; try { - netconfClient = new NetconfClient(this.toString(), address, delay, netconfClientDispatcher); + netconfClient = new NetconfClient(this.toString(), address, delayMillis, netconfClientDispatcher); } catch (IllegalStateException e) { logger.debug("Netconf {} was not initialized or is not stable, attempt {}", address, attempt, e); netconfClientDispatcher.close(); - Thread.sleep(delay); + Thread.sleep(delayMillis); continue; } latestCapabilities = netconfClient.getCapabilities(); if (Util.isSubset(netconfClient, expectedCaps)) { logger.debug("Hello from netconf stable with {} capabilities", latestCapabilities); - logger.info("Session id received from netconf server: {}", netconfClient.getClientSession()); + logger.trace("Session id received from netconf server: {}", netconfClient.getClientSession()); return netconfClient; } logger.debug("Polling hello from netconf, attempt {}, capabilities {}", attempt, latestCapabilities); Util.closeClientAndDispatcher(netconfClient); - Thread.sleep(delay); + Thread.sleep(delayMillis); + } + if (latestCapabilities == null) { + logger.error("Could not connect to the server in {} ms", maxWaitForCapabilitiesMillis); + throw new RuntimeException("Could not connect to netconf server"); } Set allNotFound = new HashSet<>(expectedCaps); allNotFound.removeAll(latestCapabilities); @@ -166,75 +168,152 @@ public class ConfigPusher { } - private synchronized void pushLastConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfClient netconfClient) - throws ConflictingVersionException, IOException, SAXException { - - Element xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot()); - logger.info("Pushing last configuration to netconf: {}", configSnapshotHolder); - StringBuilder response = new StringBuilder("editConfig response = {"); + /** + * Sends two RPCs to the netconf server: edit-config and commit. + * + * @param configSnapshotHolder + * @param netconfClient + * @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager + * @throws java.lang.RuntimeException if edit-config or commit fails otherwise + */ + private synchronized EditAndCommitResponse pushLastConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfClient netconfClient) + throws ConflictingVersionException { + Element xmlToBePersisted; + try { + xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot()); + } catch (SAXException | IOException e) { + throw new IllegalStateException("Cannot parse " + configSnapshotHolder); + } + logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder); - NetconfMessage message = createEditConfigMessage(xmlToBePersisted, "/netconfOp/editConfig.xml"); + NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted); // sending message to netconf - NetconfMessage responseMessage = getResponse(message, netconfClient); - - XmlElement element = XmlElement.fromDomDocument(responseMessage.getDocument()); - Preconditions.checkState(element.getName().equals(XmlNetconfConstants.RPC_REPLY_KEY)); - element = element.getOnlyChildElement(); - - Util.checkIsOk(element, responseMessage); - response.append(XmlUtil.toString(responseMessage.getDocument())); - response.append("}"); - responseMessage = getResponse(getNetconfMessageFromResource("/netconfOp/commit.xml"), netconfClient); - - element = XmlElement.fromDomDocument(responseMessage.getDocument()); - Preconditions.checkState(element.getName().equals(XmlNetconfConstants.RPC_REPLY_KEY)); - element = element.getOnlyChildElement(); - - Util.checkIsOk(element, responseMessage); - response.append("commit response = {"); - response.append(XmlUtil.toString(responseMessage.getDocument())); - response.append("}"); - logger.info("Last configuration loaded successfully"); - logger.trace("Detailed message {}", response); + NetconfMessage editResponseMessage; + try { + editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, netconfClient); + } catch (IOException e) { + throw new IllegalStateException("Edit-config failed on " + configSnapshotHolder, e); + } + + // commit + NetconfMessage commitResponseMessage; + try { + commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), netconfClient); + } catch (IOException e) { + throw new IllegalStateException("Edit commit succeeded, but commit failed on " + configSnapshotHolder, e); + } + + if (logger.isTraceEnabled()) { + StringBuilder response = new StringBuilder("editConfig response = {"); + response.append(XmlUtil.toString(editResponseMessage.getDocument())); + response.append("}"); + response.append("commit response = {"); + response.append(XmlUtil.toString(commitResponseMessage.getDocument())); + response.append("}"); + logger.trace("Last configuration loaded successfully"); + logger.trace("Detailed message {}", response); + } + return new EditAndCommitResponse(editResponseMessage, commitResponseMessage); } - private static NetconfMessage getResponse(NetconfMessage request, NetconfClient netconfClient) { + + private static NetconfMessage sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfClient netconfClient) throws IOException { try { - return netconfClient.sendMessage(request, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY); - } catch(RuntimeException e) { - logger.error("Error while sending message {} to {}", request, netconfClient); - throw e; + NetconfMessage netconfMessage = netconfClient.sendMessage(request, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY); + NetconfUtil.checkIsMessageOk(netconfMessage); + return netconfMessage; + } catch (RuntimeException e) { // TODO: change NetconfClient#sendMessage to throw checked exceptions + logger.debug("Error while executing netconf transaction {} to {}", request, netconfClient, e); + throw new IOException("Failed to execute netconf transaction", e); } } - private static NetconfMessage createEditConfigMessage(Element dataElement, String editConfigResourcename) { - try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcename)) { - Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcename); + + // load editConfig.xml template, populate /rpc/edit-config/config with parameter + private static NetconfMessage createEditConfigMessage(Element dataElement) { + String editConfigResourcePath = "/netconfOp/editConfig.xml"; + try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcePath)) { + Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcePath); Document doc = XmlUtil.readXmlToDocument(stream); - doc.getDocumentElement(); XmlElement editConfigElement = XmlElement.fromDomDocument(doc).getOnlyChildElement(); XmlElement configWrapper = editConfigElement.getOnlyChildElement(XmlNetconfConstants.CONFIG_KEY); editConfigElement.getDomElement().removeChild(configWrapper.getDomElement()); for (XmlElement el : XmlElement.fromDomElement(dataElement).getChildElements()) { - configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), true)); + boolean deep = true; + configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), deep)); } editConfigElement.appendChild(configWrapper.getDomElement()); return new NetconfMessage(doc); } catch (IOException | SAXException e) { - throw new RuntimeException("Unable to parse message from resources " + editConfigResourcename, e); + // error reading the xml file bundled into the jar + throw new RuntimeException("Error while opening local resource " + editConfigResourcePath, e); } } - private static NetconfMessage getNetconfMessageFromResource(String resource) { + private static NetconfMessage getCommitMessage() { + String resource = "/netconfOp/commit.xml"; try (InputStream stream = ConfigPusher.class.getResourceAsStream(resource)) { Preconditions.checkNotNull(stream, "Unable to load resource " + resource); return new NetconfMessage(XmlUtil.readXmlToDocument(stream)); } catch (SAXException | IOException e) { - throw new RuntimeException("Unable to parse message from resources " + resource, e); + // error reading the xml file bundled into the jar + throw new RuntimeException("Error while opening local resource " + resource, e); + } + } + + static class EditAndCommitResponse { + private final NetconfMessage editResponse, commitResponse; + + EditAndCommitResponse(NetconfMessage editResponse, NetconfMessage commitResponse) { + this.editResponse = editResponse; + this.commitResponse = commitResponse; + } + + public NetconfMessage getEditResponse() { + return editResponse; + } + + public NetconfMessage getCommitResponse() { + return commitResponse; + } + + @Override + public String toString() { + return "EditAndCommitResponse{" + + "editResponse=" + editResponse + + ", commitResponse=" + commitResponse + + '}'; + } + } + + + static class EditAndCommitResponseWithRetries { + private final EditAndCommitResponse editAndCommitResponse; + private final int retries; + + EditAndCommitResponseWithRetries(EditAndCommitResponse editAndCommitResponse, int retries) { + this.editAndCommitResponse = editAndCommitResponse; + this.retries = retries; + } + + public int getRetries() { + return retries; + } + + public EditAndCommitResponse getEditAndCommitResponse() { + return editAndCommitResponse; + } + + @Override + public String toString() { + return "EditAndCommitResponseWithRetries{" + + "editAndCommitResponse=" + editAndCommitResponse + + ", retries=" + retries + + '}'; } } -} +} \ No newline at end of file