config-persister-impl: final parameters
[controller.git] / opendaylight / config / config-persister-impl / src / main / java / org / opendaylight / controller / config / persist / impl / ConfigPusherImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.config.persist.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import com.google.common.base.Function;
13 import com.google.common.base.Stopwatch;
14 import com.google.common.collect.Collections2;
15 import java.io.IOException;
16 import java.util.Collection;
17 import java.util.Date;
18 import java.util.HashSet;
19 import java.util.LinkedHashMap;
20 import java.util.List;
21 import java.util.Set;
22 import java.util.SortedSet;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nonnull;
27 import javax.annotation.concurrent.Immutable;
28 import javax.management.MBeanServerConnection;
29 import org.opendaylight.controller.config.api.ConflictingVersionException;
30 import org.opendaylight.controller.config.api.ModuleFactoryNotFoundException;
31 import org.opendaylight.controller.config.api.ValidationException;
32 import org.opendaylight.controller.config.facade.xml.ConfigExecution;
33 import org.opendaylight.controller.config.facade.xml.ConfigSubsystemFacade;
34 import org.opendaylight.controller.config.facade.xml.ConfigSubsystemFacadeFactory;
35 import org.opendaylight.controller.config.facade.xml.mapping.config.Config;
36 import org.opendaylight.controller.config.persist.api.ConfigPusher;
37 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
38 import org.opendaylight.controller.config.persist.api.Persister;
39 import org.opendaylight.controller.config.util.capability.Capability;
40 import org.opendaylight.controller.config.util.xml.DocumentedException;
41 import org.opendaylight.controller.config.util.xml.XmlUtil;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import org.w3c.dom.Element;
45 import org.xml.sax.SAXException;
46
47 @Immutable
48 public class ConfigPusherImpl implements ConfigPusher {
49     private static final Logger LOG = LoggerFactory.getLogger(ConfigPusherImpl.class);
50
51     private static final Date NO_REVISION = new Date(0);
52     private static final int QUEUE_SIZE = 100;
53
54     private final long maxWaitForCapabilitiesMillis;
55     private final long conflictingVersionTimeoutMillis;
56     private final BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
57
58     private final ConfigSubsystemFacadeFactory facade;
59     private ConfigPersisterNotificationHandler jmxNotificationHandler;
60
61     public ConfigPusherImpl(final ConfigSubsystemFacadeFactory facade, final long maxWaitForCapabilitiesMillis,
62                         final long conflictingVersionTimeoutMillis) {
63         this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
64         this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
65         this.facade = facade;
66     }
67
68     public void process(final List<AutoCloseable> autoCloseables, final MBeanServerConnection platformMBeanServer,
69             final Persister persisterAggregator, final boolean propagateExceptions) throws InterruptedException {
70         while(processSingle(autoCloseables, platformMBeanServer, persisterAggregator, propagateExceptions)) {
71         }
72     }
73
74     boolean processSingle(final List<AutoCloseable> autoCloseables, final MBeanServerConnection platformMBeanServer,
75             final Persister persisterAggregator, final boolean propagateExceptions) throws InterruptedException {
76         final List<? extends ConfigSnapshotHolder> configs = queue.take();
77         try {
78             internalPushConfigs(configs);
79
80             // Do not register multiple notification handlers
81             if(jmxNotificationHandler == null) {
82                 jmxNotificationHandler =
83                         new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator, facade);
84                 synchronized (autoCloseables) {
85                     autoCloseables.add(jmxNotificationHandler);
86                 }
87             }
88
89             LOG.debug("ConfigPusher has pushed configs {}", configs);
90         } catch (final Exception e) {
91             // Exceptions are logged to error downstream
92             LOG.debug("Failed to push some of configs: {}", configs, e);
93
94             if(propagateExceptions) {
95                 if(e instanceof RuntimeException) {
96                     throw (RuntimeException)e;
97                 } else {
98                     throw new IllegalStateException(e);
99                 }
100             } else {
101                 return false;
102             }
103         }
104
105         return true;
106     }
107
108     @Override
109     public void pushConfigs(final List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
110         LOG.debug("Requested to push configs {}", configs);
111         this.queue.put(configs);
112     }
113
114     private LinkedHashMap<? extends ConfigSnapshotHolder, Boolean> internalPushConfigs(final List<? extends ConfigSnapshotHolder> configs)
115             throws DocumentedException {
116         LOG.debug("Last config snapshots to be pushed to netconf: {}", configs);
117         LinkedHashMap<ConfigSnapshotHolder, Boolean> result = new LinkedHashMap<>();
118         // start pushing snapshots
119         for (ConfigSnapshotHolder configSnapshotHolder : configs) {
120             if (configSnapshotHolder != null) {
121                 LOG.info("Pushing configuration snapshot {}", configSnapshotHolder);
122                 boolean pushResult = false;
123                 try {
124                     pushResult = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
125                 } catch (final ConfigSnapshotFailureException e) {
126                     LOG.error("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " +
127                             "for detailed information see enclosed exception.", e.getConfigIdForReporting(), e);
128                     throw new IllegalStateException("Failed to apply configuration snapshot " + e.getConfigIdForReporting(), e);
129                 }  catch (final Exception e) {
130                     String msg = String.format("Failed to apply configuration snapshot: %s", configSnapshotHolder);
131                     LOG.error(msg, e);
132                     throw new IllegalStateException(msg, e);
133                 }
134
135                 LOG.info("Successfully pushed configuration snapshot {}", configSnapshotHolder);
136                 result.put(configSnapshotHolder, pushResult);
137             }
138         }
139         LOG.debug("All configuration snapshots have been pushed successfully.");
140         return result;
141     }
142
143     private synchronized boolean pushConfigWithConflictingVersionRetries(final ConfigSnapshotHolder configSnapshotHolder) throws ConfigSnapshotFailureException {
144         ConflictingVersionException lastException;
145         Stopwatch stopwatch = Stopwatch.createUnstarted();
146         do {
147             //TODO wait untill all expected modules are in yangStoreService, do we even need to with yangStoreService instead on netconfOperationService?
148             String idForReporting = configSnapshotHolder.toString();
149             SortedSet<String> expectedCapabilities = checkNotNull(configSnapshotHolder.getCapabilities(),
150                     "Expected capabilities must not be null - %s, check %s", idForReporting,
151                     configSnapshotHolder.getClass().getName());
152
153             // wait max time for required capabilities to appear
154             waitForCapabilities(expectedCapabilities, idForReporting);
155             try {
156                 if(!stopwatch.isRunning()) {
157                     stopwatch.start();
158                 }
159                 return pushConfig(configSnapshotHolder);
160             } catch (final ConflictingVersionException e) {
161                 lastException = e;
162                 LOG.info("Conflicting version detected, will retry after timeout");
163                 sleep();
164             }
165         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
166         throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
167                 lastException);
168     }
169
170     private void waitForCapabilities(final Set<String> expectedCapabilities, final String idForReporting) {
171         Stopwatch stopwatch = Stopwatch.createStarted();
172         ConfigPusherException lastException;
173         do {
174             try {
175                 final Set<Capability> currentCaps = facade.getCurrentCapabilities();
176                 final Set<String> notFoundCapabilities = computeNotFoundCapabilities(expectedCapabilities, currentCaps);
177                 if (notFoundCapabilities.isEmpty()) {
178                     return;
179                 } else {
180                     LOG.debug("Netconf server did not provide required capabilities for {} ", idForReporting,
181                             "Expected but not found: {}, all expected {}, current {}",
182                             notFoundCapabilities, expectedCapabilities, currentCaps
183                     );
184                     throw new NotEnoughCapabilitiesException(
185                             "Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundCapabilities, notFoundCapabilities);
186                 }
187             } catch (final ConfigPusherException e) {
188                 LOG.debug("Not enough capabilities: {}", e.toString());
189                 lastException = e;
190                 sleep();
191             }
192         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
193
194         LOG.error("Unable to push configuration due to missing yang models." +
195                         " Yang models that are missing, but required by the configuration: {}." +
196                         " For each mentioned model check: " +
197                         " 1. that the mentioned yang model namespace/name/revision is identical to those in the yang model itself" +
198                         " 2. the yang file is present in the system" +
199                         " 3. the bundle with that yang file is present in the system and active" +
200                         " 4. the yang parser did not fail while attempting to parse that model",
201                 ((NotEnoughCapabilitiesException) lastException).getMissingCaps());
202         throw new IllegalStateException("Unable to push configuration due to missing yang models." +
203                 " Required yang models that are missing: "
204                 + ((NotEnoughCapabilitiesException) lastException).getMissingCaps(), lastException);
205     }
206
207     private static Set<String> computeNotFoundCapabilities(final Set<String> expectedCapabilities, final Set<Capability> currentCapabilities) {
208         Collection<String> actual = transformCapabilities(currentCapabilities);
209         Set<String> allNotFound = new HashSet<>(expectedCapabilities);
210         allNotFound.removeAll(actual);
211         return allNotFound;
212     }
213
214     static Set<String> transformCapabilities(final Set<Capability> currentCapabilities) {
215         return new HashSet<>(Collections2.transform(currentCapabilities, new Function<Capability, String>() {
216             @Override
217             public String apply(@Nonnull final Capability input) {
218                 return input.getCapabilityUri();
219             }
220         }));
221     }
222
223     static class ConfigPusherException extends Exception {
224
225         public ConfigPusherException(final String message) {
226             super(message);
227         }
228
229         public ConfigPusherException(final String message, final Throwable cause) {
230             super(message, cause);
231         }
232     }
233
234     static class NotEnoughCapabilitiesException extends ConfigPusherException {
235         private static final long serialVersionUID = 1L;
236         private final Set<String> missingCaps;
237
238         NotEnoughCapabilitiesException(final String message, final Set<String> missingCaps) {
239             super(message);
240             this.missingCaps = missingCaps;
241         }
242
243         public Set<String> getMissingCaps() {
244             return missingCaps;
245         }
246     }
247
248     private static final class ConfigSnapshotFailureException extends ConfigPusherException {
249
250         private final String configIdForReporting;
251
252         public ConfigSnapshotFailureException(final String configIdForReporting, final String operationNameForReporting, final Exception e) {
253             super(String.format("Failed to apply config snapshot: %s during phase: %s", configIdForReporting, operationNameForReporting), e);
254             this.configIdForReporting = configIdForReporting;
255         }
256
257         public String getConfigIdForReporting() {
258             return configIdForReporting;
259         }
260     }
261
262     private void sleep() {
263         try {
264             Thread.sleep(100);
265         } catch (final InterruptedException e) {
266             Thread.currentThread().interrupt();
267             throw new IllegalStateException(e);
268         }
269     }
270
271     private synchronized boolean pushConfig(final ConfigSnapshotHolder configSnapshotHolder) throws ConfigSnapshotFailureException, ConflictingVersionException {
272         Element xmlToBePersisted;
273         try {
274             xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
275         } catch (SAXException | IOException e) {
276             throw new IllegalStateException("Cannot parse " + configSnapshotHolder, e);
277         }
278         LOG.trace("Pushing last configuration to config mapping: {}", configSnapshotHolder);
279
280         Stopwatch stopwatch = Stopwatch.createStarted();
281         final ConfigSubsystemFacade currentFacade = this.facade.createFacade("config-push");
282         try {
283             ConfigExecution configExecution = createConfigExecution(xmlToBePersisted, currentFacade);
284             executeWithMissingModuleFactoryRetries(currentFacade, configExecution);
285         } catch (ValidationException | DocumentedException | ModuleFactoryNotFoundException e) {
286             LOG.trace("Validation for config: {} failed", configSnapshotHolder, e);
287             throw new ConfigSnapshotFailureException(configSnapshotHolder.toString(), "edit", e);
288         }
289
290         try {
291             currentFacade.commitSilentTransaction();
292         } catch (ValidationException | DocumentedException e) {
293             throw new ConfigSnapshotFailureException(configSnapshotHolder.toString(), "commit", e);
294         }
295
296         LOG.trace("Last configuration loaded successfully");
297         LOG.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
298
299         return true;
300     }
301
302     private void executeWithMissingModuleFactoryRetries(final ConfigSubsystemFacade facade, final ConfigExecution configExecution)
303             throws DocumentedException, ValidationException, ModuleFactoryNotFoundException {
304         Stopwatch stopwatch = Stopwatch.createStarted();
305         ModuleFactoryNotFoundException lastException = null;
306         do {
307             try {
308                 facade.executeConfigExecution(configExecution);
309                 return;
310             } catch (final ModuleFactoryNotFoundException e) {
311                 LOG.debug("{} - will retry after timeout", e.toString());
312                 lastException = e;
313                 sleep();
314             }
315         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
316
317         throw lastException;
318     }
319
320     private ConfigExecution createConfigExecution(final Element xmlToBePersisted, final ConfigSubsystemFacade currentFacade) throws DocumentedException {
321         final Config configMapping = currentFacade.getConfigMapping();
322         return currentFacade.getConfigExecution(configMapping, xmlToBePersisted);
323     }
324
325 }