Merge "Use equalsIgnoreCase when comparing subnet name to default subnet. Using equal...
[controller.git] / opendaylight / netconf / config-persister-impl / src / main / java / org / opendaylight / controller / netconf / persist / impl / ConfigPusher.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.persist.impl;
10
11 import io.netty.channel.EventLoopGroup;
12
13 import java.io.IOException;
14 import java.io.InputStream;
15 import java.net.InetSocketAddress;
16 import java.util.Collections;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Set;
20
21 import javax.annotation.concurrent.Immutable;
22
23 import org.opendaylight.controller.config.api.ConflictingVersionException;
24 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
25 import org.opendaylight.controller.netconf.api.NetconfMessage;
26 import org.opendaylight.controller.netconf.client.NetconfClient;
27 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
28 import org.opendaylight.controller.netconf.util.messages.NetconfMessageAdditionalHeader;
29 import org.opendaylight.controller.netconf.util.xml.XmlElement;
30 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
31 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import org.w3c.dom.Document;
35 import org.w3c.dom.Element;
36 import org.xml.sax.SAXException;
37
38 import com.google.common.base.Optional;
39 import com.google.common.base.Preconditions;
40
41 @Immutable
42 public class ConfigPusher {
43     private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class);
44     private static final int NETCONF_SEND_ATTEMPT_MS_DELAY = 1000;
45     private static final int NETCONF_SEND_ATTEMPTS = 20;
46
47     private final InetSocketAddress address;
48     private final EventLoopGroup nettyThreadgroup;
49
50
51     public static final long DEFAULT_TIMEOUT = 120000L;// 120 seconds until netconf must be stable
52     private final long timeout;
53
54     public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadgroup) {
55         this(address, DEFAULT_TIMEOUT, nettyThreadgroup);
56
57     }
58
59     public ConfigPusher(InetSocketAddress address, long timeout, EventLoopGroup nettyThreadgroup) {
60         this.address = address;
61         this.timeout = timeout;
62
63         this.nettyThreadgroup = nettyThreadgroup;
64     }
65
66     public synchronized NetconfClient init(List<ConfigSnapshotHolder> configs) throws InterruptedException {
67         logger.debug("Last config snapshots to be pushed to netconf: {}", configs);
68         return pushAllConfigs(configs);
69     }
70
71     private synchronized NetconfClient pushAllConfigs(List<ConfigSnapshotHolder> configs) throws InterruptedException {
72         NetconfClient netconfClient = makeNetconfConnection(Collections.<String>emptySet(), Optional.<NetconfClient>absent());
73         for (ConfigSnapshotHolder configSnapshotHolder: configs){
74             netconfClient = pushSnapshotWithRetries(configSnapshotHolder, Optional.of(netconfClient));
75         }
76         return netconfClient;
77     }
78
79     private synchronized NetconfClient pushSnapshotWithRetries(ConfigSnapshotHolder configSnapshotHolder,
80                                                                Optional<NetconfClient> oldClientForPossibleReuse)
81             throws InterruptedException {
82
83         ConflictingVersionException lastException = null;
84         int maxAttempts = 30;
85         for(int i = 0 ; i < maxAttempts; i++) {
86             NetconfClient netconfClient = makeNetconfConnection(configSnapshotHolder.getCapabilities(), oldClientForPossibleReuse);
87             final String configSnapshot = configSnapshotHolder.getConfigSnapshot();
88             logger.trace("Pushing following xml to netconf {}", configSnapshot);
89             try {
90                 pushLastConfig(configSnapshotHolder, netconfClient);
91                 return netconfClient;
92             } catch(ConflictingVersionException e) {
93                 Util.closeClientAndDispatcher(netconfClient);
94                 lastException = e;
95                 Thread.sleep(1000);
96             } catch (SAXException | IOException e) {
97                 throw new IllegalStateException("Unable to load last config", e);
98             }
99         }
100         throw new IllegalStateException("Failed to push configuration, maximum attempt count has been reached: "
101                 + maxAttempts, lastException);
102     }
103
104     /**
105      * @param expectedCaps capabilities that server hello must contain. Will retry until all are found or throws RuntimeException.
106      *                     If empty set is provided, will only make sure netconf client successfuly connected to the server.
107      * @param oldClientForPossibleReuse if present, try to get expected capabilities from it before closing it and retrying with
108      *                                  new client connection.
109      * @return NetconfClient that has all required capabilities from server.
110      */
111     private synchronized NetconfClient makeNetconfConnection(Set<String> expectedCaps,
112                                                              Optional<NetconfClient> oldClientForPossibleReuse)
113             throws InterruptedException {
114
115         if (oldClientForPossibleReuse.isPresent()) {
116             NetconfClient oldClient = oldClientForPossibleReuse.get();
117             if (Util.isSubset(oldClient, expectedCaps)) {
118                 return oldClient;
119             } else {
120                 Util.closeClientAndDispatcher(oldClient);
121             }
122         }
123
124         // TODO think about moving capability subset check to netconf client
125         // could be utilized by integration tests
126
127         long pollingStart = System.currentTimeMillis();
128         int delay = 5000;
129
130         int attempt = 0;
131
132         long deadline = pollingStart + timeout;
133
134         String additionalHeader = NetconfMessageAdditionalHeader.toString("unknown", address.getAddress().getHostAddress(),
135                 Integer.toString(address.getPort()), "tcp", Optional.of("persister"));
136
137         Set<String> latestCapabilities = new HashSet<>();
138         while (System.currentTimeMillis() < deadline) {
139             attempt++;
140             NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(nettyThreadgroup,
141                     nettyThreadgroup, additionalHeader);
142             NetconfClient netconfClient;
143             try {
144                 netconfClient = new NetconfClient(this.toString(), address, delay, netconfClientDispatcher);
145             } catch (IllegalStateException e) {
146                 logger.debug("Netconf {} was not initialized or is not stable, attempt {}", address, attempt, e);
147                 netconfClientDispatcher.close();
148                 Thread.sleep(delay);
149                 continue;
150             }
151             latestCapabilities = netconfClient.getCapabilities();
152             if (Util.isSubset(netconfClient, expectedCaps)) {
153                 logger.debug("Hello from netconf stable with {} capabilities", latestCapabilities);
154                 logger.info("Session id received from netconf server: {}", netconfClient.getClientSession());
155                 return netconfClient;
156             }
157             logger.debug("Polling hello from netconf, attempt {}, capabilities {}", attempt, latestCapabilities);
158             Util.closeClientAndDispatcher(netconfClient);
159             Thread.sleep(delay);
160         }
161         Set<String> allNotFound = new HashSet<>(expectedCaps);
162         allNotFound.removeAll(latestCapabilities);
163         logger.error("Netconf server did not provide required capabilities. Expected but not found: {}, all expected {}, current {}",
164                 allNotFound, expectedCaps, latestCapabilities);
165         throw new RuntimeException("Netconf server did not provide required capabilities. Expected but not found:" + allNotFound);
166     }
167
168
169     private synchronized void pushLastConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfClient netconfClient)
170             throws ConflictingVersionException, IOException, SAXException {
171
172         Element xmlToBePersisted = XmlUtil.readXmlToElement(configSnapshotHolder.getConfigSnapshot());
173         logger.info("Pushing last configuration to netconf: {}", configSnapshotHolder);
174         StringBuilder response = new StringBuilder("editConfig response = {");
175
176
177         NetconfMessage message = createEditConfigMessage(xmlToBePersisted, "/netconfOp/editConfig.xml");
178
179         // sending message to netconf
180         NetconfMessage responseMessage = getResponse(message, netconfClient);
181
182         XmlElement element = XmlElement.fromDomDocument(responseMessage.getDocument());
183         Preconditions.checkState(element.getName().equals(XmlNetconfConstants.RPC_REPLY_KEY));
184         element = element.getOnlyChildElement();
185
186         Util.checkIsOk(element, responseMessage);
187         response.append(XmlUtil.toString(responseMessage.getDocument()));
188         response.append("}");
189         responseMessage = getResponse(getNetconfMessageFromResource("/netconfOp/commit.xml"), netconfClient);
190
191         element = XmlElement.fromDomDocument(responseMessage.getDocument());
192         Preconditions.checkState(element.getName().equals(XmlNetconfConstants.RPC_REPLY_KEY));
193         element = element.getOnlyChildElement();
194
195         Util.checkIsOk(element, responseMessage);
196         response.append("commit response = {");
197         response.append(XmlUtil.toString(responseMessage.getDocument()));
198         response.append("}");
199         logger.info("Last configuration loaded successfully");
200         logger.trace("Detailed message {}", response);
201     }
202
203     private static NetconfMessage getResponse(NetconfMessage request, NetconfClient netconfClient) {
204         try {
205             return netconfClient.sendMessage(request, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY);
206         } catch(RuntimeException e) {
207             logger.error("Error while sending message {} to {}", request, netconfClient);
208             throw e;
209         }
210     }
211
212     private static NetconfMessage createEditConfigMessage(Element dataElement, String editConfigResourcename) {
213         try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcename)) {
214             Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcename);
215
216             Document doc = XmlUtil.readXmlToDocument(stream);
217
218             doc.getDocumentElement();
219             XmlElement editConfigElement = XmlElement.fromDomDocument(doc).getOnlyChildElement();
220             XmlElement configWrapper = editConfigElement.getOnlyChildElement(XmlNetconfConstants.CONFIG_KEY);
221             editConfigElement.getDomElement().removeChild(configWrapper.getDomElement());
222             for (XmlElement el : XmlElement.fromDomElement(dataElement).getChildElements()) {
223                 configWrapper.appendChild((Element) doc.importNode(el.getDomElement(), true));
224             }
225             editConfigElement.appendChild(configWrapper.getDomElement());
226             return new NetconfMessage(doc);
227         } catch (IOException | SAXException e) {
228             throw new RuntimeException("Unable to parse message from resources " + editConfigResourcename, e);
229         }
230     }
231
232     private static NetconfMessage getNetconfMessageFromResource(String resource) {
233         try (InputStream stream = ConfigPusher.class.getResourceAsStream(resource)) {
234             Preconditions.checkNotNull(stream, "Unable to load resource " + resource);
235             return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
236         } catch (SAXException | IOException e) {
237             throw new RuntimeException("Unable to parse message from resources " + resource, e);
238         }
239     }
240 }