import org.slf4j.LoggerFactory;
public abstract class AbstractNetconfSession<S extends NetconfSession, L extends NetconfSessionListener<S>> extends AbstractProtocolSession<NetconfMessage> implements NetconfSession, NetconfExiSession {
- private static final Logger logger = LoggerFactory.getLogger(AbstractNetconfSession.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSession.class);
private final L sessionListener;
private final long sessionId;
private boolean up = false;
this.sessionListener = sessionListener;
this.channel = channel;
this.sessionId = sessionId;
- logger.debug("Session {} created", sessionId);
+ LOG.debug("Session {} created", sessionId);
}
protected abstract S thisInstance();
@Override
protected void handleMessage(final NetconfMessage netconfMessage) {
- logger.debug("handling incoming message");
+ LOG.debug("handling incoming message");
sessionListener.onMessage(thisInstance(), netconfMessage);
}
public ChannelFuture sendMessage(final NetconfMessage netconfMessage) {
final ChannelFuture future = channel.writeAndFlush(netconfMessage);
if (delayedEncoder != null) {
- replaceMessageEncoder(delayedEncoder);
- delayedEncoder = null;
+ replaceMessageEncoder(delayedEncoder);
+ delayedEncoder = null;
}
return future;
@Override
protected void endOfInput() {
- logger.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
+ LOG.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
: "initialized");
if (isUp()) {
this.sessionListener.onSessionDown(thisInstance(), new IOException("End of input detected. Close the session."));
@Override
protected void sessionUp() {
- logger.debug("Session {} up", toString());
+ LOG.debug("Session {} up", toString());
sessionListener.onSessionUp(thisInstance());
this.up = true;
}
try {
exiParams = EXIParameters.fromXmlElement(XmlElement.fromDomDocument(startExiMessage.getDocument()));
} catch (final EXIOptionsException e) {
- logger.warn("Unable to parse EXI parameters from {} om session {}", XmlUtil.toString(startExiMessage.getDocument()), this, e);
+ LOG.warn("Unable to parse EXI parameters from {} om session {}", XmlUtil.toString(startExiMessage.getDocument()), this, e);
throw new IllegalArgumentException(e);
}
final NetconfEXICodec exiCodec = new NetconfEXICodec(exiParams.getOptions());
addExiHandlers(exiCodec);
- logger.debug("Session {} EXI handlers added to pipeline", this);
+ LOG.debug("Session {} EXI handlers added to pipeline", this);
}
protected abstract void addExiHandlers(NetconfEXICodec exiCodec);
import org.w3c.dom.NodeList;
public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionPreferences, S extends AbstractNetconfSession<S, L>, L extends NetconfSessionListener<S>>
-extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
+ extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
- private static final Logger logger = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class);
public static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler";
@Override
public void operationComplete(Future<? super Channel> future) {
Preconditions.checkState(future.isSuccess(), "Ssl handshake was not successful");
- logger.debug("Ssl handshake complete");
+ LOG.debug("Ssl handshake complete");
start();
}
});
private void start() {
final NetconfMessage helloMessage = this.sessionPreferences.getHelloMessage();
- logger.debug("Session negotiation started with hello message {} on channel {}", XmlUtil.toString(helloMessage.getDocument()), channel);
+ LOG.debug("Session negotiation started with hello message {} on channel {}", XmlUtil.toString(helloMessage.getDocument()), channel);
channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
synchronized (this) {
if (state != State.ESTABLISHED) {
- logger.debug("Connection timeout after {}, session is in state {}", timeout, state);
+ LOG.debug("Connection timeout after {}, session is in state {}", timeout, state);
// Do not fail negotiation if promise is done or canceled
// It would result in setting result of the promise second time and that throws exception
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()) {
- logger.debug("Channel {} closed: success", future.channel());
+ LOG.debug("Channel {} closed: success", future.channel());
} else {
- logger.warn("Channel {} closed: fail", future.channel());
+ LOG.warn("Channel {} closed: fail", future.channel());
}
}
});
protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message) throws NetconfDocumentedException;
private synchronized void changeState(final State newState) {
- logger.debug("Changing state from : {} to : {} for channel: {}", state, newState, channel);
+ LOG.debug("Changing state from : {} to : {} for channel: {}", state, newState, channel);
Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s for chanel %s", state,
newState, channel);
this.state = newState;
if (state == State.OPEN_WAIT && newState == State.FAILED) {
return true;
}
- logger.debug("Transition from {} to {} is not allowed", state, newState);
+ LOG.debug("Transition from {} to {} is not allowed", state, newState);
return false;
}
private final class ExceptionHandlingInboundChannelHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- logger.warn("An exception occurred during negotiation with {}", channel.remoteAddress(), cause);
+ LOG.warn("An exception occurred during negotiation with {}", channel.remoteAddress(), cause);
cancelTimeout();
negotiationFailed(cause);
changeState(State.FAILED);
package org.opendaylight.controller.netconf.nettyutil.handler;
+import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
-
import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
import org.opendaylight.controller.netconf.util.messages.NetconfMessageHeader;
-import com.google.common.base.Preconditions;
-
public class ChunkedFramingMechanismEncoder extends MessageToByteEncoder<ByteBuf> {
public static final int DEFAULT_CHUNK_SIZE = 8192;
public static final int MIN_CHUNK_SIZE = 128;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
-
import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
public class EOMFramingMechanismEncoder extends MessageToByteEncoder<ByteBuf> {
package org.opendaylight.controller.netconf.nettyutil.handler;
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.MessageToByteEncoder;
import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.buffer.ByteBuf;
-import io.netty.handler.codec.MessageToByteEncoder;
-
public final class FramingMechanismHandlerFactory {
- private static final Logger logger = LoggerFactory.getLogger(FramingMechanismHandlerFactory.class);
+ private static final Logger LOG = LoggerFactory.getLogger(FramingMechanismHandlerFactory.class);
private FramingMechanismHandlerFactory() {
// not called - private constructor for utility class
}
public static MessageToByteEncoder<ByteBuf> createHandler(FramingMechanism framingMechanism) {
- logger.debug("{} framing mechanism was selected.", framingMechanism);
+ LOG.debug("{} framing mechanism was selected.", framingMechanism);
if (framingMechanism == FramingMechanism.EOM) {
return new EOMFramingMechanismEncoder();
} else {
package org.opendaylight.controller.netconf.nettyutil.handler;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class NetconfChunkAggregator extends ByteToMessageDecoder {
- private final static Logger logger = LoggerFactory.getLogger(NetconfChunkAggregator.class);
+ private final static Logger LOG = LoggerFactory.getLogger(NetconfChunkAggregator.class);
private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM = "Got byte {} while waiting for {}";
private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM = "Got byte {} while waiting for {}-{}";
public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024;
private void checkNewLine(byte b,String errorMessage){
if (b != '\n') {
- logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, b, (byte)'\n');
+ LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, b, (byte)'\n');
throw new IllegalStateException(errorMessage);
}
}
private void checkHash(byte b,String errorMessage){
if (b != '#') {
- logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, b, (byte)'#');
+ LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, b, (byte)'#');
throw new IllegalStateException(errorMessage);
}
}
private void checkChunkSize(){
if (chunkSize > maxChunkSize) {
- logger.debug("Parsed chunk size {}, maximum allowed is {}", chunkSize, maxChunkSize);
+ LOG.debug("Parsed chunk size {}, maximum allowed is {}", chunkSize, maxChunkSize);
throw new IllegalStateException("Maximum chunk size exceeded");
}
}
if (b < '0' || b > '9') {
- logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'0', (byte)'9');
+ LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'0', (byte)'9');
throw new IllegalStateException("Invalid chunk size encountered");
}
* comes through.
*/
if (in.readableBytes() < chunkSize) {
- logger.debug("Buffer has {} bytes, need {} to complete chunk", in.readableBytes(), chunkSize);
+ LOG.debug("Buffer has {} bytes, need {} to complete chunk", in.readableBytes(), chunkSize);
in.discardReadBytes();
return;
}
} else if (b == '#') {
state = State.FOOTER_FOUR;
} else {
- logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte) '#', (byte) '1', (byte) '9');
+ LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte) '#', (byte) '1', (byte) '9');
throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
}
}
private static int processHeaderLengthFirst(byte b) {
if (!isHeaderLengthFirst(b)) {
- logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'1', (byte)'9');
+ LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'1', (byte)'9');
throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
}
*/
package org.opendaylight.controller.netconf.nettyutil.handler;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
import java.io.InputStream;
import java.util.List;
-
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMResult;
import javax.xml.transform.sax.SAXTransformerFactory;
import javax.xml.transform.sax.TransformerHandler;
-
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.openexi.sax.EXIReader;
import org.slf4j.Logger;
import org.w3c.dom.Document;
import org.xml.sax.InputSource;
-import com.google.common.base.Preconditions;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
public final class NetconfEXIToMessageDecoder extends ByteToMessageDecoder {
private static final Logger LOG = LoggerFactory.getLogger(NetconfEXIToMessageDecoder.class);
final EXIReader r = codec.getReader();
final SAXTransformerFactory transformerFactory
- = (SAXTransformerFactory) TransformerFactory.newInstance();
+ = (SAXTransformerFactory) TransformerFactory.newInstance();
final TransformerHandler handler = transformerFactory.newTransformerHandler();
r.setContentHandler(handler);
*/
package org.opendaylight.controller.netconf.nettyutil.handler;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
-
import java.io.IOException;
-
import javax.xml.transform.TransformerException;
-
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
/**
* Customized NetconfMessageToXMLEncoder that serializes additional header with
* session metadata along with
*/
package org.opendaylight.controller.netconf.nettyutil.handler;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
-
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
-
import javax.xml.transform.OutputKeys;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
-
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Comment;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-
public class NetconfMessageToXMLEncoder extends MessageToByteEncoder<NetconfMessage> {
private static final Logger LOG = LoggerFactory.getLogger(NetconfMessageToXMLEncoder.class);
private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
*/
package org.opendaylight.controller.netconf.nettyutil.handler;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
-
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
-
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
import org.xml.sax.SAXException;
/**
*/
package org.opendaylight.controller.netconf.nettyutil.handler.exi;
+import com.google.common.base.Preconditions;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.openexi.proc.common.AlignmentType;
import org.openexi.proc.common.EXIOptions;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
-import com.google.common.base.Preconditions;
-
public final class EXIParameters {
private static final String EXI_PARAMETER_ALIGNMENT = "alignment";
static final String EXI_PARAMETER_BYTE_ALIGNED = "byte-aligned";
package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
import java.io.IOException;
import java.net.SocketAddress;
-
import org.apache.sshd.ClientChannel;
import org.apache.sshd.ClientSession;
import org.apache.sshd.SshClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
-
/**
* Netty SSH handler class. Acts as interface between Netty and SSH library.
*/
public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
- private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandler.class);
public static final String SUBSYSTEM = "netconf";
public static final SshClient DEFAULT_CLIENT = SshClient.setUpDefaultClient();
}
private void startSsh(final ChannelHandlerContext ctx, final SocketAddress address) {
- logger.debug("Starting SSH to {} on channel: {}", address, ctx.channel());
+ LOG.debug("Starting SSH to {} on channel: {}", address, ctx.channel());
final ConnectFuture sshConnectionFuture = sshClient.connect(authenticationHandler.getUsername(), address);
sshConnectionFuture.addListener(new SshFutureListener<ConnectFuture>() {
private synchronized void handleSshSessionCreated(final ConnectFuture future, final ChannelHandlerContext ctx) {
try {
- logger.trace("SSH session created on channel: {}", ctx.channel());
+ LOG.trace("SSH session created on channel: {}", ctx.channel());
session = future.getSession();
final AuthFuture authenticateFuture = authenticationHandler.authenticate(session);
private synchronized void handleSshAuthenticated(final ClientSession session, final ChannelHandlerContext ctx) {
try {
- logger.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(), session.getServerVersion());
+ LOG.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(), session.getServerVersion());
channel = session.createSubsystemChannel(SUBSYSTEM);
channel.setStreaming(ClientChannel.Streaming.Async);
}
private synchronized void handleSshChanelOpened(final ChannelHandlerContext ctx) {
- logger.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel());
+ LOG.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel());
connectPromise.setSuccess();
connectPromise = null;
}
private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
- logger.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), e);
+ LOG.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), e);
connectPromise.setFailure(e);
connectPromise = null;
throw new IllegalStateException("Unable to setup SSH connection on channel: " + ctx.channel(), e);
channel = null;
promise.setSuccess();
- logger.debug("SSH session closed on channel: {}", ctx.channel());
+ LOG.debug("SSH session closed on channel: {}", ctx.channel());
ctx.fireChannelInactive();
}
*/
public final class AsyncSshHandlerReader implements SshFutureListener<IoReadFuture>, AutoCloseable {
- private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandlerReader.class);
private static final int BUFFER_SIZE = 8192;
if(future.getException() != null) {
if(asyncOut.isClosed() || asyncOut.isClosing()) {
// Ssh dropped
- logger.debug("Ssh session dropped on channel: {}", channelId, future.getException());
+ LOG.debug("Ssh session dropped on channel: {}", channelId, future.getException());
} else {
- logger.warn("Exception while reading from SSH remote on channel {}", channelId, future.getException());
+ LOG.warn("Exception while reading from SSH remote on channel {}", channelId, future.getException());
}
invokeDisconnect();
return;
if (future.getRead() > 0) {
final ByteBuf msg = Unpooled.wrappedBuffer(buf.array(), 0, future.getRead());
- if(logger.isTraceEnabled()) {
- logger.trace("Reading message on channel: {}, message: {}", channelId, AsyncSshHandlerWriter.byteBufToString(msg));
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("Reading message on channel: {}, message: {}", channelId, AsyncSshHandlerWriter.byteBufToString(msg));
}
readHandler.onMessageRead(msg);
*/
public final class AsyncSshHandlerWriter implements AutoCloseable {
- private static final Logger logger = LoggerFactory
+ private static final Logger LOG = LoggerFactory
.getLogger(AsyncSshHandlerWriter.class);
// public static final int MAX_PENDING_WRITES = 1000;
private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg) {
try {
- if (logger.isTraceEnabled()) {
- logger.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
}
asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener<IoWriteFuture>() {
- @Override
- public void operationComplete(final IoWriteFuture future) {
- if (logger.isTraceEnabled()) {
- logger.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
+ @Override
+ public void operationComplete(final IoWriteFuture future) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
- }
-
- // Notify success or failure
- if (future.isWritten()) {
- promise.setSuccess();
- } else {
- logger.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
- promise.setFailure(future.getException());
- }
-
- // Not needed anymore, release
- byteBufMsg.release();
-
- // Check pending queue and schedule next
- // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
- writePendingIfAny();
- }
- });
+ }
+
+ // Notify success or failure
+ if (future.isWritten()) {
+ promise.setSuccess();
+ } else {
+ LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
+ promise.setFailure(future.getException());
+ }
+
+ // Not needed anymore, release
+ byteBufMsg.release();
+
+ // Check pending queue and schedule next
+ // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
+ writePendingIfAny();
+ }
+ });
} catch (final WritePendingException e) {
queueRequest(ctx, byteBufMsg, promise);
}
// In case of pending, reschedule next message from queue
final PendingWriteRequest pendingWrite = pending.poll();
final ByteBuf msg = pendingWrite.msg;
- if (logger.isTraceEnabled()) {
- logger.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
}
writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg);
private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
// try {
- logger.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
- if (logger.isTraceEnabled()) {
- logger.trace("Queueing request due to pending: {}", byteBufToString(msg));
+ LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Queueing request due to pending: {}", byteBufToString(msg));
}
new PendingWriteRequest(ctx, msg, promise).pend(pending);
// } catch (final Exception ex) {
-// logger.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg));
+// LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg));
// msg.release();
// promise.setFailure(ex);
// }
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
fullOptions.setPreservePIs(true);
return Arrays.asList(new Object[][]{
- {noChangeXml, new EXIOptions()},
- {fullOptionsXml, fullOptions},
+ {noChangeXml, new EXIOptions()},
+ {fullOptionsXml, fullOptions},
});
}
fullOptions.setPreservePIs(true);
return Arrays.asList(new Object[][]{
- {noChangeXml, new EXIOptions()},
- {fullOptionsXml, fullOptions},
+ {noChangeXml, new EXIOptions()},
+ {fullOptionsXml, fullOptions},
});
}
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
+
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;