import com.google.common.collect.Sets;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+import javax.annotation.concurrent.ThreadSafe;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServerConnection;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.management.ObjectName;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import org.opendaylight.controller.config.api.ConflictingVersionException;
+import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.config.persist.api.Persister;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.jmx.CommitJMXNotification;
import org.opendaylight.controller.netconf.api.jmx.NetconfJMXNotification;
import org.opendaylight.controller.netconf.client.NetconfClient;
import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.controller.netconf.util.xml.XMLNetconfUtil;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.w3c.dom.Element;
import org.xml.sax.SAXException;
-import javax.annotation.concurrent.ThreadSafe;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServerConnection;
-import javax.management.Notification;
-import javax.management.NotificationListener;
-import javax.management.ObjectName;
-import javax.net.ssl.SSLContext;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.Set;
-
/**
* Responsible for listening for notifications from netconf containing latest
* committed configuration that should be persisted, and also for loading last
public class ConfigPersisterNotificationHandler implements NotificationListener, Closeable {
private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class);
+ private static final int NETCONF_SEND_ATTEMPT_MS_DELAY = 1000;
+ private static final int NETCONF_SEND_ATTEMPTS = 20;
private final InetSocketAddress address;
- private final NetconfClientDispatcher dispatcher;
private final EventLoopGroup nettyThreadgroup;
+ private NetconfClientDispatcher netconfClientDispatcher;
private NetconfClient netconfClient;
private final Persister persister;
private final MBeanServerConnection mbeanServer;
- private Long currentSessionId;
+
private final ObjectName on = DefaultCommitOperationMXBean.objectName;
- public static final long DEFAULT_TIMEOUT = 40000L;
+ public static final long DEFAULT_TIMEOUT = 120000L;// 120 seconds until netconf must be stable
private final long timeout;
+ private final Pattern ignoredMissingCapabilityRegex;
public ConfigPersisterNotificationHandler(Persister persister, InetSocketAddress address,
- MBeanServerConnection mbeanServer) {
- this(persister, address, mbeanServer, DEFAULT_TIMEOUT);
+ MBeanServerConnection mbeanServer, Pattern ignoredMissingCapabilityRegex) {
+ this(persister, address, mbeanServer, DEFAULT_TIMEOUT, ignoredMissingCapabilityRegex);
+
}
public ConfigPersisterNotificationHandler(Persister persister, InetSocketAddress address,
- MBeanServerConnection mbeanServer, long timeout) {
+ MBeanServerConnection mbeanServer, long timeout, Pattern ignoredMissingCapabilityRegex) {
this.persister = persister;
this.address = address;
this.mbeanServer = mbeanServer;
this.timeout = timeout;
this.nettyThreadgroup = new NioEventLoopGroup();
- this.dispatcher = new NetconfClientDispatcher(Optional.<SSLContext>absent(), nettyThreadgroup, nettyThreadgroup);
+ this.ignoredMissingCapabilityRegex = ignoredMissingCapabilityRegex;
}
public void init() throws InterruptedException {
- Optional<Persister.ConfigSnapshotHolder> maybeConfig = loadLastConfig();
+ Optional<ConfigSnapshotHolder> maybeConfig = loadLastConfig();
if (maybeConfig.isPresent()) {
logger.debug("Last config found {}", persister);
-
- registerToNetconf(maybeConfig.get().getCapabilities());
-
- final String configSnapshot = maybeConfig.get().getConfigSnapshot();
- try {
- pushLastConfig(XmlUtil.readXmlToElement(configSnapshot));
- } catch (SAXException | IOException e) {
- throw new IllegalStateException("Unable to load last config", e);
- }
+ ConflictingVersionException lastException = null;
+ pushLastConfigWithRetries(maybeConfig, lastException);
} else {
// this ensures that netconf is initialized, this is first
registerAsJMXListener();
}
+ private void pushLastConfigWithRetries(Optional<ConfigSnapshotHolder> maybeConfig, ConflictingVersionException lastException) throws InterruptedException {
+ int maxAttempts = 30;
+ for(int i = 0 ; i < maxAttempts; i++) {
+ registerToNetconf(maybeConfig.get().getCapabilities());
+
+ final String configSnapshot = maybeConfig.get().getConfigSnapshot();
+ logger.trace("Pushing following xml to netconf {}", configSnapshot);
+ try {
+ pushLastConfig(XmlUtil.readXmlToElement(configSnapshot));
+ return;
+ } catch(ConflictingVersionException e) {
+ closeClientAndDispatcher(netconfClient, netconfClientDispatcher);
+ lastException = e;
+ Thread.sleep(1000);
+ } catch (SAXException | IOException e) {
+ throw new IllegalStateException("Unable to load last config", e);
+ }
+ }
+ throw new IllegalStateException("Failed to push configuration, maximum attempt count has been reached: "
+ + maxAttempts, lastException);
+ }
+
private synchronized long registerToNetconf(Set<String> expectedCaps) throws InterruptedException {
Set<String> currentCapabilities = Sets.newHashSet();
int attempt = 0;
- while (true) {
+ long deadline = pollingStart + timeout;
+ while (System.currentTimeMillis() < deadline) {
attempt++;
-
+ netconfClientDispatcher = new NetconfClientDispatcher(nettyThreadgroup, nettyThreadgroup);
try {
- netconfClient = new NetconfClient(this.toString(), address, delay, dispatcher);
- // TODO is this correct ex to catch ?
+ netconfClient = new NetconfClient(this.toString(), address, delay, netconfClientDispatcher);
} catch (IllegalStateException e) {
logger.debug("Netconf {} was not initialized or is not stable, attempt {}", address, attempt, e);
+ netconfClientDispatcher.close();
Thread.sleep(delay);
continue;
}
if (isSubset(currentCapabilities, expectedCaps)) {
logger.debug("Hello from netconf stable with {} capabilities", currentCapabilities);
- currentSessionId = netconfClient.getSessionId();
+ long currentSessionId = netconfClient.getSessionId();
logger.info("Session id received from netconf server: {}", currentSessionId);
return currentSessionId;
}
- if (System.currentTimeMillis() > pollingStart + timeout) {
- break;
- }
+
logger.debug("Polling hello from netconf, attempt {}, capabilities {}", attempt, currentCapabilities);
- try {
- netconfClient.close();
- } catch (IOException e) {
- throw new RuntimeException("Error closing temporary client " + netconfClient);
- }
+ closeClientAndDispatcher(netconfClient, netconfClientDispatcher);
Thread.sleep(delay);
}
+ Set<String> allNotFound = new HashSet<>(expectedCaps);
+ allNotFound.removeAll(currentCapabilities);
+ logger.error("Netconf server did not provide required capabilities. Expected but not found: {}, all expected {}, current {}",
+ allNotFound, expectedCaps ,currentCapabilities);
+ throw new RuntimeException("Netconf server did not provide required capabilities. Expected but not found:" + allNotFound);
- throw new RuntimeException("Netconf server did not provide required capabilities " + expectedCaps
- + " in time, provided capabilities " + currentCapabilities);
+ }
+
+ private static void closeClientAndDispatcher(Closeable client, Closeable dispatcher) {
+ Exception fromClient = null;
+ try {
+ client.close();
+ } catch (Exception e) {
+ fromClient = e;
+ } finally {
+ try {
+ dispatcher.close();
+ } catch (Exception e) {
+ if (fromClient != null) {
+ e.addSuppressed(fromClient);
+ }
+ throw new RuntimeException("Error closing temporary client ", e);
+ }
+ }
}
private boolean isSubset(Set<String> currentCapabilities, Set<String> expectedCaps) {
}
private void registerAsJMXListener() {
+ logger.trace("Called registerAsJMXListener");
try {
mbeanServer.addNotificationListener(on, this, null, null);
} catch (InstanceNotFoundException | IOException e) {
// Socket should not be closed at this point
// Activator unregisters this as JMX listener before close is called
- logger.debug("Received notification {}", notification);
+ logger.info("Received notification {}", notification);
if (notification instanceof CommitJMXNotification) {
try {
handleAfterCommitNotification((CommitJMXNotification) notification);
private void handleAfterCommitNotification(final CommitJMXNotification notification) {
try {
- final XmlElement configElement = XmlElement.fromDomElement(notification.getConfigSnapshot());
- persister.persistConfig(new Persister.ConfigSnapshotHolder() {
- @Override
- public String getConfigSnapshot() {
- return XmlUtil.toString(configElement.getDomElement());
- }
-
- @Override
- public Set<String> getCapabilities() {
- return notification.getCapabilities();
- }
- });
- logger.debug("Configuration persisted successfully");
+ persister.persistConfig(new CapabilityStrippingConfigSnapshotHolder(notification.getConfigSnapshot(),
+ notification.getCapabilities(), ignoredMissingCapabilityRegex));
+ logger.info("Configuration persisted successfully");
} catch (IOException e) {
throw new RuntimeException("Unable to persist configuration snapshot", e);
}
}
- private Optional<Persister.ConfigSnapshotHolder> loadLastConfig() {
- Optional<Persister.ConfigSnapshotHolder> maybeConfigElement;
+ private Optional<ConfigSnapshotHolder> loadLastConfig() {
+ Optional<ConfigSnapshotHolder> maybeConfigElement;
try {
maybeConfigElement = persister.loadLastConfig();
} catch (IOException e) {
return maybeConfigElement;
}
- private synchronized void pushLastConfig(Element persistedConfig) {
+ private synchronized void pushLastConfig(Element xmlToBePersisted) throws ConflictingVersionException {
+ logger.info("Pushing last configuration to netconf");
StringBuilder response = new StringBuilder("editConfig response = {");
- Element configElement = persistedConfig;
- NetconfMessage message = createEditConfigMessage(configElement, "/netconfOp/editConfig.xml");
- NetconfMessage responseMessage = netconfClient.sendMessage(message);
+
+ NetconfMessage message = createEditConfigMessage(xmlToBePersisted, "/netconfOp/editConfig.xml");
+
+ // sending message to netconf
+ NetconfMessage responseMessage = netconfClient.sendMessage(message, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY);
XmlElement element = XmlElement.fromDomDocument(responseMessage.getDocument());
Preconditions.checkState(element.getName().equals(XmlNetconfConstants.RPC_REPLY_KEY));
checkIsOk(element, responseMessage);
response.append(XmlUtil.toString(responseMessage.getDocument()));
response.append("}");
- responseMessage = netconfClient.sendMessage(getNetconfMessageFromResource("/netconfOp/commit.xml"));
+ responseMessage = netconfClient.sendMessage(getNetconfMessageFromResource("/netconfOp/commit.xml"), NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY);
element = XmlElement.fromDomDocument(responseMessage.getDocument());
Preconditions.checkState(element.getName().equals(XmlNetconfConstants.RPC_REPLY_KEY));
response.append("commit response = {");
response.append(XmlUtil.toString(responseMessage.getDocument()));
response.append("}");
- logger.debug("Last configuration loaded successfully");
+ logger.info("Last configuration loaded successfully");
+ logger.trace("Detailed message {}", response);
}
- private void checkIsOk(XmlElement element, NetconfMessage responseMessage) {
+ static void checkIsOk(XmlElement element, NetconfMessage responseMessage) throws ConflictingVersionException {
if (element.getName().equals(XmlNetconfConstants.OK)) {
return;
- } else {
- if (element.getName().equals(XmlNetconfConstants.RPC_ERROR)) {
- logger.warn("Can not load last configuration, operation failed");
- throw new IllegalStateException("Can not load last configuration, operation failed: "
- + XmlUtil.toString(responseMessage.getDocument()));
+ }
+
+ if (element.getName().equals(XmlNetconfConstants.RPC_ERROR)) {
+ logger.warn("Can not load last configuration, operation failed");
+ // is it ConflictingVersionException ?
+ XPathExpression xPathExpression = XMLNetconfUtil.compileXPath("/netconf:rpc-reply/netconf:rpc-error/netconf:error-info/netconf:error");
+ String error = (String) XmlUtil.evaluateXPath(xPathExpression, element.getDomElement(), XPathConstants.STRING);
+ if (error!=null && error.contains(ConflictingVersionException.class.getCanonicalName())) {
+ throw new ConflictingVersionException(error);
}
- logger.warn("Can not load last configuration. Operation failed.");
- throw new IllegalStateException("Can not load last configuration. Operation failed: "
+ throw new IllegalStateException("Can not load last configuration, operation failed: "
+ XmlUtil.toString(responseMessage.getDocument()));
}
+
+ logger.warn("Can not load last configuration. Operation failed.");
+ throw new IllegalStateException("Can not load last configuration. Operation failed: "
+ + XmlUtil.toString(responseMessage.getDocument()));
}
- private NetconfMessage createEditConfigMessage(Element dataElement, String editConfigResourcename) {
- try (InputStream stream = getClass().getResourceAsStream(editConfigResourcename)) {
+ private static NetconfMessage createEditConfigMessage(Element dataElement, String editConfigResourcename) {
+ try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcename)) {
Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcename);
Document doc = XmlUtil.readXmlToDocument(stream);
}
}
+ if (netconfClientDispatcher != null) {
+ try {
+ netconfClientDispatcher.close();
+ } catch (Exception e) {
+ logger.warn("Unable to close connection to netconf {}", netconfClientDispatcher, e);
+ }
+ }
+
try {
nettyThreadgroup.shutdownGracefully();
} catch (Exception e) {
- logger.warn("Unable to close netconf client thread group {}", dispatcher, e);
+ logger.warn("Unable to close netconf client thread group {}", netconfClientDispatcher, e);
}
// unregister from JMX