2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.persist.impl;
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Function;
14 import com.google.common.base.Stopwatch;
15 import com.google.common.collect.Collections2;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.util.Collection;
19 import java.util.HashSet;
20 import java.util.LinkedHashMap;
21 import java.util.List;
22 import java.util.Map.Entry;
24 import java.util.SortedSet;
25 import java.util.TreeMap;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import javax.annotation.concurrent.Immutable;
31 import javax.management.MBeanServerConnection;
32 import org.opendaylight.controller.config.api.ConflictingVersionException;
33 import org.opendaylight.controller.config.persist.api.ConfigPusher;
34 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
35 import org.opendaylight.controller.config.persist.api.Persister;
36 import org.opendaylight.controller.netconf.api.Capability;
37 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
38 import org.opendaylight.controller.netconf.api.NetconfMessage;
39 import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
40 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
41 import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
42 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
43 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
44 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
45 import org.opendaylight.controller.netconf.util.NetconfUtil;
46 import org.opendaylight.controller.netconf.util.xml.XmlElement;
47 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import org.w3c.dom.Document;
51 import org.w3c.dom.Element;
52 import org.xml.sax.SAXException;
55 public class ConfigPusherImpl implements ConfigPusher {
56 private static final Logger LOG = LoggerFactory.getLogger(ConfigPusherImpl.class);
58 private final long maxWaitForCapabilitiesMillis;
59 private final long conflictingVersionTimeoutMillis;
60 private final NetconfOperationServiceFactory configNetconfConnector;
61 private static final int QUEUE_SIZE = 100;
62 private final BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
64 public ConfigPusherImpl(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
65 long conflictingVersionTimeoutMillis) {
66 this.configNetconfConnector = configNetconfConnector;
67 this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
68 this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
71 public void process(List<AutoCloseable> autoCloseables, MBeanServerConnection platformMBeanServer, Persister persisterAggregator) throws InterruptedException {
72 List<? extends ConfigSnapshotHolder> configs;
74 configs = queue.take();
77 internalPushConfigs(configs);
78 ConfigPersisterNotificationHandler jmxNotificationHandler = new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator);
79 synchronized (autoCloseables) {
80 autoCloseables.add(jmxNotificationHandler);
83 LOG.debug("ConfigPusher has pushed configs {}", configs);
84 } catch (Exception e) {
85 LOG.debug("Failed to push some of configs: {}", configs, e);
92 public void pushConfigs(List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
93 LOG.debug("Requested to push configs {}", configs);
94 this.queue.put(configs);
97 private LinkedHashMap<? extends ConfigSnapshotHolder, EditAndCommitResponse> internalPushConfigs(List<? extends ConfigSnapshotHolder> configs) throws NetconfDocumentedException {
98 LOG.debug("Last config snapshots to be pushed to netconf: {}", configs);
99 LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> result = new LinkedHashMap<>();
100 // start pushing snapshots:
101 for (ConfigSnapshotHolder configSnapshotHolder : configs) {
102 if(configSnapshotHolder != null) {
103 LOG.info("Pushing configuration snapshot {}", configSnapshotHolder);
104 EditAndCommitResponse editAndCommitResponseWithRetries = null;
106 editAndCommitResponseWithRetries = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
107 } catch (ConfigSnapshotFailureException e) {
108 LOG.error("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " +
109 "for detailed information see enclosed exception.", e.getConfigIdForReporting(), e);
110 onFailedConfigPush("Failed to apply configuration snapshot " + e.getConfigIdForReporting(), e);
111 } catch (Exception e) {
112 LOG.error("Failed to apply configuration snapshot: {}", configSnapshotHolder, e);
113 onFailedConfigPush("Failed to apply configuration snapshot " + configSnapshotHolder, e);
116 LOG.info("Successfully pushed configuration snapshot {}", configSnapshotHolder);
117 result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
120 LOG.debug("All configuration snapshots have been pushed successfully.");
125 protected void onFailedConfigPush(String message, Exception cause) {
126 throw new IllegalStateException(message, cause);
130 * First calls {@link #getOperationServiceWithRetries(java.util.Set, String)} in order to wait until
131 * expected capabilities are present, then tries to push configuration. If {@link ConflictingVersionException}
132 * is caught, whole process is retried - new service instance need to be obtained from the factory. Closes
133 * {@link NetconfOperationService} after each use.
135 private synchronized EditAndCommitResponse pushConfigWithConflictingVersionRetries(ConfigSnapshotHolder configSnapshotHolder) throws ConfigSnapshotFailureException {
136 ConflictingVersionException lastException;
137 Stopwatch stopwatch = Stopwatch.createUnstarted();
139 String idForReporting = configSnapshotHolder.toString();
140 SortedSet<String> expectedCapabilities = checkNotNull(configSnapshotHolder.getCapabilities(),
141 "Expected capabilities must not be null - %s, check %s", idForReporting,
142 configSnapshotHolder.getClass().getName());
143 try (NetconfOperationService operationService = getOperationServiceWithRetries(expectedCapabilities, idForReporting)) {
144 if(!stopwatch.isRunning()) {
147 return pushConfig(configSnapshotHolder, operationService);
148 } catch (ConflictingVersionException e) {
150 LOG.info("Conflicting version detected, will retry after timeout");
153 } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
154 throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
158 private NetconfOperationService getOperationServiceWithRetries(Set<String> expectedCapabilities, String idForReporting) {
159 Stopwatch stopwatch = Stopwatch.createStarted();
160 ConfigPusherException lastException;
163 return getOperationService(expectedCapabilities, idForReporting);
164 } catch (ConfigPusherException e) {
165 LOG.debug("Not enough capabilities: {}", e.toString());
169 } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
171 if(lastException instanceof NotEnoughCapabilitiesException) {
172 LOG.error("Unable to push configuration due to missing yang models." +
173 " Yang models that are missing, but required by the configuration: {}." +
174 " For each mentioned model check: " +
175 " 1. that the mentioned yang model namespace/name/revision is identical to those in the yang model itself" +
176 " 2. the yang file is present in the system" +
177 " 3. the bundle with that yang file is present in the system and active" +
178 " 4. the yang parser did not fail while attempting to parse that model",
179 ((NotEnoughCapabilitiesException) lastException).getMissingCaps());
180 throw new IllegalStateException("Unable to push configuration due to missing yang models." +
181 " Required yang models that are missing: "
182 + ((NotEnoughCapabilitiesException) lastException).getMissingCaps(), lastException);
184 final String msg = "Unable to push configuration due to missing netconf service";
185 LOG.error(msg, lastException);
186 throw new IllegalStateException(msg, lastException);
190 private static class ConfigPusherException extends Exception {
192 public ConfigPusherException(final String message) {
196 public ConfigPusherException(final String message, final Throwable cause) {
197 super(message, cause);
201 private static class NotEnoughCapabilitiesException extends ConfigPusherException {
202 private static final long serialVersionUID = 1L;
203 private final Set<String> missingCaps;
205 private NotEnoughCapabilitiesException(String message, Set<String> missingCaps) {
207 this.missingCaps = missingCaps;
210 public Set<String> getMissingCaps() {
215 private static final class NetconfServiceNotAvailableException extends ConfigPusherException {
217 public NetconfServiceNotAvailableException(final String s, final RuntimeException e) {
222 private static final class ConfigSnapshotFailureException extends ConfigPusherException {
224 private final String configIdForReporting;
226 public ConfigSnapshotFailureException(final String configIdForReporting, final String operationNameForReporting, final Exception e) {
227 super(String.format("Failed to apply config snapshot: %s during phase: %s", configIdForReporting, operationNameForReporting), e);
228 this.configIdForReporting = configIdForReporting;
231 public String getConfigIdForReporting() {
232 return configIdForReporting;
237 * Get NetconfOperationService iif all required capabilities are present.
239 * @param expectedCapabilities that must be provided by configNetconfConnector
240 * @param idForReporting
241 * @return service if capabilities are present, otherwise absent value
243 private NetconfOperationService getOperationService(Set<String> expectedCapabilities, String idForReporting) throws ConfigPusherException {
244 NetconfOperationService serviceCandidate;
246 serviceCandidate = configNetconfConnector.createService(idForReporting);
247 } catch(RuntimeException e) {
248 throw new NetconfServiceNotAvailableException("Netconf service not stable for config pusher." +
249 " Cannot push any configuration", e);
251 Set<String> notFoundDiff = computeNotFoundCapabilities(expectedCapabilities, configNetconfConnector);
252 if (notFoundDiff.isEmpty()) {
253 return serviceCandidate;
255 serviceCandidate.close();
256 LOG.debug("Netconf server did not provide required capabilities for {} ", idForReporting,
257 "Expected but not found: {}, all expected {}, current {}",
258 notFoundDiff, expectedCapabilities, configNetconfConnector.getCapabilities()
260 throw new NotEnoughCapabilitiesException("Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundDiff, notFoundDiff);
264 private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, NetconfOperationServiceFactory serviceCandidate) {
265 Collection<String> actual = Collections2.transform(serviceCandidate.getCapabilities(), new Function<Capability, String>() {
267 public String apply(@Nonnull final Capability input) {
268 return input.getCapabilityUri();
271 Set<String> allNotFound = new HashSet<>(expectedCapabilities);
272 allNotFound.removeAll(actual);
276 private void sleep() {
279 } catch (InterruptedException e) {
280 Thread.currentThread().interrupt();
281 throw new IllegalStateException(e);
286 * Sends two RPCs to the netconf server: edit-config and commit.
288 * @param configSnapshotHolder
289 * @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager
290 * @throws java.lang.RuntimeException if edit-config or commit fails otherwise
292 private synchronized EditAndCommitResponse pushConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfOperationService operationService)
293 throws ConflictingVersionException, ConfigSnapshotFailureException {
295 Element xmlToBePersisted;
297 xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
298 } catch (SAXException | IOException e) {
299 throw new IllegalStateException("Cannot parse " + configSnapshotHolder);
301 LOG.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
302 Stopwatch stopwatch = Stopwatch.createStarted();
303 NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted);
305 Document editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, operationService,
306 "edit-config", configSnapshotHolder.toString());
308 Document commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), operationService,
309 "commit", configSnapshotHolder.toString());
311 if (LOG.isTraceEnabled()) {
312 StringBuilder response = new StringBuilder("editConfig response = {");
313 response.append(XmlUtil.toString(editResponseMessage));
314 response.append("}");
315 response.append("commit response = {");
316 response.append(XmlUtil.toString(commitResponseMessage));
317 response.append("}");
318 LOG.trace("Last configuration loaded successfully");
319 LOG.trace("Detailed message {}", response);
320 LOG.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
322 return new EditAndCommitResponse(editResponseMessage, commitResponseMessage);
325 private NetconfOperation findOperation(NetconfMessage request, NetconfOperationService operationService) {
326 TreeMap<HandlingPriority, NetconfOperation> allOperations = new TreeMap<>();
327 Set<NetconfOperation> netconfOperations = operationService.getNetconfOperations();
328 if (netconfOperations.isEmpty()) {
329 throw new IllegalStateException("Possible code error: no config operations");
331 for (NetconfOperation netconfOperation : netconfOperations) {
332 HandlingPriority handlingPriority = null;
334 handlingPriority = netconfOperation.canHandle(request.getDocument());
335 } catch (NetconfDocumentedException e) {
336 throw new IllegalStateException("Possible code error: canHandle threw exception", e);
338 allOperations.put(handlingPriority, netconfOperation);
340 Entry<HandlingPriority, NetconfOperation> highestEntry = allOperations.lastEntry();
341 if (highestEntry.getKey().isCannotHandle()) {
342 throw new IllegalStateException("Possible code error: operation with highest priority is CANNOT_HANDLE");
344 return highestEntry.getValue();
347 private Document sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfOperationService operationService,
348 String operationNameForReporting, String configIdForReporting)
349 throws ConflictingVersionException, ConfigSnapshotFailureException {
351 NetconfOperation operation = findOperation(request, operationService);
354 response = operation.handle(request.getDocument(), NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
355 return NetconfUtil.checkIsMessageOk(response);
356 } catch (NetconfDocumentedException e) {
357 if (e.getCause() instanceof ConflictingVersionException) {
358 throw (ConflictingVersionException) e.getCause();
360 throw new ConfigSnapshotFailureException(configIdForReporting, operationNameForReporting, e);
364 // load editConfig.xml template, populate /rpc/edit-config/config with parameter
365 private static NetconfMessage createEditConfigMessage(Element dataElement) {
366 String editConfigResourcePath = "/netconfOp/editConfig.xml";
367 try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcePath)) {
368 checkNotNull(stream, "Unable to load resource " + editConfigResourcePath);
370 Document doc = XmlUtil.readXmlToDocument(stream);
372 XmlElement editConfigElement = XmlElement.fromDomDocument(doc).getOnlyChildElement();
373 XmlElement configWrapper = editConfigElement.getOnlyChildElement(XmlNetconfConstants.CONFIG_KEY);
374 editConfigElement.getDomElement().removeChild(configWrapper.getDomElement());
375 for (XmlElement el : XmlElement.fromDomElement(dataElement).getChildElements()) {
377 configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), deep));
379 editConfigElement.appendChild(configWrapper.getDomElement());
380 return new NetconfMessage(doc);
381 } catch (IOException | SAXException | NetconfDocumentedException e) {
382 // error reading the xml file bundled into the jar
383 throw new IllegalStateException("Error while opening local resource " + editConfigResourcePath, e);
387 private static NetconfMessage getCommitMessage() {
388 String resource = "/netconfOp/commit.xml";
389 try (InputStream stream = ConfigPusherImpl.class.getResourceAsStream(resource)) {
390 checkNotNull(stream, "Unable to load resource " + resource);
391 return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
392 } catch (SAXException | IOException e) {
393 // error reading the xml file bundled into the jar
394 throw new IllegalStateException("Error while opening local resource " + resource, e);
398 static class EditAndCommitResponse {
399 private final Document editResponse, commitResponse;
401 EditAndCommitResponse(Document editResponse, Document commitResponse) {
402 this.editResponse = editResponse;
403 this.commitResponse = commitResponse;
406 public Document getEditResponse() {
410 public Document getCommitResponse() {
411 return commitResponse;
415 public String toString() {
416 return "EditAndCommitResponse{" +
417 "editResponse=" + editResponse +
418 ", commitResponse=" + commitResponse +