private void scheduleKeepalive() {
Preconditions.checkState(currentDeviceRpc != null);
LOG.trace("{}: Scheduling next keepalive in {} {}", id, keepaliveDelaySeconds, TimeUnit.SECONDS);
- currentKeepalive = executor.schedule(new Keepalive(), keepaliveDelaySeconds, TimeUnit.SECONDS);
+ currentKeepalive = executor.schedule(new Keepalive(currentKeepalive), keepaliveDelaySeconds, TimeUnit.SECONDS);
}
@Override
*/
private class Keepalive implements Runnable, FutureCallback<DOMRpcResult> {
+ private final ScheduledFuture<?> previousKeepalive;
+
+ public Keepalive(final ScheduledFuture<?> previousKeepalive) {
+ this.previousKeepalive = previousKeepalive;
+ }
+
@Override
public void run() {
LOG.trace("{}: Invoking keepalive RPC", id);
try {
- Futures.addCallback(currentDeviceRpc.invokeRpc(PATH, KEEPALIVE_PAYLOAD), this);
+ if(previousKeepalive != null && !previousKeepalive.isDone()) {
+ onFailure(new IllegalStateException("Previous keepalive timed out"));
+ } else {
+ Futures.addCallback(currentDeviceRpc.invokeRpc(PATH, KEEPALIVE_PAYLOAD), this);
+ }
} catch (NullPointerException e) {
LOG.debug("{}: Skipping keepalive while reconnecting", id);
// Empty catch block intentional
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Matchers;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@Mock
private RemoteDeviceHandler<NetconfSessionPreferences> underlyingSalFacade;
- private static java.util.concurrent.ScheduledExecutorService executorService;
+ private ScheduledExecutorService executorServiceSpy;
@Mock
private NetconfDeviceCommunicator listener;
private DOMRpcService proxyRpc;
+ @Mock
+ private ScheduledFuture currentKeepalive;
+
@Before
public void setUp() throws Exception {
- executorService = Executors.newScheduledThreadPool(1);
MockitoAnnotations.initMocks(this);
doReturn("mockedRpc").when(deviceRpc).toString();
doNothing().when(underlyingSalFacade).onDeviceConnected(
any(SchemaContext.class), any(NetconfSessionPreferences.class), any(DOMRpcService.class));
+
+ ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
+ executorServiceSpy = Mockito.spy(executorService);
+ doAnswer(new Answer<ScheduledFuture>() {
+ @Override
+ public ScheduledFuture answer(InvocationOnMock invocationOnMock)
+ throws Throwable {
+ invocationOnMock.callRealMethod();
+ return currentKeepalive;
+ }
+ }).when(executorServiceSpy).schedule(Mockito.<Runnable> any(),
+ Mockito.anyLong(), Matchers.<TimeUnit> any());
+
+ Mockito.when(currentKeepalive.isDone()).thenReturn(true);
}
@After
public void tearDown() throws Exception {
- executorService.shutdownNow();
+ executorServiceSpy.shutdownNow();
}
@Test
final DOMRpcResult result = new DefaultDOMRpcResult(Builders.containerBuilder().withNodeIdentifier(
new YangInstanceIdentifier.NodeIdentifier(NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME)).build());
- doReturn(Futures.immediateCheckedFuture(result)).when(deviceRpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
+ doReturn(Futures.immediateCheckedFuture(result))
+ .when(deviceRpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
final KeepaliveSalFacade keepaliveSalFacade =
- new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorService, 1L);
+ new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorServiceSpy, 1L);
+
keepaliveSalFacade.setListener(listener);
keepaliveSalFacade.onDeviceConnected(null, null, deviceRpc);
.when(deviceRpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
final KeepaliveSalFacade keepaliveSalFacade =
- new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorService, 1L);
+ new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorServiceSpy, 1L);
keepaliveSalFacade.setListener(listener);
keepaliveSalFacade.onDeviceConnected(null, null, deviceRpc);
// 1 failed that results in disconnect, 3 total with previous fail
verify(listener, timeout(15000).times(3)).disconnect();
+
+
+ Mockito.when(currentKeepalive.isDone()).thenReturn(false);
+ keepaliveSalFacade.onDeviceConnected(null, null, deviceRpc);
+ // 1 failed that results in disconnect, 4 total with previous fail
+ verify(listener, timeout(15000).times(4)).disconnect();
}
@Test
.when(deviceRpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
final KeepaliveSalFacade keepaliveSalFacade =
- new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorService, 100L);
+ new KeepaliveSalFacade(REMOTE_DEVICE_ID, underlyingSalFacade, executorServiceSpy, 100L);
keepaliveSalFacade.setListener(listener);
keepaliveSalFacade.onDeviceConnected(null, null, deviceRpc);