import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.EventLoop;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import org.mockito.internal.util.collections.Sets;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.opendaylight.netconf.api.NetconfClientSessionPreferences;
+import org.opendaylight.netconf.api.NetconfMessage;
import org.opendaylight.netconf.nettyutil.handler.ChunkedFramingMechanismEncoder;
import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToHelloMessageDecoder;
import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder;
import org.opendaylight.netconf.util.messages.NetconfHelloMessage;
import org.opendaylight.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
import org.opendaylight.netconf.util.test.XmlFileLoader;
-import org.opendaylight.netconf.api.NetconfClientSessionPreferences;
-import org.opendaylight.netconf.api.NetconfMessage;
import org.openexi.proc.common.EXIOptions;
import org.w3c.dom.Document;
pipeline = mockChannelPipeline();
future = mockChannelFuture();
channel = mockChannel();
+ mockEventLoop();
}
private static ChannelHandler mockChannelHandler() {
return pipeline;
}
+ private void mockEventLoop() {
+ final EventLoop eventLoop = mock(EventLoop.class);
+ doReturn(eventLoop).when(channel).eventLoop();
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ final Object[] args = invocation.getArguments();
+ final Runnable runnable = (Runnable) args[0];
+ runnable.run();
+ return null;
+ }
+ }).when(eventLoop).execute(any(Runnable.class));
+ }
+
private NetconfClientSessionNegotiator createNetconfClientSessionNegotiator(final Promise<NetconfClientSession> promise,
final NetconfMessage startExi) {
ChannelProgressivePromise progressivePromise = mock(ChannelProgressivePromise.class);
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.util.Set;
import org.junit.Before;
import org.junit.Test;
import org.mockito.internal.util.collections.Sets;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.opendaylight.netconf.util.messages.NetconfHelloMessage;
import org.opendaylight.netconf.api.NetconfMessage;
public void setUp() throws Exception {
channel = mock(Channel.class);
channelFuture = mock(ChannelFuture.class);
+ mockEventLoop();
doReturn(channelFuture).when(channel).writeAndFlush(anyObject());
+ doReturn(channelFuture).when(channelFuture).addListener(any(GenericFutureListener.class));
caps = Sets.newSet("a", "b");
helloMessage = NetconfHelloMessage.createServerHello(caps, 10);
message = new NetconfMessage(helloMessage.getDocument());
clientSession = new NetconfClientSession(sessionListener, channel, 20L, caps);
}
+ private void mockEventLoop() {
+ final EventLoop eventLoop = mock(EventLoop.class);
+ doReturn(eventLoop).when(channel).eventLoop();
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ final Object[] args = invocation.getArguments();
+ final Runnable runnable = (Runnable) args[0];
+ runnable.run();
+ return null;
+ }
+ }).when(eventLoop).execute(any(Runnable.class));
+ }
+
@Test
public void testSessionDown() throws Exception {
SimpleNetconfClientSessionListener simpleListener = new SimpleNetconfClientSessionListener();
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoop;
import io.netty.util.concurrent.GenericFutureListener;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
public class DefaultCloseSessionTest {
+ private void mockEventLoop(final Channel channel) {
+ final EventLoop eventLoop = mock(EventLoop.class);
+ doReturn(eventLoop).when(channel).eventLoop();
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ final Object[] args = invocation.getArguments();
+ final Runnable runnable = (Runnable) args[0];
+ runnable.run();
+ return null;
+ }
+ }).when(eventLoop).execute(any(Runnable.class));
+ doReturn(true).when(eventLoop).inEventLoop();
+ }
+
@Test
public void testDefaultCloseSession() throws Exception {
AutoCloseable res = mock(AutoCloseable.class);
XmlElement elem = XmlElement.fromDomElement(XmlUtil.readXmlToElement("<elem/>"));
final Channel channel = mock(Channel.class);
doReturn("channel").when(channel).toString();
- doReturn(mock(ChannelFuture.class)).when(channel).close();
+ mockEventLoop(channel);
+ final ChannelFuture channelFuture = mock(ChannelFuture.class);
+ doReturn(channelFuture).when(channel).close();
+ doReturn(channelFuture).when(channelFuture).addListener(any(GenericFutureListener.class));
final ChannelFuture sendFuture = mock(ChannelFuture.class);
doAnswer(new Answer<Object>() {
}
}).when(sendFuture).addListener(any(GenericFutureListener.class));
doReturn(sendFuture).when(channel).writeAndFlush(anyObject());
+ doReturn(true).when(sendFuture).isSuccess();
final NetconfServerSessionListener listener = mock(NetconfServerSessionListener.class);
doNothing().when(listener).onSessionTerminated(any(NetconfServerSession.class), any(NetconfTerminationReason.class));
final NetconfServerSession session =
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
+import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
import java.io.IOException;
import org.opendaylight.controller.config.util.xml.XmlElement;
import org.opendaylight.netconf.api.NetconfExiSession;
@Override
public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
- final ChannelFuture future = channel.writeAndFlush(netconfMessage);
- if (delayedEncoder != null) {
- replaceMessageEncoder(delayedEncoder);
- delayedEncoder = null;
- }
-
- return future;
+ // From: https://github.com/netty/netty/issues/3887
+ // Netty can provide "ordering" in the following situations:
+ // 1. You are doing all writes from the EventLoop thread; OR
+ // 2. You are doing no writes from the EventLoop thread (i.e. all writes are being done in other thread(s)).
+ //
+ // Restconf writes to a netconf mountpoint execute multiple messages
+ // and one of these was executed from a restconf thread thus breaking ordering so
+ // we need to execute all messages from an EventLoop thread.
+ final DefaultChannelPromise proxyFuture = new DefaultChannelPromise(channel);
+ channel.eventLoop().execute(new Runnable() {
+ @Override
+ public void run() {
+ final ChannelFuture future = channel.writeAndFlush(netconfMessage);
+ future.addListener(new FutureListener<Void>() {
+ @Override
+ public void operationComplete(Future<Void> future) throws Exception {
+ if (future.isSuccess()) {
+ proxyFuture.setSuccess();
+ } else {
+ proxyFuture.setFailure(future.cause());
+ }
+ }
+ });
+ if (delayedEncoder != null) {
+ replaceMessageEncoder(delayedEncoder);
+ delayedEncoder = null;
+ }
+ }
+ });
+
+ return proxyFuture;
}
@Override
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoop;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.util.concurrent.GenericFutureListener;
import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.opendaylight.netconf.util.messages.NetconfHelloMessage;
-import org.opendaylight.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.opendaylight.netconf.api.NetconfMessage;
import org.opendaylight.netconf.api.NetconfSession;
import org.opendaylight.netconf.api.NetconfSessionListener;
import org.opendaylight.netconf.api.NetconfTerminationReason;
import org.opendaylight.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
+import org.opendaylight.netconf.util.messages.NetconfHelloMessage;
+import org.opendaylight.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
import org.openexi.proc.common.EXIOptions;
public class AbstractNetconfSessionTest {
private Channel channel;
@Mock
private ChannelPipeline pipeline;
+ @Mock
+ private EventLoop eventLoop;
+ @Mock
+ private ChannelFuture writeFuture;
+
private NetconfHelloMessage clientHello;
@Before
doNothing().when(listener).onSessionDown(any(NetconfSession.class), any(Exception.class));
doNothing().when(listener).onSessionTerminated(any(NetconfSession.class), any(NetconfTerminationReason.class));
- doReturn(mock(ChannelFuture.class)).when(channel).writeAndFlush(any(NetconfMessage.class));
+ doReturn(writeFuture).when(writeFuture).addListener(any(GenericFutureListener.class));
+
+ doReturn(writeFuture).when(channel).writeAndFlush(any(NetconfMessage.class));
doReturn(pipeline).when(channel).pipeline();
doReturn("mockChannel").when(channel).toString();
doReturn(mock(ChannelFuture.class)).when(channel).close();
doReturn(null).when(pipeline).replace(anyString(), anyString(), any(ChannelHandler.class));
+ doReturn(eventLoop).when(channel).eventLoop();
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ final Object[] args = invocation.getArguments();
+ final Runnable runnable = (Runnable) args[0];
+ runnable.run();
+ return null;
+ }
+ }).when(eventLoop).execute(any(Runnable.class));
+
clientHello = NetconfHelloMessage.createClientHello(Collections.<String>emptySet(), Optional.<NetconfHelloMessageAdditionalHeader>absent());
}