X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fconfig-persister-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fpersist%2Fimpl%2FConfigPusher.java;h=c8af4ccd831bb306b563913a3af86d631a92b031;hp=044346e2c5a06e2576b7f687975085dc1e2d60f1;hb=0bff7769f4390e7b6ab6cc33070df684bb3c9509;hpb=de12565a7795af98788f8150eb0072f9c985f4a1 diff --git a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java index 044346e2c5..c8af4ccd83 100644 --- a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java +++ b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java @@ -16,6 +16,8 @@ import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.client.NetconfClient; import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; +import org.opendaylight.controller.netconf.util.NetconfUtil; +import org.opendaylight.controller.netconf.util.messages.NetconfMessageAdditionalHeader; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.util.xml.XmlUtil; @@ -33,6 +35,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; @Immutable public class ConfigPusher { @@ -43,20 +46,24 @@ public class ConfigPusher { private final InetSocketAddress address; private final EventLoopGroup nettyThreadgroup; - - public static final long DEFAULT_TIMEOUT = 120000L;// 120 seconds until netconf must be stable - private final long timeout; + // Default timeout for netconf becoming stable + public static final long DEFAULT_TIMEOUT = TimeUnit.MINUTES.toNanos(2); + private final int delayMillis = 5000; + private final long timeoutNanos; public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadgroup) { - this(address, DEFAULT_TIMEOUT, nettyThreadgroup); + this(address, nettyThreadgroup, DEFAULT_TIMEOUT); + } + @Deprecated + public ConfigPusher(InetSocketAddress address, long timeoutMillis, EventLoopGroup nettyThreadgroup) { + this(address, nettyThreadgroup, TimeUnit.MILLISECONDS.toNanos(timeoutMillis)); } - public ConfigPusher(InetSocketAddress address, long timeout, EventLoopGroup nettyThreadgroup) { + public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadgroup, long timeoutNanos) { this.address = address; - this.timeout = timeout; - this.nettyThreadgroup = nettyThreadgroup; + this.timeoutNanos = timeoutNanos; } public synchronized NetconfClient init(List configs) throws InterruptedException { @@ -65,10 +72,15 @@ public class ConfigPusher { } private synchronized NetconfClient pushAllConfigs(List configs) throws InterruptedException { + // first just make sure we can connect to netconf, even if nothing is being pushed NetconfClient netconfClient = makeNetconfConnection(Collections.emptySet(), Optional.absent()); + // start pushing snapshots: for (ConfigSnapshotHolder configSnapshotHolder: configs){ netconfClient = pushSnapshotWithRetries(configSnapshotHolder, Optional.of(netconfClient)); + logger.debug("Config snapshot pushed successfully: {}", configSnapshotHolder); } + + logger.debug("All configuration snapshots have been pushed successfully."); return netconfClient; } @@ -80,8 +92,7 @@ public class ConfigPusher { int maxAttempts = 30; for(int i = 0 ; i < maxAttempts; i++) { NetconfClient netconfClient = makeNetconfConnection(configSnapshotHolder.getCapabilities(), oldClientForPossibleReuse); - final String configSnapshot = configSnapshotHolder.getConfigSnapshot(); - logger.trace("Pushing following xml to netconf {}", configSnapshot); + logger.trace("Pushing following xml to netconf {}", configSnapshotHolder); try { pushLastConfig(configSnapshotHolder, netconfClient); return netconfClient; @@ -100,55 +111,51 @@ public class ConfigPusher { /** * @param expectedCaps capabilities that server hello must contain. Will retry until all are found or throws RuntimeException. * If empty set is provided, will only make sure netconf client successfuly connected to the server. - * @param oldClientForPossibleReuse if present, try to get expected capabilities from it before closing it and retrying with - * new client connection. + * @param maybeOldClient if present, close it. * @return NetconfClient that has all required capabilities from server. */ private synchronized NetconfClient makeNetconfConnection(Set expectedCaps, - Optional oldClientForPossibleReuse) + Optional maybeOldClient) throws InterruptedException { - if (oldClientForPossibleReuse.isPresent()) { - NetconfClient oldClient = oldClientForPossibleReuse.get(); - if (Util.isSubset(oldClient, expectedCaps)) { - return oldClient; - } else { - Util.closeClientAndDispatcher(oldClient); - } + if (maybeOldClient.isPresent()) { + NetconfClient oldClient = maybeOldClient.get(); + Util.closeClientAndDispatcher(oldClient); } // TODO think about moving capability subset check to netconf client // could be utilized by integration tests - long pollingStart = System.currentTimeMillis(); - int delay = 5000; - + final long pollingStart = System.nanoTime(); + final long deadline = pollingStart + timeoutNanos; int attempt = 0; - long deadline = pollingStart + timeout; + String additionalHeader = NetconfMessageAdditionalHeader.toString("unknown", address.getAddress().getHostAddress(), + Integer.toString(address.getPort()), "tcp", Optional.of("persister")); Set latestCapabilities = new HashSet<>(); - while (System.currentTimeMillis() < deadline) { + while (System.nanoTime() < deadline) { attempt++; - NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(nettyThreadgroup, nettyThreadgroup); + NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(nettyThreadgroup, + nettyThreadgroup, additionalHeader); NetconfClient netconfClient; try { - netconfClient = new NetconfClient(this.toString(), address, delay, netconfClientDispatcher); + netconfClient = new NetconfClient(this.toString(), address, delayMillis, netconfClientDispatcher); } catch (IllegalStateException e) { logger.debug("Netconf {} was not initialized or is not stable, attempt {}", address, attempt, e); netconfClientDispatcher.close(); - Thread.sleep(delay); + Thread.sleep(delayMillis); continue; } latestCapabilities = netconfClient.getCapabilities(); if (Util.isSubset(netconfClient, expectedCaps)) { logger.debug("Hello from netconf stable with {} capabilities", latestCapabilities); - logger.info("Session id received from netconf server: {}", netconfClient.getClientSession()); + logger.trace("Session id received from netconf server: {}", netconfClient.getClientSession()); return netconfClient; } logger.debug("Polling hello from netconf, attempt {}, capabilities {}", attempt, latestCapabilities); Util.closeClientAndDispatcher(netconfClient); - Thread.sleep(delay); + Thread.sleep(delayMillis); } Set allNotFound = new HashSet<>(expectedCaps); allNotFound.removeAll(latestCapabilities); @@ -162,36 +169,38 @@ public class ConfigPusher { throws ConflictingVersionException, IOException, SAXException { Element xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot()); - logger.info("Pushing last configuration to netconf: {}", configSnapshotHolder); + logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder); StringBuilder response = new StringBuilder("editConfig response = {"); - NetconfMessage message = createEditConfigMessage(xmlToBePersisted, "/netconfOp/editConfig.xml"); // sending message to netconf - NetconfMessage responseMessage = netconfClient.sendMessage(message, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY); - - XmlElement element = XmlElement.fromDomDocument(responseMessage.getDocument()); - Preconditions.checkState(element.getName().equals(XmlNetconfConstants.RPC_REPLY_KEY)); - element = element.getOnlyChildElement(); + NetconfMessage responseMessage = getResponse(message, netconfClient); - Util.checkIsOk(element, responseMessage); + NetconfUtil.checkIsMessageOk(responseMessage); response.append(XmlUtil.toString(responseMessage.getDocument())); response.append("}"); - responseMessage = netconfClient.sendMessage(getNetconfMessageFromResource("/netconfOp/commit.xml"), NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY); + responseMessage = getResponse(getNetconfMessageFromResource("/netconfOp/commit.xml"), netconfClient); + - element = XmlElement.fromDomDocument(responseMessage.getDocument()); - Preconditions.checkState(element.getName().equals(XmlNetconfConstants.RPC_REPLY_KEY)); - element = element.getOnlyChildElement(); - Util.checkIsOk(element, responseMessage); + NetconfUtil.checkIsMessageOk(responseMessage); response.append("commit response = {"); response.append(XmlUtil.toString(responseMessage.getDocument())); response.append("}"); - logger.info("Last configuration loaded successfully"); + logger.trace("Last configuration loaded successfully"); logger.trace("Detailed message {}", response); } + private static NetconfMessage getResponse(NetconfMessage request, NetconfClient netconfClient) { + try { + return netconfClient.sendMessage(request, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY); + } catch(RuntimeException e) { + logger.error("Error while sending message {} to {}", request, netconfClient); + throw e; + } + } + private static NetconfMessage createEditConfigMessage(Element dataElement, String editConfigResourcename) { try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcename)) { Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcename);