config-persister-impl: use lambdas
[controller.git] / opendaylight / config / config-persister-impl / src / main / java / org / opendaylight / controller / config / persist / impl / ConfigPusherImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.config.persist.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import com.google.common.base.Stopwatch;
13 import com.google.common.collect.Collections2;
14 import java.io.IOException;
15 import java.util.Collection;
16 import java.util.Date;
17 import java.util.HashSet;
18 import java.util.LinkedHashMap;
19 import java.util.List;
20 import java.util.Set;
21 import java.util.SortedSet;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.TimeUnit;
25 import javax.annotation.concurrent.Immutable;
26 import javax.management.MBeanServerConnection;
27 import org.opendaylight.controller.config.api.ConflictingVersionException;
28 import org.opendaylight.controller.config.api.ModuleFactoryNotFoundException;
29 import org.opendaylight.controller.config.api.ValidationException;
30 import org.opendaylight.controller.config.facade.xml.ConfigExecution;
31 import org.opendaylight.controller.config.facade.xml.ConfigSubsystemFacade;
32 import org.opendaylight.controller.config.facade.xml.ConfigSubsystemFacadeFactory;
33 import org.opendaylight.controller.config.facade.xml.mapping.config.Config;
34 import org.opendaylight.controller.config.persist.api.ConfigPusher;
35 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
36 import org.opendaylight.controller.config.persist.api.Persister;
37 import org.opendaylight.controller.config.util.capability.Capability;
38 import org.opendaylight.controller.config.util.xml.DocumentedException;
39 import org.opendaylight.controller.config.util.xml.XmlUtil;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import org.w3c.dom.Element;
43 import org.xml.sax.SAXException;
44
45 @Immutable
46 public class ConfigPusherImpl implements ConfigPusher {
47     private static final Logger LOG = LoggerFactory.getLogger(ConfigPusherImpl.class);
48
49     private static final Date NO_REVISION = new Date(0);
50     private static final int QUEUE_SIZE = 100;
51
52     private final long maxWaitForCapabilitiesMillis;
53     private final long conflictingVersionTimeoutMillis;
54     private final BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
55
56     private final ConfigSubsystemFacadeFactory facade;
57     private ConfigPersisterNotificationHandler jmxNotificationHandler;
58
59     public ConfigPusherImpl(final ConfigSubsystemFacadeFactory facade, final long maxWaitForCapabilitiesMillis,
60                         final long conflictingVersionTimeoutMillis) {
61         this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
62         this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
63         this.facade = facade;
64     }
65
66     public void process(final List<AutoCloseable> autoCloseables, final MBeanServerConnection platformMBeanServer,
67             final Persister persisterAggregator, final boolean propagateExceptions) throws InterruptedException {
68         while(processSingle(autoCloseables, platformMBeanServer, persisterAggregator, propagateExceptions)) {
69         }
70     }
71
72     boolean processSingle(final List<AutoCloseable> autoCloseables, final MBeanServerConnection platformMBeanServer,
73             final Persister persisterAggregator, final boolean propagateExceptions) throws InterruptedException {
74         final List<? extends ConfigSnapshotHolder> configs = queue.take();
75         try {
76             internalPushConfigs(configs);
77
78             // Do not register multiple notification handlers
79             if(jmxNotificationHandler == null) {
80                 jmxNotificationHandler =
81                         new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator, facade);
82                 synchronized (autoCloseables) {
83                     autoCloseables.add(jmxNotificationHandler);
84                 }
85             }
86
87             LOG.debug("ConfigPusher has pushed configs {}", configs);
88         } catch (final Exception e) {
89             // Exceptions are logged to error downstream
90             LOG.debug("Failed to push some of configs: {}", configs, e);
91
92             if(propagateExceptions) {
93                 if(e instanceof RuntimeException) {
94                     throw (RuntimeException)e;
95                 } else {
96                     throw new IllegalStateException(e);
97                 }
98             } else {
99                 return false;
100             }
101         }
102
103         return true;
104     }
105
106     @Override
107     public void pushConfigs(final List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
108         LOG.debug("Requested to push configs {}", configs);
109         this.queue.put(configs);
110     }
111
112     private LinkedHashMap<? extends ConfigSnapshotHolder, Boolean> internalPushConfigs(final List<? extends ConfigSnapshotHolder> configs)
113             throws DocumentedException {
114         LOG.debug("Last config snapshots to be pushed to netconf: {}", configs);
115         LinkedHashMap<ConfigSnapshotHolder, Boolean> result = new LinkedHashMap<>();
116         // start pushing snapshots
117         for (ConfigSnapshotHolder configSnapshotHolder : configs) {
118             if (configSnapshotHolder != null) {
119                 LOG.info("Pushing configuration snapshot {}", configSnapshotHolder);
120                 boolean pushResult = false;
121                 try {
122                     pushResult = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
123                 } catch (final ConfigSnapshotFailureException e) {
124                     LOG.error("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " +
125                             "for detailed information see enclosed exception.", e.getConfigIdForReporting(), e);
126                     throw new IllegalStateException("Failed to apply configuration snapshot " + e.getConfigIdForReporting(), e);
127                 }  catch (final Exception e) {
128                     String msg = String.format("Failed to apply configuration snapshot: %s", configSnapshotHolder);
129                     LOG.error(msg, e);
130                     throw new IllegalStateException(msg, e);
131                 }
132
133                 LOG.info("Successfully pushed configuration snapshot {}", configSnapshotHolder);
134                 result.put(configSnapshotHolder, pushResult);
135             }
136         }
137         LOG.debug("All configuration snapshots have been pushed successfully.");
138         return result;
139     }
140
141     private synchronized boolean pushConfigWithConflictingVersionRetries(final ConfigSnapshotHolder configSnapshotHolder) throws ConfigSnapshotFailureException {
142         ConflictingVersionException lastException;
143         Stopwatch stopwatch = Stopwatch.createUnstarted();
144         do {
145             //TODO wait untill all expected modules are in yangStoreService, do we even need to with yangStoreService instead on netconfOperationService?
146             String idForReporting = configSnapshotHolder.toString();
147             SortedSet<String> expectedCapabilities = checkNotNull(configSnapshotHolder.getCapabilities(),
148                     "Expected capabilities must not be null - %s, check %s", idForReporting,
149                     configSnapshotHolder.getClass().getName());
150
151             // wait max time for required capabilities to appear
152             waitForCapabilities(expectedCapabilities, idForReporting);
153             try {
154                 if(!stopwatch.isRunning()) {
155                     stopwatch.start();
156                 }
157                 return pushConfig(configSnapshotHolder);
158             } catch (final ConflictingVersionException e) {
159                 lastException = e;
160                 LOG.info("Conflicting version detected, will retry after timeout");
161                 sleep();
162             }
163         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
164         throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
165                 lastException);
166     }
167
168     private void waitForCapabilities(final Set<String> expectedCapabilities, final String idForReporting) {
169         Stopwatch stopwatch = Stopwatch.createStarted();
170         ConfigPusherException lastException;
171         do {
172             try {
173                 final Set<Capability> currentCaps = facade.getCurrentCapabilities();
174                 final Set<String> notFoundCapabilities = computeNotFoundCapabilities(expectedCapabilities, currentCaps);
175                 if (notFoundCapabilities.isEmpty()) {
176                     return;
177                 } else {
178                     LOG.debug("Netconf server did not provide required capabilities for {} ", idForReporting,
179                             "Expected but not found: {}, all expected {}, current {}",
180                             notFoundCapabilities, expectedCapabilities, currentCaps
181                     );
182                     throw new NotEnoughCapabilitiesException(
183                             "Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundCapabilities, notFoundCapabilities);
184                 }
185             } catch (final ConfigPusherException e) {
186                 LOG.debug("Not enough capabilities: {}", e.toString());
187                 lastException = e;
188                 sleep();
189             }
190         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
191
192         LOG.error("Unable to push configuration due to missing yang models." +
193                         " Yang models that are missing, but required by the configuration: {}." +
194                         " For each mentioned model check: " +
195                         " 1. that the mentioned yang model namespace/name/revision is identical to those in the yang model itself" +
196                         " 2. the yang file is present in the system" +
197                         " 3. the bundle with that yang file is present in the system and active" +
198                         " 4. the yang parser did not fail while attempting to parse that model",
199                 ((NotEnoughCapabilitiesException) lastException).getMissingCaps());
200         throw new IllegalStateException("Unable to push configuration due to missing yang models." +
201                 " Required yang models that are missing: "
202                 + ((NotEnoughCapabilitiesException) lastException).getMissingCaps(), lastException);
203     }
204
205     private static Set<String> computeNotFoundCapabilities(final Set<String> expectedCapabilities, final Set<Capability> currentCapabilities) {
206         Collection<String> actual = transformCapabilities(currentCapabilities);
207         Set<String> allNotFound = new HashSet<>(expectedCapabilities);
208         allNotFound.removeAll(actual);
209         return allNotFound;
210     }
211
212     static Set<String> transformCapabilities(final Set<Capability> currentCapabilities) {
213         return new HashSet<>(Collections2.transform(currentCapabilities, Capability::getCapabilityUri));
214     }
215
216     static class ConfigPusherException extends Exception {
217
218         public ConfigPusherException(final String message) {
219             super(message);
220         }
221
222         public ConfigPusherException(final String message, final Throwable cause) {
223             super(message, cause);
224         }
225     }
226
227     static class NotEnoughCapabilitiesException extends ConfigPusherException {
228         private static final long serialVersionUID = 1L;
229         private final Set<String> missingCaps;
230
231         NotEnoughCapabilitiesException(final String message, final Set<String> missingCaps) {
232             super(message);
233             this.missingCaps = missingCaps;
234         }
235
236         public Set<String> getMissingCaps() {
237             return missingCaps;
238         }
239     }
240
241     private static final class ConfigSnapshotFailureException extends ConfigPusherException {
242
243         private final String configIdForReporting;
244
245         public ConfigSnapshotFailureException(final String configIdForReporting, final String operationNameForReporting, final Exception e) {
246             super(String.format("Failed to apply config snapshot: %s during phase: %s", configIdForReporting, operationNameForReporting), e);
247             this.configIdForReporting = configIdForReporting;
248         }
249
250         public String getConfigIdForReporting() {
251             return configIdForReporting;
252         }
253     }
254
255     private void sleep() {
256         try {
257             Thread.sleep(100);
258         } catch (final InterruptedException e) {
259             Thread.currentThread().interrupt();
260             throw new IllegalStateException(e);
261         }
262     }
263
264     private synchronized boolean pushConfig(final ConfigSnapshotHolder configSnapshotHolder) throws ConfigSnapshotFailureException, ConflictingVersionException {
265         Element xmlToBePersisted;
266         try {
267             xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
268         } catch (SAXException | IOException e) {
269             throw new IllegalStateException("Cannot parse " + configSnapshotHolder, e);
270         }
271         LOG.trace("Pushing last configuration to config mapping: {}", configSnapshotHolder);
272
273         Stopwatch stopwatch = Stopwatch.createStarted();
274         final ConfigSubsystemFacade currentFacade = this.facade.createFacade("config-push");
275         try {
276             ConfigExecution configExecution = createConfigExecution(xmlToBePersisted, currentFacade);
277             executeWithMissingModuleFactoryRetries(currentFacade, configExecution);
278         } catch (ValidationException | DocumentedException | ModuleFactoryNotFoundException e) {
279             LOG.trace("Validation for config: {} failed", configSnapshotHolder, e);
280             throw new ConfigSnapshotFailureException(configSnapshotHolder.toString(), "edit", e);
281         }
282
283         try {
284             currentFacade.commitSilentTransaction();
285         } catch (ValidationException | DocumentedException e) {
286             throw new ConfigSnapshotFailureException(configSnapshotHolder.toString(), "commit", e);
287         }
288
289         LOG.trace("Last configuration loaded successfully");
290         LOG.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
291
292         return true;
293     }
294
295     private void executeWithMissingModuleFactoryRetries(final ConfigSubsystemFacade facade, final ConfigExecution configExecution)
296             throws DocumentedException, ValidationException, ModuleFactoryNotFoundException {
297         Stopwatch stopwatch = Stopwatch.createStarted();
298         ModuleFactoryNotFoundException lastException = null;
299         do {
300             try {
301                 facade.executeConfigExecution(configExecution);
302                 return;
303             } catch (final ModuleFactoryNotFoundException e) {
304                 LOG.debug("{} - will retry after timeout", e.toString());
305                 lastException = e;
306                 sleep();
307             }
308         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
309
310         throw lastException;
311     }
312
313     private ConfigExecution createConfigExecution(final Element xmlToBePersisted, final ConfigSubsystemFacade currentFacade) throws DocumentedException {
314         final Config configMapping = currentFacade.getConfigMapping();
315         return currentFacade.getConfigExecution(configMapping, xmlToBePersisted);
316     }
317
318 }