Refactor persister: require only capabilities referenced by the xml snapshot.
[controller.git] / opendaylight / netconf / config-persister-impl / src / main / java / org / opendaylight / controller / netconf / persist / impl / ConfigPersisterNotificationHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.persist.impl;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Sets;
14 import io.netty.channel.EventLoopGroup;
15 import io.netty.channel.nio.NioEventLoopGroup;
16 import org.opendaylight.controller.config.persist.api.Persister;
17 import org.opendaylight.controller.netconf.api.NetconfMessage;
18 import org.opendaylight.controller.netconf.api.jmx.CommitJMXNotification;
19 import org.opendaylight.controller.netconf.api.jmx.DefaultCommitOperationMXBean;
20 import org.opendaylight.controller.netconf.api.jmx.NetconfJMXNotification;
21 import org.opendaylight.controller.netconf.client.NetconfClient;
22 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
23 import org.opendaylight.controller.netconf.util.xml.XmlElement;
24 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
25 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.w3c.dom.Document;
29 import org.w3c.dom.Element;
30 import org.xml.sax.SAXException;
31
32 import javax.annotation.concurrent.ThreadSafe;
33 import javax.management.InstanceNotFoundException;
34 import javax.management.MBeanServerConnection;
35 import javax.management.Notification;
36 import javax.management.NotificationListener;
37 import javax.management.ObjectName;
38 import javax.net.ssl.SSLContext;
39 import java.io.Closeable;
40 import java.io.IOException;
41 import java.io.InputStream;
42 import java.net.InetSocketAddress;
43 import java.util.Collections;
44 import java.util.HashSet;
45 import java.util.Set;
46 import java.util.regex.Pattern;
47
48 /**
49  * Responsible for listening for notifications from netconf containing latest
50  * committed configuration that should be persisted, and also for loading last
51  * configuration.
52  */
53 @ThreadSafe
54 public class ConfigPersisterNotificationHandler implements NotificationListener, Closeable {
55
56     private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class);
57     private static final int NETCONF_SEND_ATTEMPT_MS_DELAY = 1000;
58     private static final int NETCONF_SEND_ATTEMPTS = 20;
59
60     private final InetSocketAddress address;
61     private final EventLoopGroup nettyThreadgroup;
62
63     private NetconfClientDispatcher netconfClientDispatcher;
64     private NetconfClient netconfClient;
65
66     private final Persister persister;
67     private final MBeanServerConnection mbeanServer;
68     private Long currentSessionId;
69
70     private final ObjectName on = DefaultCommitOperationMXBean.objectName;
71
72     public static final long DEFAULT_TIMEOUT = 120000L;// 120 seconds until netconf must be stable
73     private final long timeout;
74     private final Pattern ignoredMissingCapabilityRegex;
75
76     public ConfigPersisterNotificationHandler(Persister persister, InetSocketAddress address,
77             MBeanServerConnection mbeanServer, Pattern ignoredMissingCapabilityRegex) {
78         this(persister, address, mbeanServer, DEFAULT_TIMEOUT, ignoredMissingCapabilityRegex);
79
80     }
81
82     public ConfigPersisterNotificationHandler(Persister persister, InetSocketAddress address,
83             MBeanServerConnection mbeanServer, long timeout, Pattern ignoredMissingCapabilityRegex) {
84         this.persister = persister;
85         this.address = address;
86         this.mbeanServer = mbeanServer;
87         this.timeout = timeout;
88
89         this.nettyThreadgroup = new NioEventLoopGroup();
90         this.ignoredMissingCapabilityRegex = ignoredMissingCapabilityRegex;
91     }
92
93     public void init() throws InterruptedException {
94         Optional<Persister.ConfigSnapshotHolder> maybeConfig = loadLastConfig();
95
96         if (maybeConfig.isPresent()) {
97             logger.debug("Last config found {}", persister);
98
99             registerToNetconf(maybeConfig.get().getCapabilities());
100
101             final String configSnapshot = maybeConfig.get().getConfigSnapshot();
102             logger.trace("Pushing following xml to netconf {}", configSnapshot);
103             try {
104                 pushLastConfig(XmlUtil.readXmlToElement(configSnapshot));
105             } catch (SAXException | IOException e) {
106                 throw new IllegalStateException("Unable to load last config", e);
107             }
108
109         } else {
110             // this ensures that netconf is initialized, this is first
111             // connection
112             // this means we can register as listener for commit
113             registerToNetconf(Collections.<String>emptySet());
114
115             logger.info("No last config provided by backend storage {}", persister);
116         }
117         registerAsJMXListener();
118     }
119
120     private synchronized long registerToNetconf(Set<String> expectedCaps) throws InterruptedException {
121
122         Set<String> currentCapabilities = Sets.newHashSet();
123
124         // TODO think about moving capability subset check to netconf client
125         // could be utilized by integration tests
126
127         long pollingStart = System.currentTimeMillis();
128         int delay = 5000;
129
130         int attempt = 0;
131
132         while (true) {
133             attempt++;
134
135             netconfClientDispatcher = new NetconfClientDispatcher(Optional.<SSLContext>absent(), nettyThreadgroup, nettyThreadgroup);
136             try {
137                 netconfClient = new NetconfClient(this.toString(), address, delay, netconfClientDispatcher);
138             } catch (IllegalStateException e) {
139                 logger.debug("Netconf {} was not initialized or is not stable, attempt {}", address, attempt, e);
140                 netconfClientDispatcher.close();
141                 Thread.sleep(delay);
142                 continue;
143             }
144             currentCapabilities = netconfClient.getCapabilities();
145
146             if (isSubset(currentCapabilities, expectedCaps)) {
147                 logger.debug("Hello from netconf stable with {} capabilities", currentCapabilities);
148                 currentSessionId = netconfClient.getSessionId();
149                 logger.info("Session id received from netconf server: {}", currentSessionId);
150                 return currentSessionId;
151             }
152
153             if (System.currentTimeMillis() > pollingStart + timeout) {
154                 break;
155             }
156
157             logger.debug("Polling hello from netconf, attempt {}, capabilities {}", attempt, currentCapabilities);
158
159             closeClientAndDispatcher(netconfClient, netconfClientDispatcher);
160
161             Thread.sleep(delay);
162         }
163         Set<String> allNotFound = new HashSet<>(expectedCaps);
164         allNotFound.removeAll(currentCapabilities);
165         logger.error("Netconf server did not provide required capabilities. Expected but not found: {}, all expected {}, current {}",
166                 allNotFound, expectedCaps ,currentCapabilities);
167         throw new RuntimeException("Netconf server did not provide required capabilities. Expected but not found:" + allNotFound);
168
169     }
170
171     private static void closeClientAndDispatcher(Closeable client, Closeable dispatcher) {
172         Exception fromClient = null;
173         try {
174             client.close();
175         } catch (Exception e) {
176             fromClient = e;
177         } finally {
178             try {
179                 dispatcher.close();
180             } catch (Exception e) {
181                 if (fromClient != null) {
182                     e.addSuppressed(fromClient);
183                 }
184
185                 throw new RuntimeException("Error closing temporary client ", e);
186             }
187         }
188     }
189
190     private boolean isSubset(Set<String> currentCapabilities, Set<String> expectedCaps) {
191         for (String exCap : expectedCaps) {
192             if (currentCapabilities.contains(exCap) == false)
193                 return false;
194         }
195         return true;
196     }
197
198     private void registerAsJMXListener() {
199         try {
200             mbeanServer.addNotificationListener(on, this, null, null);
201         } catch (InstanceNotFoundException | IOException e) {
202             throw new RuntimeException("Cannot register as JMX listener to netconf", e);
203         }
204     }
205
206     @Override
207     public void handleNotification(Notification notification, Object handback) {
208         if (notification instanceof NetconfJMXNotification == false)
209             return;
210
211         // Socket should not be closed at this point
212         // Activator unregisters this as JMX listener before close is called
213
214         logger.debug("Received notification {}", notification);
215         if (notification instanceof CommitJMXNotification) {
216             try {
217                 handleAfterCommitNotification((CommitJMXNotification) notification);
218             } catch (Exception e) {
219                 // TODO: notificationBroadcast support logs only DEBUG
220                 logger.warn("Exception occured during notification handling: ", e);
221                 throw e;
222             }
223         } else
224             throw new IllegalStateException("Unknown config registry notification type " + notification);
225     }
226
227     private void handleAfterCommitNotification(final CommitJMXNotification notification) {
228         try {
229             persister.persistConfig(new CapabilityStrippingConfigSnapshotHolder(notification.getConfigSnapshot(),
230                     notification.getCapabilities(), ignoredMissingCapabilityRegex));
231             logger.debug("Configuration persisted successfully");
232         } catch (IOException e) {
233             throw new RuntimeException("Unable to persist configuration snapshot", e);
234         }
235     }
236
237     private Optional<Persister.ConfigSnapshotHolder> loadLastConfig() {
238         Optional<Persister.ConfigSnapshotHolder> maybeConfigElement;
239         try {
240             maybeConfigElement = persister.loadLastConfig();
241         } catch (IOException e) {
242             throw new RuntimeException("Unable to load configuration", e);
243         }
244         return maybeConfigElement;
245     }
246
247     private synchronized void pushLastConfig(Element xmlToBePersisted) {
248         logger.info("Pushing last configuration to netconf");
249         StringBuilder response = new StringBuilder("editConfig response = {");
250
251
252         NetconfMessage message = createEditConfigMessage(xmlToBePersisted, "/netconfOp/editConfig.xml");
253
254         // sending message to netconf
255         NetconfMessage responseMessage = netconfClient.sendMessage(message, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY);
256
257         XmlElement element = XmlElement.fromDomDocument(responseMessage.getDocument());
258         Preconditions.checkState(element.getName().equals(XmlNetconfConstants.RPC_REPLY_KEY));
259         element = element.getOnlyChildElement();
260
261         checkIsOk(element, responseMessage);
262         response.append(XmlUtil.toString(responseMessage.getDocument()));
263         response.append("}");
264         responseMessage = netconfClient.sendMessage(getNetconfMessageFromResource("/netconfOp/commit.xml"), NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY);
265
266         element = XmlElement.fromDomDocument(responseMessage.getDocument());
267         Preconditions.checkState(element.getName().equals(XmlNetconfConstants.RPC_REPLY_KEY));
268         element = element.getOnlyChildElement();
269
270         checkIsOk(element, responseMessage);
271         response.append("commit response = {");
272         response.append(XmlUtil.toString(responseMessage.getDocument()));
273         response.append("}");
274         logger.info("Last configuration loaded successfully");
275         logger.trace("Detailed message {}", response);
276     }
277
278     private void checkIsOk(XmlElement element, NetconfMessage responseMessage) {
279         if (element.getName().equals(XmlNetconfConstants.OK)) {
280             return;
281         } else {
282             if (element.getName().equals(XmlNetconfConstants.RPC_ERROR)) {
283                 logger.warn("Can not load last configuration, operation failed");
284                 throw new IllegalStateException("Can not load last configuration, operation failed: "
285                         + XmlUtil.toString(responseMessage.getDocument()));
286             }
287             logger.warn("Can not load last configuration. Operation failed.");
288             throw new IllegalStateException("Can not load last configuration. Operation failed: "
289                     + XmlUtil.toString(responseMessage.getDocument()));
290         }
291     }
292
293     private static NetconfMessage createEditConfigMessage(Element dataElement, String editConfigResourcename) {
294         try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcename)) {
295             Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcename);
296
297             Document doc = XmlUtil.readXmlToDocument(stream);
298
299             doc.getDocumentElement();
300             XmlElement editConfigElement = XmlElement.fromDomDocument(doc).getOnlyChildElement();
301             XmlElement configWrapper = editConfigElement.getOnlyChildElement(XmlNetconfConstants.CONFIG_KEY);
302             editConfigElement.getDomElement().removeChild(configWrapper.getDomElement());
303             for (XmlElement el : XmlElement.fromDomElement(dataElement).getChildElements()) {
304                 configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), true));
305             }
306             editConfigElement.appendChild(configWrapper.getDomElement());
307             return new NetconfMessage(doc);
308         } catch (IOException | SAXException e) {
309             throw new RuntimeException("Unable to parse message from resources " + editConfigResourcename, e);
310         }
311     }
312
313     private NetconfMessage getNetconfMessageFromResource(String resource) {
314         try (InputStream stream = getClass().getResourceAsStream(resource)) {
315             Preconditions.checkNotNull(stream, "Unable to load resource " + resource);
316             return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
317         } catch (SAXException | IOException e) {
318             throw new RuntimeException("Unable to parse message from resources " + resource, e);
319         }
320     }
321
322     @Override
323     public synchronized void close() {
324         // TODO persister is received from constructor, should not be closed
325         // here
326         try {
327             persister.close();
328         } catch (Exception e) {
329             logger.warn("Unable to close config persister {}", persister, e);
330         }
331
332         if (netconfClient != null) {
333             try {
334                 netconfClient.close();
335             } catch (Exception e) {
336                 logger.warn("Unable to close connection to netconf {}", netconfClient, e);
337             }
338         }
339
340         if (netconfClientDispatcher != null) {
341             try {
342                 netconfClientDispatcher.close();
343             } catch (Exception e) {
344                 logger.warn("Unable to close connection to netconf {}", netconfClientDispatcher, e);
345             }
346         }
347
348         try {
349             nettyThreadgroup.shutdownGracefully();
350         } catch (Exception e) {
351             logger.warn("Unable to close netconf client thread group {}", netconfClientDispatcher, e);
352         }
353
354         // unregister from JMX
355         try {
356             if (mbeanServer.isRegistered(on)) {
357                 mbeanServer.removeNotificationListener(on, this);
358             }
359         } catch (Exception e) {
360             logger.warn("Unable to unregister {} as listener for {}", this, on, e);
361         }
362     }
363 }