2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.config.persist.impl;
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import com.google.common.base.Stopwatch;
13 import com.google.common.collect.Collections2;
14 import java.io.IOException;
15 import java.util.Collection;
16 import java.util.Date;
17 import java.util.HashSet;
18 import java.util.LinkedHashMap;
19 import java.util.List;
21 import java.util.SortedSet;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.TimeUnit;
25 import javax.annotation.concurrent.Immutable;
26 import javax.management.MBeanServerConnection;
27 import org.opendaylight.controller.config.api.ConflictingVersionException;
28 import org.opendaylight.controller.config.api.ModuleFactoryNotFoundException;
29 import org.opendaylight.controller.config.api.ValidationException;
30 import org.opendaylight.controller.config.facade.xml.ConfigExecution;
31 import org.opendaylight.controller.config.facade.xml.ConfigSubsystemFacade;
32 import org.opendaylight.controller.config.facade.xml.ConfigSubsystemFacadeFactory;
33 import org.opendaylight.controller.config.facade.xml.mapping.config.Config;
34 import org.opendaylight.controller.config.persist.api.ConfigPusher;
35 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
36 import org.opendaylight.controller.config.persist.api.Persister;
37 import org.opendaylight.controller.config.util.capability.Capability;
38 import org.opendaylight.controller.config.util.xml.DocumentedException;
39 import org.opendaylight.controller.config.util.xml.XmlUtil;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import org.w3c.dom.Element;
43 import org.xml.sax.SAXException;
46 public class ConfigPusherImpl implements ConfigPusher {
47 private static final Logger LOG = LoggerFactory.getLogger(ConfigPusherImpl.class);
49 private static final Date NO_REVISION = new Date(0);
50 private static final int QUEUE_SIZE = 100;
52 private final long maxWaitForCapabilitiesMillis;
53 private final long conflictingVersionTimeoutMillis;
54 private final BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
56 private final ConfigSubsystemFacadeFactory facade;
57 private ConfigPersisterNotificationHandler jmxNotificationHandler;
59 public ConfigPusherImpl(final ConfigSubsystemFacadeFactory facade, final long maxWaitForCapabilitiesMillis,
60 final long conflictingVersionTimeoutMillis) {
61 this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
62 this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
66 public void process(final List<AutoCloseable> autoCloseables, final MBeanServerConnection platformMBeanServer,
67 final Persister persisterAggregator, final boolean propagateExceptions) throws InterruptedException {
68 while(processSingle(autoCloseables, platformMBeanServer, persisterAggregator, propagateExceptions)) {
72 boolean processSingle(final List<AutoCloseable> autoCloseables, final MBeanServerConnection platformMBeanServer,
73 final Persister persisterAggregator, final boolean propagateExceptions) throws InterruptedException {
74 final List<? extends ConfigSnapshotHolder> configs = queue.take();
76 internalPushConfigs(configs);
78 // Do not register multiple notification handlers
79 if(jmxNotificationHandler == null) {
80 jmxNotificationHandler =
81 new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator, facade);
82 synchronized (autoCloseables) {
83 autoCloseables.add(jmxNotificationHandler);
87 LOG.debug("ConfigPusher has pushed configs {}", configs);
88 } catch (final Exception e) {
89 // Exceptions are logged to error downstream
90 LOG.debug("Failed to push some of configs: {}", configs, e);
92 if(propagateExceptions) {
93 if(e instanceof RuntimeException) {
94 throw (RuntimeException)e;
96 throw new IllegalStateException(e);
107 public void pushConfigs(final List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
108 LOG.debug("Requested to push configs {}", configs);
109 this.queue.put(configs);
112 private LinkedHashMap<? extends ConfigSnapshotHolder, Boolean> internalPushConfigs(final List<? extends ConfigSnapshotHolder> configs)
113 throws DocumentedException {
114 LOG.debug("Last config snapshots to be pushed to netconf: {}", configs);
115 LinkedHashMap<ConfigSnapshotHolder, Boolean> result = new LinkedHashMap<>();
116 // start pushing snapshots
117 for (ConfigSnapshotHolder configSnapshotHolder : configs) {
118 if (configSnapshotHolder != null) {
119 LOG.info("Pushing configuration snapshot {}", configSnapshotHolder);
120 boolean pushResult = false;
122 pushResult = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
123 } catch (final ConfigSnapshotFailureException e) {
124 LOG.error("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " +
125 "for detailed information see enclosed exception.", e.getConfigIdForReporting(), e);
126 throw new IllegalStateException("Failed to apply configuration snapshot " + e.getConfigIdForReporting(), e);
127 } catch (final Exception e) {
128 String msg = String.format("Failed to apply configuration snapshot: %s", configSnapshotHolder);
130 throw new IllegalStateException(msg, e);
133 LOG.info("Successfully pushed configuration snapshot {}", configSnapshotHolder);
134 result.put(configSnapshotHolder, pushResult);
137 LOG.info("All configuration snapshots have been pushed successfully.");
141 private synchronized boolean pushConfigWithConflictingVersionRetries(final ConfigSnapshotHolder configSnapshotHolder) throws ConfigSnapshotFailureException {
142 ConflictingVersionException lastException;
143 Stopwatch stopwatch = Stopwatch.createUnstarted();
145 //TODO wait untill all expected modules are in yangStoreService, do we even need to with yangStoreService instead on netconfOperationService?
146 String idForReporting = configSnapshotHolder.toString();
147 SortedSet<String> expectedCapabilities = checkNotNull(configSnapshotHolder.getCapabilities(),
148 "Expected capabilities must not be null - %s, check %s", idForReporting,
149 configSnapshotHolder.getClass().getName());
151 // wait max time for required capabilities to appear
152 waitForCapabilities(expectedCapabilities, idForReporting);
154 if(!stopwatch.isRunning()) {
157 return pushConfig(configSnapshotHolder);
158 } catch (final ConflictingVersionException e) {
160 LOG.info("Conflicting version detected, will retry after timeout");
163 } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
164 throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
168 private void waitForCapabilities(final Set<String> expectedCapabilities, final String idForReporting) {
169 Stopwatch stopwatch = Stopwatch.createStarted();
170 ConfigPusherException lastException;
173 final Set<Capability> currentCaps = facade.getCurrentCapabilities();
174 final Set<String> notFoundCapabilities = computeNotFoundCapabilities(expectedCapabilities, currentCaps);
175 if (notFoundCapabilities.isEmpty()) {
178 LOG.debug("Netconf server did not provide required capabilities for {} ", idForReporting,
179 "Expected but not found: {}, all expected {}, current {}",
180 notFoundCapabilities, expectedCapabilities, currentCaps
182 throw new NotEnoughCapabilitiesException(
183 "Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundCapabilities, notFoundCapabilities);
185 } catch (final ConfigPusherException e) {
186 LOG.debug("Not enough capabilities: {}", e.toString());
190 } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
192 LOG.error("Unable to push configuration due to missing yang models." +
193 " Yang models that are missing, but required by the configuration: {}." +
194 " For each mentioned model check: " +
195 " 1. that the mentioned yang model namespace/name/revision is identical to those in the yang model itself" +
196 " 2. the yang file is present in the system" +
197 " 3. the bundle with that yang file is present in the system and active" +
198 " 4. the yang parser did not fail while attempting to parse that model",
199 ((NotEnoughCapabilitiesException) lastException).getMissingCaps());
200 throw new IllegalStateException("Unable to push configuration due to missing yang models." +
201 " Required yang models that are missing: "
202 + ((NotEnoughCapabilitiesException) lastException).getMissingCaps(), lastException);
205 private static Set<String> computeNotFoundCapabilities(final Set<String> expectedCapabilities, final Set<Capability> currentCapabilities) {
206 Collection<String> actual = transformCapabilities(currentCapabilities);
207 Set<String> allNotFound = new HashSet<>(expectedCapabilities);
208 allNotFound.removeAll(actual);
212 static Set<String> transformCapabilities(final Set<Capability> currentCapabilities) {
213 return new HashSet<>(Collections2.transform(currentCapabilities, Capability::getCapabilityUri));
216 static class ConfigPusherException extends Exception {
218 public ConfigPusherException(final String message) {
222 public ConfigPusherException(final String message, final Throwable cause) {
223 super(message, cause);
227 static class NotEnoughCapabilitiesException extends ConfigPusherException {
228 private static final long serialVersionUID = 1L;
229 private final Set<String> missingCaps;
231 NotEnoughCapabilitiesException(final String message, final Set<String> missingCaps) {
233 this.missingCaps = missingCaps;
236 public Set<String> getMissingCaps() {
241 private static final class ConfigSnapshotFailureException extends ConfigPusherException {
243 private final String configIdForReporting;
245 public ConfigSnapshotFailureException(final String configIdForReporting, final String operationNameForReporting, final Exception e) {
246 super(String.format("Failed to apply config snapshot: %s during phase: %s", configIdForReporting, operationNameForReporting), e);
247 this.configIdForReporting = configIdForReporting;
250 public String getConfigIdForReporting() {
251 return configIdForReporting;
255 private void sleep() {
258 } catch (final InterruptedException e) {
259 Thread.currentThread().interrupt();
260 throw new IllegalStateException(e);
264 private synchronized boolean pushConfig(final ConfigSnapshotHolder configSnapshotHolder) throws ConfigSnapshotFailureException, ConflictingVersionException {
265 Element xmlToBePersisted;
267 xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
268 } catch (SAXException | IOException e) {
269 throw new IllegalStateException("Cannot parse " + configSnapshotHolder, e);
271 LOG.trace("Pushing last configuration to config mapping: {}", configSnapshotHolder);
273 Stopwatch stopwatch = Stopwatch.createStarted();
274 final ConfigSubsystemFacade currentFacade = this.facade.createFacade("config-push");
276 ConfigExecution configExecution = createConfigExecution(xmlToBePersisted, currentFacade);
277 executeWithMissingModuleFactoryRetries(currentFacade, configExecution);
278 } catch (ValidationException | DocumentedException | ModuleFactoryNotFoundException e) {
279 LOG.trace("Validation for config: {} failed", configSnapshotHolder, e);
280 throw new ConfigSnapshotFailureException(configSnapshotHolder.toString(), "edit", e);
284 currentFacade.commitSilentTransaction();
285 } catch (ValidationException | DocumentedException e) {
286 throw new ConfigSnapshotFailureException(configSnapshotHolder.toString(), "commit", e);
289 LOG.trace("Last configuration loaded successfully");
290 LOG.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
295 private void executeWithMissingModuleFactoryRetries(final ConfigSubsystemFacade facade, final ConfigExecution configExecution)
296 throws DocumentedException, ValidationException, ModuleFactoryNotFoundException {
297 Stopwatch stopwatch = Stopwatch.createStarted();
298 ModuleFactoryNotFoundException lastException = null;
301 facade.executeConfigExecution(configExecution);
303 } catch (final ModuleFactoryNotFoundException e) {
304 LOG.debug("{} - will retry after timeout", e.toString());
308 } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
313 private ConfigExecution createConfigExecution(final Element xmlToBePersisted, final ConfigSubsystemFacade currentFacade) throws DocumentedException {
314 final Config configMapping = currentFacade.getConfigMapping();
315 return currentFacade.getConfigExecution(configMapping, xmlToBePersisted);