Decouple config and netconf subsystems.
[controller.git] / opendaylight / config / config-persister-impl / src / main / java / org / opendaylight / controller / config / persist / impl / ConfigPusherImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.config.persist.impl;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12
13 import com.google.common.base.Function;
14 import com.google.common.base.Stopwatch;
15 import com.google.common.collect.Collections2;
16 import java.io.IOException;
17 import java.util.Collection;
18 import java.util.Date;
19 import java.util.HashSet;
20 import java.util.LinkedHashMap;
21 import java.util.List;
22 import java.util.Set;
23 import java.util.SortedSet;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.LinkedBlockingQueue;
26 import java.util.concurrent.TimeUnit;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import javax.annotation.concurrent.Immutable;
30 import javax.management.MBeanServerConnection;
31 import org.opendaylight.controller.config.api.ConflictingVersionException;
32 import org.opendaylight.controller.config.api.ValidationException;
33 import org.opendaylight.controller.config.facade.xml.ConfigExecution;
34 import org.opendaylight.controller.config.facade.xml.ConfigSubsystemFacade;
35 import org.opendaylight.controller.config.facade.xml.ConfigSubsystemFacadeFactory;
36 import org.opendaylight.controller.config.facade.xml.mapping.config.Config;
37 import org.opendaylight.controller.config.facade.xml.osgi.YangStoreService;
38 import org.opendaylight.controller.config.facade.xml.util.Util;
39 import org.opendaylight.controller.config.persist.api.ConfigPusher;
40 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
41 import org.opendaylight.controller.config.persist.api.Persister;
42 import org.opendaylight.controller.config.util.capability.Capability;
43 import org.opendaylight.controller.config.util.xml.DocumentedException;
44 import org.opendaylight.controller.config.util.xml.XmlUtil;
45 import org.opendaylight.yangtools.yang.model.api.Module;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.w3c.dom.Element;
49 import org.xml.sax.SAXException;
50
51 @Immutable
52 public class ConfigPusherImpl implements ConfigPusher {
53     private static final Logger LOG = LoggerFactory.getLogger(ConfigPusherImpl.class);
54
55     private static final Date NO_REVISION = new Date(0);
56     private static final int QUEUE_SIZE = 100;
57
58     private final long maxWaitForCapabilitiesMillis;
59     private final long conflictingVersionTimeoutMillis;
60     private BlockingQueue<List<? extends ConfigSnapshotHolder>> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
61
62     private ConfigSubsystemFacadeFactory facade;
63     private ConfigPersisterNotificationHandler jmxNotificationHandler;
64
65     public ConfigPusherImpl(ConfigSubsystemFacadeFactory facade, long maxWaitForCapabilitiesMillis,
66                         long conflictingVersionTimeoutMillis) {
67         this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
68         this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
69         this.facade = facade;
70     }
71
72     public void process(List<AutoCloseable> autoCloseables, MBeanServerConnection platformMBeanServer, Persister persisterAggregator) throws InterruptedException {
73         while(true) {
74             processSingle(autoCloseables, platformMBeanServer, persisterAggregator);
75         }
76     }
77
78     void processSingle(final List<AutoCloseable> autoCloseables, final MBeanServerConnection platformMBeanServer, final Persister persisterAggregator) throws InterruptedException {
79         final List<? extends ConfigSnapshotHolder> configs = queue.take();
80         try {
81             internalPushConfigs(configs);
82
83             // Do not register multiple notification handlers
84             if(jmxNotificationHandler == null) {
85                 jmxNotificationHandler =
86                         new ConfigPersisterNotificationHandler(platformMBeanServer, persisterAggregator, facade);
87                 synchronized (autoCloseables) {
88                     autoCloseables.add(jmxNotificationHandler);
89                 }
90             }
91
92             LOG.debug("ConfigPusher has pushed configs {}", configs);
93         } catch (DocumentedException e) {
94             LOG.error("Error pushing configs {}",configs);
95             throw new IllegalStateException(e);
96         }
97     }
98
99     public void pushConfigs(List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
100         LOG.debug("Requested to push configs {}", configs);
101         this.queue.put(configs);
102     }
103
104     private LinkedHashMap<? extends ConfigSnapshotHolder, Boolean> internalPushConfigs(List<? extends ConfigSnapshotHolder> configs)
105             throws DocumentedException {
106         LOG.debug("Last config snapshots to be pushed to netconf: {}", configs);
107         LinkedHashMap<ConfigSnapshotHolder, Boolean> result = new LinkedHashMap<>();
108         // start pushing snapshots
109         for (ConfigSnapshotHolder configSnapshotHolder : configs) {
110             if (configSnapshotHolder != null) {
111                 boolean pushResult = false;
112                 try {
113                     pushResult = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
114                 } catch (ConfigSnapshotFailureException e) {
115                     LOG.warn("Failed to apply configuration snapshot: {}. Config snapshot is not semantically correct and will be IGNORED. " +
116                             "for detailed information see enclosed exception.", e.getConfigIdForReporting(), e);
117                     throw new IllegalStateException("Failed to apply configuration snapshot " + e.getConfigIdForReporting(), e);
118                 }
119                 LOG.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
120                 result.put(configSnapshotHolder, pushResult);
121             }
122         }
123         LOG.debug("All configuration snapshots have been pushed successfully.");
124         return result;
125     }
126
127     private synchronized boolean pushConfigWithConflictingVersionRetries(ConfigSnapshotHolder configSnapshotHolder) throws ConfigSnapshotFailureException {
128         ConflictingVersionException lastException;
129         Stopwatch stopwatch = Stopwatch.createUnstarted();
130         do {
131             //TODO wait untill all expected modules are in yangStoreService, do we even need to with yangStoreService instead on netconfOperationService?
132             String idForReporting = configSnapshotHolder.toString();
133             SortedSet<String> expectedCapabilities = checkNotNull(configSnapshotHolder.getCapabilities(),
134                     "Expected capabilities must not be null - %s, check %s", idForReporting,
135                     configSnapshotHolder.getClass().getName());
136
137             // wait max time for required capabilities to appear
138             waitForCapabilities(expectedCapabilities, idForReporting);
139             try {
140                 if(!stopwatch.isRunning()) {
141                     stopwatch.start();
142                 }
143                 return pushConfig(configSnapshotHolder);
144             } catch (ConflictingVersionException e) {
145                 lastException = e;
146                 LOG.info("Conflicting version detected, will retry after timeout");
147                 sleep();
148             }
149         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
150         throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
151                 lastException);
152     }
153
154     private void waitForCapabilities(Set<String> expectedCapabilities, String idForReporting) {
155         Stopwatch stopwatch = Stopwatch.createStarted();
156         ConfigPusherException lastException;
157         do {
158             try {
159                 final Set<Capability> currentCaps = facade.getCurrentCapabilities();
160                 final Set<String> notFoundCapabilities = computeNotFoundCapabilities(expectedCapabilities, currentCaps);
161                 if (notFoundCapabilities.isEmpty()) {
162                     return;
163                 } else {
164                     LOG.debug("Netconf server did not provide required capabilities for {} ", idForReporting,
165                             "Expected but not found: {}, all expected {}, current {}",
166                             notFoundCapabilities, expectedCapabilities, currentCaps
167                     );
168                     throw new NotEnoughCapabilitiesException(
169                             "Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundCapabilities, notFoundCapabilities);
170                 }
171             } catch (ConfigPusherException e) {
172                 LOG.debug("Not enough capabilities: {}", e.toString());
173                 lastException = e;
174                 sleep();
175             }
176         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
177
178         LOG.error("Unable to push configuration due to missing yang models." +
179                         " Yang models that are missing, but required by the configuration: {}." +
180                         " For each mentioned model check: " +
181                         " 1. that the mentioned yang model namespace/name/revision is identical to those in the yang model itself" +
182                         " 2. the yang file is present in the system" +
183                         " 3. the bundle with that yang file is present in the system and active" +
184                         " 4. the yang parser did not fail while attempting to parse that model",
185                 ((NotEnoughCapabilitiesException) lastException).getMissingCaps());
186         throw new IllegalStateException("Unable to push configuration due to missing yang models." +
187                 " Required yang models that are missing: "
188                 + ((NotEnoughCapabilitiesException) lastException).getMissingCaps(), lastException);
189     }
190
191     private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, Set<Capability> currentCapabilities) {
192         Collection<String> actual = transformCapabilities(currentCapabilities);
193         Set<String> allNotFound = new HashSet<>(expectedCapabilities);
194         allNotFound.removeAll(actual);
195         return allNotFound;
196     }
197
198     static Set<String> transformCapabilities(final Set<Capability> currentCapabilities) {
199         return new HashSet<>(Collections2.transform(currentCapabilities, new Function<Capability, String>() {
200             @Override
201             public String apply(@Nonnull final Capability input) {
202                 return input.getCapabilityUri();
203             }
204         }));
205     }
206
207     static class ConfigPusherException extends Exception {
208
209         public ConfigPusherException(final String message) {
210             super(message);
211         }
212
213         public ConfigPusherException(final String message, final Throwable cause) {
214             super(message, cause);
215         }
216     }
217
218     static class NotEnoughCapabilitiesException extends ConfigPusherException {
219         private static final long serialVersionUID = 1L;
220         private Set<String> missingCaps;
221
222         NotEnoughCapabilitiesException(String message, Set<String> missingCaps) {
223             super(message);
224             this.missingCaps = missingCaps;
225         }
226
227         public Set<String> getMissingCaps() {
228             return missingCaps;
229         }
230     }
231
232     private static final class ConfigSnapshotFailureException extends ConfigPusherException {
233
234         private final String configIdForReporting;
235
236         public ConfigSnapshotFailureException(final String configIdForReporting, final String operationNameForReporting, final Exception e) {
237             super(String.format("Failed to apply config snapshot: %s during phase: %s", configIdForReporting, operationNameForReporting), e);
238             this.configIdForReporting = configIdForReporting;
239         }
240
241         public String getConfigIdForReporting() {
242             return configIdForReporting;
243         }
244     }
245
246     private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, YangStoreService yangStoreService) {
247
248         Collection<String> actual = Collections2.transform(yangStoreService.getModules(), new Function<Module, String>() {
249             @Nullable
250             @Override
251             public String apply(Module input) {
252                 final String withoutRevision = input.getNamespace().toString() + "?module=" + input.getName();
253                 return !input.getRevision().equals(NO_REVISION) ? withoutRevision + "&revision=" + Util.writeDate(input.getRevision()) : withoutRevision;
254             }
255         });
256
257         Set<String> allNotFound = new HashSet<>(expectedCapabilities);
258         allNotFound.removeAll(actual);
259         return allNotFound;
260     }
261
262     private void sleep() {
263         try {
264             Thread.sleep(100);
265         } catch (InterruptedException e) {
266             Thread.currentThread().interrupt();
267             throw new IllegalStateException(e);
268         }
269     }
270
271     private synchronized boolean pushConfig(ConfigSnapshotHolder configSnapshotHolder) throws ConfigSnapshotFailureException, ConflictingVersionException {
272         Element xmlToBePersisted;
273         try {
274             xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
275         } catch (SAXException | IOException e) {
276             throw new IllegalStateException("Cannot parse " + configSnapshotHolder, e);
277         }
278         LOG.trace("Pushing last configuration to config mapping: {}", configSnapshotHolder);
279
280         Stopwatch stopwatch = Stopwatch.createStarted();
281         final ConfigSubsystemFacade currentFacade = this.facade.createFacade("config-push");
282         try {
283             ConfigExecution configExecution = createConfigExecution(xmlToBePersisted, currentFacade);
284             currentFacade.executeConfigExecution(configExecution);
285         } catch (ValidationException | DocumentedException e) {
286             LOG.trace("Validation for config: {} failed", configSnapshotHolder, e);
287             throw new ConfigSnapshotFailureException(configSnapshotHolder.toString(), "edit", e);
288         }
289
290         try {
291             currentFacade.commitSilentTransaction();
292         } catch (ValidationException | DocumentedException e) {
293             throw new ConfigSnapshotFailureException(configSnapshotHolder.toString(), "commit", e);
294         }
295
296         LOG.trace("Last configuration loaded successfully");
297         LOG.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
298
299         return true;
300     }
301
302     private ConfigExecution createConfigExecution(Element xmlToBePersisted, final ConfigSubsystemFacade currentFacade) throws DocumentedException {
303         final Config configMapping = currentFacade.getConfigMapping();
304         return currentFacade.getConfigExecution(configMapping, xmlToBePersisted);
305     }
306
307 }