Merge "Add Enqueue validation check in FlowConfig"
[controller.git] / opendaylight / netconf / config-persister-impl / src / main / java / org / opendaylight / controller / netconf / persist / impl / ConfigPusher.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.persist.impl;
10
11 import com.google.common.base.Preconditions;
12 import org.opendaylight.controller.config.api.ConflictingVersionException;
13 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
14 import org.opendaylight.controller.netconf.api.NetconfMessage;
15 import org.opendaylight.controller.netconf.client.NetconfClient;
16 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
17 import org.opendaylight.controller.netconf.util.NetconfUtil;
18 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
19 import org.opendaylight.controller.netconf.util.xml.XmlElement;
20 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
21 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24 import org.w3c.dom.Document;
25 import org.w3c.dom.Element;
26 import org.xml.sax.SAXException;
27
28 import javax.annotation.concurrent.Immutable;
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.LinkedHashMap;
34 import java.util.List;
35 import java.util.Set;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.TimeoutException;
39
40 @Immutable
41 public class ConfigPusher {
42     private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class);
43
44     private final ConfigPusherConfiguration configuration;
45
46     public ConfigPusher(ConfigPusherConfiguration configuration) {
47         this.configuration = configuration;
48     }
49
50     public synchronized LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponseWithRetries> pushConfigs(
51             List<ConfigSnapshotHolder> configs) throws InterruptedException {
52         logger.debug("Last config snapshots to be pushed to netconf: {}", configs);
53
54         // first just make sure we can connect to netconf, even if nothing is being pushed
55         {
56             NetconfClient netconfClient = makeNetconfConnection(Collections.<String>emptySet());
57             Util.closeClientAndDispatcher(netconfClient);
58         }
59         LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponseWithRetries> result = new LinkedHashMap<>();
60         // start pushing snapshots:
61         for (ConfigSnapshotHolder configSnapshotHolder : configs) {
62             EditAndCommitResponseWithRetries editAndCommitResponseWithRetries = pushSnapshotWithRetries(configSnapshotHolder);
63             logger.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
64             result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
65         }
66         logger.debug("All configuration snapshots have been pushed successfully.");
67         return result;
68     }
69
70     /**
71      * Checks for ConflictingVersionException and retries until optimistic lock succeeds or maximal
72      * number of attempts is reached.
73      */
74     private synchronized EditAndCommitResponseWithRetries pushSnapshotWithRetries(ConfigSnapshotHolder configSnapshotHolder)
75             throws InterruptedException {
76
77         ConflictingVersionException lastException = null;
78         int maxAttempts = configuration.netconfPushConfigAttempts;
79
80         for (int retryAttempt = 1; retryAttempt <= maxAttempts; retryAttempt++) {
81             NetconfClient netconfClient = makeNetconfConnection(configSnapshotHolder.getCapabilities());
82             logger.trace("Pushing following xml to netconf {}", configSnapshotHolder);
83             try {
84                 EditAndCommitResponse editAndCommitResponse = pushLastConfig(configSnapshotHolder, netconfClient);
85                 return new EditAndCommitResponseWithRetries(editAndCommitResponse, retryAttempt);
86             } catch (ConflictingVersionException e) {
87                 logger.debug("Conflicting version detected, will retry after timeout");
88                 lastException = e;
89                 Thread.sleep(configuration.netconfPushConfigDelayMs);
90             } catch (RuntimeException e) {
91                 throw new IllegalStateException("Unable to load " + configSnapshotHolder, e);
92             } finally {
93                 Util.closeClientAndDispatcher(netconfClient);
94             }
95         }
96         throw new IllegalStateException("Maximum attempt count has been reached for pushing " + configSnapshotHolder,
97                 lastException);
98     }
99
100     /**
101      * @param expectedCaps capabilities that server hello must contain. Will retry until all are found or throws RuntimeException.
102      *                     If empty set is provided, will only make sure netconf client successfuly connected to the server.
103      * @return NetconfClient that has all required capabilities from server.
104      */
105     private synchronized NetconfClient makeNetconfConnection(Set<String> expectedCaps) throws InterruptedException {
106
107         // TODO think about moving capability subset check to netconf client
108         // could be utilized by integration tests
109
110         final long pollingStartNanos = System.nanoTime();
111         final long deadlineNanos = pollingStartNanos + TimeUnit.MILLISECONDS.toNanos(configuration.netconfCapabilitiesWaitTimeoutMs);
112         int attempt = 0;
113
114         NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("unknown",
115                 configuration.netconfAddress.getAddress().getHostAddress(),
116                 Integer.toString(configuration.netconfAddress.getPort()), "tcp", "persister");
117
118         Set<String> latestCapabilities = null;
119         while (System.nanoTime() < deadlineNanos) {
120             attempt++;
121             NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(configuration.eventLoopGroup,
122                     configuration.eventLoopGroup, additionalHeader, configuration.connectionAttemptTimeoutMs);
123             NetconfClient netconfClient;
124             try {
125                 netconfClient = new NetconfClient(this.toString(), configuration.netconfAddress, configuration.connectionAttemptDelayMs, netconfClientDispatcher);
126             } catch (IllegalStateException e) {
127                 logger.debug("Netconf {} was not initialized or is not stable, attempt {}", configuration.netconfAddress, attempt, e);
128                 netconfClientDispatcher.close();
129                 Thread.sleep(configuration.connectionAttemptDelayMs);
130                 continue;
131             }
132             latestCapabilities = netconfClient.getCapabilities();
133             if (Util.isSubset(netconfClient, expectedCaps)) {
134                 logger.debug("Hello from netconf stable with {} capabilities", latestCapabilities);
135                 logger.trace("Session id received from netconf server: {}", netconfClient.getClientSession());
136                 return netconfClient;
137             }
138             Set<String> allNotFound = computeNotFoundCapabilities(expectedCaps, latestCapabilities);
139             logger.debug("Netconf server did not provide required capabilities. Attempt {}. " +
140                     "Expected but not found: {}, all expected {}, current {}",
141                     attempt, allNotFound, expectedCaps, latestCapabilities);
142             Util.closeClientAndDispatcher(netconfClient);
143             Thread.sleep(configuration.connectionAttemptDelayMs);
144         }
145         if (latestCapabilities == null) {
146             logger.error("Could not connect to the server in {} ms", configuration.netconfCapabilitiesWaitTimeoutMs);
147             throw new RuntimeException("Could not connect to netconf server");
148         }
149         Set<String> allNotFound = computeNotFoundCapabilities(expectedCaps, latestCapabilities);
150         logger.error("Netconf server did not provide required capabilities. Expected but not found: {}, all expected {}, current {}",
151                 allNotFound, expectedCaps, latestCapabilities);
152         throw new RuntimeException("Netconf server did not provide required capabilities. Expected but not found:" + allNotFound);
153     }
154
155     private static Set<String> computeNotFoundCapabilities(Set<String> expectedCaps, Set<String> latestCapabilities) {
156         Set<String> allNotFound = new HashSet<>(expectedCaps);
157         allNotFound.removeAll(latestCapabilities);
158         return allNotFound;
159     }
160
161
162     /**
163      * Sends two RPCs to the netconf server: edit-config and commit.
164      *
165      * @param configSnapshotHolder
166      * @param netconfClient
167      * @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager
168      * @throws java.lang.RuntimeException  if edit-config or commit fails otherwise
169      */
170     private synchronized EditAndCommitResponse pushLastConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfClient netconfClient)
171             throws ConflictingVersionException {
172
173         Element xmlToBePersisted;
174         try {
175             xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
176         } catch (SAXException | IOException e) {
177             throw new IllegalStateException("Cannot parse " + configSnapshotHolder);
178         }
179         logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
180
181         NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted);
182
183         // sending message to netconf
184         NetconfMessage editResponseMessage;
185         try {
186             editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, netconfClient);
187         } catch (IOException e) {
188             throw new IllegalStateException("Edit-config failed on " + configSnapshotHolder, e);
189         }
190
191         // commit
192         NetconfMessage commitResponseMessage;
193         try {
194             commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), netconfClient);
195         } catch (IOException e) {
196             throw new IllegalStateException("Edit commit succeeded, but commit failed on " + configSnapshotHolder, e);
197         }
198
199         if (logger.isTraceEnabled()) {
200             StringBuilder response = new StringBuilder("editConfig response = {");
201             response.append(XmlUtil.toString(editResponseMessage.getDocument()));
202             response.append("}");
203             response.append("commit response = {");
204             response.append(XmlUtil.toString(commitResponseMessage.getDocument()));
205             response.append("}");
206             logger.trace("Last configuration loaded successfully");
207             logger.trace("Detailed message {}", response);
208         }
209         return new EditAndCommitResponse(editResponseMessage, commitResponseMessage);
210     }
211
212
213     private NetconfMessage sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfClient netconfClient)
214             throws ConflictingVersionException, IOException {
215         try {
216             NetconfMessage netconfMessage = netconfClient.sendMessage(request,
217                     configuration.netconfSendMessageMaxAttempts, configuration.netconfSendMessageDelayMs);
218             NetconfUtil.checkIsMessageOk(netconfMessage);
219             return netconfMessage;
220         } catch(ConflictingVersionException e) {
221             logger.trace("conflicting version detected: {}", e.toString());
222             throw e;
223         } catch (RuntimeException | ExecutionException | InterruptedException | TimeoutException e) { // TODO: change NetconfClient#sendMessage to throw checked exceptions
224             logger.debug("Error while executing netconf transaction {} to {}", request, netconfClient, e);
225             throw new IOException("Failed to execute netconf transaction", e);
226         }
227     }
228
229
230     // load editConfig.xml template, populate /rpc/edit-config/config with parameter
231     private static NetconfMessage createEditConfigMessage(Element dataElement) {
232         String editConfigResourcePath = "/netconfOp/editConfig.xml";
233         try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcePath)) {
234             Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcePath);
235
236             Document doc = XmlUtil.readXmlToDocument(stream);
237
238             XmlElement editConfigElement = XmlElement.fromDomDocument(doc).getOnlyChildElement();
239             XmlElement configWrapper = editConfigElement.getOnlyChildElement(XmlNetconfConstants.CONFIG_KEY);
240             editConfigElement.getDomElement().removeChild(configWrapper.getDomElement());
241             for (XmlElement el : XmlElement.fromDomElement(dataElement).getChildElements()) {
242                 boolean deep = true;
243                 configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), deep));
244             }
245             editConfigElement.appendChild(configWrapper.getDomElement());
246             return new NetconfMessage(doc);
247         } catch (IOException | SAXException e) {
248             // error reading the xml file bundled into the jar
249             throw new RuntimeException("Error while opening local resource " + editConfigResourcePath, e);
250         }
251     }
252
253     private static NetconfMessage getCommitMessage() {
254         String resource = "/netconfOp/commit.xml";
255         try (InputStream stream = ConfigPusher.class.getResourceAsStream(resource)) {
256             Preconditions.checkNotNull(stream, "Unable to load resource " + resource);
257             return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
258         } catch (SAXException | IOException e) {
259             // error reading the xml file bundled into the jar
260             throw new RuntimeException("Error while opening local resource " + resource, e);
261         }
262     }
263
264     static class EditAndCommitResponse {
265         private final NetconfMessage editResponse, commitResponse;
266
267         EditAndCommitResponse(NetconfMessage editResponse, NetconfMessage commitResponse) {
268             this.editResponse = editResponse;
269             this.commitResponse = commitResponse;
270         }
271
272         public NetconfMessage getEditResponse() {
273             return editResponse;
274         }
275
276         public NetconfMessage getCommitResponse() {
277             return commitResponse;
278         }
279
280         @Override
281         public String toString() {
282             return "EditAndCommitResponse{" +
283                     "editResponse=" + editResponse +
284                     ", commitResponse=" + commitResponse +
285                     '}';
286         }
287     }
288
289
290     static class EditAndCommitResponseWithRetries {
291         private final EditAndCommitResponse editAndCommitResponse;
292         private final int retries;
293
294         EditAndCommitResponseWithRetries(EditAndCommitResponse editAndCommitResponse, int retries) {
295             this.editAndCommitResponse = editAndCommitResponse;
296             this.retries = retries;
297         }
298
299         public int getRetries() {
300             return retries;
301         }
302
303         public EditAndCommitResponse getEditAndCommitResponse() {
304             return editAndCommitResponse;
305         }
306
307         @Override
308         public String toString() {
309             return "EditAndCommitResponseWithRetries{" +
310                     "editAndCommitResponse=" + editAndCommitResponse +
311                     ", retries=" + retries +
312                     '}';
313         }
314     }
315
316 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.