package org.opendaylight.controller.netconf.persist.impl;
import com.google.common.base.Preconditions;
-import io.netty.channel.EventLoopGroup;
import org.opendaylight.controller.config.api.ConflictingVersionException;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import javax.annotation.concurrent.Immutable;
import java.io.IOException;
import java.io.InputStream;
-import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
@Immutable
public class ConfigPusher {
- private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class);
- private static final int NETCONF_SEND_ATTEMPT_MS_DELAY = 1000;
- private static final int NETCONF_SEND_ATTEMPTS = 20;
-
- private final InetSocketAddress address;
- private final EventLoopGroup nettyThreadGroup;
-
- // Default timeout for netconf becoming stable
- public static final long DEFAULT_MAX_WAIT_FOR_CAPABILITIES_MILLIS = TimeUnit.MINUTES.toMillis(2);
- public static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 5000;
- private final int delayMillis = 5000;
- private final long maxWaitForCapabilitiesMillis;
- private final long connectionTimeoutMillis;
-
- public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadGroup) {
- this(address, nettyThreadGroup, DEFAULT_MAX_WAIT_FOR_CAPABILITIES_MILLIS, DEFAULT_CONNECTION_TIMEOUT_MILLIS);
- }
+ private static final Logger logger = LoggerFactory.getLogger(ConfigPusher.class);
+
+ private final ConfigPusherConfiguration configuration;
- public ConfigPusher(InetSocketAddress address, EventLoopGroup nettyThreadGroup,
- long maxWaitForCapabilitiesMillis, long connectionTimeoutMillis) {
- this.address = address;
- this.nettyThreadGroup = nettyThreadGroup;
- this.maxWaitForCapabilitiesMillis = maxWaitForCapabilitiesMillis;
- this.connectionTimeoutMillis = connectionTimeoutMillis;
+ public ConfigPusher(ConfigPusherConfiguration configuration) {
+ this.configuration = configuration;
}
public synchronized LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponseWithRetries> pushConfigs(
throws InterruptedException {
ConflictingVersionException lastException = null;
- int maxAttempts = 30;
+ int maxAttempts = configuration.netconfPushConfigAttempts;
for (int retryAttempt = 1; retryAttempt <= maxAttempts; retryAttempt++) {
NetconfClient netconfClient = makeNetconfConnection(configSnapshotHolder.getCapabilities());
} catch (ConflictingVersionException e) {
logger.debug("Conflicting version detected, will retry after timeout");
lastException = e;
- Thread.sleep(1000);
+ Thread.sleep(configuration.netconfPushConfigDelayMs);
} catch (RuntimeException e) {
throw new IllegalStateException("Unable to load " + configSnapshotHolder, e);
} finally {
// could be utilized by integration tests
final long pollingStartNanos = System.nanoTime();
- final long deadlineNanos = pollingStartNanos + TimeUnit.MILLISECONDS.toNanos(maxWaitForCapabilitiesMillis);
+ final long deadlineNanos = pollingStartNanos + TimeUnit.MILLISECONDS.toNanos(configuration.netconfCapabilitiesWaitTimeoutMs);
int attempt = 0;
- NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("unknown", address.getAddress().getHostAddress(),
- Integer.toString(address.getPort()), "tcp", "persister");
+ NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("unknown",
+ configuration.netconfAddress.getAddress().getHostAddress(),
+ Integer.toString(configuration.netconfAddress.getPort()), "tcp", "persister");
Set<String> latestCapabilities = null;
while (System.nanoTime() < deadlineNanos) {
attempt++;
- NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(nettyThreadGroup,
- nettyThreadGroup, additionalHeader, connectionTimeoutMillis);
+ NetconfClientDispatcher netconfClientDispatcher = new NetconfClientDispatcher(configuration.eventLoopGroup,
+ configuration.eventLoopGroup, additionalHeader, configuration.connectionAttemptTimeoutMs);
NetconfClient netconfClient;
try {
- netconfClient = new NetconfClient(this.toString(), address, delayMillis, netconfClientDispatcher);
+ netconfClient = new NetconfClient(this.toString(), configuration.netconfAddress, configuration.connectionAttemptDelayMs, netconfClientDispatcher);
} catch (IllegalStateException e) {
- logger.debug("Netconf {} was not initialized or is not stable, attempt {}", address, attempt, e);
+ logger.debug("Netconf {} was not initialized or is not stable, attempt {}", configuration.netconfAddress, attempt, e);
netconfClientDispatcher.close();
- Thread.sleep(delayMillis);
+ Thread.sleep(configuration.connectionAttemptDelayMs);
continue;
}
latestCapabilities = netconfClient.getCapabilities();
"Expected but not found: {}, all expected {}, current {}",
attempt, allNotFound, expectedCaps, latestCapabilities);
Util.closeClientAndDispatcher(netconfClient);
- Thread.sleep(delayMillis);
+ Thread.sleep(configuration.connectionAttemptDelayMs);
}
if (latestCapabilities == null) {
- logger.error("Could not connect to the server in {} ms", maxWaitForCapabilitiesMillis);
+ logger.error("Could not connect to the server in {} ms", configuration.netconfCapabilitiesWaitTimeoutMs);
throw new RuntimeException("Could not connect to netconf server");
}
Set<String> allNotFound = computeNotFoundCapabilities(expectedCaps, latestCapabilities);
}
- private static NetconfMessage sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfClient netconfClient)
+ private NetconfMessage sendRequestGetResponseCheckIsOK(NetconfMessage request, NetconfClient netconfClient)
throws ConflictingVersionException, IOException {
try {
- NetconfMessage netconfMessage = netconfClient.sendMessage(request, NETCONF_SEND_ATTEMPTS, NETCONF_SEND_ATTEMPT_MS_DELAY);
+ NetconfMessage netconfMessage = netconfClient.sendMessage(request,
+ configuration.netconfSendMessageMaxAttempts, configuration.netconfSendMessageDelayMs);
NetconfUtil.checkIsMessageOk(netconfMessage);
return netconfMessage;
- }catch(ConflictingVersionException e) {
+ } catch(ConflictingVersionException e) {
logger.trace("conflicting version detected: {}", e.toString());
throw e;
} catch (RuntimeException | ExecutionException | InterruptedException | TimeoutException e) { // TODO: change NetconfClient#sendMessage to throw checked exceptions
'}';
}
}
+
}