package org.opendaylight.controller.netconf.client;
-import io.netty.channel.Channel;
+import java.util.Collection;
+
import org.opendaylight.controller.netconf.util.AbstractNetconfSession;
import org.opendaylight.controller.netconf.util.handler.NetconfEXICodec;
import org.opendaylight.controller.netconf.util.handler.NetconfEXIToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
+import io.netty.channel.Channel;
public final class NetconfClientSession extends AbstractNetconfSession<NetconfClientSession, NetconfClientSessionListener> {
return capabilities;
}
-
@Override
protected NetconfClientSession thisInstance() {
return this;
package org.opendaylight.controller.netconf.client;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.util.Timer;
-import io.netty.util.concurrent.Promise;
+import java.util.Collection;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+
import org.opendaylight.controller.netconf.api.NetconfClientSessionPreferences;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.util.AbstractNetconfSessionNegotiator;
import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
+import org.opendaylight.controller.netconf.util.messages.NetconfStartExiMessage;
import org.opendaylight.controller.netconf.util.xml.XMLNetconfUtil;
import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpression;
+import io.netty.channel.Channel;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.Promise;
public class NetconfClientSessionNegotiator extends
AbstractNetconfSessionNegotiator<NetconfClientSessionPreferences, NetconfClientSession, NetconfClientSessionListener>
@Override
protected void handleMessage(NetconfHelloMessage netconfMessage) throws NetconfDocumentedException {
- NetconfClientSession session = super.getSessionForHelloMessage(netconfMessage);
-
- if (shouldUseExi(netconfMessage.getDocument())){
- logger.debug("Netconf session: {} should use exi.", session);
- tryToStartExi(session);
+ final NetconfClientSession session = getSessionForHelloMessage(netconfMessage);
+ replaceHelloMessageInboundHandler(session);
+
+ // If exi should be used, try to initiate exi communication
+ // Call negotiationSuccessFul after exi negotiation is finished successfully or not
+ if (shouldUseExi(netconfMessage)) {
+ logger.debug("Netconf session {} should use exi.", session);
+ NetconfStartExiMessage startExiMessage = (NetconfStartExiMessage) sessionPreferences.getStartExiMessage();
+ tryToInitiateExi(session, startExiMessage);
+ // Exi is not supported, release session immediately
} else {
logger.debug("Netconf session {} isn't capable using exi.", session);
negotiationSuccessful(session);
}
}
- private boolean shouldUseExi(Document doc) {
- return containsExi10Capability(doc)
+ /**
+ * Initiates exi communication by sending start-exi message and waiting for positive/negative response.
+ *
+ * @param startExiMessage
+ */
+ void tryToInitiateExi(final NetconfClientSession session, final NetconfStartExiMessage startExiMessage) {
+ session.sendMessage(startExiMessage).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(final ChannelFuture f) {
+ if (!f.isSuccess()) {
+ logger.warn("Failed to send start-exi message {} on session {}", startExiMessage, this, f.cause());
+ } else {
+ logger.trace("Start-exi message {} sent to socket on session {}", startExiMessage, this);
+ channel.pipeline().addAfter(
+ AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, ExiConfirmationInboundHandler.EXI_CONFIRMED_HANDLER,
+ new ExiConfirmationInboundHandler(session, startExiMessage));
+ }
+ }
+ });
+ }
+
+ private boolean shouldUseExi(NetconfHelloMessage helloMsg) {
+ return containsExi10Capability(helloMsg.getDocument())
&& containsExi10Capability(sessionPreferences.getHelloMessage().getDocument());
}
return false;
}
- private void tryToStartExi(final NetconfClientSession session) {
- final NetconfMessage startExi = sessionPreferences.getStartExiMessage();
- session.sendMessage(startExi).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(final ChannelFuture f) {
- if (!f.isSuccess()) {
- logger.warn("Failed to send start-exi message {} on session {}", startExi, session, f.cause());
- } else {
- logger.trace("Start-exi message {} sent to socket on session {}", startExi, session);
- NetconfClientSessionNegotiator.this.channel.pipeline().addAfter(
- AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, ExiConfirmationInboundHandler.EXI_CONFIRMED_HANDLER,
- new ExiConfirmationInboundHandler(session));
- }
- }
- });
- }
-
private long extractSessionId(Document doc) {
final Node sessionIdNode = (Node) XmlUtil.evaluateXPath(sessionIdXPath, doc, XPathConstants.NODE);
String textContent = sessionIdNode.getTextContent();
}
@Override
- protected NetconfClientSession getSession(NetconfClientSessionListener sessionListener, Channel channel, NetconfHelloMessage message) throws NetconfDocumentedException {
- return new NetconfClientSession(sessionListener, channel, extractSessionId(message.getDocument()),
- NetconfMessageUtil.extractCapabilitiesFromHello(message.getDocument()));
+ protected NetconfClientSession getSession(NetconfClientSessionListener sessionListener, Channel channel,
+ NetconfHelloMessage message) throws NetconfDocumentedException {
+ long sessionId = extractSessionId(message.getDocument());
+ Collection<String> capabilities = NetconfMessageUtil.extractCapabilitiesFromHello(message.getDocument());
+ return new NetconfClientSession(sessionListener, channel, sessionId, capabilities);
}
/**
private static final String EXI_CONFIRMED_HANDLER = "exiConfirmedHandler";
private final NetconfClientSession session;
+ private NetconfStartExiMessage startExiMessage;
- ExiConfirmationInboundHandler(NetconfClientSession session) {
+ ExiConfirmationInboundHandler(NetconfClientSession session, final NetconfStartExiMessage startExiMessage) {
this.session = session;
+ this.startExiMessage = startExiMessage;
}
@Override
if (NetconfMessageUtil.isOKMessage(netconfMessage)) {
logger.trace("Positive response on start-exi call received on session {}", session);
try {
- session.startExiCommunication(sessionPreferences.getStartExiMessage());
+ session.startExiCommunication(startExiMessage);
} catch (RuntimeException e) {
// Unable to add exi, continue without exi
logger.warn("Unable to start exi communication, Communication will continue without exi on session {}", session, e);
}
- // Error response
+ // Error response
} else if(NetconfMessageUtil.isErrorMessage(netconfMessage)) {
logger.warn(
"Error response to start-exi message {}, Communication will continue without exi on session {}",
XmlUtil.toString(netconfMessage.getDocument()), session);
- // Unexpected response to start-exi, throwing message away, continue without exi
+ // Unexpected response to start-exi, throwing message away, continue without exi
} else {
logger.warn(
"Unexpected response to start-exi message, should be ok, was {}, " +
negotiationSuccessful(session);
}
}
+
}
throw new IllegalStateException(e);
}
- NetconfClientSessionPreferences proposal = new NetconfClientSessionPreferences(helloMessage,startExiMessage);
+ NetconfClientSessionPreferences proposal = new NetconfClientSessionPreferences(helloMessage, startExiMessage);
return new NetconfClientSessionNegotiator(proposal, promise, channel, timer,
sessionListenerFactory.getSessionListener(),connectionTimeoutMillis);
}
package org.opendaylight.controller.netconf.impl;
-import com.google.common.base.Optional;
-import io.netty.channel.Channel;
-import io.netty.util.Timer;
-import io.netty.util.concurrent.Promise;
+import java.net.InetSocketAddress;
+
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfServerSessionPreferences;
import org.opendaylight.controller.netconf.util.AbstractNetconfSessionNegotiator;
import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
+import com.google.common.base.Optional;
+
+import io.netty.channel.Channel;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.Promise;
public class NetconfServerSessionNegotiator extends
AbstractNetconfSessionNegotiator<NetconfServerSessionPreferences, NetconfServerSession, NetconfServerSessionListener> {
super(sessionPreferences, promise, channel, timer, sessionListener, connectionTimeoutMillis);
}
+ @Override
+ protected void handleMessage(NetconfHelloMessage netconfMessage) throws NetconfDocumentedException {
+ NetconfServerSession session = getSessionForHelloMessage(netconfMessage);
+ replaceHelloMessageInboundHandler(session);
+ // Negotiation successful after all non hello messages were processed
+ negotiationSuccessful(session);
+ }
+
@Override
protected NetconfServerSession getSession(NetconfServerSessionListener sessionListener, Channel channel, NetconfHelloMessage message) {
Optional<NetconfHelloMessageAdditionalHeader> additionalHeader = message.getAdditionalHeader();
public class NetconfServerSessionNegotiatorFactory implements SessionNegotiatorFactory<NetconfHelloMessage, NetconfServerSession, NetconfServerSessionListener> {
+ // TODO make this configurable
private static final Set<String> DEFAULT_BASE_CAPABILITIES = ImmutableSet.of(
- XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0
- // FIXME, Chunk framing causes ConcurrentClientsTest to fail, investigate
-// XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1,
- // FIXME, EXI causing issues with sal-netconf-connector, investigate
-// XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1,
+ XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_CAPABILITY_EXI_1_0
);
private final Timer timer;
package org.opendaylight.controller.netconf.impl;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import com.google.common.base.Preconditions;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageUtil;
import org.opendaylight.controller.netconf.util.messages.NetconfStartExiMessage;
import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
public class ConcurrentClientsTest {
- private static final int CONCURRENCY = 16;
+ private static final int CONCURRENCY = 64;
+ public static final int NETTY_THREADS = 4;
+
private EventLoopGroup nettyGroup;
private NetconfClientDispatcher netconfClientDispatcher;
static final Logger logger = LoggerFactory.getLogger(ConcurrentClientsTest.class);
private DefaultCommitNotificationProducer commitNot;
- private NetconfServerDispatcher dispatch;
-
-
HashedWheelTimer hashedWheelTimer;
+ private TestingNetconfOperation testingNetconfOperation;
public static SessionMonitoringService createMockedMonitoringService() {
SessionMonitoringService monitoring = mock(SessionMonitoringService.class);
return monitoring;
}
+ // TODO refactor and test with different configurations
+
@Before
public void setUp() throws Exception {
- nettyGroup = new NioEventLoopGroup();
+ nettyGroup = new NioEventLoopGroup(NETTY_THREADS);
NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client");
netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000);
NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
- factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
+ testingNetconfOperation = new TestingNetconfOperation();
+ factoriesListener.onAddNetconfOperationServiceFactory(mockOpF(testingNetconfOperation));
SessionIdProvider idProvider = new SessionIdProvider();
hashedWheelTimer = new HashedWheelTimer();
commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory);
- dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
+ final NetconfServerDispatcher dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
ChannelFuture s = dispatch.createServer(netconfAddress);
s.await();
nettyGroup.shutdownGracefully();
}
- private NetconfOperationServiceFactory mockOpF() {
- return new NetconfOperationServiceFactory() {
- @Override
- public NetconfOperationService createService(String netconfSessionIdForReporting) {
- return new NetconfOperationService() {
- @Override
- public Set<Capability> getCapabilities() {
- return Collections.emptySet();
- }
-
- @Override
- public Set<NetconfOperation> getNetconfOperations() {
- return Sets.<NetconfOperation> newHashSet(new NetconfOperation() {
- @Override
- public HandlingPriority canHandle(Document message) {
- return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ?
- HandlingPriority.CANNOT_HANDLE :
- HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
- }
-
- @Override
- public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
- try {
- return XmlUtil.readXmlToDocument("<test/>");
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- });
- }
-
- @Override
- public void close() {
- }
- };
- }
- };
+ private NetconfOperationServiceFactory mockOpF(final NetconfOperation... operations) {
+ return new TestingOperationServiceFactory(operations);
}
@After
fail("Client thread " + thread + " failed: " + exception.getMessage());
}
}
+
+ assertEquals(CONCURRENCY, testingNetconfOperation.getMessageCount());
}
+ /**
+ * Cannot handle CHUNK, make server configurable
+ */
+ @Ignore
@Test(timeout = 30 * 1000)
public void synchronizationTest() throws Exception {
new BlockingThread("foo").run2();
}
+ /**
+ * Cannot handle CHUNK, make server configurable
+ */
+ @Ignore
@Test(timeout = 30 * 1000)
public void multipleBlockingClients() throws Exception {
List<BlockingThread> threads = new ArrayList<>();
}
}
+ private static class TestingNetconfOperation implements NetconfOperation {
+
+ private final AtomicLong counter = new AtomicLong();
+
+ @Override
+ public HandlingPriority canHandle(Document message) {
+ return XmlUtil.toString(message).contains(NetconfStartExiMessage.START_EXI) ?
+ HandlingPriority.CANNOT_HANDLE :
+ HandlingPriority.HANDLE_WITH_MAX_PRIORITY;
+ }
+
+ @Override
+ public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
+ try {
+ logger.info("Handling netconf message from test {}", XmlUtil.toString(requestMessage));
+ counter.getAndIncrement();
+ return XmlUtil.readXmlToDocument("<test/>");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public long getMessageCount() {
+ return counter.get();
+ }
+ }
+
+ private static class TestingOperationServiceFactory implements NetconfOperationServiceFactory {
+ private final NetconfOperation[] operations;
+
+ public TestingOperationServiceFactory(final NetconfOperation... operations) {
+ this.operations = operations;
+ }
+
+ @Override
+ public NetconfOperationService createService(String netconfSessionIdForReporting) {
+ return new NetconfOperationService() {
+ @Override
+ public Set<Capability> getCapabilities() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<NetconfOperation> getNetconfOperations() {
+ return Sets.<NetconfOperation> newHashSet(operations);
+ }
+
+ @Override
+ public void close() {
+ }
+ };
+ }
+ }
+
class BlockingThread extends Thread {
private Optional<Exception> thrownException;
.xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
NetconfMessage result = netconfClient.sendRequest(getMessage).get();
logger.info("Client with sessionid {} got result {}", sessionId, result);
+
+ Preconditions.checkState(NetconfMessageUtil.isErrorMessage(result) == false,
+ "Received error response: " + XmlUtil.toString(result.getDocument()) +
+ " to request: " + XmlUtil.toString(getMessage.getDocument()));
+
netconfClient.close();
logger.info("Client with session id {} ended", sessionId);
thrownException = Optional.absent();
</properties>
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>sal-binding-it</artifactId>
+ <exclusions>
+ <!-- FIXME see IdentityRefNetconfTest -->
+ <!-- Pax-url-aether contains guava classes e.g. ImmutableSet that clashes with guava and causes tests to fail-->
+ <exclusion>
+ <groupId>org.ops4j.pax.url</groupId>
+ <artifactId>pax-url-aether</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>commons.logback_settings</artifactId>
<artifactId>netty-config-api</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>sal-binding-it</artifactId>
- </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>yang-test</artifactId>
/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
+* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+*
+* This program and the accompanying materials are made available under the
+* terms of the Eclipse Public License v1.0 which accompanies this distribution,
+* and is available at http://www.eclipse.org/legal/epl-v10.html
+*/
package org.opendaylight.controller.netconf.it.pax;
import static org.junit.Assert.fail;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
-import javax.inject.Inject;
import javax.xml.parsers.ParserConfigurationException;
import org.junit.Assert;
public static final int CLIENT_CONNECTION_TIMEOUT_MILLIS = 15000;
// Wait for controller to start
- @Inject
+
+ // FIXME move this (pax) test to different module
+ // pax jars contain guava classes that clash with real guava dependencies in non-pax tests
+ //
+ //@Inject
@Filter(timeout = 60 * 1000)
BindingAwareBroker broker;
return sb.toString();
}
- protected <T extends ChannelHandler> T removeHandler(final Class<T> handlerType) {
- return this.channel.pipeline().remove(handlerType);
- }
-
- protected void replaceMessageDecoder(final ChannelHandler handler) {
+ protected final void replaceMessageDecoder(final ChannelHandler handler) {
replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, handler);
}
- protected void replaceMessageEncoder(final ChannelHandler handler) {
+ protected final void replaceMessageEncoder(final ChannelHandler handler) {
replaceChannelHandler(AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, handler);
}
- protected void replaceMessageEncoderAfterNextMessage(final ChannelHandler handler) {
+ protected final void replaceMessageEncoderAfterNextMessage(final ChannelHandler handler) {
this.delayedEncoder = handler;
}
- protected void replaceChannelHandler(final String handlerName, final ChannelHandler handler) {
+ protected final void replaceChannelHandler(final String handlerName, final ChannelHandler handler) {
channel.pipeline().replace(handlerName, handlerName, handler);
}
}
final NetconfEXICodec exiCodec = new NetconfEXICodec(exiParams.getOptions());
addExiHandlers(exiCodec);
- logger.debug("EXI handlers added to pipeline on session {}", this);
+ logger.debug("Session {} EXI handlers added to pipeline", this);
}
protected abstract void addExiHandlers(NetconfEXICodec exiCodec);
import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
import org.opendaylight.controller.netconf.util.handler.NetconfChunkAggregator;
import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder;
+import org.opendaylight.controller.netconf.util.handler.NetconfXMLToHelloMessageDecoder;
import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder;
import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
}
@Override
- protected void startNegotiation() {
+ protected final void startNegotiation() {
final Optional<SslHandler> sslHandler = getSslHandler(channel);
if (sslHandler.isPresent()) {
Future<Channel> future = sslHandler.get().handshakeFuture();
// FIXME, make sessionPreferences return HelloMessage, move NetconfHelloMessage to API
sendMessage((NetconfHelloMessage)helloMessage);
+
+ replaceHelloMessageOutboundHandler();
changeState(State.OPEN_WAIT);
}
+
private void cancelTimeout() {
if(timeout!=null) {
timeout.cancel();
}
}
- @Override
- protected void handleMessage(NetconfHelloMessage netconfMessage) throws NetconfDocumentedException {
- S session = getSessionForHelloMessage(netconfMessage) ;
- negotiationSuccessful(session);
- }
-
protected final S getSessionForHelloMessage(NetconfHelloMessage netconfMessage) throws NetconfDocumentedException {
Preconditions.checkNotNull(netconfMessage, "netconfMessage");
final Document doc = netconfMessage.getDocument();
- replaceHelloMessageHandlers();
-
if (shouldUseChunkFraming(doc)) {
insertChunkFramingToPipeline();
}
/**
* Insert chunk framing handlers into the pipeline
*/
- protected void insertChunkFramingToPipeline() {
+ private void insertChunkFramingToPipeline() {
replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_FRAME_ENCODER,
FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK));
replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR,
new NetconfChunkAggregator());
}
- protected boolean shouldUseChunkFraming(Document doc) {
+ private boolean shouldUseChunkFraming(Document doc) {
return containsBase11Capability(doc)
&& containsBase11Capability(sessionPreferences.getHelloMessage().getDocument());
}
/**
- * Remove special handlers for hello message. Insert regular netconf xml message (en|de)coders.
+ * Remove special inbound handler for hello message. Insert regular netconf xml message (en|de)coders.
+ *
+ * Inbound hello message handler should be kept until negotiation is successful
+ * It caches any non-hello messages while negotiation is still in progress
+ */
+ protected final void replaceHelloMessageInboundHandler(final S session) {
+ ChannelHandler helloMessageHandler = replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, new NetconfXMLToMessageDecoder());
+
+ Preconditions.checkState(helloMessageHandler instanceof NetconfXMLToHelloMessageDecoder,
+ "Pipeline handlers misplaced on session: %s, pipeline: %s", session, channel.pipeline());
+ Iterable<NetconfMessage> netconfMessagesFromNegotiation =
+ ((NetconfXMLToHelloMessageDecoder) helloMessageHandler).getPostHelloNetconfMessages();
+
+ // Process messages received during negotiation
+ // The hello message handler does not have to be synchronized, since it is always call from the same thread by netty
+ // It means, we are now using the thread now
+ for (NetconfMessage message : netconfMessagesFromNegotiation) {
+ session.handleMessage(message);
+ }
+ }
+
+ /**
+ * Remove special outbound handler for hello message. Insert regular netconf xml message (en|de)coders.
*/
- protected void replaceHelloMessageHandlers() {
- replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, new NetconfXMLToMessageDecoder());
+ private void replaceHelloMessageOutboundHandler() {
replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, new NetconfMessageToXMLEncoder());
}
protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message) throws NetconfDocumentedException;
- protected synchronized void changeState(final State newState) {
+ private synchronized void changeState(final State newState) {
logger.debug("Changing state from : {} to : {}", state, newState);
Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s", state,
newState);
private static final Logger LOG = LoggerFactory.getLogger(NetconfEXIToMessageDecoder.class);
-// private static final SAXTransformerFactory saxTransformerFactory = (SAXTransformerFactory)SAXTransformerFactory.newInstance();
-
private final NetconfEXICodec codec;
public NetconfEXIToMessageDecoder(final NetconfEXICodec codec) {
* the use of EXI, which means the next message needs to be decoded not by us, but rather
* by the XML decoder.
*/
- // If empty Byte buffer is passed to r.parse, EOFException is thrown
- if (in.readableBytes() == 0) {
+ // If empty Byte buffer is passed to r.parse, EOFException is thrown
+ if (in.isReadable() == false) {
LOG.debug("No more content in incoming buffer.");
return;
}
final DOMResult domResult = new DOMResult();
handler.setResult(domResult);
-
try (final InputStream is = new ByteBufInputStream(in)) {
r.parse(new InputSource(is));
}
*/
package org.opendaylight.controller.netconf.util.handler;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
* Customized NetconfXMLToMessageDecoder that reads additional header with
* session metadata from
* {@link org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage}
- * . Used by netconf server to retrieve information about session metadata.
+ *
+ *
+ * This handler should be replaced in pipeline by regular message handler as last step of negotiation.
+ * It serves as a message barrier and halts all non-hello netconf messages.
+ * Netconf messages after hello should be processed once the negotiation succeeded.
+ *
*/
public final class NetconfXMLToHelloMessageDecoder extends ByteToMessageDecoder {
private static final Logger LOG = LoggerFactory.getLogger(NetconfXMLToHelloMessageDecoder.class);
new byte[] { '\r', '\n', '[' },
new byte[] { '\n', '[' });
+ // State variables do not have to by synchronized
+ // Netty uses always the same (1) thread per pipeline
+ // We use instance of this per pipeline
+ private List<NetconfMessage> nonHelloMessages = Lists.newArrayList();
+ private boolean helloReceived = false;
+
@Override
@VisibleForTesting
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws IOException, SAXException, NetconfDocumentedException {
Document doc = XmlUtil.readXmlToDocument(new ByteArrayInputStream(bytes));
- final NetconfMessage message;
- if (additionalHeader != null) {
- message = new NetconfHelloMessage(doc, NetconfHelloMessageAdditionalHeader.fromString(additionalHeader));
+ final NetconfMessage message = getNetconfMessage(additionalHeader, doc);
+ if (message instanceof NetconfHelloMessage) {
+ Preconditions.checkState(helloReceived == false,
+ "Multiple hello messages received, unexpected hello: %s",
+ XmlUtil.toString(message.getDocument()));
+ out.add(message);
+ helloReceived = true;
+ // Non hello message, suspend the message and insert into cache
} else {
- message = new NetconfHelloMessage(doc);
+ Preconditions.checkState(helloReceived, "Hello message not received, instead received: %s",
+ XmlUtil.toString(message.getDocument()));
+ LOG.debug("Netconf message received during negotiation, caching {}",
+ XmlUtil.toString(message.getDocument()));
+ nonHelloMessages.add(message);
}
- out.add(message);
} finally {
in.discardReadBytes();
}
}
+ private NetconfMessage getNetconfMessage(final String additionalHeader, final Document doc) throws NetconfDocumentedException {
+ NetconfMessage msg = new NetconfMessage(doc);
+ if(NetconfHelloMessage.isHelloMessage(msg)) {
+ if (additionalHeader != null) {
+ return new NetconfHelloMessage(doc, NetconfHelloMessageAdditionalHeader.fromString(additionalHeader));
+ } else {
+ return new NetconfHelloMessage(doc);
+ }
+ }
+
+ return msg;
+ }
+
private int getAdditionalHeaderEndIndex(byte[] bytes) {
for (byte[] possibleEnd : POSSIBLE_ENDS) {
int idx = findByteSequence(bytes, possibleEnd);
return Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)).toString();
}
+ /**
+ * @return Collection of NetconfMessages that were not hello, but were received during negotiation
+ */
+ public Iterable<NetconfMessage> getPostHelloNetconfMessages() {
+ return nonHelloMessages;
+ }
}
*/
package org.opendaylight.controller.netconf.util.handler;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
-import java.io.IOException;
import java.util.List;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import org.xml.sax.SAXException;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
public final class NetconfXMLToMessageDecoder extends ByteToMessageDecoder {
private static final Logger LOG = LoggerFactory.getLogger(NetconfXMLToMessageDecoder.class);
@Override
@VisibleForTesting
- public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws IOException, SAXException {
+ public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+
if (in.readableBytes() != 0) {
LOG.trace("Received to decode: {}", ByteBufUtil.hexDump(in));
out.add(new NetconfMessage(XmlUtil.readXmlToDocument(new ByteBufInputStream(in))));
package org.opendaylight.controller.netconf.util.messages;
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
+
+import java.util.Set;
+
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import java.util.Set;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
/**
* NetconfMessage that can carry additional header with session metadata. See {@link org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader}
return additionalHeader== null ? Optional.<NetconfHelloMessageAdditionalHeader>absent() : Optional.of(additionalHeader);
}
- private static void checkHelloMessage(Document doc) throws NetconfDocumentedException {
- XmlElement.fromDomElementWithExpected(doc.getDocumentElement(), HELLO_TAG,
- XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0);
-
+ private static void checkHelloMessage(Document doc) {
+ Preconditions.checkArgument(isHelloMessage(doc),
+ "Hello message invalid format, should contain %s tag from namespace %s, but is: %s", HELLO_TAG,
+ XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0, XmlUtil.toString(doc));
}
public static NetconfHelloMessage createClientHello(Iterable<String> capabilities,
doc.getDocumentElement().appendChild(sessionIdElement);
return new NetconfHelloMessage(doc);
}
+
+ public static boolean isHelloMessage(final NetconfMessage msg) {
+ Document document = msg.getDocument();
+ return isHelloMessage(document);
+ }
+
+ private static boolean isHelloMessage(final Document document) {
+ XmlElement element = XmlElement.fromDomElement(document.getDocumentElement());
+ try {
+ return element.getName().equals(HELLO_TAG) &&
+ element.hasNamespace() &&
+ element.getNamespace().equals(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0);
+ } catch (MissingNameSpaceException e) {
+ // Cannot happen, since we check for hasNamespace
+ throw new IllegalStateException(e);
+ }
+ }
}
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import org.opendaylight.controller.netconf.api.NetconfSession;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
final NetconfDocumentedException sendErrorException) {
logger.trace("Sending error {}", sendErrorException.getMessage(), sendErrorException);
final Document errorDocument = createDocument(sendErrorException);
- session.sendMessage(new NetconfMessage(errorDocument));
+ ChannelFuture f = session.sendMessage(new NetconfMessage(errorDocument));
+ f.addListener(new SendErrorVerifyingListener(sendErrorException));
}
public static void sendErrorMessage(Channel channel, NetconfDocumentedException sendErrorException) {
logger.trace("Sending error {}", sendErrorException.getMessage(), sendErrorException);
final Document errorDocument = createDocument(sendErrorException);
- channel.writeAndFlush(new NetconfMessage(errorDocument));
+ ChannelFuture f = channel.writeAndFlush(new NetconfMessage(errorDocument));
+ f.addListener(new SendErrorVerifyingListener(sendErrorException));
}
public static void sendErrorMessage(NetconfSession session, NetconfDocumentedException sendErrorException,
final Document errorDocument = createDocument(sendErrorException);
logger.trace("Sending error {}", XmlUtil.toString(errorDocument));
tryToCopyAttributes(incommingMessage.getDocument(), errorDocument, sendErrorException);
- session.sendMessage(new NetconfMessage(errorDocument));
+ ChannelFuture f = session.sendMessage(new NetconfMessage(errorDocument));
+ f.addListener(new SendErrorVerifyingListener(sendErrorException));
}
private static void tryToCopyAttributes(final Document incommingDocument, final Document errorDocument,
return errorDocument;
}
+ /**
+ * Checks if netconf error was sent successfully.
+ */
+ private static final class SendErrorVerifyingListener implements ChannelFutureListener {
+ private final NetconfDocumentedException sendErrorException;
+
+ public SendErrorVerifyingListener(final NetconfDocumentedException sendErrorException) {
+ this.sendErrorException = sendErrorException;
+ }
+
+ @Override
+ public void operationComplete(final ChannelFuture channelFuture) throws Exception {
+ Preconditions.checkState(channelFuture.isSuccess(), "Unable to send exception {}", sendErrorException,
+ channelFuture.cause());
+ }
+ }
}