Bug 4560: Improve config system logging for debuggability
[controller.git] / opendaylight / netconf / config-persister-impl / src / main / java / org / opendaylight / controller / netconf / persist / impl / ConfigPusherImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.persist.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Function;
14 import com.google.common.base.Stopwatch;
15 import com.google.common.collect.Collections2;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.util.Collection;
19 import java.util.HashSet;
20 import java.util.LinkedHashMap;
21 import java.util.List;
22 import java.util.Map.Entry;
23 import java.util.Set;
24 import java.util.SortedSet;
25 import java.util.TreeMap;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import javax.annotation.concurrent.Immutable;
31 import javax.management.MBeanServerConnection;
32 import org.opendaylight.controller.config.api.ConflictingVersionException;
33 import org.opendaylight.controller.config.persist.api.ConfigPusher;
34 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
35 import org.opendaylight.controller.config.persist.api.Persister;
36 import org.opendaylight.controller.netconf.api.Capability;
37 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
38 import org.opendaylight.controller.netconf.api.NetconfMessage;
39 import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
40 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
41 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
42 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
43 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
44 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
45 import org.opendaylight.controller.netconf.util.NetconfUtil;
46 import org.opendaylight.controller.netconf.util.xml.XmlElement;
47 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import org.w3c.dom.Document;
51 import org.w3c.dom.Element;
52 import org.xml.sax.SAXException;
53
54 @Immutable
55 public class ConfigPusherImpl implements ConfigPusher {
56     private static final Logger LOG = LoggerFactory.getLogger(ConfigPusherImpl.class);
57
58     private final long maxWaitForCapabilitiesMillis;
59     private final long conflictingVersionTimeoutMillis;
60     private final NetconfOperationServiceFactory configNetconfConnector;
61     private static final int QUEUE_SIZE = 100;
62     private final BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
63
64     public ConfigPusherImpl(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
65                         long conflictingVersionTimeoutMillis) {
66         this.configNetconfConnector = configNetconfConnector;
67         this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
68         this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
69     }
70
71     public void process(List<AutoCloseable> autoCloseables, MBeanServerConnection platformMBeanServer, Persister persisterAggregator) throws InterruptedException {
72         List<? extends ConfigSnapshotHolder> configs;
73         while(true) {
74             configs = queue.take();
75
76             try {
77                 internalPushConfigs(configs);
78                 ConfigPersisterNotificationHandler jmxNotificationHandler = new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator);
79                 synchronized (autoCloseables) {
80                     autoCloseables.add(jmxNotificationHandler);
81                 }
82
83                 LOG.debug("ConfigPusher has pushed configs {}", configs);
84             } catch (Exception e) {
85                 LOG.debug("Failed to push some of configs: {}", configs, e);
86                 break;
87             }
88         }
89     }
90
91     @Override
92     public void pushConfigs(List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
93         LOG.debug("Requested to push configs {}", configs);
94         this.queue.put(configs);
95     }
96
97     private LinkedHashMap<? extends ConfigSnapshotHolder, EditAndCommitResponse> internalPushConfigs(List<? extends ConfigSnapshotHolder> configs) throws NetconfDocumentedException {
98         LOG.debug("Last config snapshots to be pushed to netconf: {}", configs);
99         LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> result = new LinkedHashMap<>();
100         // start pushing snapshots:
101         for (ConfigSnapshotHolder configSnapshotHolder : configs) {
102             if(configSnapshotHolder != null) {
103                 LOG.info("Pushing configuration snapshot {}", configSnapshotHolder);
104                 EditAndCommitResponse editAndCommitResponseWithRetries = null;
105                 try {
106                     editAndCommitResponseWithRetries = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
107                 } catch (ConfigSnapshotFailureException e) {
108                     LOG.error("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " +
109                             "for detailed information see enclosed exception.", e.getConfigIdForReporting(), e);
110                     onFailedConfigPush("Failed to apply configuration snapshot " + e.getConfigIdForReporting(), e);
111                 } catch (Exception e) {
112                     LOG.error("Failed to apply configuration snapshot: {}", configSnapshotHolder, e);
113                     onFailedConfigPush("Failed to apply configuration snapshot " + configSnapshotHolder, e);
114                 }
115
116                 LOG.info("Successfully pushed configuration snapshot {}", configSnapshotHolder);
117                 result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
118             }
119         }
120         LOG.debug("All configuration snapshots have been pushed successfully.");
121         return result;
122     }
123
124     @VisibleForTesting
125     protected void onFailedConfigPush(String message, Exception cause) {
126         throw new IllegalStateException(message, cause);
127     }
128
129     /**
130      * First calls {@link #getOperationServiceWithRetries(java.util.Set, String)} in order to wait until
131      * expected capabilities are present, then tries to push configuration. If {@link ConflictingVersionException}
132      * is caught, whole process is retried - new service instance need to be obtained from the factory. Closes
133      * {@link NetconfOperationService} after each use.
134      */
135     private synchronized EditAndCommitResponse pushConfigWithConflictingVersionRetries(ConfigSnapshotHolder configSnapshotHolder) throws ConfigSnapshotFailureException {
136         ConflictingVersionException lastException;
137         Stopwatch stopwatch = Stopwatch.createUnstarted();
138         do {
139             String idForReporting = configSnapshotHolder.toString();
140             SortedSet<String> expectedCapabilities = checkNotNull(configSnapshotHolder.getCapabilities(),
141                     "Expected capabilities must not be null - %s, check %s", idForReporting,
142                     configSnapshotHolder.getClass().getName());
143             try (NetconfOperationService operationService = getOperationServiceWithRetries(expectedCapabilities, idForReporting)) {
144                 if(!stopwatch.isRunning()) {
145                     stopwatch.start();
146                 }
147                 return pushConfig(configSnapshotHolder, operationService);
148             } catch (ConflictingVersionException e) {
149                 lastException = e;
150                 LOG.info("Conflicting version detected, will retry after timeout");
151                 sleep();
152             }
153         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
154         throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
155                 lastException);
156     }
157
158     private NetconfOperationService getOperationServiceWithRetries(Set<String> expectedCapabilities, String idForReporting) {
159         Stopwatch stopwatch = Stopwatch.createStarted();
160         ConfigPusherException lastException;
161         do {
162             try {
163                 return getOperationService(expectedCapabilities, idForReporting);
164             } catch (ConfigPusherException e) {
165                 LOG.debug("Not enough capabilities: {}", e.toString());
166                 lastException = e;
167                 sleep();
168             }
169         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
170
171         if(lastException instanceof NotEnoughCapabilitiesException) {
172             LOG.error("Unable to push configuration due to missing yang models." +
173                             " Yang models that are missing, but required by the configuration: {}." +
174                             " For each mentioned model check: " +
175                             " 1. that the mentioned yang model namespace/name/revision is identical to those in the yang model itself" +
176                             " 2. the yang file is present in the system" +
177                             " 3. the bundle with that yang file is present in the system and active" +
178                             " 4. the yang parser did not fail while attempting to parse that model",
179                     ((NotEnoughCapabilitiesException) lastException).getMissingCaps());
180             throw new IllegalStateException("Unable to push configuration due to missing yang models." +
181                     " Required yang models that are missing: "
182                     + ((NotEnoughCapabilitiesException) lastException).getMissingCaps(), lastException);
183         } else {
184             final String msg = "Unable to push configuration due to missing netconf service";
185             LOG.error(msg, lastException);
186             throw new IllegalStateException(msg, lastException);
187         }
188     }
189
190     private static class ConfigPusherException extends Exception {
191
192         public ConfigPusherException(final String message) {
193             super(message);
194         }
195
196         public ConfigPusherException(final String message, final Throwable cause) {
197             super(message, cause);
198         }
199     }
200
201     private static class NotEnoughCapabilitiesException extends ConfigPusherException {
202         private static final long serialVersionUID = 1L;
203         private final Set<String> missingCaps;
204
205         private NotEnoughCapabilitiesException(String message, Set<String> missingCaps) {
206             super(message);
207             this.missingCaps = missingCaps;
208         }
209
210         public Set<String> getMissingCaps() {
211             return missingCaps;
212         }
213     }
214
215     private static final class NetconfServiceNotAvailableException extends ConfigPusherException {
216
217         public NetconfServiceNotAvailableException(final String s, final RuntimeException e) {
218             super(s, e);
219         }
220     }
221
222     private static final class ConfigSnapshotFailureException extends ConfigPusherException {
223
224         private final String configIdForReporting;
225
226         public ConfigSnapshotFailureException(final String configIdForReporting, final String operationNameForReporting, final Exception e) {
227             super(String.format("Failed to apply config snapshot: %s during phase: %s", configIdForReporting, operationNameForReporting), e);
228             this.configIdForReporting = configIdForReporting;
229         }
230
231         public String getConfigIdForReporting() {
232             return configIdForReporting;
233         }
234     }
235
236     /**
237      * Get NetconfOperationService iif all required capabilities are present.
238      *
239      * @param expectedCapabilities that must be provided by configNetconfConnector
240      * @param idForReporting
241      * @return service if capabilities are present, otherwise absent value
242      */
243     private NetconfOperationService getOperationService(Set<String> expectedCapabilities, String idForReporting) throws ConfigPusherException {
244         NetconfOperationService serviceCandidate;
245         try {
246             serviceCandidate = configNetconfConnector.createService(idForReporting);
247         } catch(RuntimeException e) {
248             throw new NetconfServiceNotAvailableException("Netconf service not stable for config pusher." +
249                     " Cannot push any configuration", e);
250         }
251         Set<String> notFoundDiff = computeNotFoundCapabilities(expectedCapabilities, configNetconfConnector);
252         if (notFoundDiff.isEmpty()) {
253             return serviceCandidate;
254         } else {
255             serviceCandidate.close();
256             LOG.debug("Netconf server did not provide required capabilities for {} ", idForReporting,
257                     "Expected but not found: {}, all expected {}, current {}",
258                      notFoundDiff, expectedCapabilities, configNetconfConnector.getCapabilities()
259             );
260             throw new NotEnoughCapabilitiesException("Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundDiff, notFoundDiff);
261         }
262     }
263
264     private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, NetconfOperationServiceFactory serviceCandidate) {
265         Collection<String> actual = Collections2.transform(serviceCandidate.getCapabilities(), new Function<Capability, String>() {
266             @Override
267             public String apply(@Nonnull final Capability input) {
268                 return input.getCapabilityUri();
269             }
270         });
271         Set<String> allNotFound = new HashSet<>(expectedCapabilities);
272         allNotFound.removeAll(actual);
273         return allNotFound;
274     }
275
276     private void sleep() {
277         try {
278             Thread.sleep(100);
279         } catch (InterruptedException e) {
280             Thread.currentThread().interrupt();
281             throw new IllegalStateException(e);
282         }
283     }
284
285     /**
286      * Sends two RPCs to the netconf server: edit-config and commit.
287      *
288      * @param configSnapshotHolder
289      * @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager
290      * @throws java.lang.RuntimeException  if edit-config or commit fails otherwise
291      */
292     private synchronized EditAndCommitResponse pushConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfOperationService operationService)
293             throws ConflictingVersionException, ConfigSnapshotFailureException {
294
295         Element xmlToBePersisted;
296         try {
297             xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
298         } catch (SAXException | IOException e) {
299             throw new IllegalStateException("Cannot parse " + configSnapshotHolder);
300         }
301         LOG.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
302         Stopwatch stopwatch = Stopwatch.createStarted();
303         NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted);
304
305         Document editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, operationService,
306                 "edit-config", configSnapshotHolder.toString());
307
308         Document commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), operationService,
309                 "commit", configSnapshotHolder.toString());
310
311         if (LOG.isTraceEnabled()) {
312             StringBuilder response = new StringBuilder("editConfig response = {");
313             response.append(XmlUtil.toString(editResponseMessage));
314             response.append("}");
315             response.append("commit response = {");
316             response.append(XmlUtil.toString(commitResponseMessage));
317             response.append("}");
318             LOG.trace("Last configuration loaded successfully");
319             LOG.trace("Detailed message {}", response);
320             LOG.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
321         }
322         return new EditAndCommitResponse(editResponseMessage, commitResponseMessage);
323     }
324
325     private NetconfOperation findOperation(NetconfMessage request, NetconfOperationService operationService) {
326         TreeMap<HandlingPriority, NetconfOperation> allOperations = new TreeMap<>();
327         Set<NetconfOperation> netconfOperations = operationService.getNetconfOperations();
328         if (netconfOperations.isEmpty()) {
329             throw new IllegalStateException("Possible code error: no config operations");
330         }
331         for (NetconfOperation netconfOperation : netconfOperations) {
332             HandlingPriority handlingPriority = null;
333             try {
334                 handlingPriority = netconfOperation.canHandle(request.getDocument());
335             } catch (NetconfDocumentedException e) {
336                 throw new IllegalStateException("Possible code error: canHandle threw exception", e);
337             }
338             allOperations.put(handlingPriority, netconfOperation);
339         }
340         Entry<HandlingPriority, NetconfOperation> highestEntry = allOperations.lastEntry();
341         if (highestEntry.getKey().isCannotHandle()) {
342             throw new IllegalStateException("Possible code error: operation with highest priority is CANNOT_HANDLE");
343         }
344         return highestEntry.getValue();
345     }
346
347     private Document sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfOperationService operationService,
348                                                      String operationNameForReporting, String configIdForReporting)
349             throws ConflictingVersionException, ConfigSnapshotFailureException {
350
351         NetconfOperation operation = findOperation(request, operationService);
352         Document response;
353         try {
354             response = operation.handle(request.getDocument(), NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
355             return NetconfUtil.checkIsMessageOk(response);
356         } catch (NetconfDocumentedException e) {
357             if (e.getCause() instanceof ConflictingVersionException) {
358                 throw (ConflictingVersionException) e.getCause();
359             }
360             throw new ConfigSnapshotFailureException(configIdForReporting, operationNameForReporting, e);
361         }
362     }
363
364     // load editConfig.xml template, populate /rpc/edit-config/config with parameter
365     private static NetconfMessage createEditConfigMessage(Element dataElement) {
366         String editConfigResourcePath = "/netconfOp/editConfig.xml";
367         try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcePath)) {
368             checkNotNull(stream, "Unable to load resource " + editConfigResourcePath);
369
370             Document doc = XmlUtil.readXmlToDocument(stream);
371
372             XmlElement editConfigElement = XmlElement.fromDomDocument(doc).getOnlyChildElement();
373             XmlElement configWrapper = editConfigElement.getOnlyChildElement(XmlNetconfConstants.CONFIG_KEY);
374             editConfigElement.getDomElement().removeChild(configWrapper.getDomElement());
375             for (XmlElement el : XmlElement.fromDomElement(dataElement).getChildElements()) {
376                 boolean deep = true;
377                 configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), deep));
378             }
379             editConfigElement.appendChild(configWrapper.getDomElement());
380             return new NetconfMessage(doc);
381         } catch (IOException | SAXException | NetconfDocumentedException e) {
382             // error reading the xml file bundled into the jar
383             throw new IllegalStateException("Error while opening local resource " + editConfigResourcePath, e);
384         }
385     }
386
387     private static NetconfMessage getCommitMessage() {
388         String resource = "/netconfOp/commit.xml";
389         try (InputStream stream = ConfigPusherImpl.class.getResourceAsStream(resource)) {
390             checkNotNull(stream, "Unable to load resource " + resource);
391             return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
392         } catch (SAXException | IOException e) {
393             // error reading the xml file bundled into the jar
394             throw new IllegalStateException("Error while opening local resource " + resource, e);
395         }
396     }
397
398     static class EditAndCommitResponse {
399         private final Document editResponse, commitResponse;
400
401         EditAndCommitResponse(Document editResponse, Document commitResponse) {
402             this.editResponse = editResponse;
403             this.commitResponse = commitResponse;
404         }
405
406         public Document getEditResponse() {
407             return editResponse;
408         }
409
410         public Document getCommitResponse() {
411             return commitResponse;
412         }
413
414         @Override
415         public String toString() {
416             return "EditAndCommitResponse{" +
417                     "editResponse=" + editResponse +
418                     ", commitResponse=" + commitResponse +
419                     '}';
420         }
421     }
422 }