X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fnetconf%2Fconfig-persister-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fpersist%2Fimpl%2FConfigPersisterNotificationHandler.java;h=99b7ee60a2c51c8033c44e1195f78a93328b63c9;hb=cdeca662c72dd78f39d7503c3c06ccfde9e4c51b;hp=d390161affd49ac510ac66fe7cca699b2418f2eb;hpb=d105455084f43d9423b7c0e6af785302e6a3ea93;p=controller.git diff --git a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPersisterNotificationHandler.java b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPersisterNotificationHandler.java index d390161aff..99b7ee60a2 100644 --- a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPersisterNotificationHandler.java +++ b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPersisterNotificationHandler.java @@ -11,6 +11,26 @@ package org.opendaylight.controller.netconf.persist.impl; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Pattern; +import javax.annotation.concurrent.ThreadSafe; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanServerConnection; +import javax.management.Notification; +import javax.management.NotificationListener; +import javax.management.ObjectName; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpression; +import org.opendaylight.controller.config.api.ConflictingVersionException; +import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder; import org.opendaylight.controller.config.persist.api.Persister; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.api.jmx.CommitJMXNotification; @@ -18,6 +38,7 @@ import org.opendaylight.controller.netconf.api.jmx.DefaultCommitOperationMXBean; import org.opendaylight.controller.netconf.api.jmx.NetconfJMXNotification; import org.opendaylight.controller.netconf.client.NetconfClient; import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; +import org.opendaylight.controller.netconf.util.xml.XMLNetconfUtil; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.util.xml.XmlUtil; @@ -27,20 +48,6 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; import org.xml.sax.SAXException; -import javax.annotation.concurrent.ThreadSafe; -import javax.management.InstanceNotFoundException; -import javax.management.MBeanServerConnection; -import javax.management.Notification; -import javax.management.NotificationListener; -import javax.management.ObjectName; -import javax.net.ssl.SSLContext; -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.Set; - /** * Responsible for listening for notifications from netconf containing latest * committed configuration that should be persisted, and also for loading last @@ -50,49 +57,49 @@ import java.util.Set; public class ConfigPersisterNotificationHandler implements NotificationListener, Closeable { private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class); + private static final int NETCONF_SEND_ATTEMPT_MS_DELAY = 1000; + private static final int NETCONF_SEND_ATTEMPTS = 20; private final InetSocketAddress address; - private final NetconfClientDispatcher dispatcher; + private final EventLoopGroup nettyThreadgroup; + private NetconfClientDispatcher netconfClientDispatcher; private NetconfClient netconfClient; private final Persister persister; private final MBeanServerConnection mbeanServer; - private Long currentSessionId; + private final ObjectName on = DefaultCommitOperationMXBean.objectName; - public static final long DEFAULT_TIMEOUT = 40000L; + public static final long DEFAULT_TIMEOUT = 120000L;// 120 seconds until netconf must be stable private final long timeout; + private final Pattern ignoredMissingCapabilityRegex; public ConfigPersisterNotificationHandler(Persister persister, InetSocketAddress address, - MBeanServerConnection mbeanServer) { - this(persister, address, mbeanServer, DEFAULT_TIMEOUT); + MBeanServerConnection mbeanServer, Pattern ignoredMissingCapabilityRegex) { + this(persister, address, mbeanServer, DEFAULT_TIMEOUT, ignoredMissingCapabilityRegex); + } public ConfigPersisterNotificationHandler(Persister persister, InetSocketAddress address, - MBeanServerConnection mbeanServer, long timeout) { + MBeanServerConnection mbeanServer, long timeout, Pattern ignoredMissingCapabilityRegex) { this.persister = persister; this.address = address; this.mbeanServer = mbeanServer; this.timeout = timeout; - this.dispatcher = new NetconfClientDispatcher(Optional.absent()); + + this.nettyThreadgroup = new NioEventLoopGroup(); + this.ignoredMissingCapabilityRegex = ignoredMissingCapabilityRegex; } public void init() throws InterruptedException { - Optional maybeConfig = loadLastConfig(); + Optional maybeConfig = loadLastConfig(); if (maybeConfig.isPresent()) { logger.debug("Last config found {}", persister); - - registerToNetconf(maybeConfig.get().getCapabilities()); - - final String configSnapshot = maybeConfig.get().getConfigSnapshot(); - try { - pushLastConfig(XmlUtil.readXmlToElement(configSnapshot)); - } catch (SAXException | IOException e) { - throw new IllegalStateException("Unable to load last config", e); - } + ConflictingVersionException lastException = null; + pushLastConfigWithRetries(maybeConfig, lastException); } else { // this ensures that netconf is initialized, this is first @@ -105,6 +112,28 @@ public class ConfigPersisterNotificationHandler implements NotificationListener, registerAsJMXListener(); } + private void pushLastConfigWithRetries(Optional maybeConfig, ConflictingVersionException lastException) throws InterruptedException { + int maxAttempts = 30; + for(int i = 0 ; i < maxAttempts; i++) { + registerToNetconf(maybeConfig.get().getCapabilities()); + + final String configSnapshot = maybeConfig.get().getConfigSnapshot(); + logger.trace("Pushing following xml to netconf {}", configSnapshot); + try { + pushLastConfig(XmlUtil.readXmlToElement(configSnapshot)); + return; + } catch(ConflictingVersionException e) { + closeClientAndDispatcher(netconfClient, netconfClientDispatcher); + lastException = e; + Thread.sleep(1000); + } catch (SAXException | IOException e) { + throw new IllegalStateException("Unable to load last config", e); + } + } + throw new IllegalStateException("Failed to push configuration, maximum attempt count has been reached: " + + maxAttempts, lastException); + } + private synchronized long registerToNetconf(Set expectedCaps) throws InterruptedException { Set currentCapabilities = Sets.newHashSet(); @@ -117,14 +146,15 @@ public class ConfigPersisterNotificationHandler implements NotificationListener, int attempt = 0; - while (true) { + long deadline = pollingStart + timeout; + while (System.currentTimeMillis() < deadline) { attempt++; - + netconfClientDispatcher = new NetconfClientDispatcher(nettyThreadgroup, nettyThreadgroup); try { - netconfClient = new NetconfClient(this.toString(), address, delay, dispatcher); - // TODO is this correct ex to catch ? + netconfClient = new NetconfClient(this.toString(), address, delay, netconfClientDispatcher); } catch (IllegalStateException e) { logger.debug("Netconf {} was not initialized or is not stable, attempt {}", address, attempt, e); + netconfClientDispatcher.close(); Thread.sleep(delay); continue; } @@ -132,29 +162,44 @@ public class ConfigPersisterNotificationHandler implements NotificationListener, if (isSubset(currentCapabilities, expectedCaps)) { logger.debug("Hello from netconf stable with {} capabilities", currentCapabilities); - currentSessionId = netconfClient.getSessionId(); + long currentSessionId = netconfClient.getSessionId(); logger.info("Session id received from netconf server: {}", currentSessionId); return currentSessionId; } - if (System.currentTimeMillis() > pollingStart + timeout) { - break; - } + logger.debug("Polling hello from netconf, attempt {}, capabilities {}", attempt, currentCapabilities); - try { - netconfClient.close(); - } catch (IOException e) { - throw new RuntimeException("Error closing temporary client " + netconfClient); - } + closeClientAndDispatcher(netconfClient, netconfClientDispatcher); Thread.sleep(delay); } + Set allNotFound = new HashSet<>(expectedCaps); + allNotFound.removeAll(currentCapabilities); + logger.error("Netconf server did not provide required capabilities. Expected but not found: {}, all expected {}, current {}", + allNotFound, expectedCaps ,currentCapabilities); + throw new RuntimeException("Netconf server did not provide required capabilities. Expected but not found:" + allNotFound); - throw new RuntimeException("Netconf server did not provide required capabilities " + expectedCaps - + " in time, provided capabilities " + currentCapabilities); + } + + private static void closeClientAndDispatcher(Closeable client, Closeable dispatcher) { + Exception fromClient = null; + try { + client.close(); + } catch (Exception e) { + fromClient = e; + } finally { + try { + dispatcher.close(); + } catch (Exception e) { + if (fromClient != null) { + e.addSuppressed(fromClient); + } + throw new RuntimeException("Error closing temporary client ", e); + } + } } private boolean isSubset(Set currentCapabilities, Set expectedCaps) { @@ -166,6 +211,7 @@ public class ConfigPersisterNotificationHandler implements NotificationListener, } private void registerAsJMXListener() { + logger.trace("Called registerAsJMXListener"); try { mbeanServer.addNotificationListener(on, this, null, null); } catch (InstanceNotFoundException | IOException e) { @@ -181,7 +227,7 @@ public class ConfigPersisterNotificationHandler implements NotificationListener, // Socket should not be closed at this point // Activator unregisters this as JMX listener before close is called - logger.debug("Received notification {}", notification); + logger.info("Received notification {}", notification); if (notification instanceof CommitJMXNotification) { try { handleAfterCommitNotification((CommitJMXNotification) notification); @@ -196,26 +242,16 @@ public class ConfigPersisterNotificationHandler implements NotificationListener, private void handleAfterCommitNotification(final CommitJMXNotification notification) { try { - final XmlElement configElement = XmlElement.fromDomElement(notification.getConfigSnapshot()); - persister.persistConfig(new Persister.ConfigSnapshotHolder() { - @Override - public String getConfigSnapshot() { - return XmlUtil.toString(configElement.getDomElement()); - } - - @Override - public Set getCapabilities() { - return notification.getCapabilities(); - } - }); - logger.debug("Configuration persisted successfully"); + persister.persistConfig(new CapabilityStrippingConfigSnapshotHolder(notification.getConfigSnapshot(), + notification.getCapabilities(), ignoredMissingCapabilityRegex)); + logger.info("Configuration persisted successfully"); } catch (IOException e) { throw new RuntimeException("Unable to persist configuration snapshot", e); } } - private Optional loadLastConfig() { - Optional maybeConfigElement; + private Optional loadLastConfig() { + Optional maybeConfigElement; try { maybeConfigElement = persister.loadLastConfig(); } catch (IOException e) { @@ -224,12 +260,15 @@ public class ConfigPersisterNotificationHandler implements NotificationListener, return maybeConfigElement; } - private synchronized void pushLastConfig(Element persistedConfig) { + private synchronized void pushLastConfig(Element xmlToBePersisted) throws ConflictingVersionException { + logger.info("Pushing last configuration to netconf"); StringBuilder response = new StringBuilder("editConfig response = {"); - Element configElement = persistedConfig; - NetconfMessage message = createEditConfigMessage(configElement, "/netconfOp/editConfig.xml"); - NetconfMessage responseMessage = netconfClient.sendMessage(message); + + NetconfMessage message = createEditConfigMessage(xmlToBePersisted, "/netconfOp/editConfig.xml"); + + // sending message to netconf + NetconfMessage responseMessage = netconfClient.sendMessage(message, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY); XmlElement element = XmlElement.fromDomDocument(responseMessage.getDocument()); Preconditions.checkState(element.getName().equals(XmlNetconfConstants.RPC_REPLY_KEY)); @@ -238,7 +277,7 @@ public class ConfigPersisterNotificationHandler implements NotificationListener, checkIsOk(element, responseMessage); response.append(XmlUtil.toString(responseMessage.getDocument())); response.append("}"); - responseMessage = netconfClient.sendMessage(getNetconfMessageFromResource("/netconfOp/commit.xml")); + responseMessage = netconfClient.sendMessage(getNetconfMessageFromResource("/netconfOp/commit.xml"), NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY); element = XmlElement.fromDomDocument(responseMessage.getDocument()); Preconditions.checkState(element.getName().equals(XmlNetconfConstants.RPC_REPLY_KEY)); @@ -248,26 +287,34 @@ public class ConfigPersisterNotificationHandler implements NotificationListener, response.append("commit response = {"); response.append(XmlUtil.toString(responseMessage.getDocument())); response.append("}"); - logger.debug("Last configuration loaded successfully"); + logger.info("Last configuration loaded successfully"); + logger.trace("Detailed message {}", response); } - private void checkIsOk(XmlElement element, NetconfMessage responseMessage) { + static void checkIsOk(XmlElement element, NetconfMessage responseMessage) throws ConflictingVersionException { if (element.getName().equals(XmlNetconfConstants.OK)) { return; - } else { - if (element.getName().equals(XmlNetconfConstants.RPC_ERROR)) { - logger.warn("Can not load last configuration, operation failed"); - throw new IllegalStateException("Can not load last configuration, operation failed: " - + XmlUtil.toString(responseMessage.getDocument())); + } + + if (element.getName().equals(XmlNetconfConstants.RPC_ERROR)) { + logger.warn("Can not load last configuration, operation failed"); + // is it ConflictingVersionException ? + XPathExpression xPathExpression = XMLNetconfUtil.compileXPath("/netconf:rpc-reply/netconf:rpc-error/netconf:error-info/netconf:error"); + String error = (String) XmlUtil.evaluateXPath(xPathExpression, element.getDomElement(), XPathConstants.STRING); + if (error!=null && error.contains(ConflictingVersionException.class.getCanonicalName())) { + throw new ConflictingVersionException(error); } - logger.warn("Can not load last configuration. Operation failed."); - throw new IllegalStateException("Can not load last configuration. Operation failed: " + throw new IllegalStateException("Can not load last configuration, operation failed: " + XmlUtil.toString(responseMessage.getDocument())); } + + logger.warn("Can not load last configuration. Operation failed."); + throw new IllegalStateException("Can not load last configuration. Operation failed: " + + XmlUtil.toString(responseMessage.getDocument())); } - private NetconfMessage createEditConfigMessage(Element dataElement, String editConfigResourcename) { - try (InputStream stream = getClass().getResourceAsStream(editConfigResourcename)) { + private static NetconfMessage createEditConfigMessage(Element dataElement, String editConfigResourcename) { + try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcename)) { Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcename); Document doc = XmlUtil.readXmlToDocument(stream); @@ -313,10 +360,18 @@ public class ConfigPersisterNotificationHandler implements NotificationListener, } } + if (netconfClientDispatcher != null) { + try { + netconfClientDispatcher.close(); + } catch (Exception e) { + logger.warn("Unable to close connection to netconf {}", netconfClientDispatcher, e); + } + } + try { - dispatcher.close(); + nettyThreadgroup.shutdownGracefully(); } catch (Exception e) { - logger.warn("Unable to close netconf client dispatcher {}", dispatcher, e); + logger.warn("Unable to close netconf client thread group {}", netconfClientDispatcher, e); } // unregister from JMX