90f71ca51aa1fc338fdae21aaacdff0a665196ca
[controller.git] / opendaylight / config / config-persister-impl / src / main / java / org / opendaylight / controller / config / persist / impl / ConfigPusherImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.config.persist.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12 import com.google.common.base.Function;
13 import com.google.common.base.Stopwatch;
14 import com.google.common.collect.Collections2;
15 import java.io.IOException;
16 import java.util.Collection;
17 import java.util.Date;
18 import java.util.HashSet;
19 import java.util.LinkedHashMap;
20 import java.util.List;
21 import java.util.Set;
22 import java.util.SortedSet;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import javax.annotation.concurrent.Immutable;
29 import javax.management.MBeanServerConnection;
30 import org.opendaylight.controller.config.api.ConflictingVersionException;
31 import org.opendaylight.controller.config.api.ModuleFactoryNotFoundException;
32 import org.opendaylight.controller.config.api.ValidationException;
33 import org.opendaylight.controller.config.facade.xml.ConfigExecution;
34 import org.opendaylight.controller.config.facade.xml.ConfigSubsystemFacade;
35 import org.opendaylight.controller.config.facade.xml.ConfigSubsystemFacadeFactory;
36 import org.opendaylight.controller.config.facade.xml.mapping.config.Config;
37 import org.opendaylight.controller.config.facade.xml.osgi.YangStoreService;
38 import org.opendaylight.controller.config.facade.xml.util.Util;
39 import org.opendaylight.controller.config.persist.api.ConfigPusher;
40 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
41 import org.opendaylight.controller.config.persist.api.Persister;
42 import org.opendaylight.controller.config.util.capability.Capability;
43 import org.opendaylight.controller.config.util.xml.DocumentedException;
44 import org.opendaylight.controller.config.util.xml.XmlUtil;
45 import org.opendaylight.yangtools.yang.model.api.Module;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.w3c.dom.Element;
49 import org.xml.sax.SAXException;
50
51 @Immutable
52 public class ConfigPusherImpl implements ConfigPusher {
53     private static final Logger LOG = LoggerFactory.getLogger(ConfigPusherImpl.class);
54
55     private static final Date NO_REVISION = new Date(0);
56     private static final int QUEUE_SIZE = 100;
57
58     private final long maxWaitForCapabilitiesMillis;
59     private final long conflictingVersionTimeoutMillis;
60     private final BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
61
62     private final ConfigSubsystemFacadeFactory facade;
63     private ConfigPersisterNotificationHandler jmxNotificationHandler;
64
65     public ConfigPusherImpl(ConfigSubsystemFacadeFactory facade, long maxWaitForCapabilitiesMillis,
66                         long conflictingVersionTimeoutMillis) {
67         this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
68         this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
69         this.facade = facade;
70     }
71
72     public void process(List<AutoCloseable> autoCloseables, MBeanServerConnection platformMBeanServer,
73             Persister persisterAggregator, boolean propagateExceptions) throws InterruptedException {
74         while(processSingle(autoCloseables, platformMBeanServer, persisterAggregator, propagateExceptions)) {
75             ;
76         }
77     }
78
79     boolean processSingle(final List<AutoCloseable> autoCloseables, final MBeanServerConnection platformMBeanServer,
80             final Persister persisterAggregator, boolean propagateExceptions) throws InterruptedException {
81         final List<? extends ConfigSnapshotHolder> configs = queue.take();
82         try {
83             internalPushConfigs(configs);
84
85             // Do not register multiple notification handlers
86             if(jmxNotificationHandler == null) {
87                 jmxNotificationHandler =
88                         new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator, facade);
89                 synchronized (autoCloseables) {
90                     autoCloseables.add(jmxNotificationHandler);
91                 }
92             }
93
94             LOG.debug("ConfigPusher has pushed configs {}", configs);
95         } catch (Exception e) {
96             // Exceptions are logged to error downstream
97             LOG.debug("Failed to push some of configs: {}", configs, e);
98
99             if(propagateExceptions) {
100                 if(e instanceof RuntimeException) {
101                     throw (RuntimeException)e;
102                 } else {
103                     throw new IllegalStateException(e);
104                 }
105             } else {
106                 return false;
107             }
108         }
109
110         return true;
111     }
112
113     @Override
114     public void pushConfigs(List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
115         LOG.debug("Requested to push configs {}", configs);
116         this.queue.put(configs);
117     }
118
119     private LinkedHashMap<? extends ConfigSnapshotHolder, Boolean> internalPushConfigs(List<? extends ConfigSnapshotHolder> configs)
120             throws DocumentedException {
121         LOG.debug("Last config snapshots to be pushed to netconf: {}", configs);
122         LinkedHashMap<ConfigSnapshotHolder, Boolean> result = new LinkedHashMap<>();
123         // start pushing snapshots
124         for (ConfigSnapshotHolder configSnapshotHolder : configs) {
125             if (configSnapshotHolder != null) {
126                 LOG.info("Pushing configuration snapshot {}", configSnapshotHolder);
127                 boolean pushResult = false;
128                 try {
129                     pushResult = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
130                 } catch (ConfigSnapshotFailureException e) {
131                     LOG.error("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " +
132                             "for detailed information see enclosed exception.", e.getConfigIdForReporting(), e);
133                     throw new IllegalStateException("Failed to apply configuration snapshot " + e.getConfigIdForReporting(), e);
134                 }  catch (Exception e) {
135                     String msg = String.format("Failed to apply configuration snapshot: %s", configSnapshotHolder);
136                     LOG.error(msg, e);
137                     throw new IllegalStateException(msg, e);
138                 }
139
140                 LOG.info("Successfully pushed configuration snapshot {}", configSnapshotHolder);
141                 result.put(configSnapshotHolder, pushResult);
142             }
143         }
144         LOG.debug("All configuration snapshots have been pushed successfully.");
145         return result;
146     }
147
148     private synchronized boolean pushConfigWithConflictingVersionRetries(ConfigSnapshotHolder configSnapshotHolder) throws ConfigSnapshotFailureException {
149         ConflictingVersionException lastException;
150         Stopwatch stopwatch = Stopwatch.createUnstarted();
151         do {
152             //TODO wait untill all expected modules are in yangStoreService, do we even need to with yangStoreService instead on netconfOperationService?
153             String idForReporting = configSnapshotHolder.toString();
154             SortedSet<String> expectedCapabilities = checkNotNull(configSnapshotHolder.getCapabilities(),
155                     "Expected capabilities must not be null - %s, check %s", idForReporting,
156                     configSnapshotHolder.getClass().getName());
157
158             // wait max time for required capabilities to appear
159             waitForCapabilities(expectedCapabilities, idForReporting);
160             try {
161                 if(!stopwatch.isRunning()) {
162                     stopwatch.start();
163                 }
164                 return pushConfig(configSnapshotHolder);
165             } catch (ConflictingVersionException e) {
166                 lastException = e;
167                 LOG.info("Conflicting version detected, will retry after timeout");
168                 sleep();
169             }
170         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
171         throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
172                 lastException);
173     }
174
175     private void waitForCapabilities(Set<String> expectedCapabilities, String idForReporting) {
176         Stopwatch stopwatch = Stopwatch.createStarted();
177         ConfigPusherException lastException;
178         do {
179             try {
180                 final Set<Capability> currentCaps = facade.getCurrentCapabilities();
181                 final Set<String> notFoundCapabilities = computeNotFoundCapabilities(expectedCapabilities, currentCaps);
182                 if (notFoundCapabilities.isEmpty()) {
183                     return;
184                 } else {
185                     LOG.debug("Netconf server did not provide required capabilities for {} ", idForReporting,
186                             "Expected but not found: {}, all expected {}, current {}",
187                             notFoundCapabilities, expectedCapabilities, currentCaps
188                     );
189                     throw new NotEnoughCapabilitiesException(
190                             "Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundCapabilities, notFoundCapabilities);
191                 }
192             } catch (ConfigPusherException e) {
193                 LOG.debug("Not enough capabilities: {}", e.toString());
194                 lastException = e;
195                 sleep();
196             }
197         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
198
199         LOG.error("Unable to push configuration due to missing yang models." +
200                         " Yang models that are missing, but required by the configuration: {}." +
201                         " For each mentioned model check: " +
202                         " 1. that the mentioned yang model namespace/name/revision is identical to those in the yang model itself" +
203                         " 2. the yang file is present in the system" +
204                         " 3. the bundle with that yang file is present in the system and active" +
205                         " 4. the yang parser did not fail while attempting to parse that model",
206                 ((NotEnoughCapabilitiesException) lastException).getMissingCaps());
207         throw new IllegalStateException("Unable to push configuration due to missing yang models." +
208                 " Required yang models that are missing: "
209                 + ((NotEnoughCapabilitiesException) lastException).getMissingCaps(), lastException);
210     }
211
212     private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, Set<Capability> currentCapabilities) {
213         Collection<String> actual = transformCapabilities(currentCapabilities);
214         Set<String> allNotFound = new HashSet<>(expectedCapabilities);
215         allNotFound.removeAll(actual);
216         return allNotFound;
217     }
218
219     static Set<String> transformCapabilities(final Set<Capability> currentCapabilities) {
220         return new HashSet<>(Collections2.transform(currentCapabilities, new Function<Capability, String>() {
221             @Override
222             public String apply(@Nonnull final Capability input) {
223                 return input.getCapabilityUri();
224             }
225         }));
226     }
227
228     static class ConfigPusherException extends Exception {
229
230         public ConfigPusherException(final String message) {
231             super(message);
232         }
233
234         public ConfigPusherException(final String message, final Throwable cause) {
235             super(message, cause);
236         }
237     }
238
239     static class NotEnoughCapabilitiesException extends ConfigPusherException {
240         private static final long serialVersionUID = 1L;
241         private final Set<String> missingCaps;
242
243         NotEnoughCapabilitiesException(String message, Set<String> missingCaps) {
244             super(message);
245             this.missingCaps = missingCaps;
246         }
247
248         public Set<String> getMissingCaps() {
249             return missingCaps;
250         }
251     }
252
253     private static final class ConfigSnapshotFailureException extends ConfigPusherException {
254
255         private final String configIdForReporting;
256
257         public ConfigSnapshotFailureException(final String configIdForReporting, final String operationNameForReporting, final Exception e) {
258             super(String.format("Failed to apply config snapshot: %s during phase: %s", configIdForReporting, operationNameForReporting), e);
259             this.configIdForReporting = configIdForReporting;
260         }
261
262         public String getConfigIdForReporting() {
263             return configIdForReporting;
264         }
265     }
266
267     private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, YangStoreService yangStoreService) {
268
269         Collection<String> actual = Collections2.transform(yangStoreService.getModules(), new Function<Module, String>() {
270             @Nullable
271             @Override
272             public String apply(Module input) {
273                 final String withoutRevision = input.getNamespace().toString() + "?module=" + input.getName();
274                 return !input.getRevision().equals(NO_REVISION) ? withoutRevision + "&revision=" + Util.writeDate(input.getRevision()) : withoutRevision;
275             }
276         });
277
278         Set<String> allNotFound = new HashSet<>(expectedCapabilities);
279         allNotFound.removeAll(actual);
280         return allNotFound;
281     }
282
283     private void sleep() {
284         try {
285             Thread.sleep(100);
286         } catch (InterruptedException e) {
287             Thread.currentThread().interrupt();
288             throw new IllegalStateException(e);
289         }
290     }
291
292     private synchronized boolean pushConfig(ConfigSnapshotHolder configSnapshotHolder) throws ConfigSnapshotFailureException, ConflictingVersionException {
293         Element xmlToBePersisted;
294         try {
295             xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
296         } catch (SAXException | IOException e) {
297             throw new IllegalStateException("Cannot parse " + configSnapshotHolder, e);
298         }
299         LOG.trace("Pushing last configuration to config mapping: {}", configSnapshotHolder);
300
301         Stopwatch stopwatch = Stopwatch.createStarted();
302         final ConfigSubsystemFacade currentFacade = this.facade.createFacade("config-push");
303         try {
304             ConfigExecution configExecution = createConfigExecution(xmlToBePersisted, currentFacade);
305             executeWithMissingModuleFactoryRetries(currentFacade, configExecution);
306         } catch (ValidationException | DocumentedException | ModuleFactoryNotFoundException e) {
307             LOG.trace("Validation for config: {} failed", configSnapshotHolder, e);
308             throw new ConfigSnapshotFailureException(configSnapshotHolder.toString(), "edit", e);
309         }
310
311         try {
312             currentFacade.commitSilentTransaction();
313         } catch (ValidationException | DocumentedException e) {
314             throw new ConfigSnapshotFailureException(configSnapshotHolder.toString(), "commit", e);
315         }
316
317         LOG.trace("Last configuration loaded successfully");
318         LOG.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
319
320         return true;
321     }
322
323     private void executeWithMissingModuleFactoryRetries(ConfigSubsystemFacade facade, ConfigExecution configExecution)
324             throws DocumentedException, ValidationException, ModuleFactoryNotFoundException {
325         Stopwatch stopwatch = Stopwatch.createStarted();
326         ModuleFactoryNotFoundException lastException = null;
327         do {
328             try {
329                 facade.executeConfigExecution(configExecution);
330                 return;
331             } catch (ModuleFactoryNotFoundException e) {
332                 LOG.debug("{} - will retry after timeout", e.toString());
333                 lastException = e;
334                 sleep();
335             }
336         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
337
338         throw lastException;
339     }
340
341     private ConfigExecution createConfigExecution(Element xmlToBePersisted, final ConfigSubsystemFacade currentFacade) throws DocumentedException {
342         final Config configMapping = currentFacade.getConfigMapping();
343         return currentFacade.getConfigExecution(configMapping, xmlToBePersisted);
344     }
345
346 }