0e179ad7d58c92e019c9a8c98c6d1bd8d8563d78
[controller.git] / opendaylight / netconf / config-persister-impl / src / main / java / org / opendaylight / controller / netconf / persist / impl / ConfigPusherImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.persist.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12
13 import com.google.common.base.Function;
14 import com.google.common.base.Stopwatch;
15 import com.google.common.collect.Collections2;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.util.Collection;
19 import java.util.HashSet;
20 import java.util.LinkedHashMap;
21 import java.util.List;
22 import java.util.Map.Entry;
23 import java.util.Set;
24 import java.util.SortedSet;
25 import java.util.TreeMap;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import javax.annotation.concurrent.Immutable;
31 import javax.management.MBeanServerConnection;
32 import org.opendaylight.controller.config.api.ConflictingVersionException;
33 import org.opendaylight.controller.config.persist.api.ConfigPusher;
34 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
35 import org.opendaylight.controller.config.persist.api.Persister;
36 import org.opendaylight.controller.netconf.api.Capability;
37 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
38 import org.opendaylight.controller.netconf.api.NetconfMessage;
39 import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
40 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
41 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
42 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
43 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
44 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
45 import org.opendaylight.controller.netconf.util.NetconfUtil;
46 import org.opendaylight.controller.netconf.util.xml.XmlElement;
47 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import org.w3c.dom.Document;
51 import org.w3c.dom.Element;
52 import org.xml.sax.SAXException;
53
54 @Immutable
55 public class ConfigPusherImpl implements ConfigPusher {
56     private static final Logger LOG = LoggerFactory.getLogger(ConfigPusherImpl.class);
57
58     private final long maxWaitForCapabilitiesMillis;
59     private final long conflictingVersionTimeoutMillis;
60     private final NetconfOperationServiceFactory configNetconfConnector;
61     private static final int QUEUE_SIZE = 100;
62     private BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<List<? extends ConfigSnapshotHolder>>(QUEUE_SIZE);
63
64     public ConfigPusherImpl(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
65                         long conflictingVersionTimeoutMillis) {
66         this.configNetconfConnector = configNetconfConnector;
67         this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
68         this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
69     }
70
71     public void process(List<AutoCloseable> autoCloseables, MBeanServerConnection platformMBeanServer, Persister persisterAggregator) throws InterruptedException {
72         List<? extends ConfigSnapshotHolder> configs;
73         while(true) {
74             configs = queue.take();
75             try {
76                 internalPushConfigs(configs);
77                 ConfigPersisterNotificationHandler jmxNotificationHandler = new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator);
78                 synchronized (autoCloseables) {
79                     autoCloseables.add(jmxNotificationHandler);
80                 }
81
82                 LOG.debug("ConfigPusher has pushed configs {}", configs);
83             } catch (NetconfDocumentedException e) {
84                 LOG.error("Error pushing configs {}",configs);
85                 throw new IllegalStateException(e);
86             }
87         }
88     }
89
90     public void pushConfigs(List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
91         LOG.debug("Requested to push configs {}", configs);
92         this.queue.put(configs);
93     }
94
95     private LinkedHashMap<? extends ConfigSnapshotHolder, EditAndCommitResponse> internalPushConfigs(List<? extends ConfigSnapshotHolder> configs) throws NetconfDocumentedException {
96         LOG.debug("Last config snapshots to be pushed to netconf: {}", configs);
97         LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> result = new LinkedHashMap<>();
98         // start pushing snapshots:
99         for (ConfigSnapshotHolder configSnapshotHolder : configs) {
100             if(configSnapshotHolder != null) {
101                 EditAndCommitResponse editAndCommitResponseWithRetries = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
102                 LOG.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
103                 result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
104             }
105         }
106         LOG.debug("All configuration snapshots have been pushed successfully.");
107         return result;
108     }
109
110     /**
111      * First calls {@link #getOperationServiceWithRetries(java.util.Set, String)} in order to wait until
112      * expected capabilities are present, then tries to push configuration. If {@link ConflictingVersionException}
113      * is caught, whole process is retried - new service instance need to be obtained from the factory. Closes
114      * {@link NetconfOperationService} after each use.
115      */
116     private synchronized EditAndCommitResponse pushConfigWithConflictingVersionRetries(ConfigSnapshotHolder configSnapshotHolder) throws NetconfDocumentedException {
117         ConflictingVersionException lastException;
118         Stopwatch stopwatch = Stopwatch.createUnstarted();
119         do {
120             String idForReporting = configSnapshotHolder.toString();
121             SortedSet<String> expectedCapabilities = checkNotNull(configSnapshotHolder.getCapabilities(),
122                     "Expected capabilities must not be null - %s, check %s", idForReporting,
123                     configSnapshotHolder.getClass().getName());
124             try (NetconfOperationService operationService = getOperationServiceWithRetries(expectedCapabilities, idForReporting)) {
125                 if(!stopwatch.isRunning()) {
126                     stopwatch.start();
127                 }
128                 return pushConfig(configSnapshotHolder, operationService);
129             } catch (ConflictingVersionException e) {
130                 lastException = e;
131                 LOG.info("Conflicting version detected, will retry after timeout");
132                 sleep();
133             }
134         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
135         throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
136                 lastException);
137     }
138
139     private NetconfOperationService getOperationServiceWithRetries(Set<String> expectedCapabilities, String idForReporting) {
140         Stopwatch stopwatch = Stopwatch.createStarted();
141         ConfigPusherException lastException;
142         do {
143             try {
144                 return getOperationService(expectedCapabilities, idForReporting);
145             } catch (ConfigPusherException e) {
146                 LOG.debug("Not enough capabilities: {}", e.toString());
147                 lastException = e;
148                 sleep();
149             }
150         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
151
152         if(lastException instanceof NotEnoughCapabilitiesException) {
153             LOG.error("Unable to push configuration due to missing yang models." +
154                             " Yang models that are missing, but required by the configuration: {}." +
155                             " For each mentioned model check: " +
156                             " 1. that the mentioned yang model namespace/name/revision is identical to those in the yang model itself" +
157                             " 2. the yang file is present in the system" +
158                             " 3. the bundle with that yang file is present in the system and active" +
159                             " 4. the yang parser did not fail while attempting to parse that model",
160                     ((NotEnoughCapabilitiesException) lastException).getMissingCaps());
161             throw new IllegalStateException("Unable to push configuration due to missing yang models." +
162                     " Required yang models that are missing: "
163                     + ((NotEnoughCapabilitiesException) lastException).getMissingCaps(), lastException);
164         } else {
165             final String msg = "Unable to push configuration due to missing netconf service";
166             LOG.error(msg, lastException);
167             throw new IllegalStateException(msg, lastException);
168         }
169     }
170
171     private static class ConfigPusherException extends Exception {
172
173         public ConfigPusherException(final String message) {
174             super(message);
175         }
176
177         public ConfigPusherException(final String message, final Throwable cause) {
178             super(message, cause);
179         }
180     }
181
182     private static class NotEnoughCapabilitiesException extends ConfigPusherException {
183         private static final long serialVersionUID = 1L;
184         private Set<String> missingCaps;
185
186         private NotEnoughCapabilitiesException(String message, Set<String> missingCaps) {
187             super(message);
188             this.missingCaps = missingCaps;
189         }
190
191         public Set<String> getMissingCaps() {
192             return missingCaps;
193         }
194     }
195
196     private static final class NetconfServiceNotAvailableException extends ConfigPusherException {
197
198         public NetconfServiceNotAvailableException(final String s, final RuntimeException e) {
199             super(s, e);
200         }
201     }
202
203     /**
204      * Get NetconfOperationService iif all required capabilities are present.
205      *
206      * @param expectedCapabilities that must be provided by configNetconfConnector
207      * @param idForReporting
208      * @return service if capabilities are present, otherwise absent value
209      */
210     private NetconfOperationService getOperationService(Set<String> expectedCapabilities, String idForReporting) throws ConfigPusherException {
211         NetconfOperationService serviceCandidate;
212         try {
213             serviceCandidate = configNetconfConnector.createService(idForReporting);
214         } catch(RuntimeException e) {
215             throw new NetconfServiceNotAvailableException("Netconf service not stable for config pusher." +
216                     " Cannot push any configuration", e);
217         }
218         Set<String> notFoundDiff = computeNotFoundCapabilities(expectedCapabilities, configNetconfConnector);
219         if (notFoundDiff.isEmpty()) {
220             return serviceCandidate;
221         } else {
222             serviceCandidate.close();
223             LOG.debug("Netconf server did not provide required capabilities for {} ", idForReporting,
224                     "Expected but not found: {}, all expected {}, current {}",
225                      notFoundDiff, expectedCapabilities, configNetconfConnector.getCapabilities()
226             );
227             throw new NotEnoughCapabilitiesException("Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundDiff, notFoundDiff);
228         }
229     }
230
231     private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, NetconfOperationServiceFactory serviceCandidate) {
232         Collection<String> actual = Collections2.transform(serviceCandidate.getCapabilities(), new Function<Capability, String>() {
233             @Override
234             public String apply(@Nonnull final Capability input) {
235                 return input.getCapabilityUri();
236             }
237         });
238         Set<String> allNotFound = new HashSet<>(expectedCapabilities);
239         allNotFound.removeAll(actual);
240         return allNotFound;
241     }
242
243     private void sleep() {
244         try {
245             Thread.sleep(100);
246         } catch (InterruptedException e) {
247             Thread.currentThread().interrupt();
248             throw new IllegalStateException(e);
249         }
250     }
251
252     /**
253      * Sends two RPCs to the netconf server: edit-config and commit.
254      *
255      * @param configSnapshotHolder
256      * @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager
257      * @throws java.lang.RuntimeException  if edit-config or commit fails otherwise
258      */
259     private synchronized EditAndCommitResponse pushConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfOperationService operationService)
260             throws ConflictingVersionException, NetconfDocumentedException {
261
262         Element xmlToBePersisted;
263         try {
264             xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
265         } catch (SAXException | IOException e) {
266             throw new IllegalStateException("Cannot parse " + configSnapshotHolder);
267         }
268         LOG.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
269         Stopwatch stopwatch = Stopwatch.createStarted();
270         NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted);
271
272         Document editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, operationService,
273                 "edit-config", configSnapshotHolder.toString());
274
275         Document commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), operationService,
276                 "commit", configSnapshotHolder.toString());
277
278         if (LOG.isTraceEnabled()) {
279             StringBuilder response = new StringBuilder("editConfig response = {");
280             response.append(XmlUtil.toString(editResponseMessage));
281             response.append("}");
282             response.append("commit response = {");
283             response.append(XmlUtil.toString(commitResponseMessage));
284             response.append("}");
285             LOG.trace("Last configuration loaded successfully");
286             LOG.trace("Detailed message {}", response);
287             LOG.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
288         }
289         return new EditAndCommitResponse(editResponseMessage, commitResponseMessage);
290     }
291
292     private NetconfOperation findOperation(NetconfMessage request, NetconfOperationService operationService) throws NetconfDocumentedException {
293         TreeMap<HandlingPriority, NetconfOperation> allOperations = new TreeMap<>();
294         Set<NetconfOperation> netconfOperations = operationService.getNetconfOperations();
295         if (netconfOperations.isEmpty()) {
296             throw new IllegalStateException("Possible code error: no config operations");
297         }
298         for (NetconfOperation netconfOperation : netconfOperations) {
299             HandlingPriority handlingPriority = netconfOperation.canHandle(request.getDocument());
300             allOperations.put(handlingPriority, netconfOperation);
301         }
302         Entry<HandlingPriority, NetconfOperation> highestEntry = allOperations.lastEntry();
303         if (highestEntry.getKey().isCannotHandle()) {
304             throw new IllegalStateException("Possible code error: operation with highest priority is CANNOT_HANDLE");
305         }
306         return highestEntry.getValue();
307     }
308
309     private Document sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfOperationService operationService,
310                                                      String operationNameForReporting, String configIdForReporting)
311             throws ConflictingVersionException, NetconfDocumentedException {
312
313         NetconfOperation operation = findOperation(request, operationService);
314         Document response;
315         try {
316             response = operation.handle(request.getDocument(), NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
317         } catch (NetconfDocumentedException | RuntimeException e) {
318             if (e instanceof NetconfDocumentedException && e.getCause() instanceof ConflictingVersionException) {
319                 throw (ConflictingVersionException) e.getCause();
320             }
321             throw new IllegalStateException("Failed to send " + operationNameForReporting +
322                     " for configuration " + configIdForReporting, e);
323         }
324         return NetconfUtil.checkIsMessageOk(response);
325     }
326
327     // load editConfig.xml template, populate /rpc/edit-config/config with parameter
328     private static NetconfMessage createEditConfigMessage(Element dataElement) throws NetconfDocumentedException {
329         String editConfigResourcePath = "/netconfOp/editConfig.xml";
330         try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcePath)) {
331             checkNotNull(stream, "Unable to load resource " + editConfigResourcePath);
332
333             Document doc = XmlUtil.readXmlToDocument(stream);
334
335             XmlElement editConfigElement = XmlElement.fromDomDocument(doc).getOnlyChildElement();
336             XmlElement configWrapper = editConfigElement.getOnlyChildElement(XmlNetconfConstants.CONFIG_KEY);
337             editConfigElement.getDomElement().removeChild(configWrapper.getDomElement());
338             for (XmlElement el : XmlElement.fromDomElement(dataElement).getChildElements()) {
339                 boolean deep = true;
340                 configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), deep));
341             }
342             editConfigElement.appendChild(configWrapper.getDomElement());
343             return new NetconfMessage(doc);
344         } catch (IOException | SAXException e) {
345             // error reading the xml file bundled into the jar
346             throw new IllegalStateException("Error while opening local resource " + editConfigResourcePath, e);
347         }
348     }
349
350     private static NetconfMessage getCommitMessage() {
351         String resource = "/netconfOp/commit.xml";
352         try (InputStream stream = ConfigPusherImpl.class.getResourceAsStream(resource)) {
353             checkNotNull(stream, "Unable to load resource " + resource);
354             return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
355         } catch (SAXException | IOException e) {
356             // error reading the xml file bundled into the jar
357             throw new IllegalStateException("Error while opening local resource " + resource, e);
358         }
359     }
360
361     static class EditAndCommitResponse {
362         private final Document editResponse, commitResponse;
363
364         EditAndCommitResponse(Document editResponse, Document commitResponse) {
365             this.editResponse = editResponse;
366             this.commitResponse = commitResponse;
367         }
368
369         public Document getEditResponse() {
370             return editResponse;
371         }
372
373         public Document getCommitResponse() {
374             return commitResponse;
375         }
376
377         @Override
378         public String toString() {
379             return "EditAndCommitResponse{" +
380                     "editResponse=" + editResponse +
381                     ", commitResponse=" + commitResponse +
382                     '}';
383         }
384     }
385 }