@Override
public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
- return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()));
+ return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()), rpc);
}
@Override
import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
import org.opendaylight.yangtools.yang.common.QName;
private static final class Request {
final UncancellableFuture<RpcResult<CompositeNode>> future;
final NetconfMessage request;
+ final QName rpc;
- private Request(UncancellableFuture<RpcResult<CompositeNode>> future, NetconfMessage request) {
+ private Request(UncancellableFuture<RpcResult<CompositeNode>> future, NetconfMessage request, final QName rpc) {
this.future = future;
this.request = request;
+ this.rpc = rpc;
}
}
return;
}
- r.future.set(Rpcs.getRpcResult(true, NetconfMapping.toNotificationNode(message, device.getSchemaContext()),
- Collections.<RpcError>emptyList()));
+ r.future.set(NetconfMapping.toRpcResult(message, r.rpc, device.getSchemaContext()));
} else {
LOG.warn("Ignoring unsolicited message", message);
}
}
- synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message) {
+ synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message, final QName rpc) {
if (session == null) {
LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message);
return Futures.<RpcResult<CompositeNode>>immediateFuture(new RpcResult<CompositeNode>() {
});
}
- final Request req = new Request(new UncancellableFuture<RpcResult<CompositeNode>>(true), message);
+ final Request req = new Request(new UncancellableFuture<RpcResult<CompositeNode>>(true), message, rpc);
requests.add(req);
session.sendMessage(req.request).addListener(new FutureListener<Void>() {
public void operationComplete(final Future<Void> future) throws Exception {
if (!future.isSuccess()) {
// We expect that a session down will occur at this point
- LOG.debug("Failed to send request {}", req.request, future.cause());
+ LOG.debug("Failed to send request {}", XmlUtil.toString(req.request.getDocument()), future.cause());
req.future.setException(future.cause());
} else {
LOG.trace("Finished sending request {}", req.request);
rawRpc = it.toInstance();
// sys(xmlData)
} else {
- RpcDefinition rpcSchema = Iterables.find(context.get().getOperations(), new Predicate<RpcDefinition>() {
- @Override
- public boolean apply(final RpcDefinition input) {
- return rpc == input.getQName();
- }
- });
rawRpc = (CompositeNode) toCompositeNode(message.getDocument());
}
else {
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class NetconfRemoteSchemaSourceProvider implements SchemaSourceProvider<String> {
private final NetconfDevice device;
+ private final Logger logger;
+
public NetconfRemoteSchemaSourceProvider(NetconfDevice device) {
this.device = Preconditions.checkNotNull(device);
+ logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + device.getName());
}
@Override
request.addLeaf("version", revision.get());
}
- device.logger.trace("Loading YANG schema source for {}:{}", moduleName, revision);
+ logger.trace("Loading YANG schema source for {}:{}", moduleName, revision);
try {
RpcResult<CompositeNode> schemaReply = device.invokeRpc(GET_SCHEMA_QNAME, request.toInstance()).get();
if (schemaReply.isSuccessful()) {
return Optional.of(schemaBody);
}
}
- device.logger.warn("YANG shcema was not successfully retrieved.");
+ logger.warn("YANG shcema was not successfully retrieved.");
} catch (InterruptedException | ExecutionException e) {
- device.logger.warn("YANG shcema was not successfully retrieved.", e);
+ logger.warn("YANG shcema was not successfully retrieved.", e);
}
return Optional.absent();
}
public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorFactory<NetconfMessage, NetconfClientSession, NetconfClientSessionListener> {
public static final java.util.Set<String> CLIENT_CAPABILITIES = Sets.newHashSet(
- XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0,
- XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_1,
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1,
XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0);
private static final String START_EXI_MESSAGE_ID = "default-start-exi";
package org.opendaylight.controller.netconf.impl;
-import com.google.common.collect.Sets;
-import io.netty.channel.Channel;
-import io.netty.util.Timer;
-import io.netty.util.concurrent.Promise;
+import static org.opendaylight.controller.netconf.mapping.api.NetconfOperationProvider.NetconfOperationProviderUtil.getNetconfSessionIdForReporting;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfServerSessionPreferences;
import org.opendaylight.controller.netconf.impl.mapping.CapabilityProvider;
import org.opendaylight.protocol.framework.SessionListenerFactory;
import org.opendaylight.protocol.framework.SessionNegotiator;
import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.Set;
+import com.google.common.collect.Sets;
-import static org.opendaylight.controller.netconf.mapping.api.NetconfOperationProvider.NetconfOperationProviderUtil.getNetconfSessionIdForReporting;
+import io.netty.channel.Channel;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.Promise;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class NetconfServerSessionNegotiatorFactory implements SessionNegotiatorFactory<NetconfHelloMessage, NetconfServerSession, NetconfServerSessionListener> {
- private static final Set<String> DEFAULT_CAPABILITIES = Sets.newHashSet(
- XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0,
- XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0);
+ private static final Set<String> DEFAULT_BASE_CAPABILITIES = ImmutableSet.of(
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0
+ // FIXME, Chunk framing causes ConcurrentClientsTest to fail, investigate
+// XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1,
+ // FIXME, EXI causing issues with sal-netconf-connector, investigate
+// XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
+ );
private final Timer timer;
private final SessionMonitoringService monitoringService;
private static final Logger logger = LoggerFactory.getLogger(NetconfServerSessionNegotiatorFactory.class);
+ // TODO too many params, refactor
public NetconfServerSessionNegotiatorFactory(Timer timer, NetconfOperationProvider netconfOperationProvider,
SessionIdProvider idProvider, long connectionTimeoutMillis,
- DefaultCommitNotificationProducer commitNot, SessionMonitoringService monitoringService) {
+ DefaultCommitNotificationProducer commitNot,
+ SessionMonitoringService monitoringService) {
this.timer = timer;
this.netconfOperationProvider = netconfOperationProvider;
this.idProvider = idProvider;
}
private NetconfHelloMessage createHelloMessage(long sessionId, CapabilityProvider capabilityProvider) throws NetconfDocumentedException {
- return NetconfHelloMessage.createServerHello(Sets.union(capabilityProvider.getCapabilities(), DEFAULT_CAPABILITIES), sessionId);
+ return NetconfHelloMessage.createServerHello(Sets.union(capabilityProvider.getCapabilities(), DEFAULT_BASE_CAPABILITIES), sessionId);
}
}
package org.opendaylight.controller.netconf.impl;
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.HashedWheelTimer;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+
+import java.io.DataOutputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.controller.netconf.client.test.TestingNetconfClient;
import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
import org.opendaylight.controller.netconf.mapping.api.Capability;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.opendaylight.controller.netconf.util.messages.NetconfStartExiMessage;
import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
-import java.io.DataOutputStream;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
public class ConcurrentClientsTest {
SessionIdProvider idProvider = new SessionIdProvider();
hashedWheelTimer = new HashedWheelTimer();
+
NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
hashedWheelTimer, factoriesListener, idProvider, 5000, commitNot, createMockedMonitoringService());
commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
-
-
NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory);
dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
@Override
public HandlingPriority canHandle(Document message) {
- return HandlingPriority.getHandlingPriority(Integer.MAX_VALUE);
+ return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ?
+ HandlingPriority.CANNOT_HANDLE :
+ HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
}
@Override
commitNot.close();
}
- @Test
+ @Test(timeout = 30 * 1000)
public void multipleClients() throws Exception {
List<TestingThread> threads = new ArrayList<>();
}
}
- @Test
+ @Test(timeout = 30 * 1000)
public void synchronizationTest() throws Exception {
new BlockingThread("foo").run2();
}
- @Test
+ @Test(timeout = 30 * 1000)
public void multipleBlockingClients() throws Exception {
List<BlockingThread> threads = new ArrayList<>();
for (int i = 0; i < CONCURRENCY; i++) {
@Override
public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
final ChannelFuture future = channel.writeAndFlush(netconfMessage);
- if (delayedEncoder !=null) {
+ if (delayedEncoder != null) {
replaceMessageEncoder(delayedEncoder);
delayedEncoder = null;
}
// TODO duplicate
public static final String RFC4741_TARGET_NAMESPACE = "urn:ietf:params:xml:ns:netconf:base:1.0";
public static final String URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0 = "urn:ietf:params:xml:ns:netconf:base:1.0";
- public static final String URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_1 = "urn:ietf:params:xml:ns:netconf:base:1.1";
+// public static final String URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_1 = "urn:ietf:params:xml:ns:netconf:base:1.1";
+ public static final String URN_IETF_PARAMS_NETCONF_BASE_1_0 = "urn:ietf:params:netconf:base:1.0";
+ public static final String URN_IETF_PARAMS_NETCONF_BASE_1_1 = "urn:ietf:params:netconf:base:1.1";
public static final String URN_IETF_PARAMS_XML_NS_NETCONF_EXI_1_0 = "urn:ietf:params:xml:ns:netconf:exi:1.0";
public static final String URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0 = "urn:ietf:params:netconf:capability:exi:1.0";