package org.opendaylight.controller.netconf.persist.impl;
-import io.netty.channel.EventLoopGroup;
+import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.base.Function;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Collections2;
import java.io.IOException;
import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.util.Collections;
+import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
import javax.annotation.concurrent.Immutable;
-
import org.opendaylight.controller.config.api.ConflictingVersionException;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.client.NetconfClient;
-import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.mapping.api.Capability;
+import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
import org.opendaylight.controller.netconf.util.NetconfUtil;
-import org.opendaylight.controller.netconf.util.messages.NetconfMessageAdditionalHeader;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
-import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import org.xml.sax.SAXException;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
@Immutable
public class ConfigPusher {
- private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class);
- private static final int NETCONF_SEND_ATTEMPT_MS_DELAY = 1000;
- private static final int NETCONF_SEND_ATTEMPTS = 20;
-
- private final InetSocketAddress address;
- private final EventLoopGroup nettyThreadGroup;
+ private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class);
- // Default timeout for netconf becoming stable
- public static final long DEFAULT_MAX_WAIT_FOR_CAPABILITIES_MILLIS = TimeUnit.MINUTES.toMillis(2);
- public static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 5000;
- private final int delayMillis = 5000;
private final long maxWaitForCapabilitiesMillis;
- private final long connectionTimeoutMillis;
+ private final long conflictingVersionTimeoutMillis;
+ private final NetconfOperationServiceFactory configNetconfConnector;
- public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadGroup) {
- this(address, nettyThreadGroup, DEFAULT_MAX_WAIT_FOR_CAPABILITIES_MILLIS, DEFAULT_CONNECTION_TIMEOUT_MILLIS);
- }
-
- public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadGroup,
- long maxWaitForCapabilitiesMillis, long connectionTimeoutMillis) {
- this.address = address;
- this.nettyThreadGroup = nettyThreadGroup;
+ public ConfigPusher(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
+ long conflictingVersionTimeoutMillis) {
+ this.configNetconfConnector = configNetconfConnector;
this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
- this.connectionTimeoutMillis = connectionTimeoutMillis;
+ this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
}
- public synchronized LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponseWithRetries> pushConfigs(
- List<ConfigSnapshotHolder> configs) throws InterruptedException {
+ public synchronized LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> pushConfigs(List<ConfigSnapshotHolder> configs) throws NetconfDocumentedException {
logger.debug("Last config snapshots to be pushed to netconf: {}", configs);
-
- // first just make sure we can connect to netconf, even if nothing is being pushed
- {
- NetconfClient netconfClient = makeNetconfConnection(Collections.<String>emptySet());
- Util.closeClientAndDispatcher(netconfClient);
- }
- LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponseWithRetries> result = new LinkedHashMap<>();
+ LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> result = new LinkedHashMap<>();
// start pushing snapshots:
for (ConfigSnapshotHolder configSnapshotHolder : configs) {
- EditAndCommitResponseWithRetries editAndCommitResponseWithRetries = pushSnapshotWithRetries(configSnapshotHolder);
- logger.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
- result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
+ if(configSnapshotHolder != null) {
+ EditAndCommitResponse editAndCommitResponseWithRetries = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
+ logger.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
+ result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
+ }
}
logger.debug("All configuration snapshots have been pushed successfully.");
return result;
}
/**
- * Checks for ConflictingVersionException and retries until optimistic lock succeeds or maximal
- * number of attempts is reached.
+ * First calls {@link #getOperationServiceWithRetries(java.util.Set, String)} in order to wait until
+ * expected capabilities are present, then tries to push configuration. If {@link ConflictingVersionException}
+ * is caught, whole process is retried - new service instance need to be obtained from the factory. Closes
+ * {@link NetconfOperationService} after each use.
*/
- private synchronized EditAndCommitResponseWithRetries pushSnapshotWithRetries(ConfigSnapshotHolder configSnapshotHolder)
- throws InterruptedException {
-
- ConflictingVersionException lastException = null;
- int maxAttempts = 30;
+ private synchronized EditAndCommitResponse pushConfigWithConflictingVersionRetries(ConfigSnapshotHolder configSnapshotHolder) throws NetconfDocumentedException {
+ ConflictingVersionException lastException;
+ Stopwatch stopwatch = new Stopwatch().start();
+ do {
+ String idForReporting = configSnapshotHolder.toString();
+ SortedSet<String> expectedCapabilities = checkNotNull(configSnapshotHolder.getCapabilities(),
+ "Expected capabilities must not be null - %s, check %s", idForReporting,
+ configSnapshotHolder.getClass().getName());
+ try (NetconfOperationService operationService = getOperationServiceWithRetries(expectedCapabilities, idForReporting)) {
+ return pushConfig(configSnapshotHolder, operationService);
+ } catch (ConflictingVersionException e) {
+ lastException = e;
+ logger.debug("Conflicting version detected, will retry after timeout");
+ sleep();
+ }
+ } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
+ throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
+ lastException);
+ }
- for (int retryAttempt = 1; retryAttempt <= maxAttempts; retryAttempt++) {
- NetconfClient netconfClient = makeNetconfConnection(configSnapshotHolder.getCapabilities());
- logger.trace("Pushing following xml to netconf {}", configSnapshotHolder);
+ private NetconfOperationService getOperationServiceWithRetries(Set<String> expectedCapabilities, String idForReporting) {
+ Stopwatch stopwatch = new Stopwatch().start();
+ NotEnoughCapabilitiesException lastException;
+ do {
try {
- EditAndCommitResponse editAndCommitResponse = pushLastConfig(configSnapshotHolder, netconfClient);
- return new EditAndCommitResponseWithRetries(editAndCommitResponse, retryAttempt);
- } catch (ConflictingVersionException e) {
+ return getOperationService(expectedCapabilities, idForReporting);
+ } catch (NotEnoughCapabilitiesException e) {
+ logger.debug("Not enough capabilities: " + e.toString());
lastException = e;
- Thread.sleep(1000);
- } catch (RuntimeException e) {
- throw new IllegalStateException("Unable to load " + configSnapshotHolder, e);
- } finally {
- Util.closeClientAndDispatcher(netconfClient);
+ sleep();
}
+ } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
+ throw new IllegalStateException("Max wait for capabilities reached." + lastException.getMessage(), lastException);
+ }
+
+ private static class NotEnoughCapabilitiesException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ private NotEnoughCapabilitiesException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ private NotEnoughCapabilitiesException(String message) {
+ super(message);
}
- throw new IllegalStateException("Maximum attempt count has been reached for pushing " + configSnapshotHolder,
- lastException);
}
/**
- * @param expectedCaps capabilities that server hello must contain. Will retry until all are found or throws RuntimeException.
- * If empty set is provided, will only make sure netconf client successfuly connected to the server.
- * @return NetconfClient that has all required capabilities from server.
+ * Get NetconfOperationService iif all required capabilities are present.
+ *
+ * @param expectedCapabilities that must be provided by configNetconfConnector
+ * @param idForReporting
+ * @return service if capabilities are present, otherwise absent value
*/
- private synchronized NetconfClient makeNetconfConnection(Set<String> expectedCaps) throws InterruptedException {
+ private NetconfOperationService getOperationService(Set<String> expectedCapabilities, String idForReporting) throws NotEnoughCapabilitiesException {
+ NetconfOperationService serviceCandidate;
+ try {
+ serviceCandidate = configNetconfConnector.createService(idForReporting);
+ } catch(RuntimeException e) {
+ throw new NotEnoughCapabilitiesException("Netconf service not stable for " + idForReporting, e);
+ }
+ Set<String> notFoundDiff = computeNotFoundCapabilities(expectedCapabilities, serviceCandidate);
+ if (notFoundDiff.isEmpty()) {
+ return serviceCandidate;
+ } else {
+ serviceCandidate.close();
+ logger.trace("Netconf server did not provide required capabilities for {} " +
+ "Expected but not found: {}, all expected {}, current {}",
+ idForReporting, notFoundDiff, expectedCapabilities, serviceCandidate.getCapabilities()
+ );
+ throw new NotEnoughCapabilitiesException("Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundDiff);
+ }
+ }
- // TODO think about moving capability subset check to netconf client
- // could be utilized by integration tests
+ private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, NetconfOperationService serviceCandidate) {
+ Collection<String> actual = Collections2.transform(serviceCandidate.getCapabilities(), new Function<Capability, String>() {
+ @Override
+ public String apply(Capability input) {
+ return input.getCapabilityUri();
+ }
+ });
+ Set<String> allNotFound = new HashSet<>(expectedCapabilities);
+ allNotFound.removeAll(actual);
+ return allNotFound;
+ }
- final long pollingStartNanos = System.nanoTime();
- final long deadlineNanos = pollingStartNanos + TimeUnit.MILLISECONDS.toNanos(maxWaitForCapabilitiesMillis);
- int attempt = 0;
- String additionalHeader = NetconfMessageAdditionalHeader.toString("unknown", address.getAddress().getHostAddress(),
- Integer.toString(address.getPort()), "tcp", Optional.of("persister"));
- Set<String> latestCapabilities = null;
- while (System.nanoTime() < deadlineNanos) {
- attempt++;
- NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(nettyThreadGroup,
- nettyThreadGroup, additionalHeader, connectionTimeoutMillis);
- NetconfClient netconfClient;
- try {
- netconfClient = new NetconfClient(this.toString(), address, delayMillis, netconfClientDispatcher);
- } catch (IllegalStateException e) {
- logger.debug("Netconf {} was not initialized or is not stable, attempt {}", address, attempt, e);
- netconfClientDispatcher.close();
- Thread.sleep(delayMillis);
- continue;
- }
- latestCapabilities = netconfClient.getCapabilities();
- if (Util.isSubset(netconfClient, expectedCaps)) {
- logger.debug("Hello from netconf stable with {} capabilities", latestCapabilities);
- logger.trace("Session id received from netconf server: {}", netconfClient.getClientSession());
- return netconfClient;
- }
- logger.debug("Polling hello from netconf, attempt {}, capabilities {}", attempt, latestCapabilities);
- Util.closeClientAndDispatcher(netconfClient);
- Thread.sleep(delayMillis);
- }
- if (latestCapabilities == null) {
- logger.error("Could not connect to the server in {} ms", maxWaitForCapabilitiesMillis);
- throw new RuntimeException("Could not connect to netconf server");
+ private void sleep() {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
}
- Set<String> allNotFound = new HashSet<>(expectedCaps);
- allNotFound.removeAll(latestCapabilities);
- logger.error("Netconf server did not provide required capabilities. Expected but not found: {}, all expected {}, current {}",
- allNotFound, expectedCaps, latestCapabilities);
- throw new RuntimeException("Netconf server did not provide required capabilities. Expected but not found:" + allNotFound);
}
-
/**
* Sends two RPCs to the netconf server: edit-config and commit.
*
* @param configSnapshotHolder
- * @param netconfClient
* @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager
* @throws java.lang.RuntimeException if edit-config or commit fails otherwise
*/
- private synchronized EditAndCommitResponse pushLastConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfClient netconfClient)
- throws ConflictingVersionException {
+ private synchronized EditAndCommitResponse pushConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfOperationService operationService)
+ throws ConflictingVersionException, NetconfDocumentedException {
Element xmlToBePersisted;
try {
throw new IllegalStateException("Cannot parse " + configSnapshotHolder);
}
logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
-
+ Stopwatch stopwatch = new Stopwatch().start();
NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted);
- // sending message to netconf
- NetconfMessage editResponseMessage;
- try {
- editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, netconfClient);
- } catch (IOException e) {
- throw new IllegalStateException("Edit-config failed on " + configSnapshotHolder, e);
- }
+ Document editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, operationService,
+ "edit-config", configSnapshotHolder.toString());
- // commit
- NetconfMessage commitResponseMessage;
- try {
- commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), netconfClient);
- } catch (IOException e) {
- throw new IllegalStateException("Edit commit succeeded, but commit failed on " + configSnapshotHolder, e);
- }
+ Document commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), operationService,
+ "commit", configSnapshotHolder.toString());
if (logger.isTraceEnabled()) {
StringBuilder response = new StringBuilder("editConfig response = {");
- response.append(XmlUtil.toString(editResponseMessage.getDocument()));
+ response.append(XmlUtil.toString(editResponseMessage));
response.append("}");
response.append("commit response = {");
- response.append(XmlUtil.toString(commitResponseMessage.getDocument()));
+ response.append(XmlUtil.toString(commitResponseMessage));
response.append("}");
logger.trace("Last configuration loaded successfully");
logger.trace("Detailed message {}", response);
+ logger.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
return new EditAndCommitResponse(editResponseMessage, commitResponseMessage);
}
+ private NetconfOperation findOperation(NetconfMessage request, NetconfOperationService operationService) throws NetconfDocumentedException {
+ TreeMap<HandlingPriority, NetconfOperation> allOperations = new TreeMap<>();
+ Set<NetconfOperation> netconfOperations = operationService.getNetconfOperations();
+ if (netconfOperations.isEmpty()) {
+ throw new IllegalStateException("Possible code error: no config operations");
+ }
+ for (NetconfOperation netconfOperation : netconfOperations) {
+ HandlingPriority handlingPriority = netconfOperation.canHandle(request.getDocument());
+ allOperations.put(handlingPriority, netconfOperation);
+ }
+ Entry<HandlingPriority, NetconfOperation> highestEntry = allOperations.lastEntry();
+ if (highestEntry.getKey().isCannotHandle()) {
+ throw new IllegalStateException("Possible code error: operation with highest priority is CANNOT_HANDLE");
+ }
+ return highestEntry.getValue();
+ }
+
+ private Document sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfOperationService operationService,
+ String operationNameForReporting, String configIdForReporting)
+ throws ConflictingVersionException, NetconfDocumentedException {
- private static NetconfMessage sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfClient netconfClient) throws IOException {
+ NetconfOperation operation = findOperation(request, operationService);
+ Document response;
try {
- NetconfMessage netconfMessage = netconfClient.sendMessage(request, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY);
- NetconfUtil.checkIsMessageOk(netconfMessage);
- return netconfMessage;
- } catch (RuntimeException | ExecutionException | InterruptedException | TimeoutException e) {
- logger.debug("Error while executing netconf transaction {} to {}", request, netconfClient, e);
- throw new IOException("Failed to execute netconf transaction", e);
+ response = operation.handle(request.getDocument(), NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
+ } catch (NetconfDocumentedException | RuntimeException e) {
+ if (e instanceof NetconfDocumentedException && e.getCause() instanceof ConflictingVersionException) {
+ throw (ConflictingVersionException) e.getCause();
+ }
+ throw new IllegalStateException("Failed to send " + operationNameForReporting +
+ " for configuration " + configIdForReporting, e);
}
+ return NetconfUtil.checkIsMessageOk(response);
}
// load editConfig.xml template, populate /rpc/edit-config/config with parameter
- private static NetconfMessage createEditConfigMessage(Element dataElement) {
+ private static NetconfMessage createEditConfigMessage(Element dataElement) throws NetconfDocumentedException {
String editConfigResourcePath = "/netconfOp/editConfig.xml";
try (InputStream stream = ConfigPersisterNotificationHandler.class.getResourceAsStream(editConfigResourcePath)) {
- Preconditions.checkNotNull(stream, "Unable to load resource " + editConfigResourcePath);
+ checkNotNull(stream, "Unable to load resource " + editConfigResourcePath);
Document doc = XmlUtil.readXmlToDocument(stream);
return new NetconfMessage(doc);
} catch (IOException | SAXException e) {
// error reading the xml file bundled into the jar
- throw new RuntimeException("Error while opening local resource " + editConfigResourcePath, e);
+ throw new IllegalStateException("Error while opening local resource " + editConfigResourcePath, e);
}
}
private static NetconfMessage getCommitMessage() {
String resource = "/netconfOp/commit.xml";
try (InputStream stream = ConfigPusher.class.getResourceAsStream(resource)) {
- Preconditions.checkNotNull(stream, "Unable to load resource " + resource);
+ checkNotNull(stream, "Unable to load resource " + resource);
return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
} catch (SAXException | IOException e) {
// error reading the xml file bundled into the jar
- throw new RuntimeException("Error while opening local resource " + resource, e);
+ throw new IllegalStateException("Error while opening local resource " + resource, e);
}
}
static class EditAndCommitResponse {
- private final NetconfMessage editResponse, commitResponse;
+ private final Document editResponse, commitResponse;
- EditAndCommitResponse(NetconfMessage editResponse, NetconfMessage commitResponse) {
+ EditAndCommitResponse(Document editResponse, Document commitResponse) {
this.editResponse = editResponse;
this.commitResponse = commitResponse;
}
- public NetconfMessage getEditResponse() {
+ public Document getEditResponse() {
return editResponse;
}
- public NetconfMessage getCommitResponse() {
+ public Document getCommitResponse() {
return commitResponse;
}
'}';
}
}
-
-
- static class EditAndCommitResponseWithRetries {
- private final EditAndCommitResponse editAndCommitResponse;
- private final int retries;
-
- EditAndCommitResponseWithRetries(EditAndCommitResponse editAndCommitResponse, int retries) {
- this.editAndCommitResponse = editAndCommitResponse;
- this.retries = retries;
- }
-
- public int getRetries() {
- return retries;
- }
-
- public EditAndCommitResponse getEditAndCommitResponse() {
- return editAndCommitResponse;
- }
-
- @Override
- public String toString() {
- return "EditAndCommitResponseWithRetries{" +
- "editAndCommitResponse=" + editAndCommitResponse +
- ", retries=" + retries +
- '}';
- }
- }
}