8d85a35bc79ad3f77eb516c14cb23b2003974c13
[controller.git] / opendaylight / netconf / config-persister-impl / src / main / java / org / opendaylight / controller / netconf / persist / impl / ConfigPusher.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.persist.impl;
10
11 import java.io.IOException;
12 import java.io.InputStream;
13 import java.util.Collection;
14 import java.util.HashSet;
15 import java.util.LinkedHashMap;
16 import java.util.List;
17 import java.util.Map.Entry;
18 import java.util.Set;
19 import java.util.TreeMap;
20 import java.util.concurrent.TimeUnit;
21
22 import javax.annotation.concurrent.Immutable;
23
24 import org.opendaylight.controller.config.api.ConflictingVersionException;
25 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
26 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
27 import org.opendaylight.controller.netconf.api.NetconfMessage;
28 import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
29 import org.opendaylight.controller.netconf.mapping.api.Capability;
30 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
31 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
32 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
33 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
34 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
35 import org.opendaylight.controller.netconf.util.NetconfUtil;
36 import org.opendaylight.controller.netconf.util.xml.XmlElement;
37 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.w3c.dom.Document;
41 import org.w3c.dom.Element;
42 import org.xml.sax.SAXException;
43
44 import com.google.common.base.Function;
45 import com.google.common.base.Preconditions;
46 import com.google.common.base.Stopwatch;
47 import com.google.common.collect.Collections2;
48
49 @Immutable
50 public class ConfigPusher {
51     private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class);
52
53     private final long maxWaitForCapabilitiesMillis;
54     private final long conflictingVersionTimeoutMillis;
55     private final NetconfOperationServiceFactory configNetconfConnector;
56
57     public ConfigPusher(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
58                         long conflictingVersionTimeoutMillis) {
59         this.configNetconfConnector = configNetconfConnector;
60         this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
61         this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
62     }
63
64     public synchronized LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> pushConfigs(List<ConfigSnapshotHolder> configs) throws NetconfDocumentedException {
65         logger.debug("Last config snapshots to be pushed to netconf: {}", configs);
66         LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> result = new LinkedHashMap<>();
67         // start pushing snapshots:
68         for (ConfigSnapshotHolder configSnapshotHolder : configs) {
69             EditAndCommitResponse editAndCommitResponseWithRetries = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
70             logger.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
71             result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
72         }
73         logger.debug("All configuration snapshots have been pushed successfully.");
74         return result;
75     }
76
77     /**
78      * First calls {@link #getOperationServiceWithRetries(java.util.Set, String)} in order to wait until
79      * expected capabilities are present, then tries to push configuration. If {@link ConflictingVersionException}
80      * is caught, whole process is retried - new service instance need to be obtained from the factory. Closes
81      * {@link NetconfOperationService} after each use.
82      */
83     private synchronized EditAndCommitResponse pushConfigWithConflictingVersionRetries(ConfigSnapshotHolder configSnapshotHolder) throws NetconfDocumentedException {
84         ConflictingVersionException lastException;
85         Stopwatch stopwatch = new Stopwatch().start();
86         do {
87             try (NetconfOperationService operationService = getOperationServiceWithRetries(configSnapshotHolder.getCapabilities(), configSnapshotHolder.toString())) {
88                 return pushConfig(configSnapshotHolder, operationService);
89             } catch (ConflictingVersionException e) {
90                 lastException = e;
91                 logger.debug("Conflicting version detected, will retry after timeout");
92                 sleep();
93             }
94         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
95         throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
96                 lastException);
97     }
98
99     private NetconfOperationService getOperationServiceWithRetries(Set<String> expectedCapabilities, String idForReporting) {
100         Stopwatch stopwatch = new Stopwatch().start();
101         NotEnoughCapabilitiesException lastException;
102         do {
103             try {
104                 return getOperationService(expectedCapabilities, idForReporting);
105             } catch (NotEnoughCapabilitiesException e) {
106                 logger.debug("Not enough capabilities: " + e.toString());
107                 lastException = e;
108                 sleep();
109             }
110         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
111         throw new IllegalStateException("Max wait for capabilities reached." + lastException.getMessage(), lastException);
112     }
113
114     private static class NotEnoughCapabilitiesException extends Exception {
115         private static final long serialVersionUID = 1L;
116
117         private NotEnoughCapabilitiesException(String message, Throwable cause) {
118             super(message, cause);
119         }
120
121         private NotEnoughCapabilitiesException(String message) {
122             super(message);
123         }
124     }
125
126     /**
127      * Get NetconfOperationService iif all required capabilities are present.
128      *
129      * @param expectedCapabilities that must be provided by configNetconfConnector
130      * @param idForReporting
131      * @return service if capabilities are present, otherwise absent value
132      */
133     private NetconfOperationService getOperationService(Set<String> expectedCapabilities, String idForReporting) throws NotEnoughCapabilitiesException {
134         NetconfOperationService serviceCandidate;
135         try {
136             serviceCandidate = configNetconfConnector.createService(idForReporting);
137         } catch(RuntimeException e) {
138             throw new NotEnoughCapabilitiesException("Netconf service not stable for " + idForReporting, e);
139         }
140         Set<String> notFoundDiff = computeNotFoundCapabilities(expectedCapabilities, serviceCandidate);
141         if (notFoundDiff.isEmpty()) {
142             return serviceCandidate;
143         } else {
144             serviceCandidate.close();
145             logger.trace("Netconf server did not provide required capabilities for {} " +
146                     "Expected but not found: {}, all expected {}, current {}",
147                     idForReporting, notFoundDiff, expectedCapabilities, serviceCandidate.getCapabilities()
148             );
149             throw new NotEnoughCapabilitiesException("Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundDiff);
150         }
151     }
152
153     private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, NetconfOperationService serviceCandidate) {
154         Collection<String> actual = Collections2.transform(serviceCandidate.getCapabilities(), new Function<Capability, String>() {
155             @Override
156             public String apply(Capability input) {
157                 return input.getCapabilityUri();
158             }
159         });
160         Set<String> allNotFound = new HashSet<>(expectedCapabilities);
161         allNotFound.removeAll(actual);
162         return allNotFound;
163     }
164
165
166
167     private void sleep() {
168         try {
169             Thread.sleep(100);
170         } catch (InterruptedException e) {
171             Thread.currentThread().interrupt();
172             throw new IllegalStateException(e);
173         }
174     }
175
176     /**
177      * Sends two RPCs to the netconf server: edit-config and commit.
178      *
179      * @param configSnapshotHolder
180      * @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager
181      * @throws java.lang.RuntimeException  if edit-config or commit fails otherwise
182      */
183     private synchronized EditAndCommitResponse pushConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfOperationService operationService)
184             throws ConflictingVersionException, NetconfDocumentedException {
185
186         Element xmlToBePersisted;
187         try {
188             xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
189         } catch (SAXException | IOException e) {
190             throw new IllegalStateException("Cannot parse " + configSnapshotHolder);
191         }
192         logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
193         Stopwatch stopwatch = new Stopwatch().start();
194         NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted);
195
196         Document editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, operationService,
197                 "edit-config", configSnapshotHolder.toString());
198
199         Document commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), operationService,
200                 "commit", configSnapshotHolder.toString());
201
202         if (logger.isTraceEnabled()) {
203             StringBuilder response = new StringBuilder("editConfig response = {");
204             response.append(XmlUtil.toString(editResponseMessage));
205             response.append("}");
206             response.append("commit response = {");
207             response.append(XmlUtil.toString(commitResponseMessage));
208             response.append("}");
209             logger.trace("Last configuration loaded successfully");
210             logger.trace("Detailed message {}", response);
211             logger.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
212         }
213         return new EditAndCommitResponse(editResponseMessage, commitResponseMessage);
214     }
215
216     private NetconfOperation findOperation(NetconfMessage request, NetconfOperationService operationService) throws NetconfDocumentedException {
217         TreeMap<HandlingPriority, NetconfOperation> allOperations = new TreeMap<>();
218         Set<NetconfOperation> netconfOperations = operationService.getNetconfOperations();
219         if (netconfOperations.isEmpty()) {
220             throw new IllegalStateException("Possible code error: no config operations");
221         }
222         for (NetconfOperation netconfOperation : netconfOperations) {
223             HandlingPriority handlingPriority = netconfOperation.canHandle(request.getDocument());
224             allOperations.put(handlingPriority, netconfOperation);
225         }
226         Entry<HandlingPriority, NetconfOperation> highestEntry = allOperations.lastEntry();
227         if (highestEntry.getKey().isCannotHandle()) {
228             throw new IllegalStateException("Possible code error: operation with highest priority is CANNOT_HANDLE");
229         }
230         return highestEntry.getValue();
231     }
232
233     private Document sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfOperationService operationService,
234                                                      String operationNameForReporting, String configIdForReporting)
235             throws ConflictingVersionException, NetconfDocumentedException {
236
237         NetconfOperation operation = findOperation(request, operationService);
238         Document response;
239         try {
240             response = operation.handle(request.getDocument(), NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
241         } catch (NetconfDocumentedException | RuntimeException e) {
242             if (e instanceof NetconfDocumentedException && e.getCause() instanceof ConflictingVersionException) {
243                 throw (ConflictingVersionException) e.getCause();
244             }
245             throw new IllegalStateException("Failed to send " + operationNameForReporting +
246                     " for configuration " + configIdForReporting, e);
247         }
248         return NetconfUtil.checkIsMessageOk(response);
249     }
250
251     // load editConfig.xml template, populate /rpc/edit-config/config with parameter
252     private static NetconfMessage createEditConfigMessage(Element dataElement) throws NetconfDocumentedException {
253         String editConfigResourcePath = "/netconfOp/editConfig.xml";
254         try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcePath)) {
255             Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcePath);
256
257             Document doc = XmlUtil.readXmlToDocument(stream);
258
259             XmlElement editConfigElement = XmlElement.fromDomDocument(doc).getOnlyChildElement();
260             XmlElement configWrapper = editConfigElement.getOnlyChildElement(XmlNetconfConstants.CONFIG_KEY);
261             editConfigElement.getDomElement().removeChild(configWrapper.getDomElement());
262             for (XmlElement el : XmlElement.fromDomElement(dataElement).getChildElements()) {
263                 boolean deep = true;
264                 configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), deep));
265             }
266             editConfigElement.appendChild(configWrapper.getDomElement());
267             return new NetconfMessage(doc);
268         } catch (IOException | SAXException e) {
269             // error reading the xml file bundled into the jar
270             throw new IllegalStateException("Error while opening local resource " + editConfigResourcePath, e);
271         }
272     }
273
274     private static NetconfMessage getCommitMessage() {
275         String resource = "/netconfOp/commit.xml";
276         try (InputStream stream = ConfigPusher.class.getResourceAsStream(resource)) {
277             Preconditions.checkNotNull(stream, "Unable to load resource " + resource);
278             return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
279         } catch (SAXException | IOException e) {
280             // error reading the xml file bundled into the jar
281             throw new IllegalStateException("Error while opening local resource " + resource, e);
282         }
283     }
284
285     static class EditAndCommitResponse {
286         private final Document editResponse, commitResponse;
287
288         EditAndCommitResponse(Document editResponse, Document commitResponse) {
289             this.editResponse = editResponse;
290             this.commitResponse = commitResponse;
291         }
292
293         public Document getEditResponse() {
294             return editResponse;
295         }
296
297         public Document getCommitResponse() {
298             return commitResponse;
299         }
300
301         @Override
302         public String toString() {
303             return "EditAndCommitResponse{" +
304                     "editResponse=" + editResponse +
305                     ", commitResponse=" + commitResponse +
306                     '}';
307         }
308     }
309 }