this.ctx = ctx;
this.checker = connection.getProposalChecker();
this.sync = new BGPSynchronization(this.listener);
- this.handler = new ProtocolSessionOutboundHandler(this);
+ this.handler = new ProtocolSessionOutboundHandler();
}
@Override
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
- <version>4.0.0.CR9</version>
+ <version>4.0.6.Final</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
private final ProtocolServer server;
+ private ProtocolSession session;
+
public ServerChannelInitializer(final ProtocolServer server) {
this.server = server;
}
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
- final ProtocolSession session = this.server.createSession(DispatcherImpl.this.stateTimer, ch);
- ch.pipeline().addLast(DispatcherImpl.this.handlerFactory.getHandlers(session));
+ final ProtocolHandlerFactory factory = new ProtocolHandlerFactory(DispatcherImpl.this.messageFactory);
+ final ChannelHandler handler = factory.getSessionOutboundHandler();
+ ch.pipeline().addFirst("outbound", handler);
+ ch.pipeline().addFirst("decoder", factory.getDecoder());
+ this.session = this.server.createSession(DispatcherImpl.this.stateTimer, ch);
+
+ ch.pipeline().addAfter("decoder", "inbound", factory.getSessionInboundHandler(this.session));
+ ch.pipeline().addAfter("inbound", "encoder", factory.getEncoder());
+ }
+
+ public ProtocolSession getSession() {
+ return this.session;
}
}
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
+ final ProtocolHandlerFactory factory = new ProtocolHandlerFactory(DispatcherImpl.this.messageFactory);
+ final ChannelHandler handler = factory.getSessionOutboundHandler();
+ ch.pipeline().addFirst("outbound", handler);
+ ch.pipeline().addFirst("decoder", factory.getDecoder());
this.session = this.sfactory.getProtocolSession(DispatcherImpl.this, DispatcherImpl.this.stateTimer, this.connection, 0,
ch.pipeline().context(ProtocolSessionOutboundHandler.class));
- ch.pipeline().addLast(DispatcherImpl.this.handlerFactory.getHandlers(this.session));
+ ch.pipeline().addAfter("decoder", "inbound", factory.getSessionInboundHandler(this.session));
+ ch.pipeline().addAfter("inbound", "encoder", factory.getEncoder());
}
public ProtocolSession getSession() {
*/
private final Timer stateTimer;
- private final ProtocolHandlerFactory handlerFactory;
+ private final ProtocolMessageFactory messageFactory;
public DispatcherImpl(final ProtocolMessageFactory factory) {
this.bossGroup = new NioEventLoopGroup();
this.workerGroup = new NioEventLoopGroup();
this.stateTimer = new Timer();
- this.handlerFactory = new ProtocolHandlerFactory(factory);
+ this.messageFactory = factory;
}
@Override
b.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
- final ChannelFuture f = b.bind(address);
- // b.localAddress(address);
+ b.bind(address);
logger.debug("Server {} created.", server);
return server;
}
this.decoder = new ProtocolMessageDecoder(msgFactory);
}
- public ChannelHandler[] getHandlers(final ProtocolSession session) {
- return new ChannelHandler[] { this.encoder, new ProtocolSessionInboundHandler(session),
- new ProtocolSessionOutboundHandler(session), this.decoder };
+ public ChannelHandler getEncoder() {
+ return this.encoder;
+ }
+
+ public ChannelHandler getDecoder() {
+ return this.decoder;
+ }
+
+ public ChannelHandler getSessionInboundHandler(final ProtocolSession session) {
+ return new ProtocolSessionInboundHandler(session);
+ }
+
+ public ChannelHandler getSessionOutboundHandler() {
+ return new ProtocolSessionOutboundHandler();
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.MessageList;
import io.netty.handler.codec.ByteToMessageDecoder;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
final class ProtocolMessageDecoder extends ByteToMessageDecoder {
+ private final static Logger logger = LoggerFactory.getLogger(ProtocolMessageDecoder.class);
+
private final ProtocolMessageFactory factory;
public ProtocolMessageDecoder(final ProtocolMessageFactory factory) {
}
@Override
- public void decode(final ChannelHandlerContext ctx, final ByteBuf in, final MessageList<Object> out) throws Exception {
+ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception {
+ in.markReaderIndex();
ProtocolMessage msg = null;
try {
- msg = this.factory.parse(in.array());
+ final byte[] bytes = new byte[in.readableBytes()];
+ logger.debug("Received to decode: {}", Arrays.toString(bytes));
+ in.readBytes(bytes);
+ msg = this.factory.parse(bytes);
} catch (DeserializerException | DocumentedException e) {
this.exceptionCaught(ctx, e);
}
+ in.discardReadBytes();
out.add(msg);
}
-
- @Override
- public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
- // TODO:
- ctx.close();
- }
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
+import java.util.Arrays;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
@Sharable
final class ProtocolMessageEncoder extends MessageToByteEncoder<ProtocolMessage> {
+ private final static Logger logger = LoggerFactory.getLogger(ProtocolMessageEncoder.class);
+
private final ProtocolMessageFactory factory;
public ProtocolMessageEncoder(final ProtocolMessageFactory factory) {
@Override
protected void encode(final ChannelHandlerContext ctx, final ProtocolMessage msg, final ByteBuf out) throws Exception {
+ logger.debug("Sent to encode : {}", Arrays.toString(ctx.channel().pipeline().names().toArray()));
out.writeBytes(this.factory.put(msg));
}
}
package org.opendaylight.protocol.framework;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.MessageList;
+import io.netty.channel.SimpleChannelInboundHandler;
-final class ProtocolSessionInboundHandler extends ChannelInboundHandlerAdapter {
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ProtocolSessionInboundHandler extends SimpleChannelInboundHandler<ProtocolMessage> {
+
+ private final static Logger logger = LoggerFactory.getLogger(ProtocolSessionInboundHandler.class);
private final ProtocolSession session;
}
@Override
- public void messageReceived(final ChannelHandlerContext ctx, final MessageList<Object> msgs) {
- final MessageList<ProtocolMessage> pmsgs = msgs.cast();
- for (final ProtocolMessage msg : pmsgs) {
- this.session.handleMessage(msg);
- }
+ public void channelActive(final ChannelHandlerContext ctx) throws Exception {
+ logger.debug("Channel active.");
+ this.session.startSession();
+ }
+
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ logger.debug("Channel inactive.");
+ this.session.endOfInput();
+ }
+
+ @Override
+ protected void channelRead0(final ChannelHandlerContext ctx, final ProtocolMessage msg) throws Exception {
+ logger.debug("Message was received: {}", msg);
+ this.session.handleMessage(msg);
+ }
+
+ public ProtocolSession getSession() {
+ return this.session;
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.MessageList;
public final class ProtocolSessionOutboundHandler extends ChannelOutboundHandlerAdapter {
- private final ProtocolSession session;
-
- public ProtocolSessionOutboundHandler(final ProtocolSession session) {
- this.session = session;
- }
-
- @Override
- public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
- this.session.startSession();
- }
-
- @Override
- public void handlerRemoved(final ChannelHandlerContext ctx) throws Exception {
- this.session.close();
- }
-
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
// TODO:
}
public void writeDown(final ChannelHandlerContext ctx, final ProtocolMessage msg) throws Exception {
- this.write(ctx, MessageList.<Object> newInstance(msg), ctx.newPromise());
+ this.write(ctx, msg, ctx.newPromise());
}
}
public final List<ProtocolMessage> msgs = Lists.newArrayList();
- private final ProtocolMessageFactory pmf = new MessageFactory();
-
- private final SessionParent parent;
-
public boolean up = false;
private final int maxMsgSize;
- public Session(final SessionParent parent, final int maxMsgSize) {
- this.parent = parent;
+ public Session(final int maxMsgSize) {
this.maxMsgSize = maxMsgSize;
}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.protocol.pcep;
-
-import io.netty.channel.ChannelHandlerContext;
-
-import java.util.Timer;
-
-import org.opendaylight.protocol.framework.ProtocolConnection;
-import org.opendaylight.protocol.framework.ProtocolSession;
-import org.opendaylight.protocol.framework.ProtocolSessionFactory;
-import org.opendaylight.protocol.framework.SessionParent;
-
-public interface PCEPSessionFactory extends ProtocolSessionFactory {
- @Override
- public ProtocolSession getProtocolSession(SessionParent parent, Timer timer, ProtocolConnection connection, int sessionId,
- ChannelHandlerContext ctx);
-}
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
import org.opendaylight.protocol.framework.Dispatcher;
import org.opendaylight.protocol.framework.ProtocolServer;
import org.opendaylight.protocol.pcep.PCEPDispatcher;
import org.opendaylight.protocol.pcep.PCEPSession;
import org.opendaylight.protocol.pcep.PCEPSessionProposalFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implementation of PCEPDispatcher.
*/
public class PCEPDispatcherImpl implements PCEPDispatcher {
+
+ private final static Logger logger = LoggerFactory.getLogger(PCEPDispatcherImpl.class);
+
public static final int DEFAULT_MAX_UNKNOWN_MSG = 5;
private int maxUnknownMessages = DEFAULT_MAX_UNKNOWN_MSG;
/**
* Create client is used for mock purposes only.
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
*/
@Override
public PCEPSession createClient(final PCEPConnection connection) throws IOException {
- return (PCEPSession) this.dispatcher.createClient(connection, new PCEPSessionFactoryImpl(this.maxUnknownMessages));
+ PCEPSession session = null;
+ try {
+ session = (PCEPSession) this.dispatcher.createClient(connection, new PCEPSessionFactoryImpl(this.maxUnknownMessages)).get();
+ } catch (InterruptedException | ExecutionException e) {
+ logger.warn("Client not created. Exception {}.", e.getMessage(), e);
+ }
+ return session;
}
@Override
logger.trace("Attempt to parse message from bytes: {}", ByteArray.bytesToHexString(bytes));
- final int type = UnsignedBytes.toInt(bytes[0]);
+ final int type = UnsignedBytes.toInt(bytes[1]);
- final int msgLength = ByteArray.bytesToInt(ByteArray.subByte(bytes, TYPE_SIZE, LENGTH_SIZE));
+ final int msgLength = ByteArray.bytesToInt(ByteArray.subByte(bytes, TYPE_SIZE + 1, LENGTH_SIZE));
- final byte[] msgBody = ByteArray.cutBytes(bytes, TYPE_SIZE + LENGTH_SIZE);
+ final byte[] msgBody = ByteArray.cutBytes(bytes, TYPE_SIZE + 1 + LENGTH_SIZE);
if (msgBody.length != (msgLength - COMMON_HEADER_LENGTH))
throw new DeserializerException("Size don't match size specified in header. Passed: " + msgBody.length + "; Expected: "
import org.opendaylight.protocol.framework.ProtocolConnection;
import org.opendaylight.protocol.framework.ProtocolSession;
+import org.opendaylight.protocol.framework.ProtocolSessionFactory;
import org.opendaylight.protocol.framework.SessionParent;
import org.opendaylight.protocol.pcep.PCEPConnection;
-import org.opendaylight.protocol.pcep.PCEPSessionFactory;
-public class PCEPSessionFactoryImpl implements PCEPSessionFactory {
+public class PCEPSessionFactoryImpl implements ProtocolSessionFactory {
private final int maxUnknownMessages;
import org.opendaylight.protocol.framework.ProtocolMessage;
import org.opendaylight.protocol.framework.ProtocolMessageFactory;
import org.opendaylight.protocol.framework.ProtocolSession;
-import org.opendaylight.protocol.framework.ProtocolSessionOutboundHandler;
import org.opendaylight.protocol.framework.SessionParent;
import org.opendaylight.protocol.pcep.PCEPCloseTermination;
import org.opendaylight.protocol.pcep.PCEPConnection;
private final String peerAddress;
- private final ProtocolSessionOutboundHandler handler;
-
private final ChannelHandlerContext ctx;
PCEPSessionImpl(final SessionParent parent, final Timer timer, final PCEPConnection connection, final PCEPMessageFactory factory,
this.ctx = ctx;
if (this.maxUnknownMessages != 0)
this.maxUnknownMessages = maxUnknownMessages;
- this.handler = new ProtocolSessionOutboundHandler(this);
}
@Override
@Override
public void sendMessage(final PCEPMessage msg) {
try {
- this.handler.writeDown(this.ctx, msg);
+ this.ctx.writeAndFlush(msg);
this.lastMessageSentAt = System.nanoTime();
if (!(msg instanceof PCEPKeepAliveMessage))
logger.debug("Sent message: " + msg);
private static List<PCEPMessage> deserMsg(final String srcFile) throws IOException, DeserializerException, DocumentedException,
PCEPDeserializerException {
final byte[] bytesFromFile = ByteArray.fileToBytes(srcFile);
- final PCEPRawMessage rawMessage = (PCEPRawMessage) new PCEPMessageFactory().parse(ByteArray.cutBytes(bytesFromFile, 1));
+ final PCEPRawMessage rawMessage = (PCEPRawMessage) new PCEPMessageFactory().parse(bytesFromFile);
return PCEPMessageValidator.getValidator(rawMessage.getMsgType()).validate(rawMessage.getAllObjects());
}
final PCEPXRDeleteTunnelMessage dTunnel = new PCEPXRDeleteTunnelMessage(new PCEPLspObject(1, false, true, false, true));
final byte[] bytes = this.msgFactory.put(dTunnel);
- final PCEPRawMessage rawMessage = (PCEPRawMessage) this.msgFactory.parse(ByteArray.cutBytes(bytes, 1));
+ final PCEPRawMessage rawMessage = (PCEPRawMessage) this.msgFactory.parse(bytes);
assertEquals(PCEPMessageValidator.getValidator(rawMessage.getMsgType()).validate(rawMessage.getAllObjects()),
asList((PCEPMessage) dTunnel));
final PCEPXRAddTunnelMessage addTunnel = new PCEPXRAddTunnelMessage(new PCEPLspObject(1, false, false, false, false), new PCEPEndPointsObject<IPv4Address>(IPv4.FAMILY.addressForString("127.0.0.2"), IPv4.FAMILY.addressForString("127.0.0.1")), new PCEPExplicitRouteObject(subs, true));
final byte[] bytes = this.msgFactory.put(addTunnel);
- final PCEPRawMessage rawMessage = (PCEPRawMessage) this.msgFactory.parse(ByteArray.cutBytes(bytes, 1));
+ final PCEPRawMessage rawMessage = (PCEPRawMessage) this.msgFactory.parse(bytes);
assertEquals(PCEPMessageValidator.getValidator(rawMessage.getMsgType()).validate(rawMessage.getAllObjects()),
asList((PCEPMessage) addTunnel));
}
final byte[] bytes = this.msgFactory.put(msg);
// FIXME: need construct with invalid processed parameter
- final PCEPRawMessage rawMessage = (PCEPRawMessage) this.msgFactory.parse(ByteArray.cutBytes(bytes, 1));
+ final PCEPRawMessage rawMessage = (PCEPRawMessage) this.msgFactory.parse(bytes);
assertEquals(PCEPMessageValidator.getValidator(rawMessage.getMsgType()).validate(rawMessage.getAllObjects()),
asList((PCEPMessage) msg));
<artifactId>groovy-all</artifactId>
<version>2.0.2</version>
</dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
</dependencies>
<build>
import java.net.InetSocketAddress;
import org.opendaylight.protocol.framework.DispatcherImpl;
+import org.opendaylight.protocol.framework.ProtocolServer;
import org.opendaylight.protocol.pcep.PCEPConnection;
import org.opendaylight.protocol.pcep.PCEPConnectionFactory;
import org.opendaylight.protocol.pcep.PCEPSessionListener;
} else if (args[i].equalsIgnoreCase("--instant")) {
stateful = true;
instant = true;
- if (Integer.valueOf(args[i + 1]) > 0 && Integer.valueOf(args[i + 1]) < Integer.MAX_VALUE) {
+ if (i == args.length - 1) {
+ timeout = 0;
+ } else if (Integer.valueOf(args[i + 1]) > 0 && Integer.valueOf(args[i + 1]) < Integer.MAX_VALUE) {
timeout = Integer.valueOf(args[i + 1]);
i++;
}
final DispatcherImpl d = new DispatcherImpl(new PCEPMessageFactory());
final PCEPDispatcherImpl dispatcher = new PCEPDispatcherImpl(d, spf);
+ ProtocolServer s = null;
+
try {
- dispatcher.createServer(address, new PCEPConnectionFactory() {
+ s = dispatcher.createServer(address, new PCEPConnectionFactory() {
@Override
public PCEPConnection createProtocolConnection(final InetSocketAddress address) {
final PCEPSessionProposalChecker checker = spcf.getPreferencesChecker(address);
public void setProposal(final PCEPSessionProposalFactory proposals, final InetSocketAddress address, final int sessionId) {
}
});
- // final ProtocolServer s = dispatcher.createServer(address, slf, spf, spcf);
// try {
// Thread.sleep(10000);
import java.io.BufferedReader;
import java.io.File;
-import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Timer;
import java.util.TimerTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.opendaylight.protocol.framework.TerminationReason;
import org.opendaylight.protocol.concepts.IPv4Address;
import org.opendaylight.protocol.concepts.IPv4Prefix;
import org.opendaylight.protocol.concepts.Prefix;
+import org.opendaylight.protocol.framework.TerminationReason;
import org.opendaylight.protocol.pcep.PCEPMessage;
import org.opendaylight.protocol.pcep.PCEPSession;
import org.opendaylight.protocol.pcep.PCEPSessionListener;
import org.opendaylight.protocol.pcep.object.PCEPOpenObject;
import org.opendaylight.protocol.pcep.subobject.EROIPPrefixSubobject;
import org.opendaylight.protocol.pcep.subobject.ExplicitRouteSubobject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestingSessionListener extends PCEPSessionListener {
@Override
public void onMessage(final PCEPSession session, final PCEPMessage message) {
- logger.debug("Received message: " + message);
+ logger.debug("Received message: {}", message);
this.messages.add(message);
// if (!this.replyMessages.isEmpty()) {
@Override
public void onSessionDown(final PCEPSession session, final PCEPCloseObject reason, final Exception e) {
- logger.debug("Session down because: " + reason);
- try {
- session.close();
- } catch (final IOException ex) {
- logger.warn("Session could not be closed.", e);
- }
+ logger.debug("Session down because: {}", reason);
}
@Override
d.createClient(new PCEPConnection() {
@Override
public InetSocketAddress getPeerAddress() {
- return new InetSocketAddress("127.0.0.1", 4189);
+ return new InetSocketAddress("127.0.0.3", 12345);
}
@Override