import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
* traffic between the echo client and server by sending the first message to
* the server.
*/
-public class EchoClientHandler extends ChannelInboundHandlerAdapter {
+public class EchoClientHandler extends ChannelInboundHandlerAdapter implements ChannelFutureListener {
private static final Logger logger = LoggerFactory.getLogger(EchoClientHandler.class);
private ChannelHandlerContext ctx;
+ private final StringBuilder fromServer = new StringBuilder();
+
+ public static enum State {CONNECTING, CONNECTED, FAILED_TO_CONNECT, CONNECTION_CLOSED}
+
+
+ private State state = State.CONNECTING;
@Override
- public void channelActive(ChannelHandlerContext ctx) {
+ public synchronized void channelActive(ChannelHandlerContext ctx) {
checkState(this.ctx == null);
- logger.info("client active");
+ logger.info("channelActive");
this.ctx = ctx;
+ state = State.CONNECTED;
}
@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf bb = (ByteBuf) msg;
- logger.info(">{}", bb.toString(Charsets.UTF_8));
- bb.release();
+ public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ state = State.CONNECTION_CLOSED;
}
@Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ ByteBuf bb = (ByteBuf) msg;
+ String string = bb.toString(Charsets.UTF_8);
+ fromServer.append(string);
+ logger.info(">{}", string);
+ bb.release();
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ public synchronized void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
logger.warn("Unexpected exception from downstream.", cause);
checkState(this.ctx.equals(ctx));
this.ctx = null;
}
- public void write(String message) {
+ public synchronized void write(String message) {
ByteBuf byteBuf = Unpooled.copiedBuffer(message.getBytes());
ctx.writeAndFlush(byteBuf);
}
+
+ public synchronized boolean isConnected() {
+ return state == State.CONNECTED;
+ }
+
+ public synchronized String read() {
+ return fromServer.toString();
+ }
+
+ @Override
+ public synchronized void operationComplete(ChannelFuture future) throws Exception {
+ checkState(state == State.CONNECTING);
+ if (future.isSuccess()) {
+ logger.trace("Successfully connected, state will be switched in channelActive");
+ } else {
+ state = State.FAILED_TO_CONNECT;
+ }
+ }
+
+ public State getState() {
+ return state;
+ }
}