Add exists method on DOMStoreReadTransaction and DOMDataReadTransaction
[controller.git] / opendaylight / netconf / config-persister-impl / src / main / java / org / opendaylight / controller / netconf / persist / impl / ConfigPusherImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.persist.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12
13 import java.io.IOException;
14 import java.io.InputStream;
15 import java.util.Collection;
16 import java.util.HashSet;
17 import java.util.LinkedHashMap;
18 import java.util.List;
19 import java.util.Map.Entry;
20 import java.util.Set;
21 import java.util.SortedSet;
22 import java.util.TreeMap;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.TimeUnit;
26
27 import javax.annotation.concurrent.Immutable;
28 import javax.management.MBeanServerConnection;
29
30 import org.opendaylight.controller.config.api.ConflictingVersionException;
31 import org.opendaylight.controller.config.persist.api.ConfigPusher;
32 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
33 import org.opendaylight.controller.config.persist.api.Persister;
34 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
35 import org.opendaylight.controller.netconf.api.NetconfMessage;
36 import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
37 import org.opendaylight.controller.netconf.mapping.api.Capability;
38 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
39 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
40 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
41 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
42 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
43 import org.opendaylight.controller.netconf.util.NetconfUtil;
44 import org.opendaylight.controller.netconf.util.xml.XmlElement;
45 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.w3c.dom.Document;
49 import org.w3c.dom.Element;
50 import org.xml.sax.SAXException;
51
52 import com.google.common.base.Function;
53 import com.google.common.base.Stopwatch;
54 import com.google.common.collect.Collections2;
55
56 @Immutable
57 public class ConfigPusherImpl implements ConfigPusher {
58     private static final Logger logger = LoggerFactory.getLogger(ConfigPusherImpl.class);
59
60     private final long maxWaitForCapabilitiesMillis;
61     private final long conflictingVersionTimeoutMillis;
62     private final NetconfOperationServiceFactory configNetconfConnector;
63     private static final int QUEUE_SIZE = 100;
64     private BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<List<? extends ConfigSnapshotHolder>>(QUEUE_SIZE);
65
66     public ConfigPusherImpl(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
67                         long conflictingVersionTimeoutMillis) {
68         this.configNetconfConnector = configNetconfConnector;
69         this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
70         this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
71     }
72
73     public void process(List<AutoCloseable> autoCloseables, MBeanServerConnection platformMBeanServer, Persister persisterAggregator) throws InterruptedException {
74         List<? extends ConfigSnapshotHolder> configs;
75         while(true) {
76             configs = queue.take();
77             try {
78                 internalPushConfigs(configs);
79                 ConfigPersisterNotificationHandler jmxNotificationHandler = new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator);
80                 synchronized (autoCloseables) {
81                     autoCloseables.add(jmxNotificationHandler);
82                 }
83                 /*
84                  * We have completed initial configuration. At this point
85                  * it is good idea to perform garbage collection to prune
86                  * any garbage we have accumulated during startup.
87                  */
88                 logger.debug("Running post-initialization garbage collection...");
89                 System.gc();
90                 logger.debug("Post-initialization garbage collection completed.");
91                 logger.debug("ConfigPusher has pushed configs {}, gc completed", configs);
92             }
93             catch (NetconfDocumentedException e) {
94                 logger.error("Error pushing configs {}",configs);
95                 throw new IllegalStateException(e);
96             }
97         }
98     }
99
100     public void pushConfigs(List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
101         logger.debug("Requested to push configs {}", configs);
102         this.queue.put(configs);
103     }
104
105     private LinkedHashMap<? extends ConfigSnapshotHolder, EditAndCommitResponse> internalPushConfigs(List<? extends ConfigSnapshotHolder> configs) throws NetconfDocumentedException {
106         logger.debug("Last config snapshots to be pushed to netconf: {}", configs);
107         LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> result = new LinkedHashMap<>();
108         // start pushing snapshots:
109         for (ConfigSnapshotHolder configSnapshotHolder : configs) {
110             if(configSnapshotHolder != null) {
111                 EditAndCommitResponse editAndCommitResponseWithRetries = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
112                 logger.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
113                 result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
114             }
115         }
116         logger.debug("All configuration snapshots have been pushed successfully.");
117         return result;
118     }
119
120     /**
121      * First calls {@link #getOperationServiceWithRetries(java.util.Set, String)} in order to wait until
122      * expected capabilities are present, then tries to push configuration. If {@link ConflictingVersionException}
123      * is caught, whole process is retried - new service instance need to be obtained from the factory. Closes
124      * {@link NetconfOperationService} after each use.
125      */
126     private synchronized EditAndCommitResponse pushConfigWithConflictingVersionRetries(ConfigSnapshotHolder configSnapshotHolder) throws NetconfDocumentedException {
127         ConflictingVersionException lastException;
128         Stopwatch stopwatch = new Stopwatch().start();
129         do {
130             String idForReporting = configSnapshotHolder.toString();
131             SortedSet<String> expectedCapabilities = checkNotNull(configSnapshotHolder.getCapabilities(),
132                     "Expected capabilities must not be null - %s, check %s", idForReporting,
133                     configSnapshotHolder.getClass().getName());
134             try (NetconfOperationService operationService = getOperationServiceWithRetries(expectedCapabilities, idForReporting)) {
135                 return pushConfig(configSnapshotHolder, operationService);
136             } catch (ConflictingVersionException e) {
137                 lastException = e;
138                 logger.debug("Conflicting version detected, will retry after timeout");
139                 sleep();
140             }
141         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
142         throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
143                 lastException);
144     }
145
146     private NetconfOperationService getOperationServiceWithRetries(Set<String> expectedCapabilities, String idForReporting) {
147         Stopwatch stopwatch = new Stopwatch().start();
148         NotEnoughCapabilitiesException lastException;
149         do {
150             try {
151                 return getOperationService(expectedCapabilities, idForReporting);
152             } catch (NotEnoughCapabilitiesException e) {
153                 logger.debug("Not enough capabilities: " + e.toString());
154                 lastException = e;
155                 sleep();
156             }
157         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
158         throw new IllegalStateException("Max wait for capabilities reached." + lastException.getMessage(), lastException);
159     }
160
161     private static class NotEnoughCapabilitiesException extends Exception {
162         private static final long serialVersionUID = 1L;
163
164         private NotEnoughCapabilitiesException(String message, Throwable cause) {
165             super(message, cause);
166         }
167
168         private NotEnoughCapabilitiesException(String message) {
169             super(message);
170         }
171     }
172
173     /**
174      * Get NetconfOperationService iif all required capabilities are present.
175      *
176      * @param expectedCapabilities that must be provided by configNetconfConnector
177      * @param idForReporting
178      * @return service if capabilities are present, otherwise absent value
179      */
180     private NetconfOperationService getOperationService(Set<String> expectedCapabilities, String idForReporting) throws NotEnoughCapabilitiesException {
181         NetconfOperationService serviceCandidate;
182         try {
183             serviceCandidate = configNetconfConnector.createService(idForReporting);
184         } catch(RuntimeException e) {
185             throw new NotEnoughCapabilitiesException("Netconf service not stable for " + idForReporting, e);
186         }
187         Set<String> notFoundDiff = computeNotFoundCapabilities(expectedCapabilities, serviceCandidate);
188         if (notFoundDiff.isEmpty()) {
189             return serviceCandidate;
190         } else {
191             serviceCandidate.close();
192             logger.trace("Netconf server did not provide required capabilities for {} " +
193                     "Expected but not found: {}, all expected {}, current {}",
194                     idForReporting, notFoundDiff, expectedCapabilities, serviceCandidate.getCapabilities()
195             );
196             throw new NotEnoughCapabilitiesException("Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundDiff);
197         }
198     }
199
200     private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, NetconfOperationService serviceCandidate) {
201         Collection<String> actual = Collections2.transform(serviceCandidate.getCapabilities(), new Function<Capability, String>() {
202             @Override
203             public String apply(Capability input) {
204                 return input.getCapabilityUri();
205             }
206         });
207         Set<String> allNotFound = new HashSet<>(expectedCapabilities);
208         allNotFound.removeAll(actual);
209         return allNotFound;
210     }
211
212
213
214     private void sleep() {
215         try {
216             Thread.sleep(100);
217         } catch (InterruptedException e) {
218             Thread.currentThread().interrupt();
219             throw new IllegalStateException(e);
220         }
221     }
222
223     /**
224      * Sends two RPCs to the netconf server: edit-config and commit.
225      *
226      * @param configSnapshotHolder
227      * @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager
228      * @throws java.lang.RuntimeException  if edit-config or commit fails otherwise
229      */
230     private synchronized EditAndCommitResponse pushConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfOperationService operationService)
231             throws ConflictingVersionException, NetconfDocumentedException {
232
233         Element xmlToBePersisted;
234         try {
235             xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
236         } catch (SAXException | IOException e) {
237             throw new IllegalStateException("Cannot parse " + configSnapshotHolder);
238         }
239         logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
240         Stopwatch stopwatch = new Stopwatch().start();
241         NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted);
242
243         Document editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, operationService,
244                 "edit-config", configSnapshotHolder.toString());
245
246         Document commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), operationService,
247                 "commit", configSnapshotHolder.toString());
248
249         if (logger.isTraceEnabled()) {
250             StringBuilder response = new StringBuilder("editConfig response = {");
251             response.append(XmlUtil.toString(editResponseMessage));
252             response.append("}");
253             response.append("commit response = {");
254             response.append(XmlUtil.toString(commitResponseMessage));
255             response.append("}");
256             logger.trace("Last configuration loaded successfully");
257             logger.trace("Detailed message {}", response);
258             logger.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
259         }
260         return new EditAndCommitResponse(editResponseMessage, commitResponseMessage);
261     }
262
263     private NetconfOperation findOperation(NetconfMessage request, NetconfOperationService operationService) throws NetconfDocumentedException {
264         TreeMap<HandlingPriority, NetconfOperation> allOperations = new TreeMap<>();
265         Set<NetconfOperation> netconfOperations = operationService.getNetconfOperations();
266         if (netconfOperations.isEmpty()) {
267             throw new IllegalStateException("Possible code error: no config operations");
268         }
269         for (NetconfOperation netconfOperation : netconfOperations) {
270             HandlingPriority handlingPriority = netconfOperation.canHandle(request.getDocument());
271             allOperations.put(handlingPriority, netconfOperation);
272         }
273         Entry<HandlingPriority, NetconfOperation> highestEntry = allOperations.lastEntry();
274         if (highestEntry.getKey().isCannotHandle()) {
275             throw new IllegalStateException("Possible code error: operation with highest priority is CANNOT_HANDLE");
276         }
277         return highestEntry.getValue();
278     }
279
280     private Document sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfOperationService operationService,
281                                                      String operationNameForReporting, String configIdForReporting)
282             throws ConflictingVersionException, NetconfDocumentedException {
283
284         NetconfOperation operation = findOperation(request, operationService);
285         Document response;
286         try {
287             response = operation.handle(request.getDocument(), NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
288         } catch (NetconfDocumentedException | RuntimeException e) {
289             if (e instanceof NetconfDocumentedException && e.getCause() instanceof ConflictingVersionException) {
290                 throw (ConflictingVersionException) e.getCause();
291             }
292             throw new IllegalStateException("Failed to send " + operationNameForReporting +
293                     " for configuration " + configIdForReporting, e);
294         }
295         return NetconfUtil.checkIsMessageOk(response);
296     }
297
298     // load editConfig.xml template, populate /rpc/edit-config/config with parameter
299     private static NetconfMessage createEditConfigMessage(Element dataElement) throws NetconfDocumentedException {
300         String editConfigResourcePath = "/netconfOp/editConfig.xml";
301         try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcePath)) {
302             checkNotNull(stream, "Unable to load resource " + editConfigResourcePath);
303
304             Document doc = XmlUtil.readXmlToDocument(stream);
305
306             XmlElement editConfigElement = XmlElement.fromDomDocument(doc).getOnlyChildElement();
307             XmlElement configWrapper = editConfigElement.getOnlyChildElement(XmlNetconfConstants.CONFIG_KEY);
308             editConfigElement.getDomElement().removeChild(configWrapper.getDomElement());
309             for (XmlElement el : XmlElement.fromDomElement(dataElement).getChildElements()) {
310                 boolean deep = true;
311                 configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), deep));
312             }
313             editConfigElement.appendChild(configWrapper.getDomElement());
314             return new NetconfMessage(doc);
315         } catch (IOException | SAXException e) {
316             // error reading the xml file bundled into the jar
317             throw new IllegalStateException("Error while opening local resource " + editConfigResourcePath, e);
318         }
319     }
320
321     private static NetconfMessage getCommitMessage() {
322         String resource = "/netconfOp/commit.xml";
323         try (InputStream stream = ConfigPusherImpl.class.getResourceAsStream(resource)) {
324             checkNotNull(stream, "Unable to load resource " + resource);
325             return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
326         } catch (SAXException | IOException e) {
327             // error reading the xml file bundled into the jar
328             throw new IllegalStateException("Error while opening local resource " + resource, e);
329         }
330     }
331
332     static class EditAndCommitResponse {
333         private final Document editResponse, commitResponse;
334
335         EditAndCommitResponse(Document editResponse, Document commitResponse) {
336             this.editResponse = editResponse;
337             this.commitResponse = commitResponse;
338         }
339
340         public Document getEditResponse() {
341             return editResponse;
342         }
343
344         public Document getCommitResponse() {
345             return commitResponse;
346         }
347
348         @Override
349         public String toString() {
350             return "EditAndCommitResponse{" +
351                     "editResponse=" + editResponse +
352                     ", commitResponse=" + commitResponse +
353                     '}';
354         }
355     }
356 }