Resolve Bug:593. Persister should communicate via OSGi SR instead of TCP.
[controller.git] / opendaylight / netconf / config-persister-impl / src / main / java / org / opendaylight / controller / netconf / persist / impl / ConfigPusher.java
index ea2a46dba535f825d0baa7b5dfa58d4b6371c297..6dba9ac64e22e1e5d5f528aa6b8c830aa78d9f17 100644 (file)
@@ -8,14 +8,21 @@
 
 package org.opendaylight.controller.netconf.persist.impl;
 
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Collections2;
 import org.opendaylight.controller.config.api.ConflictingVersionException;
 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.client.NetconfClient;
-import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.controller.netconf.mapping.api.Capability;
+import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
 import org.opendaylight.controller.netconf.util.NetconfUtil;
-import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
@@ -28,38 +35,36 @@ import org.xml.sax.SAXException;
 import javax.annotation.concurrent.Immutable;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 @Immutable
 public class ConfigPusher {
     private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class);
 
-    private final ConfigPusherConfiguration configuration;
+    private final long maxWaitForCapabilitiesMillis;
+    private final long conflictingVersionTimeoutMillis;
+    private final NetconfOperationServiceFactory configNetconfConnector;
 
-    public ConfigPusher(ConfigPusherConfiguration configuration) {
-        this.configuration = configuration;
+    public ConfigPusher(NetconfOperationServiceFactory configNetconfConnector, long maxWaitForCapabilitiesMillis,
+                        long conflictingVersionTimeoutMillis) {
+        this.configNetconfConnector = configNetconfConnector;
+        this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
+        this.conflictingVersionTimeoutMillis = conflictingVersionTimeoutMillis;
     }
 
-    public synchronized LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponseWithRetries> pushConfigs(
-            List<ConfigSnapshotHolder> configs) throws InterruptedException {
+    public synchronized LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> pushConfigs(List<ConfigSnapshotHolder> configs) {
         logger.debug("Last config snapshots to be pushed to netconf: {}", configs);
-
-        // first just make sure we can connect to netconf, even if nothing is being pushed
-        {
-            NetconfClient netconfClient = makeNetconfConnection(Collections.<String>emptySet());
-            Util.closeClientAndDispatcher(netconfClient);
-        }
-        LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponseWithRetries> result = new LinkedHashMap<>();
+        LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> result = new LinkedHashMap<>();
         // start pushing snapshots:
         for (ConfigSnapshotHolder configSnapshotHolder : configs) {
-            EditAndCommitResponseWithRetries editAndCommitResponseWithRetries = pushSnapshotWithRetries(configSnapshotHolder);
+            EditAndCommitResponse editAndCommitResponseWithRetries = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
             logger.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
             result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
         }
@@ -68,106 +73,101 @@ public class ConfigPusher {
     }
 
     /**
-     * Checks for ConflictingVersionException and retries until optimistic lock succeeds or maximal
-     * number of attempts is reached.
+     * First calls {@link #getOperationServiceWithRetries(java.util.Set, String)} in order to wait until
+     * expected capabilities are present, then tries to push configuration. If {@link ConflictingVersionException}
+     * is caught, whole process is retried - new service instance need to be obtained from the factory. Closes
+     * {@link NetconfOperationService} after each use.
      */
-    private synchronized EditAndCommitResponseWithRetries pushSnapshotWithRetries(ConfigSnapshotHolder configSnapshotHolder)
-            throws InterruptedException {
-
-        ConflictingVersionException lastException = null;
-        int maxAttempts = configuration.netconfPushConfigAttempts;
-
-        for (int retryAttempt = 1; retryAttempt <= maxAttempts; retryAttempt++) {
-            NetconfClient netconfClient = makeNetconfConnection(configSnapshotHolder.getCapabilities());
-            logger.trace("Pushing following xml to netconf {}", configSnapshotHolder);
-            try {
-                EditAndCommitResponse editAndCommitResponse = pushLastConfig(configSnapshotHolder, netconfClient);
-                return new EditAndCommitResponseWithRetries(editAndCommitResponse, retryAttempt);
+    private synchronized EditAndCommitResponse pushConfigWithConflictingVersionRetries(ConfigSnapshotHolder configSnapshotHolder) {
+        ConflictingVersionException lastException;
+        Stopwatch stopwatch = new Stopwatch().start();
+        do {
+            try (NetconfOperationService operationService = getOperationServiceWithRetries(configSnapshotHolder.getCapabilities(), configSnapshotHolder.toString())) {
+                return pushConfig(configSnapshotHolder, operationService);
             } catch (ConflictingVersionException e) {
-                logger.debug("Conflicting version detected, will retry after timeout");
                 lastException = e;
-                Thread.sleep(configuration.netconfPushConfigDelayMs);
-            } catch (RuntimeException e) {
-                throw new IllegalStateException("Unable to load " + configSnapshotHolder, e);
-            } finally {
-                Util.closeClientAndDispatcher(netconfClient);
+                logger.debug("Conflicting version detected, will retry after timeout");
+                sleep();
             }
-        }
-        throw new IllegalStateException("Maximum attempt count has been reached for pushing " + configSnapshotHolder,
+        } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
+        throw new IllegalStateException("Max wait for conflicting version stabilization timeout after " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms",
                 lastException);
     }
 
-    /**
-     * @param expectedCaps capabilities that server hello must contain. Will retry until all are found or throws RuntimeException.
-     *                     If empty set is provided, will only make sure netconf client successfuly connected to the server.
-     * @return NetconfClient that has all required capabilities from server.
-     */
-    private synchronized NetconfClient makeNetconfConnection(Set<String> expectedCaps) throws InterruptedException {
-
-        // TODO think about moving capability subset check to netconf client
-        // could be utilized by integration tests
-
-        final long pollingStartNanos = System.nanoTime();
-        final long deadlineNanos = pollingStartNanos + TimeUnit.MILLISECONDS.toNanos(configuration.netconfCapabilitiesWaitTimeoutMs);
-        int attempt = 0;
-
-        NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("unknown",
-                configuration.netconfAddress.getAddress().getHostAddress(),
-                Integer.toString(configuration.netconfAddress.getPort()), "tcp", "persister");
-
-        Set<String> latestCapabilities = null;
-        while (System.nanoTime() < deadlineNanos) {
-            attempt++;
-            NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(configuration.eventLoopGroup,
-                    configuration.eventLoopGroup, additionalHeader, configuration.connectionAttemptTimeoutMs);
-            NetconfClient netconfClient;
+    private NetconfOperationService getOperationServiceWithRetries(Set<String> expectedCapabilities, String idForReporting) {
+        Stopwatch stopwatch = new Stopwatch().start();
+        NotEnoughCapabilitiesException lastException;
+        do {
             try {
-                netconfClient = new NetconfClient(this.toString(), configuration.netconfAddress, configuration.connectionAttemptDelayMs, netconfClientDispatcher);
-            } catch (IllegalStateException e) {
-                logger.debug("Netconf {} was not initialized or is not stable, attempt {}", configuration.netconfAddress, attempt, e);
-                netconfClientDispatcher.close();
-                Thread.sleep(configuration.connectionAttemptDelayMs);
-                continue;
-            }
-            latestCapabilities = netconfClient.getCapabilities();
-            if (Util.isSubset(netconfClient, expectedCaps)) {
-                logger.debug("Hello from netconf stable with {} capabilities", latestCapabilities);
-                logger.trace("Session id received from netconf server: {}", netconfClient.getClientSession());
-                return netconfClient;
+                return getOperationService(expectedCapabilities, idForReporting);
+            } catch (NotEnoughCapabilitiesException e) {
+                logger.debug("Not enough capabilities: " + e.toString());
+                lastException = e;
+                sleep();
             }
-            Set<String> allNotFound = computeNotFoundCapabilities(expectedCaps, latestCapabilities);
-            logger.debug("Netconf server did not provide required capabilities. Attempt {}. " +
-                    "Expected but not found: {}, all expected {}, current {}",
-                    attempt, allNotFound, expectedCaps, latestCapabilities);
-            Util.closeClientAndDispatcher(netconfClient);
-            Thread.sleep(configuration.connectionAttemptDelayMs);
+        } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < maxWaitForCapabilitiesMillis);
+        throw new IllegalStateException("Max wait for capabilities reached." + lastException.getMessage(), lastException);
+    }
+
+    private static class NotEnoughCapabilitiesException extends Exception {
+        private NotEnoughCapabilitiesException(String message) {
+            super(message);
         }
-        if (latestCapabilities == null) {
-            logger.error("Could not connect to the server in {} ms", configuration.netconfCapabilitiesWaitTimeoutMs);
-            throw new RuntimeException("Could not connect to netconf server");
+    }
+
+    /**
+     * Get NetconfOperationService iif all required capabilities are present.
+     *
+     * @param expectedCapabilities that must be provided by configNetconfConnector
+     * @param idForReporting
+     * @return service if capabilities are present, otherwise absent value
+     */
+    private NetconfOperationService getOperationService(Set<String> expectedCapabilities, String idForReporting) throws NotEnoughCapabilitiesException {
+        NetconfOperationService serviceCandidate = configNetconfConnector.createService(idForReporting);
+        Set<String> notFoundDiff = computeNotFoundCapabilities(expectedCapabilities, serviceCandidate);
+        if (notFoundDiff.isEmpty()) {
+            return serviceCandidate;
+        } else {
+            serviceCandidate.close();
+            logger.debug("Netconf server did not provide required capabilities for {} " +
+                            "Expected but not found: {}, all expected {}, current {}",
+                    idForReporting, notFoundDiff, expectedCapabilities, serviceCandidate.getCapabilities()
+            );
+            throw new NotEnoughCapabilitiesException("Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundDiff);
         }
-        Set<String> allNotFound = computeNotFoundCapabilities(expectedCaps, latestCapabilities);
-        logger.error("Netconf server did not provide required capabilities. Expected but not found: {}, all expected {}, current {}",
-                allNotFound, expectedCaps, latestCapabilities);
-        throw new RuntimeException("Netconf server did not provide required capabilities. Expected but not found:" + allNotFound);
     }
 
-    private static Set<String> computeNotFoundCapabilities(Set<String> expectedCaps, Set<String> latestCapabilities) {
-        Set<String> allNotFound = new HashSet<>(expectedCaps);
-        allNotFound.removeAll(latestCapabilities);
+    private static Set<String> computeNotFoundCapabilities(Set<String> expectedCapabilities, NetconfOperationService serviceCandidate) {
+        Collection<String> actual = Collections2.transform(serviceCandidate.getCapabilities(), new Function<Capability, String>() {
+            @Override
+            public String apply(Capability input) {
+                return input.getCapabilityUri();
+            }
+        });
+        Set<String> allNotFound = new HashSet<>(expectedCapabilities);
+        allNotFound.removeAll(actual);
         return allNotFound;
     }
 
 
+
+    private void sleep() {
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IllegalStateException(e);
+        }
+    }
+
     /**
      * Sends two RPCs to the netconf server: edit-config and commit.
      *
      * @param configSnapshotHolder
-     * @param netconfClient
      * @throws ConflictingVersionException if commit fails on optimistic lock failure inside of config-manager
      * @throws java.lang.RuntimeException  if edit-config or commit fails otherwise
      */
-    private synchronized EditAndCommitResponse pushLastConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfClient netconfClient)
+    private synchronized EditAndCommitResponse pushConfig(ConfigSnapshotHolder configSnapshotHolder, NetconfOperationService operationService)
             throws ConflictingVersionException {
 
         Element xmlToBePersisted;
@@ -177,56 +177,66 @@ public class ConfigPusher {
             throw new IllegalStateException("Cannot parse " + configSnapshotHolder);
         }
         logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
-
+        Stopwatch stopwatch = new Stopwatch().start();
         NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted);
 
-        // sending message to netconf
-        NetconfMessage editResponseMessage;
-        try {
-            editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, netconfClient);
-        } catch (IOException e) {
-            throw new IllegalStateException("Edit-config failed on " + configSnapshotHolder, e);
-        }
+        Document editResponseMessage = sendRequestGetResponseCheckIsOK(editConfigMessage, operationService,
+                "edit-config", configSnapshotHolder.toString());
 
-        // commit
-        NetconfMessage commitResponseMessage;
-        try {
-            commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), netconfClient);
-        } catch (IOException e) {
-            throw new IllegalStateException("Edit commit succeeded, but commit failed on " + configSnapshotHolder, e);
-        }
+        Document commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), operationService,
+                "commit", configSnapshotHolder.toString());
 
         if (logger.isTraceEnabled()) {
             StringBuilder response = new StringBuilder("editConfig response = {");
-            response.append(XmlUtil.toString(editResponseMessage.getDocument()));
+            response.append(XmlUtil.toString(editResponseMessage));
             response.append("}");
             response.append("commit response = {");
-            response.append(XmlUtil.toString(commitResponseMessage.getDocument()));
+            response.append(XmlUtil.toString(commitResponseMessage));
             response.append("}");
             logger.trace("Last configuration loaded successfully");
             logger.trace("Detailed message {}", response);
+            logger.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
         }
         return new EditAndCommitResponse(editResponseMessage, commitResponseMessage);
     }
 
+    private NetconfOperation findOperation(NetconfMessage request, NetconfOperationService operationService) {
+        TreeMap<HandlingPriority, NetconfOperation> allOperations = new TreeMap<>();
+        Set<NetconfOperation> netconfOperations = operationService.getNetconfOperations();
+        if (netconfOperations.isEmpty()) {
+            throw new IllegalStateException("Possible code error: no config operations");
+        }
+        for (NetconfOperation netconfOperation : netconfOperations) {
+            HandlingPriority handlingPriority = netconfOperation.canHandle(request.getDocument());
+            allOperations.put(handlingPriority, netconfOperation);
+        }
+        Entry<HandlingPriority, NetconfOperation> highestEntry = allOperations.lastEntry();
+        if (highestEntry.getKey().isCannotHandle()) {
+            throw new IllegalStateException("Possible code error: operation with highest priority is CANNOT_HANDLE");
+        }
+        return highestEntry.getValue();
+    }
+
+    private Document sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfOperationService operationService,
+                                                     String operationNameForReporting, String configIdForReporting)
+            throws ConflictingVersionException {
 
-    private NetconfMessage sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfClient netconfClient)
-            throws ConflictingVersionException, IOException {
+        NetconfOperation operation = findOperation(request, operationService);
+        Document response;
         try {
-            NetconfMessage netconfMessage = netconfClient.sendMessage(request,
-                    configuration.netconfSendMessageMaxAttempts, configuration.netconfSendMessageDelayMs);
-            NetconfUtil.checkIsMessageOk(netconfMessage);
-            return netconfMessage;
-        } catch(ConflictingVersionException e) {
-            logger.trace("conflicting version detected: {}", e.toString());
+            response = operation.handle(request.getDocument(), NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
+        } catch (NetconfDocumentedException | RuntimeException e) {
+            throw new IllegalStateException("Failed to send " + operationNameForReporting +
+                    " for configuration " + configIdForReporting, e);
+        }
+        try {
+            return NetconfUtil.checkIsMessageOk(response);
+        } catch (ConflictingVersionException e) {
+            logger.trace("conflicting version detected: {} while committing {}", e.toString(), configIdForReporting);
             throw e;
-        } catch (RuntimeException | ExecutionException | InterruptedException | TimeoutException e) { // TODO: change NetconfClient#sendMessage to throw checked exceptions
-            logger.debug("Error while executing netconf transaction {} to {}", request, netconfClient, e);
-            throw new IOException("Failed to execute netconf transaction", e);
         }
     }
 
-
     // load editConfig.xml template, populate /rpc/edit-config/config with parameter
     private static NetconfMessage createEditConfigMessage(Element dataElement) {
         String editConfigResourcePath = "/netconfOp/editConfig.xml";
@@ -246,7 +256,7 @@ public class ConfigPusher {
             return new NetconfMessage(doc);
         } catch (IOException | SAXException e) {
             // error reading the xml file bundled into the jar
-            throw new RuntimeException("Error while opening local resource " + editConfigResourcePath, e);
+            throw new IllegalStateException("Error while opening local resource " + editConfigResourcePath, e);
         }
     }
 
@@ -257,23 +267,23 @@ public class ConfigPusher {
             return new NetconfMessage(XmlUtil.readXmlToDocument(stream));
         } catch (SAXException | IOException e) {
             // error reading the xml file bundled into the jar
-            throw new RuntimeException("Error while opening local resource " + resource, e);
+            throw new IllegalStateException("Error while opening local resource " + resource, e);
         }
     }
 
     static class EditAndCommitResponse {
-        private final NetconfMessage editResponse, commitResponse;
+        private final Document editResponse, commitResponse;
 
-        EditAndCommitResponse(NetconfMessage editResponse, NetconfMessage commitResponse) {
+        EditAndCommitResponse(Document editResponse, Document commitResponse) {
             this.editResponse = editResponse;
             this.commitResponse = commitResponse;
         }
 
-        public NetconfMessage getEditResponse() {
+        public Document getEditResponse() {
             return editResponse;
         }
 
-        public NetconfMessage getCommitResponse() {
+        public Document getCommitResponse() {
             return commitResponse;
         }
 
@@ -285,32 +295,4 @@ public class ConfigPusher {
                     '}';
         }
     }
-
-
-    static class EditAndCommitResponseWithRetries {
-        private final EditAndCommitResponse editAndCommitResponse;
-        private final int retries;
-
-        EditAndCommitResponseWithRetries(EditAndCommitResponse editAndCommitResponse, int retries) {
-            this.editAndCommitResponse = editAndCommitResponse;
-            this.retries = retries;
-        }
-
-        public int getRetries() {
-            return retries;
-        }
-
-        public EditAndCommitResponse getEditAndCommitResponse() {
-            return editAndCommitResponse;
-        }
-
-        @Override
-        public String toString() {
-            return "EditAndCommitResponseWithRetries{" +
-                    "editAndCommitResponse=" + editAndCommitResponse +
-                    ", retries=" + retries +
-                    '}';
-        }
-    }
-
 }