Merge "Resolve Bug:593. Persister should communicate via OSGi SR instead of TCP."
[controller.git] / opendaylight / netconf / config-persister-impl / src / main / java / org / opendaylight / controller / netconf / persist / impl / ConfigPusher.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.persist.impl;
10
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Stopwatch;
14 import com.google.common.collect.Collections2;
15 import org.opendaylight.controller.config.api.ConflictingVersionException;
16 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
17 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
18 import org.opendaylight.controller.netconf.api.NetconfMessage;
19 import org.opendaylight.controller.netconf.mapping.api.Capability;
20 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
21 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
22 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
23 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
24 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
25 import org.opendaylight.controller.netconf.util.NetconfUtil;
26 import org.opendaylight.controller.netconf.util.xml.XmlElement;
27 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
28 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.w3c.dom.Document;
32 import org.w3c.dom.Element;
33 import org.xml.sax.SAXException;
34
35 import javax.annotation.concurrent.Immutable;
36 import java.io.IOException;
37 import java.io.InputStream;
38 import java.util.Collection;
39 import java.util.HashSet;
40 import java.util.LinkedHashMap;
41 import java.util.List;
42 import java.util.Map.Entry;
43 import java.util.Set;
44 import java.util.TreeMap;
45 import java.util.concurrent.TimeUnit;
46
47 @Immutable
48 public class ConfigPusher {
49     private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class);
50
51     private final long maxWaitForCapabilitiesMillis;
52     private final long conflictingVersionTimeoutMillis;
53     private final NetconfOperationServiceFactory configNetconfConnector;
54
55     public ConfigPusher(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
56                         long conflictingVersionTimeoutMillis) {
57         this.configNetconfConnector = configNetconfConnector;
58         this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
59         this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
60     }
61
62     public synchronized LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> pushConfigs(List<ConfigSnapshotHolder> configs) {
63         logger.debug("Last config snapshots to be pushed to netconf: {}", configs);
64         LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> result = new LinkedHashMap<>();
65         // start pushing snapshots:
66         for (ConfigSnapshotHolder configSnapshotHolder : configs) {
67             EditAndCommitResponse editAndCommitResponseWithRetries = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
68             logger.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
69             result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
70         }
71         logger.debug("All configuration snapshots have been pushed successfully.");
72         return result;
73     }
74
75     /**
76      * First calls {@link #getOperationServiceWithRetries(java.util.Set, String)} in order to wait until
77      * expected capabilities are present, then tries to push configuration. If {@link ConflictingVersionException}
78      * is caught, whole process is retried - new service instance need to be obtained from the factory. Closes
79      * {@link NetconfOperationService} after each use.
80      */
81     private synchronized EditAndCommitResponse pushConfigWithConflictingVersionRetries(ConfigSnapshotHolder configSnapshotHolder) {
82         ConflictingVersionException lastException;
83         Stopwatch stopwatch = new Stopwatch().start();
84         do {
85             try (NetconfOperationService operationService = getOperationServiceWithRetries(configSnapshotHolder.getCapabilities(), configSnapshotHolder.toString())) {
86                 return pushConfig(configSnapshotHolder, operationService);
87             } catch (ConflictingVersionException e) {
88                 lastException = e;
89                 logger.debug("Conflicting version detected, will retry after timeout");
90                 sleep();
91             }
92         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
93         throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
94                 lastException);
95     }
96
97     private NetconfOperationService getOperationServiceWithRetries(Set<String> expectedCapabilities, String idForReporting) {
98         Stopwatch stopwatch = new Stopwatch().start();
99         NotEnoughCapabilitiesException lastException;
100         do {
101             try {
102                 return getOperationService(expectedCapabilities, idForReporting);
103             } catch (NotEnoughCapabilitiesException e) {
104                 logger.debug("Not enough capabilities: " + e.toString());
105                 lastException = e;
106                 sleep();
107             }
108         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
109         throw new IllegalStateException("Max wait for capabilities reached." + lastException.getMessage(), lastException);
110     }
111
112     private static class NotEnoughCapabilitiesException extends Exception {
113         private NotEnoughCapabilitiesException(String message) {
114             super(message);
115         }
116     }
117
118     /**
119      * Get NetconfOperationService iif all required capabilities are present.
120      *
121      * @param expectedCapabilities that must be provided by configNetconfConnector
122      * @param idForReporting
123      * @return service if capabilities are present, otherwise absent value
124      */
125     private NetconfOperationService getOperationService(Set<String> expectedCapabilities, String idForReporting) throws NotEnoughCapabilitiesException {
126         NetconfOperationService serviceCandidate = configNetconfConnector.createService(idForReporting);
127         Set<String> notFoundDiff = computeNotFoundCapabilities(expectedCapabilities, serviceCandidate);
128         if (notFoundDiff.isEmpty()) {
129             return serviceCandidate;
130         } else {
131             serviceCandidate.close();
132             logger.debug("Netconf server did not provide required capabilities for {} " +
133                             "Expected but not found: {}, all expected {}, current {}",
134                     idForReporting, notFoundDiff, expectedCapabilities, serviceCandidate.getCapabilities()
135             );
136             throw new NotEnoughCapabilitiesException("Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundDiff);
137         }
138     }
139
140     private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, NetconfOperationService serviceCandidate) {
141         Collection<String> actual = Collections2.transform(serviceCandidate.getCapabilities(), new Function<Capability, String>() {
142             @Override
143             public String apply(Capability input) {
144                 return input.getCapabilityUri();
145             }
146         });
147         Set<String> allNotFound = new HashSet<>(expectedCapabilities);
148         allNotFound.removeAll(actual);
149         return allNotFound;
150     }
151
152
153
154     private void sleep() {
155         try {
156             Thread.sleep(100);
157         } catch (InterruptedException e) {
158             Thread.currentThread().interrupt();
159             throw new IllegalStateException(e);
160         }
161     }
162
163     /**
164      * Sends two RPCs to the netconf server: edit-config and commit.
165      *
166      * @param configSnapshotHolder
167      * @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager
168      * @throws java.lang.RuntimeException  if edit-config or commit fails otherwise
169      */
170     private synchronized EditAndCommitResponse pushConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfOperationService operationService)
171             throws ConflictingVersionException {
172
173         Element xmlToBePersisted;
174         try {
175             xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
176         } catch (SAXException | IOException e) {
177             throw new IllegalStateException("Cannot parse " + configSnapshotHolder);
178         }
179         logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
180         Stopwatch stopwatch = new Stopwatch().start();
181         NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted);
182
183         Document editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, operationService,
184                 "edit-config", configSnapshotHolder.toString());
185
186         Document commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), operationService,
187                 "commit", configSnapshotHolder.toString());
188
189         if (logger.isTraceEnabled()) {
190             StringBuilder response = new StringBuilder("editConfig response = {");
191             response.append(XmlUtil.toString(editResponseMessage));
192             response.append("}");
193             response.append("commit response = {");
194             response.append(XmlUtil.toString(commitResponseMessage));
195             response.append("}");
196             logger.trace("Last configuration loaded successfully");
197             logger.trace("Detailed message {}", response);
198             logger.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
199         }
200         return new EditAndCommitResponse(editResponseMessage, commitResponseMessage);
201     }
202
203     private NetconfOperation findOperation(NetconfMessage request, NetconfOperationService operationService) {
204         TreeMap<HandlingPriority, NetconfOperation> allOperations = new TreeMap<>();
205         Set<NetconfOperation> netconfOperations = operationService.getNetconfOperations();
206         if (netconfOperations.isEmpty()) {
207             throw new IllegalStateException("Possible code error: no config operations");
208         }
209         for (NetconfOperation netconfOperation : netconfOperations) {
210             HandlingPriority handlingPriority = netconfOperation.canHandle(request.getDocument());
211             allOperations.put(handlingPriority, netconfOperation);
212         }
213         Entry<HandlingPriority, NetconfOperation> highestEntry = allOperations.lastEntry();
214         if (highestEntry.getKey().isCannotHandle()) {
215             throw new IllegalStateException("Possible code error: operation with highest priority is CANNOT_HANDLE");
216         }
217         return highestEntry.getValue();
218     }
219
220     private Document sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfOperationService operationService,
221                                                      String operationNameForReporting, String configIdForReporting)
222             throws ConflictingVersionException {
223
224         NetconfOperation operation = findOperation(request, operationService);
225         Document response;
226         try {
227             response = operation.handle(request.getDocument(), NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
228         } catch (NetconfDocumentedException | RuntimeException e) {
229             throw new IllegalStateException("Failed to send " + operationNameForReporting +
230                     " for configuration " + configIdForReporting, e);
231         }
232         try {
233             return NetconfUtil.checkIsMessageOk(response);
234         } catch (ConflictingVersionException e) {
235             logger.trace("conflicting version detected: {} while committing {}", e.toString(), configIdForReporting);
236             throw e;
237         }
238     }
239
240     // load editConfig.xml template, populate /rpc/edit-config/config with parameter
241     private static NetconfMessage createEditConfigMessage(Element dataElement) {
242         String editConfigResourcePath = "/netconfOp/editConfig.xml";
243         try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcePath)) {
244             Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcePath);
245
246             Document doc = XmlUtil.readXmlToDocument(stream);
247
248             XmlElement editConfigElement = XmlElement.fromDomDocument(doc).getOnlyChildElement();
249             XmlElement configWrapper = editConfigElement.getOnlyChildElement(XmlNetconfConstants.CONFIG_KEY);
250             editConfigElement.getDomElement().removeChild(configWrapper.getDomElement());
251             for (XmlElement el : XmlElement.fromDomElement(dataElement).getChildElements()) {
252                 boolean deep = true;
253                 configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), deep));
254             }
255             editConfigElement.appendChild(configWrapper.getDomElement());
256             return new NetconfMessage(doc);
257         } catch (IOException | SAXException e) {
258             // error reading the xml file bundled into the jar
259             throw new IllegalStateException("Error while opening local resource " + editConfigResourcePath, e);
260         }
261     }
262
263     private static NetconfMessage getCommitMessage() {
264         String resource = "/netconfOp/commit.xml";
265         try (InputStream stream = ConfigPusher.class.getResourceAsStream(resource)) {
266             Preconditions.checkNotNull(stream, "Unable to load resource " + resource);
267             return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
268         } catch (SAXException | IOException e) {
269             // error reading the xml file bundled into the jar
270             throw new IllegalStateException("Error while opening local resource " + resource, e);
271         }
272     }
273
274     static class EditAndCommitResponse {
275         private final Document editResponse, commitResponse;
276
277         EditAndCommitResponse(Document editResponse, Document commitResponse) {
278             this.editResponse = editResponse;
279             this.commitResponse = commitResponse;
280         }
281
282         public Document getEditResponse() {
283             return editResponse;
284         }
285
286         public Document getCommitResponse() {
287             return commitResponse;
288         }
289
290         @Override
291         public String toString() {
292             return "EditAndCommitResponse{" +
293                     "editResponse=" + editResponse +
294                     ", commitResponse=" + commitResponse +
295                     '}';
296         }
297     }
298 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.