2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.persist.impl;
11 import com.google.common.base.Preconditions;
12 import org.opendaylight.controller.config.api.ConflictingVersionException;
13 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
14 import org.opendaylight.controller.netconf.api.NetconfMessage;
15 import org.opendaylight.controller.netconf.client.NetconfClient;
16 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
17 import org.opendaylight.controller.netconf.util.NetconfUtil;
18 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
19 import org.opendaylight.controller.netconf.util.xml.XmlElement;
20 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
21 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24 import org.w3c.dom.Document;
25 import org.w3c.dom.Element;
26 import org.xml.sax.SAXException;
28 import javax.annotation.concurrent.Immutable;
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.LinkedHashMap;
34 import java.util.List;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.TimeoutException;
41 public class ConfigPusher {
42 private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class);
44 private final ConfigPusherConfiguration configuration;
46 public ConfigPusher(ConfigPusherConfiguration configuration) {
47 this.configuration = configuration;
50 public synchronized LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponseWithRetries> pushConfigs(
51 List<ConfigSnapshotHolder> configs) throws InterruptedException {
52 logger.debug("Last config snapshots to be pushed to netconf: {}", configs);
54 // first just make sure we can connect to netconf, even if nothing is being pushed
56 NetconfClient netconfClient = makeNetconfConnection(Collections.<String>emptySet());
57 Util.closeClientAndDispatcher(netconfClient);
59 LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponseWithRetries> result = new LinkedHashMap<>();
60 // start pushing snapshots:
61 for (ConfigSnapshotHolder configSnapshotHolder : configs) {
62 EditAndCommitResponseWithRetries editAndCommitResponseWithRetries = pushSnapshotWithRetries(configSnapshotHolder);
63 logger.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
64 result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
66 logger.debug("All configuration snapshots have been pushed successfully.");
71 * Checks for ConflictingVersionException and retries until optimistic lock succeeds or maximal
72 * number of attempts is reached.
74 private synchronized EditAndCommitResponseWithRetries pushSnapshotWithRetries(ConfigSnapshotHolder configSnapshotHolder)
75 throws InterruptedException {
77 ConflictingVersionException lastException = null;
78 int maxAttempts = configuration.netconfPushConfigAttempts;
80 for (int retryAttempt = 1; retryAttempt <= maxAttempts; retryAttempt++) {
81 NetconfClient netconfClient = makeNetconfConnection(configSnapshotHolder.getCapabilities());
82 logger.trace("Pushing following xml to netconf {}", configSnapshotHolder);
84 EditAndCommitResponse editAndCommitResponse = pushLastConfig(configSnapshotHolder, netconfClient);
85 return new EditAndCommitResponseWithRetries(editAndCommitResponse, retryAttempt);
86 } catch (ConflictingVersionException e) {
87 logger.debug("Conflicting version detected, will retry after timeout");
89 Thread.sleep(configuration.netconfPushConfigDelayMs);
90 } catch (RuntimeException e) {
91 throw new IllegalStateException("Unable to load " + configSnapshotHolder, e);
93 Util.closeClientAndDispatcher(netconfClient);
96 throw new IllegalStateException("Maximum attempt count has been reached for pushing " + configSnapshotHolder,
101 * @param expectedCaps capabilities that server hello must contain. Will retry until all are found or throws RuntimeException.
102 * If empty set is provided, will only make sure netconf client successfuly connected to the server.
103 * @return NetconfClient that has all required capabilities from server.
105 private synchronized NetconfClient makeNetconfConnection(Set<String> expectedCaps) throws InterruptedException {
107 // TODO think about moving capability subset check to netconf client
108 // could be utilized by integration tests
110 final long pollingStartNanos = System.nanoTime();
111 final long deadlineNanos = pollingStartNanos + TimeUnit.MILLISECONDS.toNanos(configuration.netconfCapabilitiesWaitTimeoutMs);
114 NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("unknown",
115 configuration.netconfAddress.getAddress().getHostAddress(),
116 Integer.toString(configuration.netconfAddress.getPort()), "tcp", "persister");
118 Set<String> latestCapabilities = null;
119 while (System.nanoTime() < deadlineNanos) {
121 NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(configuration.eventLoopGroup,
122 configuration.eventLoopGroup, additionalHeader, configuration.connectionAttemptTimeoutMs);
123 NetconfClient netconfClient;
125 netconfClient = new NetconfClient(this.toString(), configuration.netconfAddress, configuration.connectionAttemptDelayMs, netconfClientDispatcher);
126 } catch (IllegalStateException e) {
127 logger.debug("Netconf {} was not initialized or is not stable, attempt {}", configuration.netconfAddress, attempt, e);
128 netconfClientDispatcher.close();
129 Thread.sleep(configuration.connectionAttemptDelayMs);
132 latestCapabilities = netconfClient.getCapabilities();
133 if (Util.isSubset(netconfClient, expectedCaps)) {
134 logger.debug("Hello from netconf stable with {} capabilities", latestCapabilities);
135 logger.trace("Session id received from netconf server: {}", netconfClient.getClientSession());
136 return netconfClient;
138 Set<String> allNotFound = computeNotFoundCapabilities(expectedCaps, latestCapabilities);
139 logger.debug("Netconf server did not provide required capabilities. Attempt {}. " +
140 "Expected but not found: {}, all expected {}, current {}",
141 attempt, allNotFound, expectedCaps, latestCapabilities);
142 Util.closeClientAndDispatcher(netconfClient);
143 Thread.sleep(configuration.connectionAttemptDelayMs);
145 if (latestCapabilities == null) {
146 logger.error("Could not connect to the server in {} ms", configuration.netconfCapabilitiesWaitTimeoutMs);
147 throw new RuntimeException("Could not connect to netconf server");
149 Set<String> allNotFound = computeNotFoundCapabilities(expectedCaps, latestCapabilities);
150 logger.error("Netconf server did not provide required capabilities. Expected but not found: {}, all expected {}, current {}",
151 allNotFound, expectedCaps, latestCapabilities);
152 throw new RuntimeException("Netconf server did not provide required capabilities. Expected but not found:" + allNotFound);
155 private static Set<String> computeNotFoundCapabilities(Set<String> expectedCaps, Set<String> latestCapabilities) {
156 Set<String> allNotFound = new HashSet<>(expectedCaps);
157 allNotFound.removeAll(latestCapabilities);
163 * Sends two RPCs to the netconf server: edit-config and commit.
165 * @param configSnapshotHolder
166 * @param netconfClient
167 * @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager
168 * @throws java.lang.RuntimeException if edit-config or commit fails otherwise
170 private synchronized EditAndCommitResponse pushLastConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfClient netconfClient)
171 throws ConflictingVersionException {
173 Element xmlToBePersisted;
175 xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
176 } catch (SAXException | IOException e) {
177 throw new IllegalStateException("Cannot parse " + configSnapshotHolder);
179 logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
181 NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted);
183 // sending message to netconf
184 NetconfMessage editResponseMessage;
186 editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, netconfClient);
187 } catch (IOException e) {
188 throw new IllegalStateException("Edit-config failed on " + configSnapshotHolder, e);
192 NetconfMessage commitResponseMessage;
194 commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), netconfClient);
195 } catch (IOException e) {
196 throw new IllegalStateException("Edit commit succeeded, but commit failed on " + configSnapshotHolder, e);
199 if (logger.isTraceEnabled()) {
200 StringBuilder response = new StringBuilder("editConfig response = {");
201 response.append(XmlUtil.toString(editResponseMessage.getDocument()));
202 response.append("}");
203 response.append("commit response = {");
204 response.append(XmlUtil.toString(commitResponseMessage.getDocument()));
205 response.append("}");
206 logger.trace("Last configuration loaded successfully");
207 logger.trace("Detailed message {}", response);
209 return new EditAndCommitResponse(editResponseMessage, commitResponseMessage);
213 private NetconfMessage sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfClient netconfClient)
214 throws ConflictingVersionException, IOException {
216 NetconfMessage netconfMessage = netconfClient.sendMessage(request,
217 configuration.netconfSendMessageMaxAttempts, configuration.netconfSendMessageDelayMs);
218 NetconfUtil.checkIsMessageOk(netconfMessage);
219 return netconfMessage;
220 } catch(ConflictingVersionException e) {
221 logger.trace("conflicting version detected: {}", e.toString());
223 } catch (RuntimeException | ExecutionException | InterruptedException | TimeoutException e) { // TODO: change NetconfClient#sendMessage to throw checked exceptions
224 logger.debug("Error while executing netconf transaction {} to {}", request, netconfClient, e);
225 throw new IOException("Failed to execute netconf transaction", e);
230 // load editConfig.xml template, populate /rpc/edit-config/config with parameter
231 private static NetconfMessage createEditConfigMessage(Element dataElement) {
232 String editConfigResourcePath = "/netconfOp/editConfig.xml";
233 try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcePath)) {
234 Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcePath);
236 Document doc = XmlUtil.readXmlToDocument(stream);
238 XmlElement editConfigElement = XmlElement.fromDomDocument(doc).getOnlyChildElement();
239 XmlElement configWrapper = editConfigElement.getOnlyChildElement(XmlNetconfConstants.CONFIG_KEY);
240 editConfigElement.getDomElement().removeChild(configWrapper.getDomElement());
241 for (XmlElement el : XmlElement.fromDomElement(dataElement).getChildElements()) {
243 configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), deep));
245 editConfigElement.appendChild(configWrapper.getDomElement());
246 return new NetconfMessage(doc);
247 } catch (IOException | SAXException e) {
248 // error reading the xml file bundled into the jar
249 throw new RuntimeException("Error while opening local resource " + editConfigResourcePath, e);
253 private static NetconfMessage getCommitMessage() {
254 String resource = "/netconfOp/commit.xml";
255 try (InputStream stream = ConfigPusher.class.getResourceAsStream(resource)) {
256 Preconditions.checkNotNull(stream, "Unable to load resource " + resource);
257 return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
258 } catch (SAXException | IOException e) {
259 // error reading the xml file bundled into the jar
260 throw new RuntimeException("Error while opening local resource " + resource, e);
264 static class EditAndCommitResponse {
265 private final NetconfMessage editResponse, commitResponse;
267 EditAndCommitResponse(NetconfMessage editResponse, NetconfMessage commitResponse) {
268 this.editResponse = editResponse;
269 this.commitResponse = commitResponse;
272 public NetconfMessage getEditResponse() {
276 public NetconfMessage getCommitResponse() {
277 return commitResponse;
281 public String toString() {
282 return "EditAndCommitResponse{" +
283 "editResponse=" + editResponse +
284 ", commitResponse=" + commitResponse +
290 static class EditAndCommitResponseWithRetries {
291 private final EditAndCommitResponse editAndCommitResponse;
292 private final int retries;
294 EditAndCommitResponseWithRetries(EditAndCommitResponse editAndCommitResponse, int retries) {
295 this.editAndCommitResponse = editAndCommitResponse;
296 this.retries = retries;
299 public int getRetries() {
303 public EditAndCommitResponse getEditAndCommitResponse() {
304 return editAndCommitResponse;
308 public String toString() {
309 return "EditAndCommitResponseWithRetries{" +
310 "editAndCommitResponse=" + editAndCommitResponse +
311 ", retries=" + retries +