X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fconfig-persister-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fpersist%2Fimpl%2FConfigPusher.java;h=ea2a46dba535f825d0baa7b5dfa58d4b6371c297;hb=1537b1d90705c2068f5e3652b6190d96777e8f80;hp=01d872d89cad71156b792b37dc2e78f1749f393a;hpb=acd7a3cdef79a6b1fdcf0f68d3beab7422f4e9c7;p=controller.git diff --git a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java index 01d872d89c..ea2a46dba5 100644 --- a/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java +++ b/opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusher.java @@ -8,16 +8,14 @@ package org.opendaylight.controller.netconf.persist.impl; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import io.netty.channel.EventLoopGroup; import org.opendaylight.controller.config.api.ConflictingVersionException; import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.client.NetconfClient; import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; import org.opendaylight.controller.netconf.util.NetconfUtil; -import org.opendaylight.controller.netconf.util.messages.NetconfMessageAdditionalHeader; +import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.util.xml.XmlUtil; @@ -30,40 +28,23 @@ import org.xml.sax.SAXException; import javax.annotation.concurrent.Immutable; import java.io.IOException; import java.io.InputStream; -import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; @Immutable public class ConfigPusher { - private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class); - private static final int NETCONF_SEND_ATTEMPT_MS_DELAY = 1000; - private static final int NETCONF_SEND_ATTEMPTS = 20; - - private final InetSocketAddress address; - private final EventLoopGroup nettyThreadGroup; - - // Default timeout for netconf becoming stable - public static final long DEFAULT_MAX_WAIT_FOR_CAPABILITIES_MILLIS = TimeUnit.MINUTES.toMillis(2); - public static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 5000; - private final int delayMillis = 5000; - private final long maxWaitForCapabilitiesMillis; - private final long connectionTimeoutMillis; - - public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadGroup) { - this(address, nettyThreadGroup, DEFAULT_MAX_WAIT_FOR_CAPABILITIES_MILLIS, DEFAULT_CONNECTION_TIMEOUT_MILLIS); - } + private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class); + + private final ConfigPusherConfiguration configuration; - public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadGroup, - long maxWaitForCapabilitiesMillis, long connectionTimeoutMillis) { - this.address = address; - this.nettyThreadGroup = nettyThreadGroup; - this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis; - this.connectionTimeoutMillis = connectionTimeoutMillis; + public ConfigPusher(ConfigPusherConfiguration configuration) { + this.configuration = configuration; } public synchronized LinkedHashMap pushConfigs( @@ -94,7 +75,7 @@ public class ConfigPusher { throws InterruptedException { ConflictingVersionException lastException = null; - int maxAttempts = 30; + int maxAttempts = configuration.netconfPushConfigAttempts; for (int retryAttempt = 1; retryAttempt <= maxAttempts; retryAttempt++) { NetconfClient netconfClient = makeNetconfConnection(configSnapshotHolder.getCapabilities()); @@ -103,8 +84,9 @@ public class ConfigPusher { EditAndCommitResponse editAndCommitResponse = pushLastConfig(configSnapshotHolder, netconfClient); return new EditAndCommitResponseWithRetries(editAndCommitResponse, retryAttempt); } catch (ConflictingVersionException e) { + logger.debug("Conflicting version detected, will retry after timeout"); lastException = e; - Thread.sleep(1000); + Thread.sleep(configuration.netconfPushConfigDelayMs); } catch (RuntimeException e) { throw new IllegalStateException("Unable to load " + configSnapshotHolder, e); } finally { @@ -126,24 +108,25 @@ public class ConfigPusher { // could be utilized by integration tests final long pollingStartNanos = System.nanoTime(); - final long deadlineNanos = pollingStartNanos + TimeUnit.MILLISECONDS.toNanos(maxWaitForCapabilitiesMillis); + final long deadlineNanos = pollingStartNanos + TimeUnit.MILLISECONDS.toNanos(configuration.netconfCapabilitiesWaitTimeoutMs); int attempt = 0; - String additionalHeader = NetconfMessageAdditionalHeader.toString("unknown", address.getAddress().getHostAddress(), - Integer.toString(address.getPort()), "tcp", Optional.of("persister")); + NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("unknown", + configuration.netconfAddress.getAddress().getHostAddress(), + Integer.toString(configuration.netconfAddress.getPort()), "tcp", "persister"); Set latestCapabilities = null; while (System.nanoTime() < deadlineNanos) { attempt++; - NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(nettyThreadGroup, - nettyThreadGroup, additionalHeader, connectionTimeoutMillis); + NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(configuration.eventLoopGroup, + configuration.eventLoopGroup, additionalHeader, configuration.connectionAttemptTimeoutMs); NetconfClient netconfClient; try { - netconfClient = new NetconfClient(this.toString(), address, delayMillis, netconfClientDispatcher); + netconfClient = new NetconfClient(this.toString(), configuration.netconfAddress, configuration.connectionAttemptDelayMs, netconfClientDispatcher); } catch (IllegalStateException e) { - logger.debug("Netconf {} was not initialized or is not stable, attempt {}", address, attempt, e); + logger.debug("Netconf {} was not initialized or is not stable, attempt {}", configuration.netconfAddress, attempt, e); netconfClientDispatcher.close(); - Thread.sleep(delayMillis); + Thread.sleep(configuration.connectionAttemptDelayMs); continue; } latestCapabilities = netconfClient.getCapabilities(); @@ -152,21 +135,29 @@ public class ConfigPusher { logger.trace("Session id received from netconf server: {}", netconfClient.getClientSession()); return netconfClient; } - logger.debug("Polling hello from netconf, attempt {}, capabilities {}", attempt, latestCapabilities); + Set allNotFound = computeNotFoundCapabilities(expectedCaps, latestCapabilities); + logger.debug("Netconf server did not provide required capabilities. Attempt {}. " + + "Expected but not found: {}, all expected {}, current {}", + attempt, allNotFound, expectedCaps, latestCapabilities); Util.closeClientAndDispatcher(netconfClient); - Thread.sleep(delayMillis); + Thread.sleep(configuration.connectionAttemptDelayMs); } if (latestCapabilities == null) { - logger.error("Could not connect to the server in {} ms", maxWaitForCapabilitiesMillis); + logger.error("Could not connect to the server in {} ms", configuration.netconfCapabilitiesWaitTimeoutMs); throw new RuntimeException("Could not connect to netconf server"); } - Set allNotFound = new HashSet<>(expectedCaps); - allNotFound.removeAll(latestCapabilities); + Set allNotFound = computeNotFoundCapabilities(expectedCaps, latestCapabilities); logger.error("Netconf server did not provide required capabilities. Expected but not found: {}, all expected {}, current {}", allNotFound, expectedCaps, latestCapabilities); throw new RuntimeException("Netconf server did not provide required capabilities. Expected but not found:" + allNotFound); } + private static Set computeNotFoundCapabilities(Set expectedCaps, Set latestCapabilities) { + Set allNotFound = new HashSet<>(expectedCaps); + allNotFound.removeAll(latestCapabilities); + return allNotFound; + } + /** * Sends two RPCs to the netconf server: edit-config and commit. @@ -219,12 +210,17 @@ public class ConfigPusher { } - private static NetconfMessage sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfClient netconfClient) throws IOException { + private NetconfMessage sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfClient netconfClient) + throws ConflictingVersionException, IOException { try { - NetconfMessage netconfMessage = netconfClient.sendMessage(request, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY); + NetconfMessage netconfMessage = netconfClient.sendMessage(request, + configuration.netconfSendMessageMaxAttempts, configuration.netconfSendMessageDelayMs); NetconfUtil.checkIsMessageOk(netconfMessage); return netconfMessage; - } catch (RuntimeException e) { // TODO: change NetconfClient#sendMessage to throw checked exceptions + } catch(ConflictingVersionException e) { + logger.trace("conflicting version detected: {}", e.toString()); + throw e; + } catch (RuntimeException | ExecutionException | InterruptedException | TimeoutException e) { // TODO: change NetconfClient#sendMessage to throw checked exceptions logger.debug("Error while executing netconf transaction {} to {}", request, netconfClient, e); throw new IOException("Failed to execute netconf transaction", e); } @@ -316,4 +312,5 @@ public class ConfigPusher { '}'; } } -} \ No newline at end of file + +}