new ServiceTracker<>(context, NetconfNotificationCollector.class, new ServiceTrackerCustomizer<NetconfNotificationCollector, NetconfNotificationCollector>() {
@Override
public NetconfNotificationCollector addingService(ServiceReference<NetconfNotificationCollector> reference) {
- Preconditions.checkState(listenerReg != null, "Notification collector service was already added");
+ Preconditions.checkState(listenerReg == null, "Notification collector service was already added");
listenerReg = context.getService(reference).registerBaseNotificationPublisher();
monitoringService.setNotificationPublisher(listenerReg);
return null;
@Override
public void onSuccess(final DOMRpcResult result) {
- LOG.debug("{}: Keepalive RPC successful with response: {}", id, result.getResult());
- scheduleKeepalive();
+ if (result != null && result.getResult() != null) {
+ LOG.debug("{}: Keepalive RPC successful with response: {}", id, result.getResult());
+ scheduleKeepalive();
+ } else {
+ LOG.warn("{} Keepalive RPC returned null with response: {}. Reconnecting netconf session", id, result);
+ reconnect();
+ }
}
@Override
final DOMRpcResult result = new DefaultDOMRpcResult(Builders.containerBuilder().withNodeIdentifier(
new YangInstanceIdentifier.NodeIdentifier(NetconfMessageTransformUtil.NETCONF_RUNNING_QNAME)).build());
- final DOMRpcResult resultFail = new DefaultDOMRpcResult(mock(RpcError.class));
+ RpcError error = mock(RpcError.class);
+ doReturn("Failure").when(error).toString();
+
+ final DOMRpcResult resultFailWithResultAndError = new DefaultDOMRpcResult(mock(NormalizedNode.class), error);
doReturn(Futures.immediateCheckedFuture(result))
- .doReturn(Futures.immediateCheckedFuture(resultFail))
+ .doReturn(Futures.immediateCheckedFuture(resultFailWithResultAndError))
.doReturn(Futures.immediateFailedCheckedFuture(new IllegalStateException("illegal-state")))
.when(deviceRpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
// Reconnect with same keepalive responses
doReturn(Futures.immediateCheckedFuture(result))
- .doReturn(Futures.immediateCheckedFuture(resultFail))
+ .doReturn(Futures.immediateCheckedFuture(resultFailWithResultAndError))
.doReturn(Futures.immediateFailedCheckedFuture(new IllegalStateException("illegal-state")))
.when(deviceRpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
verify(listener, timeout(15000).times(2)).disconnect();
// 6 attempts now total
verify(deviceRpc, times(3 * 2)).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
+
+ final DOMRpcResult resultFailwithError = new DefaultDOMRpcResult(error);
+
+ doReturn(Futures.immediateCheckedFuture(resultFailwithError))
+ .when(deviceRpc).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
+
+ keepaliveSalFacade.onDeviceConnected(null, null, deviceRpc);
+
+ // 1 failed that results in disconnect, 3 total with previous fail
+ verify(listener, timeout(15000).times(3)).disconnect();
}
@Test
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.util.ArrayList;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.annotation.Arg;
import net.sourceforge.argparse4j.inf.ArgumentParser;
@Arg(dest = "throttle")
public int throttle;
+ @Arg(dest = "auth")
+ public ArrayList<String> auth;
+
static ArgumentParser getParser() {
final ArgumentParser parser = ArgumentParsers.newArgumentParser("netconf stress client");
"with mutltiple threads this gets divided among all threads")
.dest("throttle");
+ parser.addArgument("--auth")
+ .nargs(2)
+ .help("Username and password for HTTP basic authentication in order username password.")
+ .dest("auth");
+
return parser;
}
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
+import com.ning.http.client.Realm;
import com.ning.http.client.Request;
import java.util.ArrayList;
import java.util.concurrent.Callable;
.build());
this.payloads = new ArrayList<>();
for (DestToPayload payload : payloads) {
- this.payloads.add(asyncHttpClient.preparePost(payload.getDestination())
+ AsyncHttpClient.BoundRequestBuilder requestBuilder = asyncHttpClient.preparePost(payload.getDestination())
.addHeader("content-type", "application/json")
.addHeader("Accept", "application/xml")
.setBody(payload.getPayload())
- .setRequestTimeout(Integer.MAX_VALUE)
- .build());
+ .setRequestTimeout(Integer.MAX_VALUE);
+
+ if(params.auth != null) {
+ requestBuilder.setRealm(new Realm.RealmBuilder()
+ .setScheme(Realm.AuthScheme.BASIC)
+ .setPrincipal(params.auth.get(0))
+ .setPassword(params.auth.get(1))
+ .setUsePreemptiveAuth(true)
+ .build());
+ }
+
+ this.payloads.add(requestBuilder.build());
}
executionStrategy = getExecutionStrategy();
}