2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.persist.impl;
11 import static com.google.common.base.Preconditions.checkNotNull;
13 import com.google.common.base.Function;
14 import com.google.common.base.Stopwatch;
15 import com.google.common.collect.Collections2;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.util.Collection;
19 import java.util.HashSet;
20 import java.util.LinkedHashMap;
21 import java.util.List;
22 import java.util.Map.Entry;
24 import java.util.SortedSet;
25 import java.util.TreeMap;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import javax.annotation.concurrent.Immutable;
31 import javax.management.MBeanServerConnection;
32 import org.opendaylight.controller.config.api.ConflictingVersionException;
33 import org.opendaylight.controller.config.persist.api.ConfigPusher;
34 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
35 import org.opendaylight.controller.config.persist.api.Persister;
36 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
37 import org.opendaylight.controller.netconf.api.NetconfMessage;
38 import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
39 import org.opendaylight.controller.netconf.mapping.api.Capability;
40 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
41 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
42 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
43 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
44 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
45 import org.opendaylight.controller.netconf.util.NetconfUtil;
46 import org.opendaylight.controller.netconf.util.xml.XmlElement;
47 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import org.w3c.dom.Document;
51 import org.w3c.dom.Element;
52 import org.xml.sax.SAXException;
55 public class ConfigPusherImpl implements ConfigPusher {
56 private static final Logger LOG = LoggerFactory.getLogger(ConfigPusherImpl.class);
58 private final long maxWaitForCapabilitiesMillis;
59 private final long conflictingVersionTimeoutMillis;
60 private final NetconfOperationServiceFactory configNetconfConnector;
61 private static final int QUEUE_SIZE = 100;
62 private BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<List<? extends ConfigSnapshotHolder>>(QUEUE_SIZE);
64 public ConfigPusherImpl(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
65 long conflictingVersionTimeoutMillis) {
66 this.configNetconfConnector = configNetconfConnector;
67 this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
68 this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
71 public void process(List<AutoCloseable> autoCloseables, MBeanServerConnection platformMBeanServer, Persister persisterAggregator) throws InterruptedException {
72 List<? extends ConfigSnapshotHolder> configs;
74 configs = queue.take();
76 internalPushConfigs(configs);
77 ConfigPersisterNotificationHandler jmxNotificationHandler = new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator);
78 synchronized (autoCloseables) {
79 autoCloseables.add(jmxNotificationHandler);
82 * We have completed initial configuration. At this point
83 * it is good idea to perform garbage collection to prune
84 * any garbage we have accumulated during startup.
86 LOG.debug("Running post-initialization garbage collection...");
88 LOG.debug("Post-initialization garbage collection completed.");
89 LOG.debug("ConfigPusher has pushed configs {}, gc completed", configs);
91 catch (NetconfDocumentedException e) {
92 LOG.error("Error pushing configs {}",configs);
93 throw new IllegalStateException(e);
98 public void pushConfigs(List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
99 LOG.debug("Requested to push configs {}", configs);
100 this.queue.put(configs);
103 private LinkedHashMap<? extends ConfigSnapshotHolder, EditAndCommitResponse> internalPushConfigs(List<? extends ConfigSnapshotHolder> configs) throws NetconfDocumentedException {
104 LOG.debug("Last config snapshots to be pushed to netconf: {}", configs);
105 LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> result = new LinkedHashMap<>();
106 // start pushing snapshots:
107 for (ConfigSnapshotHolder configSnapshotHolder : configs) {
108 if(configSnapshotHolder != null) {
109 EditAndCommitResponse editAndCommitResponseWithRetries = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
110 LOG.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
111 result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
114 LOG.debug("All configuration snapshots have been pushed successfully.");
119 * First calls {@link #getOperationServiceWithRetries(java.util.Set, String)} in order to wait until
120 * expected capabilities are present, then tries to push configuration. If {@link ConflictingVersionException}
121 * is caught, whole process is retried - new service instance need to be obtained from the factory. Closes
122 * {@link NetconfOperationService} after each use.
124 private synchronized EditAndCommitResponse pushConfigWithConflictingVersionRetries(ConfigSnapshotHolder configSnapshotHolder) throws NetconfDocumentedException {
125 ConflictingVersionException lastException;
126 Stopwatch stopwatch = new Stopwatch().start();
128 String idForReporting = configSnapshotHolder.toString();
129 SortedSet<String> expectedCapabilities = checkNotNull(configSnapshotHolder.getCapabilities(),
130 "Expected capabilities must not be null - %s, check %s", idForReporting,
131 configSnapshotHolder.getClass().getName());
132 try (NetconfOperationService operationService = getOperationServiceWithRetries(expectedCapabilities, idForReporting)) {
133 return pushConfig(configSnapshotHolder, operationService);
134 } catch (ConflictingVersionException e) {
136 LOG.debug("Conflicting version detected, will retry after timeout");
139 } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
140 throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
144 private NetconfOperationService getOperationServiceWithRetries(Set<String> expectedCapabilities, String idForReporting) {
145 Stopwatch stopwatch = new Stopwatch().start();
146 NotEnoughCapabilitiesException lastException;
149 return getOperationService(expectedCapabilities, idForReporting);
150 } catch (NotEnoughCapabilitiesException e) {
151 LOG.debug("Not enough capabilities: {}", e.toString());
155 } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
156 throw new IllegalStateException("Max wait for capabilities reached." + lastException.getMessage(), lastException);
159 private static class NotEnoughCapabilitiesException extends Exception {
160 private static final long serialVersionUID = 1L;
162 private NotEnoughCapabilitiesException(String message, Throwable cause) {
163 super(message, cause);
166 private NotEnoughCapabilitiesException(String message) {
172 * Get NetconfOperationService iif all required capabilities are present.
174 * @param expectedCapabilities that must be provided by configNetconfConnector
175 * @param idForReporting
176 * @return service if capabilities are present, otherwise absent value
178 private NetconfOperationService getOperationService(Set<String> expectedCapabilities, String idForReporting) throws NotEnoughCapabilitiesException {
179 NetconfOperationService serviceCandidate;
181 serviceCandidate = configNetconfConnector.createService(idForReporting);
182 } catch(RuntimeException e) {
183 throw new NotEnoughCapabilitiesException("Netconf service not stable for " + idForReporting, e);
185 Set<String> notFoundDiff = computeNotFoundCapabilities(expectedCapabilities, serviceCandidate);
186 if (notFoundDiff.isEmpty()) {
187 return serviceCandidate;
189 serviceCandidate.close();
190 LOG.trace("Netconf server did not provide required capabilities for {} ", idForReporting,
191 "Expected but not found: {}, all expected {}, current {}",
192 notFoundDiff, expectedCapabilities, serviceCandidate.getCapabilities()
194 throw new NotEnoughCapabilitiesException("Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundDiff);
198 private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, NetconfOperationService serviceCandidate) {
199 Collection<String> actual = Collections2.transform(serviceCandidate.getCapabilities(), new Function<Capability, String>() {
201 public String apply(@Nonnull final Capability input) {
202 return input.getCapabilityUri();
205 Set<String> allNotFound = new HashSet<>(expectedCapabilities);
206 allNotFound.removeAll(actual);
212 private void sleep() {
215 } catch (InterruptedException e) {
216 Thread.currentThread().interrupt();
217 throw new IllegalStateException(e);
222 * Sends two RPCs to the netconf server: edit-config and commit.
224 * @param configSnapshotHolder
225 * @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager
226 * @throws java.lang.RuntimeException if edit-config or commit fails otherwise
228 private synchronized EditAndCommitResponse pushConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfOperationService operationService)
229 throws ConflictingVersionException, NetconfDocumentedException {
231 Element xmlToBePersisted;
233 xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
234 } catch (SAXException | IOException e) {
235 throw new IllegalStateException("Cannot parse " + configSnapshotHolder);
237 LOG.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
238 Stopwatch stopwatch = new Stopwatch().start();
239 NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted);
241 Document editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, operationService,
242 "edit-config", configSnapshotHolder.toString());
244 Document commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), operationService,
245 "commit", configSnapshotHolder.toString());
247 if (LOG.isTraceEnabled()) {
248 StringBuilder response = new StringBuilder("editConfig response = {");
249 response.append(XmlUtil.toString(editResponseMessage));
250 response.append("}");
251 response.append("commit response = {");
252 response.append(XmlUtil.toString(commitResponseMessage));
253 response.append("}");
254 LOG.trace("Last configuration loaded successfully");
255 LOG.trace("Detailed message {}", response);
256 LOG.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
258 return new EditAndCommitResponse(editResponseMessage, commitResponseMessage);
261 private NetconfOperation findOperation(NetconfMessage request, NetconfOperationService operationService) throws NetconfDocumentedException {
262 TreeMap<HandlingPriority, NetconfOperation> allOperations = new TreeMap<>();
263 Set<NetconfOperation> netconfOperations = operationService.getNetconfOperations();
264 if (netconfOperations.isEmpty()) {
265 throw new IllegalStateException("Possible code error: no config operations");
267 for (NetconfOperation netconfOperation : netconfOperations) {
268 HandlingPriority handlingPriority = netconfOperation.canHandle(request.getDocument());
269 allOperations.put(handlingPriority, netconfOperation);
271 Entry<HandlingPriority, NetconfOperation> highestEntry = allOperations.lastEntry();
272 if (highestEntry.getKey().isCannotHandle()) {
273 throw new IllegalStateException("Possible code error: operation with highest priority is CANNOT_HANDLE");
275 return highestEntry.getValue();
278 private Document sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfOperationService operationService,
279 String operationNameForReporting, String configIdForReporting)
280 throws ConflictingVersionException, NetconfDocumentedException {
282 NetconfOperation operation = findOperation(request, operationService);
285 response = operation.handle(request.getDocument(), NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
286 } catch (NetconfDocumentedException | RuntimeException e) {
287 if (e instanceof NetconfDocumentedException && e.getCause() instanceof ConflictingVersionException) {
288 throw (ConflictingVersionException) e.getCause();
290 throw new IllegalStateException("Failed to send " + operationNameForReporting +
291 " for configuration " + configIdForReporting, e);
293 return NetconfUtil.checkIsMessageOk(response);
296 // load editConfig.xml template, populate /rpc/edit-config/config with parameter
297 private static NetconfMessage createEditConfigMessage(Element dataElement) throws NetconfDocumentedException {
298 String editConfigResourcePath = "/netconfOp/editConfig.xml";
299 try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcePath)) {
300 checkNotNull(stream, "Unable to load resource " + editConfigResourcePath);
302 Document doc = XmlUtil.readXmlToDocument(stream);
304 XmlElement editConfigElement = XmlElement.fromDomDocument(doc).getOnlyChildElement();
305 XmlElement configWrapper = editConfigElement.getOnlyChildElement(XmlNetconfConstants.CONFIG_KEY);
306 editConfigElement.getDomElement().removeChild(configWrapper.getDomElement());
307 for (XmlElement el : XmlElement.fromDomElement(dataElement).getChildElements()) {
309 configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), deep));
311 editConfigElement.appendChild(configWrapper.getDomElement());
312 return new NetconfMessage(doc);
313 } catch (IOException | SAXException e) {
314 // error reading the xml file bundled into the jar
315 throw new IllegalStateException("Error while opening local resource " + editConfigResourcePath, e);
319 private static NetconfMessage getCommitMessage() {
320 String resource = "/netconfOp/commit.xml";
321 try (InputStream stream = ConfigPusherImpl.class.getResourceAsStream(resource)) {
322 checkNotNull(stream, "Unable to load resource " + resource);
323 return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
324 } catch (SAXException | IOException e) {
325 // error reading the xml file bundled into the jar
326 throw new IllegalStateException("Error while opening local resource " + resource, e);
330 static class EditAndCommitResponse {
331 private final Document editResponse, commitResponse;
333 EditAndCommitResponse(Document editResponse, Document commitResponse) {
334 this.editResponse = editResponse;
335 this.commitResponse = commitResponse;
338 public Document getEditResponse() {
342 public Document getCommitResponse() {
343 return commitResponse;
347 public String toString() {
348 return "EditAndCommitResponse{" +
349 "editResponse=" + editResponse +
350 ", commitResponse=" + commitResponse +