Merge "Bug 1025: Fixed incorrect revision in sal-remote-augment, which caused log...
[controller.git] / opendaylight / netconf / config-persister-impl / src / main / java / org / opendaylight / controller / netconf / persist / impl / ConfigPusherImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.persist.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12
13 import com.google.common.base.Function;
14 import com.google.common.base.Stopwatch;
15 import com.google.common.collect.Collections2;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.util.Collection;
19 import java.util.HashSet;
20 import java.util.LinkedHashMap;
21 import java.util.List;
22 import java.util.Map.Entry;
23 import java.util.Set;
24 import java.util.SortedSet;
25 import java.util.TreeMap;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import javax.annotation.concurrent.Immutable;
31 import javax.management.MBeanServerConnection;
32 import org.opendaylight.controller.config.api.ConflictingVersionException;
33 import org.opendaylight.controller.config.persist.api.ConfigPusher;
34 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
35 import org.opendaylight.controller.config.persist.api.Persister;
36 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
37 import org.opendaylight.controller.netconf.api.NetconfMessage;
38 import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
39 import org.opendaylight.controller.netconf.mapping.api.Capability;
40 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
41 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
42 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
43 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
44 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
45 import org.opendaylight.controller.netconf.util.NetconfUtil;
46 import org.opendaylight.controller.netconf.util.xml.XmlElement;
47 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import org.w3c.dom.Document;
51 import org.w3c.dom.Element;
52 import org.xml.sax.SAXException;
53
54 @Immutable
55 public class ConfigPusherImpl implements ConfigPusher {
56     private static final Logger logger = LoggerFactory.getLogger(ConfigPusherImpl.class);
57
58     private final long maxWaitForCapabilitiesMillis;
59     private final long conflictingVersionTimeoutMillis;
60     private final NetconfOperationServiceFactory configNetconfConnector;
61     private static final int QUEUE_SIZE = 100;
62     private BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<List<? extends ConfigSnapshotHolder>>(QUEUE_SIZE);
63
64     public ConfigPusherImpl(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
65                         long conflictingVersionTimeoutMillis) {
66         this.configNetconfConnector = configNetconfConnector;
67         this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
68         this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
69     }
70
71     public void process(List<AutoCloseable> autoCloseables, MBeanServerConnection platformMBeanServer, Persister persisterAggregator) throws InterruptedException {
72         List<? extends ConfigSnapshotHolder> configs;
73         while(true) {
74             configs = queue.take();
75             try {
76                 internalPushConfigs(configs);
77                 ConfigPersisterNotificationHandler jmxNotificationHandler = new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator);
78                 synchronized (autoCloseables) {
79                     autoCloseables.add(jmxNotificationHandler);
80                 }
81                 /*
82                  * We have completed initial configuration. At this point
83                  * it is good idea to perform garbage collection to prune
84                  * any garbage we have accumulated during startup.
85                  */
86                 logger.debug("Running post-initialization garbage collection...");
87                 System.gc();
88                 logger.debug("Post-initialization garbage collection completed.");
89                 logger.debug("ConfigPusher has pushed configs {}, gc completed", configs);
90             }
91             catch (NetconfDocumentedException e) {
92                 logger.error("Error pushing configs {}",configs);
93                 throw new IllegalStateException(e);
94             }
95         }
96     }
97
98     public void pushConfigs(List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
99         logger.debug("Requested to push configs {}", configs);
100         this.queue.put(configs);
101     }
102
103     private LinkedHashMap<? extends ConfigSnapshotHolder, EditAndCommitResponse> internalPushConfigs(List<? extends ConfigSnapshotHolder> configs) throws NetconfDocumentedException {
104         logger.debug("Last config snapshots to be pushed to netconf: {}", configs);
105         LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> result = new LinkedHashMap<>();
106         // start pushing snapshots:
107         for (ConfigSnapshotHolder configSnapshotHolder : configs) {
108             if(configSnapshotHolder != null) {
109                 EditAndCommitResponse editAndCommitResponseWithRetries = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
110                 logger.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
111                 result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
112             }
113         }
114         logger.debug("All configuration snapshots have been pushed successfully.");
115         return result;
116     }
117
118     /**
119      * First calls {@link #getOperationServiceWithRetries(java.util.Set, String)} in order to wait until
120      * expected capabilities are present, then tries to push configuration. If {@link ConflictingVersionException}
121      * is caught, whole process is retried - new service instance need to be obtained from the factory. Closes
122      * {@link NetconfOperationService} after each use.
123      */
124     private synchronized EditAndCommitResponse pushConfigWithConflictingVersionRetries(ConfigSnapshotHolder configSnapshotHolder) throws NetconfDocumentedException {
125         ConflictingVersionException lastException;
126         Stopwatch stopwatch = new Stopwatch().start();
127         do {
128             String idForReporting = configSnapshotHolder.toString();
129             SortedSet<String> expectedCapabilities = checkNotNull(configSnapshotHolder.getCapabilities(),
130                     "Expected capabilities must not be null - %s, check %s", idForReporting,
131                     configSnapshotHolder.getClass().getName());
132             try (NetconfOperationService operationService = getOperationServiceWithRetries(expectedCapabilities, idForReporting)) {
133                 return pushConfig(configSnapshotHolder, operationService);
134             } catch (ConflictingVersionException e) {
135                 lastException = e;
136                 logger.debug("Conflicting version detected, will retry after timeout");
137                 sleep();
138             }
139         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
140         throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
141                 lastException);
142     }
143
144     private NetconfOperationService getOperationServiceWithRetries(Set<String> expectedCapabilities, String idForReporting) {
145         Stopwatch stopwatch = new Stopwatch().start();
146         NotEnoughCapabilitiesException lastException;
147         do {
148             try {
149                 return getOperationService(expectedCapabilities, idForReporting);
150             } catch (NotEnoughCapabilitiesException e) {
151                 logger.debug("Not enough capabilities: " + e.toString());
152                 lastException = e;
153                 sleep();
154             }
155         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
156         throw new IllegalStateException("Max wait for capabilities reached." + lastException.getMessage(), lastException);
157     }
158
159     private static class NotEnoughCapabilitiesException extends Exception {
160         private static final long serialVersionUID = 1L;
161
162         private NotEnoughCapabilitiesException(String message, Throwable cause) {
163             super(message, cause);
164         }
165
166         private NotEnoughCapabilitiesException(String message) {
167             super(message);
168         }
169     }
170
171     /**
172      * Get NetconfOperationService iif all required capabilities are present.
173      *
174      * @param expectedCapabilities that must be provided by configNetconfConnector
175      * @param idForReporting
176      * @return service if capabilities are present, otherwise absent value
177      */
178     private NetconfOperationService getOperationService(Set<String> expectedCapabilities, String idForReporting) throws NotEnoughCapabilitiesException {
179         NetconfOperationService serviceCandidate;
180         try {
181             serviceCandidate = configNetconfConnector.createService(idForReporting);
182         } catch(RuntimeException e) {
183             throw new NotEnoughCapabilitiesException("Netconf service not stable for " + idForReporting, e);
184         }
185         Set<String> notFoundDiff = computeNotFoundCapabilities(expectedCapabilities, serviceCandidate);
186         if (notFoundDiff.isEmpty()) {
187             return serviceCandidate;
188         } else {
189             serviceCandidate.close();
190             logger.trace("Netconf server did not provide required capabilities for {} " +
191                     "Expected but not found: {}, all expected {}, current {}",
192                     idForReporting, notFoundDiff, expectedCapabilities, serviceCandidate.getCapabilities()
193             );
194             throw new NotEnoughCapabilitiesException("Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundDiff);
195         }
196     }
197
198     private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, NetconfOperationService serviceCandidate) {
199         Collection<String> actual = Collections2.transform(serviceCandidate.getCapabilities(), new Function<Capability, String>() {
200             @Override
201             public String apply(@Nonnull final Capability input) {
202                 return input.getCapabilityUri();
203             }
204         });
205         Set<String> allNotFound = new HashSet<>(expectedCapabilities);
206         allNotFound.removeAll(actual);
207         return allNotFound;
208     }
209
210
211
212     private void sleep() {
213         try {
214             Thread.sleep(100);
215         } catch (InterruptedException e) {
216             Thread.currentThread().interrupt();
217             throw new IllegalStateException(e);
218         }
219     }
220
221     /**
222      * Sends two RPCs to the netconf server: edit-config and commit.
223      *
224      * @param configSnapshotHolder
225      * @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager
226      * @throws java.lang.RuntimeException  if edit-config or commit fails otherwise
227      */
228     private synchronized EditAndCommitResponse pushConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfOperationService operationService)
229             throws ConflictingVersionException, NetconfDocumentedException {
230
231         Element xmlToBePersisted;
232         try {
233             xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
234         } catch (SAXException | IOException e) {
235             throw new IllegalStateException("Cannot parse " + configSnapshotHolder);
236         }
237         logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
238         Stopwatch stopwatch = new Stopwatch().start();
239         NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted);
240
241         Document editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, operationService,
242                 "edit-config", configSnapshotHolder.toString());
243
244         Document commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), operationService,
245                 "commit", configSnapshotHolder.toString());
246
247         if (logger.isTraceEnabled()) {
248             StringBuilder response = new StringBuilder("editConfig response = {");
249             response.append(XmlUtil.toString(editResponseMessage));
250             response.append("}");
251             response.append("commit response = {");
252             response.append(XmlUtil.toString(commitResponseMessage));
253             response.append("}");
254             logger.trace("Last configuration loaded successfully");
255             logger.trace("Detailed message {}", response);
256             logger.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
257         }
258         return new EditAndCommitResponse(editResponseMessage, commitResponseMessage);
259     }
260
261     private NetconfOperation findOperation(NetconfMessage request, NetconfOperationService operationService) throws NetconfDocumentedException {
262         TreeMap<HandlingPriority, NetconfOperation> allOperations = new TreeMap<>();
263         Set<NetconfOperation> netconfOperations = operationService.getNetconfOperations();
264         if (netconfOperations.isEmpty()) {
265             throw new IllegalStateException("Possible code error: no config operations");
266         }
267         for (NetconfOperation netconfOperation : netconfOperations) {
268             HandlingPriority handlingPriority = netconfOperation.canHandle(request.getDocument());
269             allOperations.put(handlingPriority, netconfOperation);
270         }
271         Entry<HandlingPriority, NetconfOperation> highestEntry = allOperations.lastEntry();
272         if (highestEntry.getKey().isCannotHandle()) {
273             throw new IllegalStateException("Possible code error: operation with highest priority is CANNOT_HANDLE");
274         }
275         return highestEntry.getValue();
276     }
277
278     private Document sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfOperationService operationService,
279                                                      String operationNameForReporting, String configIdForReporting)
280             throws ConflictingVersionException, NetconfDocumentedException {
281
282         NetconfOperation operation = findOperation(request, operationService);
283         Document response;
284         try {
285             response = operation.handle(request.getDocument(), NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
286         } catch (NetconfDocumentedException | RuntimeException e) {
287             if (e instanceof NetconfDocumentedException && e.getCause() instanceof ConflictingVersionException) {
288                 throw (ConflictingVersionException) e.getCause();
289             }
290             throw new IllegalStateException("Failed to send " + operationNameForReporting +
291                     " for configuration " + configIdForReporting, e);
292         }
293         return NetconfUtil.checkIsMessageOk(response);
294     }
295
296     // load editConfig.xml template, populate /rpc/edit-config/config with parameter
297     private static NetconfMessage createEditConfigMessage(Element dataElement) throws NetconfDocumentedException {
298         String editConfigResourcePath = "/netconfOp/editConfig.xml";
299         try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcePath)) {
300             checkNotNull(stream, "Unable to load resource " + editConfigResourcePath);
301
302             Document doc = XmlUtil.readXmlToDocument(stream);
303
304             XmlElement editConfigElement = XmlElement.fromDomDocument(doc).getOnlyChildElement();
305             XmlElement configWrapper = editConfigElement.getOnlyChildElement(XmlNetconfConstants.CONFIG_KEY);
306             editConfigElement.getDomElement().removeChild(configWrapper.getDomElement());
307             for (XmlElement el : XmlElement.fromDomElement(dataElement).getChildElements()) {
308                 boolean deep = true;
309                 configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), deep));
310             }
311             editConfigElement.appendChild(configWrapper.getDomElement());
312             return new NetconfMessage(doc);
313         } catch (IOException | SAXException e) {
314             // error reading the xml file bundled into the jar
315             throw new IllegalStateException("Error while opening local resource " + editConfigResourcePath, e);
316         }
317     }
318
319     private static NetconfMessage getCommitMessage() {
320         String resource = "/netconfOp/commit.xml";
321         try (InputStream stream = ConfigPusherImpl.class.getResourceAsStream(resource)) {
322             checkNotNull(stream, "Unable to load resource " + resource);
323             return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
324         } catch (SAXException | IOException e) {
325             // error reading the xml file bundled into the jar
326             throw new IllegalStateException("Error while opening local resource " + resource, e);
327         }
328     }
329
330     static class EditAndCommitResponse {
331         private final Document editResponse, commitResponse;
332
333         EditAndCommitResponse(Document editResponse, Document commitResponse) {
334             this.editResponse = editResponse;
335             this.commitResponse = commitResponse;
336         }
337
338         public Document getEditResponse() {
339             return editResponse;
340         }
341
342         public Document getCommitResponse() {
343             return commitResponse;
344         }
345
346         @Override
347         public String toString() {
348             return "EditAndCommitResponse{" +
349                     "editResponse=" + editResponse +
350                     ", commitResponse=" + commitResponse +
351                     '}';
352         }
353     }
354 }