From: Tomas Olvecky Date: Fri, 31 Jan 2014 13:51:08 +0000 (+0100) Subject: Refactor config-persister: clean up exception handling and netconf client. X-Git-Tag: autorelease-tag-v20140601202136_82eb3f9~539^2~1 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=c5b0b028392646507133df0af5efcee547763b6d Refactor config-persister: clean up exception handling and netconf client. Split ConfigPersisterNotificationHandler into registrator and listener, remove unnecessary reference to NetconfClient. Refactor ConfigPusher: each method now returns computational results instead of void. Avoid unnecessary NetconfClient reference leak - close it after every use. Rename generic 'timeout' to 'maxWaitForCapabilitiesMillis'. Avoid throwing and catching SAXException and IOException as there is no recovery from them; keep only ConflictingVersionException. ConfigPersisterActivator now actually uses the configurable timeout, the property is renamed from 'pushTimeout' to 'maxWaitForCapabilitiesMillis'. Change-Id: Idc9995b11dd2f19a90598c88eb7337d39d63fc27 Signed-off-by: Tomas Olvecky --- diff --git a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPersisterNotificationHandler.java b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPersisterNotificationHandler.java index 889fa8428c..e319d2cb67 100644 --- a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPersisterNotificationHandler.java +++ b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPersisterNotificationHandler.java @@ -12,7 +12,6 @@ import org.opendaylight.controller.config.persist.api.Persister; import org.opendaylight.controller.netconf.api.jmx.CommitJMXNotification; import org.opendaylight.controller.netconf.api.jmx.DefaultCommitOperationMXBean; import org.opendaylight.controller.netconf.api.jmx.NetconfJMXNotification; -import org.opendaylight.controller.netconf.client.NetconfClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,40 +26,60 @@ import java.io.IOException; import java.util.regex.Pattern; /** - * Responsible for listening for notifications from netconf containing latest + * Responsible for listening for notifications from netconf (via JMX) containing latest * committed configuration that should be persisted, and also for loading last * configuration. */ @ThreadSafe -public class ConfigPersisterNotificationHandler implements NotificationListener, Closeable { +public class ConfigPersisterNotificationHandler implements Closeable { private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class); private final MBeanServerConnection mBeanServerConnection; - private final NetconfClient netconfClient; - private final Persister persisterAggregator; - private final Pattern ignoredMissingCapabilityRegex; + private final ConfigPersisterNotificationListener listener; + - public ConfigPersisterNotificationHandler(MBeanServerConnection mBeanServerConnection, NetconfClient netconfClient, + public ConfigPersisterNotificationHandler(MBeanServerConnection mBeanServerConnection, Persister persisterAggregator, Pattern ignoredMissingCapabilityRegex) { this.mBeanServerConnection = mBeanServerConnection; - this.netconfClient = netconfClient; - this.persisterAggregator = persisterAggregator; - this.ignoredMissingCapabilityRegex = ignoredMissingCapabilityRegex; - } + listener = new ConfigPersisterNotificationListener(persisterAggregator, ignoredMissingCapabilityRegex); + registerAsJMXListener(mBeanServerConnection, listener); - public void init() { - registerAsJMXListener(); } - private void registerAsJMXListener() { + private static void registerAsJMXListener(MBeanServerConnection mBeanServerConnection, ConfigPersisterNotificationListener listener) { logger.trace("Called registerAsJMXListener"); try { - mBeanServerConnection.addNotificationListener(DefaultCommitOperationMXBean.objectName, this, null, null); + mBeanServerConnection.addNotificationListener(DefaultCommitOperationMXBean.objectName, listener, null, null); } catch (InstanceNotFoundException | IOException e) { throw new RuntimeException("Cannot register as JMX listener to netconf", e); } } + @Override + public synchronized void close() { + // unregister from JMX + ObjectName on = DefaultCommitOperationMXBean.objectName; + try { + if (mBeanServerConnection.isRegistered(on)) { + mBeanServerConnection.removeNotificationListener(on, listener); + } + } catch (Exception e) { + logger.warn("Unable to unregister {} as listener for {}", listener, on, e); + } + } +} + +class ConfigPersisterNotificationListener implements NotificationListener { + private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationListener.class); + + private final Persister persisterAggregator; + private final Pattern ignoredMissingCapabilityRegex; + + ConfigPersisterNotificationListener(Persister persisterAggregator, Pattern ignoredMissingCapabilityRegex) { + this.persisterAggregator = persisterAggregator; + this.ignoredMissingCapabilityRegex = ignoredMissingCapabilityRegex; + } + @Override public void handleNotification(Notification notification, Object handback) { if (notification instanceof NetconfJMXNotification == false) @@ -92,22 +111,4 @@ public class ConfigPersisterNotificationHandler implements NotificationListener, throw new RuntimeException("Unable to persist configuration snapshot", e); } } - - @Override - public synchronized void close() { - // unregister from JMX - ObjectName on = DefaultCommitOperationMXBean.objectName; - try { - if (mBeanServerConnection.isRegistered(on)) { - mBeanServerConnection.removeNotificationListener(on, this); - } - } catch (Exception e) { - logger.warn("Unable to unregister {} as listener for {}", this, on, e); - } - } - - public NetconfClient getNetconfClient() { - return netconfClient; - } - -} +} \ No newline at end of file diff --git a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java index b1eb2fc720..01d872d89c 100644 --- a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java +++ b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java @@ -33,6 +33,7 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -44,98 +45,98 @@ public class ConfigPusher { private static final int NETCONF_SEND_ATTEMPTS = 20; private final InetSocketAddress address; - private final EventLoopGroup nettyThreadgroup; + private final EventLoopGroup nettyThreadGroup; // Default timeout for netconf becoming stable - public static final long DEFAULT_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(2); - private static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 5000; + public static final long DEFAULT_MAX_WAIT_FOR_CAPABILITIES_MILLIS = TimeUnit.MINUTES.toMillis(2); + public static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 5000; private final int delayMillis = 5000; - private final long timeoutNanos; + private final long maxWaitForCapabilitiesMillis; private final long connectionTimeoutMillis; - public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadgroup) { - this(address, nettyThreadgroup, DEFAULT_TIMEOUT_NANOS, DEFAULT_CONNECTION_TIMEOUT_MILLIS); + public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadGroup) { + this(address, nettyThreadGroup, DEFAULT_MAX_WAIT_FOR_CAPABILITIES_MILLIS, DEFAULT_CONNECTION_TIMEOUT_MILLIS); } - public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadgroup, long timeoutNanos, long connectionTimeoutMillis) { + public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadGroup, + long maxWaitForCapabilitiesMillis, long connectionTimeoutMillis) { this.address = address; - this.nettyThreadgroup = nettyThreadgroup; - this.timeoutNanos = timeoutNanos; + this.nettyThreadGroup = nettyThreadGroup; + this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis; this.connectionTimeoutMillis = connectionTimeoutMillis; } - public synchronized NetconfClient init(List configs) throws InterruptedException { + public synchronized LinkedHashMap pushConfigs( + List configs) throws InterruptedException { logger.debug("Last config snapshots to be pushed to netconf: {}", configs); - return pushAllConfigs(configs); - } - private synchronized NetconfClient pushAllConfigs(List configs) throws InterruptedException { // first just make sure we can connect to netconf, even if nothing is being pushed - NetconfClient netconfClient = makeNetconfConnection(Collections.emptySet(), Optional.absent()); + { + NetconfClient netconfClient = makeNetconfConnection(Collections.emptySet()); + Util.closeClientAndDispatcher(netconfClient); + } + LinkedHashMap result = new LinkedHashMap<>(); // start pushing snapshots: - for (ConfigSnapshotHolder configSnapshotHolder: configs){ - netconfClient = pushSnapshotWithRetries(configSnapshotHolder, Optional.of(netconfClient)); - logger.debug("Config snapshot pushed successfully: {}", configSnapshotHolder); + for (ConfigSnapshotHolder configSnapshotHolder : configs) { + EditAndCommitResponseWithRetries editAndCommitResponseWithRetries = pushSnapshotWithRetries(configSnapshotHolder); + logger.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result); + result.put(configSnapshotHolder, editAndCommitResponseWithRetries); } - logger.debug("All configuration snapshots have been pushed successfully."); - return netconfClient; + return result; } - private synchronized NetconfClient pushSnapshotWithRetries(ConfigSnapshotHolder configSnapshotHolder, - Optional oldClientForPossibleReuse) + /** + * Checks for ConflictingVersionException and retries until optimistic lock succeeds or maximal + * number of attempts is reached. + */ + private synchronized EditAndCommitResponseWithRetries pushSnapshotWithRetries(ConfigSnapshotHolder configSnapshotHolder) throws InterruptedException { - Exception lastException = null; + ConflictingVersionException lastException = null; int maxAttempts = 30; - for(int i = 0 ; i < maxAttempts; i++) { - NetconfClient netconfClient = makeNetconfConnection(configSnapshotHolder.getCapabilities(), oldClientForPossibleReuse); + + for (int retryAttempt = 1; retryAttempt <= maxAttempts; retryAttempt++) { + NetconfClient netconfClient = makeNetconfConnection(configSnapshotHolder.getCapabilities()); logger.trace("Pushing following xml to netconf {}", configSnapshotHolder); try { - pushLastConfig(configSnapshotHolder, netconfClient); - return netconfClient; - } catch (ConflictingVersionException | IOException e) { - Util.closeClientAndDispatcher(netconfClient); + EditAndCommitResponse editAndCommitResponse = pushLastConfig(configSnapshotHolder, netconfClient); + return new EditAndCommitResponseWithRetries(editAndCommitResponse, retryAttempt); + } catch (ConflictingVersionException e) { lastException = e; Thread.sleep(1000); - } catch (SAXException e) { - throw new IllegalStateException("Unable to load last config", e); + } catch (RuntimeException e) { + throw new IllegalStateException("Unable to load " + configSnapshotHolder, e); + } finally { + Util.closeClientAndDispatcher(netconfClient); } } - throw new IllegalStateException("Failed to push configuration, maximum attempt count has been reached: " - + maxAttempts, lastException); + throw new IllegalStateException("Maximum attempt count has been reached for pushing " + configSnapshotHolder, + lastException); } /** * @param expectedCaps capabilities that server hello must contain. Will retry until all are found or throws RuntimeException. * If empty set is provided, will only make sure netconf client successfuly connected to the server. - * @param maybeOldClient if present, close it. * @return NetconfClient that has all required capabilities from server. */ - private synchronized NetconfClient makeNetconfConnection(Set expectedCaps, - Optional maybeOldClient) - throws InterruptedException { - - if (maybeOldClient.isPresent()) { - NetconfClient oldClient = maybeOldClient.get(); - Util.closeClientAndDispatcher(oldClient); - } + private synchronized NetconfClient makeNetconfConnection(Set expectedCaps) throws InterruptedException { // TODO think about moving capability subset check to netconf client // could be utilized by integration tests - final long pollingStart = System.nanoTime(); - final long deadline = pollingStart + timeoutNanos; + final long pollingStartNanos = System.nanoTime(); + final long deadlineNanos = pollingStartNanos + TimeUnit.MILLISECONDS.toNanos(maxWaitForCapabilitiesMillis); int attempt = 0; String additionalHeader = NetconfMessageAdditionalHeader.toString("unknown", address.getAddress().getHostAddress(), Integer.toString(address.getPort()), "tcp", Optional.of("persister")); Set latestCapabilities = null; - while (System.nanoTime() < deadline) { + while (System.nanoTime() < deadlineNanos) { attempt++; - NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(nettyThreadgroup, - nettyThreadgroup, additionalHeader, connectionTimeoutMillis); + NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(nettyThreadGroup, + nettyThreadGroup, additionalHeader, connectionTimeoutMillis); NetconfClient netconfClient; try { netconfClient = new NetconfClient(this.toString(), address, delayMillis, netconfClientDispatcher); @@ -156,7 +157,7 @@ public class ConfigPusher { Thread.sleep(delayMillis); } if (latestCapabilities == null) { - logger.error("Could not connect to the server in {} ms", timeoutNanos / 1000); + logger.error("Could not connect to the server in {} ms", maxWaitForCapabilitiesMillis); throw new RuntimeException("Could not connect to netconf server"); } Set allNotFound = new HashSet<>(expectedCaps); @@ -167,70 +168,152 @@ public class ConfigPusher { } - private synchronized void pushLastConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfClient netconfClient) - throws ConflictingVersionException, IOException, SAXException { + /** + * Sends two RPCs to the netconf server: edit-config and commit. + * + * @param configSnapshotHolder + * @param netconfClient + * @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager + * @throws java.lang.RuntimeException if edit-config or commit fails otherwise + */ + private synchronized EditAndCommitResponse pushLastConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfClient netconfClient) + throws ConflictingVersionException { - Element xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot()); + Element xmlToBePersisted; + try { + xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot()); + } catch (SAXException | IOException e) { + throw new IllegalStateException("Cannot parse " + configSnapshotHolder); + } logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder); - StringBuilder response = new StringBuilder("editConfig response = {"); - NetconfMessage message = createEditConfigMessage(xmlToBePersisted, "/netconfOp/editConfig.xml"); + NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted); // sending message to netconf - NetconfMessage responseMessage = getResponse(message, netconfClient); - - NetconfUtil.checkIsMessageOk(responseMessage); - response.append(XmlUtil.toString(responseMessage.getDocument())); - response.append("}"); - responseMessage = getResponse(getNetconfMessageFromResource("/netconfOp/commit.xml"), netconfClient); - + NetconfMessage editResponseMessage; + try { + editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, netconfClient); + } catch (IOException e) { + throw new IllegalStateException("Edit-config failed on " + configSnapshotHolder, e); + } + // commit + NetconfMessage commitResponseMessage; + try { + commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), netconfClient); + } catch (IOException e) { + throw new IllegalStateException("Edit commit succeeded, but commit failed on " + configSnapshotHolder, e); + } - NetconfUtil.checkIsMessageOk(responseMessage); - response.append("commit response = {"); - response.append(XmlUtil.toString(responseMessage.getDocument())); - response.append("}"); - logger.trace("Last configuration loaded successfully"); - logger.trace("Detailed message {}", response); + if (logger.isTraceEnabled()) { + StringBuilder response = new StringBuilder("editConfig response = {"); + response.append(XmlUtil.toString(editResponseMessage.getDocument())); + response.append("}"); + response.append("commit response = {"); + response.append(XmlUtil.toString(commitResponseMessage.getDocument())); + response.append("}"); + logger.trace("Last configuration loaded successfully"); + logger.trace("Detailed message {}", response); + } + return new EditAndCommitResponse(editResponseMessage, commitResponseMessage); } - private static NetconfMessage getResponse(NetconfMessage request, NetconfClient netconfClient) throws IOException { + + private static NetconfMessage sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfClient netconfClient) throws IOException { try { - return netconfClient.sendMessage(request, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY); - } catch (RuntimeException e) { + NetconfMessage netconfMessage = netconfClient.sendMessage(request, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY); + NetconfUtil.checkIsMessageOk(netconfMessage); + return netconfMessage; + } catch (RuntimeException e) { // TODO: change NetconfClient#sendMessage to throw checked exceptions logger.debug("Error while executing netconf transaction {} to {}", request, netconfClient, e); throw new IOException("Failed to execute netconf transaction", e); } } - private static NetconfMessage createEditConfigMessage(Element dataElement, String editConfigResourcename) throws IOException, SAXException { - try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcename)) { - Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcename); + + // load editConfig.xml template, populate /rpc/edit-config/config with parameter + private static NetconfMessage createEditConfigMessage(Element dataElement) { + String editConfigResourcePath = "/netconfOp/editConfig.xml"; + try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcePath)) { + Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcePath); Document doc = XmlUtil.readXmlToDocument(stream); - doc.getDocumentElement(); XmlElement editConfigElement = XmlElement.fromDomDocument(doc).getOnlyChildElement(); XmlElement configWrapper = editConfigElement.getOnlyChildElement(XmlNetconfConstants.CONFIG_KEY); editConfigElement.getDomElement().removeChild(configWrapper.getDomElement()); for (XmlElement el : XmlElement.fromDomElement(dataElement).getChildElements()) { - configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), true)); + boolean deep = true; + configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), deep)); } editConfigElement.appendChild(configWrapper.getDomElement()); return new NetconfMessage(doc); } catch (IOException | SAXException e) { - logger.debug("Failed to create edit-config message for resource {}", editConfigResourcename, e); - throw e; + // error reading the xml file bundled into the jar + throw new RuntimeException("Error while opening local resource " + editConfigResourcePath, e); } } - private static NetconfMessage getNetconfMessageFromResource(String resource) throws IOException, SAXException { + private static NetconfMessage getCommitMessage() { + String resource = "/netconfOp/commit.xml"; try (InputStream stream = ConfigPusher.class.getResourceAsStream(resource)) { Preconditions.checkNotNull(stream, "Unable to load resource " + resource); return new NetconfMessage(XmlUtil.readXmlToDocument(stream)); } catch (SAXException | IOException e) { - logger.debug("Failed to parse netconf message for resource {}", resource, e); - throw e; + // error reading the xml file bundled into the jar + throw new RuntimeException("Error while opening local resource " + resource, e); + } + } + + static class EditAndCommitResponse { + private final NetconfMessage editResponse, commitResponse; + + EditAndCommitResponse(NetconfMessage editResponse, NetconfMessage commitResponse) { + this.editResponse = editResponse; + this.commitResponse = commitResponse; + } + + public NetconfMessage getEditResponse() { + return editResponse; + } + + public NetconfMessage getCommitResponse() { + return commitResponse; + } + + @Override + public String toString() { + return "EditAndCommitResponse{" + + "editResponse=" + editResponse + + ", commitResponse=" + commitResponse + + '}'; + } + } + + + static class EditAndCommitResponseWithRetries { + private final EditAndCommitResponse editAndCommitResponse; + private final int retries; + + EditAndCommitResponseWithRetries(EditAndCommitResponse editAndCommitResponse, int retries) { + this.editAndCommitResponse = editAndCommitResponse; + this.retries = retries; + } + + public int getRetries() { + return retries; + } + + public EditAndCommitResponse getEditAndCommitResponse() { + return editAndCommitResponse; + } + + @Override + public String toString() { + return "EditAndCommitResponseWithRetries{" + + "editAndCommitResponse=" + editAndCommitResponse + + ", retries=" + retries + + '}'; } } -} +} \ No newline at end of file diff --git a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/osgi/ConfigPersisterActivator.java b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/osgi/ConfigPersisterActivator.java index a09c75b940..1157ddbd83 100644 --- a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/osgi/ConfigPersisterActivator.java +++ b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/osgi/ConfigPersisterActivator.java @@ -10,11 +10,9 @@ package org.opendaylight.controller.netconf.persist.impl.osgi; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import org.opendaylight.controller.netconf.client.NetconfClient; import org.opendaylight.controller.netconf.persist.impl.ConfigPersisterNotificationHandler; import org.opendaylight.controller.netconf.persist.impl.ConfigPusher; import org.opendaylight.controller.netconf.persist.impl.PersisterAggregator; -import org.opendaylight.controller.netconf.persist.impl.Util; import org.opendaylight.controller.netconf.util.osgi.NetconfConfigUtil; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; @@ -25,7 +23,6 @@ import javax.management.MBeanServer; import java.lang.management.ManagementFactory; import java.net.InetSocketAddress; import java.util.regex.Pattern; -import java.util.concurrent.TimeUnit; public class ConfigPersisterActivator implements BundleActivator { @@ -34,7 +31,7 @@ public class ConfigPersisterActivator implements BundleActivator { private final static MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer(); private static final String IGNORED_MISSING_CAPABILITY_REGEX_SUFFIX = "ignoredMissingCapabilityRegex"; - private static final String PUSH_TIMEOUT = "pushTimeout"; + private static final String MAX_WAIT_FOR_CAPABILITIES_MILLIS = "maxWaitForCapabilitiesMillis"; public static final String NETCONF_CONFIG_PERSISTER = "netconf.config.persister"; @@ -44,9 +41,8 @@ public class ConfigPersisterActivator implements BundleActivator { private volatile ConfigPersisterNotificationHandler jmxNotificationHandler; - private volatile NetconfClient netconfClient; private Thread initializationThread; - private EventLoopGroup nettyThreadgroup; + private EventLoopGroup nettyThreadGroup; private PersisterAggregator persisterAggregator; @Override @@ -63,15 +59,22 @@ public class ConfigPersisterActivator implements BundleActivator { regex = DEFAULT_IGNORED_REGEX; } - String timeoutProperty = propertiesProvider.getProperty(PUSH_TIMEOUT); - long timeout = timeoutProperty == null ? ConfigPusher.DEFAULT_TIMEOUT_NANOS : TimeUnit.SECONDS.toNanos(Integer.valueOf(timeoutProperty)); + String timeoutProperty = propertiesProvider.getProperty(MAX_WAIT_FOR_CAPABILITIES_MILLIS); + long maxWaitForCapabilitiesMillis; + if (timeoutProperty == null) { + maxWaitForCapabilitiesMillis = ConfigPusher.DEFAULT_MAX_WAIT_FOR_CAPABILITIES_MILLIS; + } else { + maxWaitForCapabilitiesMillis = Integer.valueOf(timeoutProperty); + } final Pattern ignoredMissingCapabilityRegex = Pattern.compile(regex); - nettyThreadgroup = new NioEventLoopGroup(); + nettyThreadGroup = new NioEventLoopGroup(); persisterAggregator = PersisterAggregator.createFromProperties(propertiesProvider); - final InetSocketAddress address = NetconfConfigUtil.extractTCPNetconfAddress(context, "Netconf is not configured, persister is not operational", true); - final ConfigPusher configPusher = new ConfigPusher(address, nettyThreadgroup); + final InetSocketAddress address = NetconfConfigUtil.extractTCPNetconfAddress(context, + "Netconf is not configured, persister is not operational", true); + final ConfigPusher configPusher = new ConfigPusher(address, nettyThreadGroup, maxWaitForCapabilitiesMillis, + ConfigPusher.DEFAULT_CONNECTION_TIMEOUT_MILLIS); // offload initialization to another thread in order to stop blocking activator @@ -79,11 +82,9 @@ public class ConfigPersisterActivator implements BundleActivator { @Override public void run() { try { - netconfClient = configPusher.init(persisterAggregator.loadLastConfigs()); - jmxNotificationHandler = new ConfigPersisterNotificationHandler( - platformMBeanServer, netconfClient, persisterAggregator, + configPusher.pushConfigs(persisterAggregator.loadLastConfigs()); + jmxNotificationHandler = new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator, ignoredMissingCapabilityRegex); - jmxNotificationHandler.init(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("Interrupted while waiting for netconf connection"); @@ -103,15 +104,7 @@ public class ConfigPersisterActivator implements BundleActivator { if (jmxNotificationHandler != null) { jmxNotificationHandler.close(); } - if (netconfClient != null) { - netconfClient = jmxNotificationHandler.getNetconfClient(); - try { - Util.closeClientAndDispatcher(netconfClient); - } catch (Exception e) { - logger.warn("Unable to close connection to netconf {}", netconfClient, e); - } - } - nettyThreadgroup.shutdownGracefully(); + nettyThreadGroup.shutdownGracefully(); persisterAggregator.close(); } } diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClient.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClient.java index 67d95bb90f..4fcad2cbd6 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClient.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClient.java @@ -8,16 +8,11 @@ package org.opendaylight.controller.netconf.client; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Set; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; - import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.protocol.framework.NeverReconnectStrategy; import org.opendaylight.protocol.framework.ReconnectStrategy; @@ -25,8 +20,13 @@ import org.opendaylight.protocol.framework.TimedReconnectStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; public class NetconfClient implements Closeable { @@ -103,7 +103,7 @@ public class NetconfClient implements Closeable { } public NetconfMessage sendMessage(NetconfMessage message, int attempts, int attemptMsDelay) { - long startTime = System.currentTimeMillis(); + Stopwatch stopwatch = new Stopwatch().start(); Preconditions.checkState(clientSession.isUp(), "Session was not up yet"); //logger.debug("Sending message: {}",XmlUtil.toString(message.getDocument())); clientSession.sendMessage(message); @@ -115,8 +115,8 @@ public class NetconfClient implements Closeable { } catch (IllegalStateException e) { throw new IllegalStateException(this + " Cannot read message from " + address, e); } finally { - long diffMillis = System.currentTimeMillis() - startTime; - logger.debug("Total time spent waiting for response {}", diffMillis); + stopwatch.stop(); + logger.debug("Total time spent waiting for response {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS)); } } diff --git a/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfConfigPersisterITTest.java b/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfConfigPersisterITTest.java index 8bfa0a5846..0037981a3e 100644 --- a/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfConfigPersisterITTest.java +++ b/opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfConfigPersisterITTest.java @@ -128,22 +128,22 @@ public class NetconfConfigPersisterITTest extends AbstractNetconfConfigTest { VerifyingPersister mockedAggregator = mockAggregator(); try (NetconfClient persisterClient = new NetconfClient("persister", tcpAddress, 4000, clientDispatcher)) { - ConfigPersisterNotificationHandler configPersisterNotificationHandler = new ConfigPersisterNotificationHandler( - platformMBeanServer, persisterClient, mockedAggregator, - Pattern.compile("")); - configPersisterNotificationHandler.init(); - - try (NetconfClient netconfClient = new NetconfClient("client", tcpAddress, 4000, clientDispatcher)) { - NetconfMessage response = netconfClient.sendMessage(loadGetConfigMessage()); - assertResponse(response, "