LOG.info("{}: Concurrent rpc limit is smaller than 1, no limit will be enforced.", remoteDeviceId);
}
- return new NetconfConnectorDTO(
- userCapabilities.isPresent() ? new NetconfDeviceCommunicator(remoteDeviceId, device,
- new UserPreferences(userCapabilities.get(),
- Objects.isNull(node.getYangModuleCapabilities())
- ? false : node.getYangModuleCapabilities().isOverride(),
- Objects.isNull(node.getNonModuleCapabilities())
- ? false : node.getNonModuleCapabilities().isOverride()), rpcMessageLimit)
- : new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit), salFacade);
+ NetconfDeviceCommunicator netconfDeviceCommunicator =
+ userCapabilities.isPresent() ? new NetconfDeviceCommunicator(remoteDeviceId, device,
+ new UserPreferences(userCapabilities.get(),
+ Objects.isNull(node.getYangModuleCapabilities())
+ ? false : node.getYangModuleCapabilities().isOverride(),
+ Objects.isNull(node.getNonModuleCapabilities())
+ ? false : node.getNonModuleCapabilities().isOverride()), rpcMessageLimit)
+ : new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit);
+
+ if (salFacade instanceof KeepaliveSalFacade) {
+ ((KeepaliveSalFacade)salFacade).setListener(netconfDeviceCommunicator);
+ }
+ return new NetconfConnectorDTO(netconfDeviceCommunicator, salFacade);
}
private static Optional<NetconfSessionPreferences> getUserCapabilities(final NetconfNode node) {
LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId);
}
- return new NetconfConnectorDTO(userCapabilities.isPresent()
- ? new NetconfDeviceCommunicator(remoteDeviceId, device, userCapabilities.get(), rpcMessageLimit)
- : new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit), salFacade);
+ NetconfDeviceCommunicator netconfDeviceCommunicator =
+ userCapabilities.isPresent() ? new NetconfDeviceCommunicator(remoteDeviceId, device,
+ userCapabilities.get(), rpcMessageLimit)
+ : new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit);
+
+ if (salFacade instanceof KeepaliveSalFacade) {
+ ((KeepaliveSalFacade)salFacade).setListener(netconfDeviceCommunicator);
+ }
+ return new NetconfConnectorDTO(netconfDeviceCommunicator, salFacade);
}
protected NetconfDevice.SchemaResourcesDTO setupSchemaCacheDTO(final NodeId nodeId, final NetconfNode node) {
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
private volatile NetconfDeviceCommunicator listener;
private volatile ScheduledFuture<?> currentKeepalive;
private volatile DOMRpcService currentDeviceRpc;
+ private final AtomicBoolean lastKeepAliveSucceeded = new AtomicBoolean(false);
public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
final ScheduledExecutorService executor, final long keepaliveDelaySeconds,
if (currentKeepalive != null) {
currentKeepalive.cancel(false);
}
- scheduleKeepalive();
+ scheduleKeepalives();
}
/**
salFacade.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc1);
LOG.debug("{}: Netconf session initiated, starting keepalives", id);
- scheduleKeepalive();
+ scheduleKeepalives();
}
- private void scheduleKeepalive() {
+ private void scheduleKeepalives() {
+ lastKeepAliveSucceeded.set(true);
Preconditions.checkState(currentDeviceRpc != null);
- LOG.trace("{}: Scheduling next keepalive in {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS);
- currentKeepalive = executor.schedule(new Keepalive(currentKeepalive), keepaliveDelaySeconds, TimeUnit.SECONDS);
+ LOG.trace("{}: Scheduling keepalives every {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS);
+ currentKeepalive = executor.scheduleWithFixedDelay(new Keepalive(),
+ keepaliveDelaySeconds, keepaliveDelaySeconds, TimeUnit.SECONDS);
}
@Override
*/
private class Keepalive implements Runnable, FutureCallback<DOMRpcResult> {
- private final ScheduledFuture<?> previousKeepalive;
-
- Keepalive(final ScheduledFuture<?> previousKeepalive) {
- this.previousKeepalive = previousKeepalive;
- }
-
@Override
public void run() {
LOG.trace("{}: Invoking keepalive RPC", id);
try {
- if (previousKeepalive != null && !previousKeepalive.isDone()) {
+ boolean lastJobSucceeded = lastKeepAliveSucceeded.getAndSet(false);
+ if (!lastJobSucceeded) {
onFailure(new IllegalStateException("Previous keepalive timed out"));
} else {
Futures.addCallback(currentDeviceRpc.invokeRpc(PATH, KEEPALIVE_PAYLOAD), this,
// No matter what response we got, rpc-reply or rpc-error,
// we got it from device so the netconf session is OK
if (result != null && result.getResult() != null) {
- LOG.debug("{}: Keepalive RPC successful with response: {}", id, result.getResult());
- scheduleKeepalive();
- } else if (result != null && !result.getErrors().isEmpty()) {
+ lastKeepAliveSucceeded.set(true);
+ } else if (result != null && result.getErrors() != null) {
LOG.warn("{}: Keepalive RPC failed with error: {}", id, result.getErrors());
- scheduleKeepalive();
+ lastKeepAliveSucceeded.set(true);
} else {
LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
reconnect();