--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.parser;
+
+import org.opendaylight.protocol.framework.ProtocolMessageFactory;
+
+/**
+ * Interface to expose BGP specific MessageFactory.
+ */
+public interface BGPMessageFactory extends ProtocolMessageFactory<BGPMessage> {
+
+}
import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
import org.opendaylight.protocol.bgp.parser.BGPError;
import org.opendaylight.protocol.bgp.parser.BGPMessage;
+import org.opendaylight.protocol.bgp.parser.BGPMessageFactory;
import org.opendaylight.protocol.bgp.parser.impl.message.BGPNotificationMessageParser;
import org.opendaylight.protocol.bgp.parser.impl.message.BGPOpenMessageParser;
import org.opendaylight.protocol.bgp.parser.impl.message.BGPUpdateMessageParser;
import org.opendaylight.protocol.bgp.parser.message.BGPOpenMessage;
import org.opendaylight.protocol.framework.DeserializerException;
import org.opendaylight.protocol.framework.DocumentedException;
-import org.opendaylight.protocol.framework.ProtocolMessageFactory;
import org.opendaylight.protocol.util.ByteArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The byte array
*/
-public class BGPMessageFactory implements ProtocolMessageFactory<BGPMessage> {
+public final class BGPMessageFactoryImpl implements BGPMessageFactory {
- private final static Logger logger = LoggerFactory.getLogger(BGPMessageFactory.class);
+ private final static Logger logger = LoggerFactory.getLogger(BGPMessageFactoryImpl.class);
final static int LENGTH_FIELD_LENGTH = 2; // bytes
public final static int COMMON_HEADER_LENGTH = LENGTH_FIELD_LENGTH + TYPE_FIELD_LENGTH + MARKER_LENGTH;
- public BGPMessageFactory() {
- }
-
/*
* (non-Javadoc)
* @see org.opendaylight.protocol.bgp.parser.BGPMessageParser#parse(byte[])
import org.opendaylight.protocol.bgp.parser.BGPParsingException;
import org.opendaylight.protocol.bgp.parser.BGPUpdateEvent;
import org.opendaylight.protocol.bgp.parser.BGPUpdateSynchronized;
-import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory;
+import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl;
import org.opendaylight.protocol.bgp.parser.impl.BGPUpdateEventBuilder;
import org.opendaylight.protocol.bgp.parser.impl.IPv6MP;
import org.opendaylight.protocol.bgp.parser.impl.PathAttribute;
byteOffset += TOTAL_PATH_ATTR_LENGTH_SIZE;
eventBuilder.setTotalPathAttrLength(totalPathAttrLength);
- if (withdrawnRoutesLength + totalPathAttrLength + BGPMessageFactory.COMMON_HEADER_LENGTH > msgLength)
+ if (withdrawnRoutesLength + totalPathAttrLength + BGPMessageFactoryImpl.COMMON_HEADER_LENGTH > msgLength)
throw new BGPDocumentedException("Message length inconsistent with withdrawn router length.", BGPError.MALFORMED_ATTR_LIST);
if (withdrawnRoutesLength == 0 && totalPathAttrLength == 0) {
@Test
public void testGetUpdateMessage1() throws Exception {
- final byte[] body = ByteArray.cutBytes(inputBytes.get(0), BGPMessageFactory.COMMON_HEADER_LENGTH);
- final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(0), BGPMessageFactory.MARKER_LENGTH,
- BGPMessageFactory.LENGTH_FIELD_LENGTH));
+ final byte[] body = ByteArray.cutBytes(inputBytes.get(0), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+ final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(0), BGPMessageFactoryImpl.MARKER_LENGTH,
+ BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
assertTrue(ret instanceof BGPUpdateMessage);
*/
@Test
public void testGetUpdateMessage2() throws Exception {
- final byte[] body = ByteArray.cutBytes(inputBytes.get(1), BGPMessageFactory.COMMON_HEADER_LENGTH);
- final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(1), BGPMessageFactory.MARKER_LENGTH,
- BGPMessageFactory.LENGTH_FIELD_LENGTH));
+ final byte[] body = ByteArray.cutBytes(inputBytes.get(1), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+ final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(1), BGPMessageFactoryImpl.MARKER_LENGTH,
+ BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
assertTrue(ret instanceof BGPUpdateMessage);
*/
@Test
public void testGetUpdateMessage3() throws Exception {
- final byte[] body = ByteArray.cutBytes(inputBytes.get(2), BGPMessageFactory.COMMON_HEADER_LENGTH);
- final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(2), BGPMessageFactory.MARKER_LENGTH,
- BGPMessageFactory.LENGTH_FIELD_LENGTH));
+ final byte[] body = ByteArray.cutBytes(inputBytes.get(2), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+ final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(2), BGPMessageFactoryImpl.MARKER_LENGTH,
+ BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
assertTrue(ret instanceof BGPUpdateMessage);
final BGPUpdateMessage message = (BGPUpdateMessage) ret;
*/
@Test
public void testGetUpdateMessage4() throws Exception {
- final byte[] body = ByteArray.cutBytes(inputBytes.get(3), BGPMessageFactory.COMMON_HEADER_LENGTH);
- final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(3), BGPMessageFactory.MARKER_LENGTH,
- BGPMessageFactory.LENGTH_FIELD_LENGTH));
+ final byte[] body = ByteArray.cutBytes(inputBytes.get(3), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+ final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(3), BGPMessageFactoryImpl.MARKER_LENGTH,
+ BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
assertTrue(ret instanceof BGPUpdateMessage);
*/
@Test
public void testGetUpdateMessage5() throws Exception {
- final byte[] body = ByteArray.cutBytes(inputBytes.get(4), BGPMessageFactory.COMMON_HEADER_LENGTH);
- final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(4), BGPMessageFactory.MARKER_LENGTH,
- BGPMessageFactory.LENGTH_FIELD_LENGTH));
+ final byte[] body = ByteArray.cutBytes(inputBytes.get(4), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+ final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(4), BGPMessageFactoryImpl.MARKER_LENGTH,
+ BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
assertTrue(ret instanceof BGPUpdateMessage);
*/
@Test
public void testEORIpv4() throws Exception {
- final byte[] body = ByteArray.cutBytes(inputBytes.get(5), BGPMessageFactory.COMMON_HEADER_LENGTH);
- final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(5), BGPMessageFactory.MARKER_LENGTH,
- BGPMessageFactory.LENGTH_FIELD_LENGTH));
+ final byte[] body = ByteArray.cutBytes(inputBytes.get(5), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+ final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(5), BGPMessageFactoryImpl.MARKER_LENGTH,
+ BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
assertTrue(ret instanceof BGPUpdateSynchronized);
*/
@Test
public void testEORIpv6() throws Exception {
- final byte[] body = ByteArray.cutBytes(inputBytes.get(6), BGPMessageFactory.COMMON_HEADER_LENGTH);
- final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(6), BGPMessageFactory.MARKER_LENGTH,
- BGPMessageFactory.LENGTH_FIELD_LENGTH));
+ final byte[] body = ByteArray.cutBytes(inputBytes.get(6), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+ final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(6), BGPMessageFactoryImpl.MARKER_LENGTH,
+ BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
assertTrue(ret instanceof BGPUpdateSynchronized);
*/
@Test
public void testEORLS() throws Exception {
- final byte[] body = ByteArray.cutBytes(inputBytes.get(7), BGPMessageFactory.COMMON_HEADER_LENGTH);
- final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(7), BGPMessageFactory.MARKER_LENGTH,
- BGPMessageFactory.LENGTH_FIELD_LENGTH));
+ final byte[] body = ByteArray.cutBytes(inputBytes.get(7), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+ final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(7), BGPMessageFactoryImpl.MARKER_LENGTH,
+ BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
assertTrue(ret instanceof BGPUpdateSynchronized);
*/
@Test
public void testBGPLink() throws Exception {
- final byte[] body = ByteArray.cutBytes(inputBytes.get(8), BGPMessageFactory.COMMON_HEADER_LENGTH);
- final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(8), BGPMessageFactory.MARKER_LENGTH,
- BGPMessageFactory.LENGTH_FIELD_LENGTH));
+ final byte[] body = ByteArray.cutBytes(inputBytes.get(8), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+ final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(8), BGPMessageFactoryImpl.MARKER_LENGTH,
+ BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
assertTrue(ret instanceof BGPUpdateMessage);
*/
@Test
public void testBGPNode() throws Exception {
- final byte[] body = ByteArray.cutBytes(inputBytes.get(9), BGPMessageFactory.COMMON_HEADER_LENGTH);
- final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(9), BGPMessageFactory.MARKER_LENGTH,
- BGPMessageFactory.LENGTH_FIELD_LENGTH));
+ final byte[] body = ByteArray.cutBytes(inputBytes.get(9), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+ final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(9), BGPMessageFactoryImpl.MARKER_LENGTH,
+ BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
assertTrue(ret instanceof BGPUpdateMessage);
*/
@Test
public void testOpenMessage() throws Exception {
- final BGPMessageFactory msgFactory = new BGPMessageFactory();
+ final BGPMessageFactoryImpl msgFactory = new BGPMessageFactoryImpl();
final BGPOpenMessage open = (BGPOpenMessage) msgFactory.parse(inputBytes.get(13)).get(0);
final Set<BGPTableType> types = Sets.newHashSet();
for (final BGPParameter param : open.getOptParams()) {
public void testNodeParsing() throws Exception {
final List<byte[]> result = HexDumpBGPFileParser.parseMessages(new File(this.getClass().getResource("/bgp-update-nodes.txt").getFile()));
assertEquals(1, result.size());
- final byte[] body = ByteArray.cutBytes(result.get(0), BGPMessageFactory.COMMON_HEADER_LENGTH);
- final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(result.get(0), BGPMessageFactory.MARKER_LENGTH,
- BGPMessageFactory.LENGTH_FIELD_LENGTH));
+ final byte[] body = ByteArray.cutBytes(result.get(0), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+ final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(result.get(0), BGPMessageFactoryImpl.MARKER_LENGTH,
+ BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
final BGPUpdateEvent event = BGPUpdateMessageParser.parse(body, messageLength);
final BGPUpdateMessage updateMessage = (BGPUpdateMessage) event;
final Set<BGPObject> addedObjects = updateMessage.getAddedObjects();
@Test
public void testBGPHeaderParser() throws IOException {
- final BGPMessageFactory h = new BGPMessageFactory();
+ final BGPMessageFactoryImpl h = new BGPMessageFactoryImpl();
try {
h.parse(new byte[] { (byte) 0, (byte) 0 });
fail("Exception should have occured.");
@Test
public void testMessageParser() throws IOException {
- final BGPMessageFactory parser = new BGPMessageFactory();
+ final BGPMessageFactoryImpl parser = new BGPMessageFactoryImpl();
String ex = "";
try {
parser.put(null);
*/
package org.opendaylight.protocol.bgp.rib.impl;
+import io.netty.channel.socket.SocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
-import org.opendaylight.protocol.bgp.parser.BGPMessage;
+import org.opendaylight.protocol.bgp.parser.BGPMessageFactory;
import org.opendaylight.protocol.bgp.parser.BGPSession;
import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
import org.opendaylight.protocol.framework.AbstractDispatcher;
-import org.opendaylight.protocol.framework.ProtocolMessageFactory;
import org.opendaylight.protocol.framework.ReconnectStrategy;
-
-import com.google.common.base.Preconditions;
+import org.opendaylight.protocol.framework.SessionListenerFactory;
/**
* Implementation of BGPDispatcher.
*/
-public final class BGPDispatcherImpl extends AbstractDispatcher implements BGPDispatcher {
+public final class BGPDispatcherImpl extends AbstractDispatcher<BGPSessionImpl, BGPSessionListener> implements BGPDispatcher {
private final Timer timer = new HashedWheelTimer();
- private final ProtocolMessageFactory<BGPMessage> parser;
- public BGPDispatcherImpl(final ProtocolMessageFactory<BGPMessage> parser) {
+ private BGPSessionNegotiatorFactory snf;
+
+ private final BGPHandlerFactory hf;
+
+ public BGPDispatcherImpl(final BGPMessageFactory parser) {
super();
- this.parser = Preconditions.checkNotNull(parser);
+ this.hf = new BGPHandlerFactory(parser);
}
@Override
public Future<? extends BGPSession> createClient(final InetSocketAddress address, final BGPSessionPreferences preferences,
final BGPSessionListener listener, final ReconnectStrategy strategy) {
- return createClient(address, listener, new BGPSessionNegotiatorFactory(timer, preferences), parser, strategy);
+ this.snf = new BGPSessionNegotiatorFactory(this.timer, preferences);
+ final SessionListenerFactory<BGPSessionListener> slf = new SessionListenerFactory<BGPSessionListener>() {
+
+ @Override
+ public BGPSessionListener getSessionListener() {
+ return listener;
+ }
+ };
+ return super.createClient(address, strategy, slf);
+ }
+
+ @Override
+ public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise,
+ final SessionListenerFactory<BGPSessionListener> slf) {
+ ch.pipeline().addLast(this.hf.getDecoders());
+ ch.pipeline().addLast("negotiator", this.snf.getSessionNegotiator(slf, ch, promise));
+ ch.pipeline().addLast(this.hf.getEncoders());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.rib.impl;
+
+import io.netty.channel.ChannelHandler;
+
+import org.opendaylight.protocol.bgp.parser.BGPMessage;
+import org.opendaylight.protocol.bgp.parser.BGPMessageFactory;
+import org.opendaylight.protocol.framework.ProtocolHandlerFactory;
+import org.opendaylight.protocol.framework.ProtocolMessageDecoder;
+import org.opendaylight.protocol.framework.ProtocolMessageEncoder;
+
+/**
+ * BGP specific factory for protocol inbound/outbound handlers.
+ */
+public class BGPHandlerFactory extends ProtocolHandlerFactory<BGPMessage> {
+ private final ProtocolMessageEncoder<BGPMessage> encoder;
+
+ public BGPHandlerFactory(final BGPMessageFactory msgFactory) {
+ super(msgFactory);
+ this.encoder = new ProtocolMessageEncoder<BGPMessage>(this.msgFactory);
+ }
+
+ @Override
+ public ChannelHandler[] getEncoders() {
+ return new ChannelHandler[] { this.encoder };
+ }
+
+ @Override
+ public ChannelHandler[] getDecoders() {
+ return new ChannelHandler[] { new BGPMessageHeaderDecoder(), new ProtocolMessageDecoder<BGPMessage>(this.msgFactory) };
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.rib.impl;
+
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+/**
+ * @see <a href="http://tools.ietf.org/html/rfc4271#section-4.1">BGP Message Header</a>
+ */
+public final class BGPMessageHeaderDecoder extends LengthFieldBasedFrameDecoder {
+
+ private static final int MAX_FRAME_SIZE = 4096; // min 19, max 4096
+
+ private static final int MARKER_SIZE = 16;
+
+ private static final int LENGTH_SIZE = 2; // the length field represents the length of the whole message including
+ // the header
+
+ /*
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | |
+ + +
+ | |
+ + +
+ | Marker |
+ + +
+ | |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Length | Type |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+ */
+
+ public BGPMessageHeaderDecoder() {
+ super(MAX_FRAME_SIZE, MARKER_SIZE, LENGTH_SIZE, -MARKER_SIZE - LENGTH_SIZE, 0);
+ }
+}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Objects.ToStringHelper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
-class BGPSessionImpl extends AbstractProtocolSession<BGPMessage> implements BGPSession {
+@VisibleForTesting
+public class BGPSessionImpl extends AbstractProtocolSession<BGPMessage> implements BGPSession {
private static final Logger logger = LoggerFactory.getLogger(BGPSessionImpl.class);
private final Set<BGPTableType> tableTypes;
- BGPSessionImpl(final Timer timer, final BGPSessionListener listener, final Channel channel, final short keepAlive, final BGPOpenMessage remoteOpen) {
+ BGPSessionImpl(final Timer timer, final BGPSessionListener listener, final Channel channel, final short keepAlive,
+ final BGPOpenMessage remoteOpen) {
this.listener = Preconditions.checkNotNull(listener);
this.stateTimer = Preconditions.checkNotNull(timer);
this.channel = Preconditions.checkNotNull(channel);
@Override
public synchronized void close() {
logger.debug("Closing session: {}", this);
- if (!closed) {
+ if (!this.closed) {
this.sendMessage(new BGPNotificationMessage(BGPError.CEASE));
- channel.close();
- closed = true;
+ this.channel.close();
+ this.closed = true;
}
}
@Override
public synchronized void endOfInput() {
- if (!closed) {
+ if (!this.closed) {
this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
}
}
private synchronized void closeWithoutMessage() {
logger.debug("Closing session: {}", this);
- channel.close();
- closed = true;
+ this.channel.close();
+ this.closed = true;
}
/**
final long nextHold = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(HOLD_TIMER_VALUE);
- if (!closed) {
+ if (!this.closed) {
if (ct >= nextHold) {
logger.debug("HoldTimer expired. " + new Date());
this.terminate(BGPError.HOLD_TIMER_EXPIRED);
long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive);
- if (!closed) {
+ if (!this.closed) {
if (ct >= nextKeepalive) {
this.sendMessage(new BGPKeepAliveMessage());
nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive);
}
protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
- toStringHelper.add("channel", channel);
- toStringHelper.add("closed", closed);
+ toStringHelper.add("channel", this.channel);
+ toStringHelper.add("closed", this.closed);
return toStringHelper;
}
@Override
protected void sessionUp() {
- listener.onSessionUp(this);
+ this.listener.onSessionUp(this);
}
}
import com.google.common.base.Preconditions;
-final class BGPSessionNegotiator extends AbstractSessionNegotiator<BGPMessage, BGPSessionImpl> {
+public final class BGPSessionNegotiator extends AbstractSessionNegotiator<BGPMessage, BGPSessionImpl> {
// 4 minutes recommended in http://tools.ietf.org/html/rfc4271#section-8.2.2
private static final int INITIAL_HOLDTIMER = 4;
*/
Idle,
/**
- * We have sent our Open message, and are waiting for the peer's Open
- * message.
+ * We have sent our Open message, and are waiting for the peer's Open message.
*/
OpenSent,
/**
- * We have received the peer's Open message, which is acceptable, and
- * we're waiting the acknowledgement of our Open message.
+ * We have received the peer's Open message, which is acceptable, and we're waiting the acknowledgement of our
+ * Open message.
*/
OpenConfirm,
/**
private State state = State.Idle;
private final short keepAlive = 15;
- BGPSessionNegotiator(final Timer timer, final Promise<BGPSessionImpl> promise, final Channel channel,
+ public BGPSessionNegotiator(final Timer timer, final Promise<BGPSessionImpl> promise, final Channel channel,
final BGPSessionPreferences initialPrefs, final BGPSessionListener listener) {
super(promise, channel);
this.listener = Preconditions.checkNotNull(listener);
@Override
protected void startNegotiation() {
- Preconditions.checkState(state == State.Idle);
- channel.writeAndFlush(new BGPOpenMessage(localPref.getMyAs(), (short) localPref.getHoldTime(), localPref.getBgpId(), localPref.getParams()));
- state = State.OpenSent;
+ Preconditions.checkState(this.state == State.Idle);
+ this.channel.writeAndFlush(new BGPOpenMessage(this.localPref.getMyAs(), (short) this.localPref.getHoldTime(), this.localPref.getBgpId(), this.localPref.getParams()));
+ this.state = State.OpenSent;
final Object lock = this;
- timer.newTimeout(new TimerTask() {
+ this.timer.newTimeout(new TimerTask() {
@Override
public void run(final Timeout timeout) throws Exception {
synchronized (lock) {
- if (state != State.Finished) {
+ if (BGPSessionNegotiator.this.state != State.Finished) {
negotiationFailed(new BGPDocumentedException("HoldTimer expired", BGPError.FSM_ERROR));
- state = State.Finished;
+ BGPSessionNegotiator.this.state = State.Finished;
}
}
}
@Override
protected synchronized void handleMessage(final BGPMessage msg) {
- logger.debug("Channel {} handling message in state {}", channel, state);
+ logger.debug("Channel {} handling message in state {}", this.channel, this.state);
- switch (state) {
+ switch (this.state) {
case Finished:
case Idle:
- throw new IllegalStateException("Unexpected state " + state);
+ throw new IllegalStateException("Unexpected state " + this.state);
case OpenConfirm:
if (msg instanceof BGPKeepAliveMessage) {
final BGPKeepAliveMessage ka = (BGPKeepAliveMessage) msg;
// FIXME: we miss some stuff over here
- negotiationSuccessful(new BGPSessionImpl(timer, listener, channel, keepAlive, remotePref));
- state = State.Finished;
+ negotiationSuccessful(new BGPSessionImpl(this.timer, this.listener, this.channel, this.keepAlive, this.remotePref));
+ this.state = State.Finished;
return;
} else if (msg instanceof BGPNotificationMessage) {
final BGPNotificationMessage ntf = (BGPNotificationMessage) msg;
negotiationFailed(new BGPDocumentedException("Peer refusal", ntf.getError()));
- state = State.Finished;
+ this.state = State.Finished;
return;
}
// TODO: validate the open message
- remotePref = open;
- channel.writeAndFlush(new BGPKeepAliveMessage());
- state = State.OpenConfirm;
- logger.debug("Channel {} moved to OpenConfirm state with remote proposal {}", channel, remotePref);
+ this.remotePref = open;
+ this.channel.writeAndFlush(new BGPKeepAliveMessage());
+ this.state = State.OpenConfirm;
+ logger.debug("Channel {} moved to OpenConfirm state with remote proposal {}", this.channel, this.remotePref);
return;
}
break;
}
// Catch-all for unexpected message
- logger.warn("Channel {} state {} unexpected message {}", channel, state, msg);
- channel.writeAndFlush(new BGPNotificationMessage(BGPError.FSM_ERROR));
+ logger.warn("Channel {} state {} unexpected message {}", this.channel, this.state, msg);
+ this.channel.writeAndFlush(new BGPNotificationMessage(BGPError.FSM_ERROR));
negotiationFailed(new BGPDocumentedException("Unexpected message", BGPError.FSM_ERROR));
- state = State.Finished;
+ this.state = State.Finished;
}
}
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.protocol.bgp.parser.BGPMessageFactory;
import org.opendaylight.protocol.bgp.parser.BGPParameter;
import org.opendaylight.protocol.bgp.parser.BGPSession;
import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
-import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory;
import org.opendaylight.protocol.bgp.rib.impl.BGPImpl.BGPListenerRegistration;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
doReturn("").when(this.parser).toString();
doReturn(null).when(this.future).get();
- doReturn(future).when(this.disp).createClient(any(InetSocketAddress.class), any(BGPSessionPreferences.class),
+ doReturn(this.future).when(this.disp).createClient(any(InetSocketAddress.class), any(BGPSessionPreferences.class),
any(BGPSessionListener.class), any(ReconnectStrategy.class));
}
public void testBgpImpl() throws Exception {
doReturn(new BGPSessionPreferences(null, 0, null, Collections.<BGPParameter> emptyList())).when(this.prop).getProposal();
this.bgp = new BGPImpl(this.disp, new InetSocketAddress(InetAddress.getLoopbackAddress(), 2000), this.prop);
- final BGPListenerRegistration reg = this.bgp.registerUpdateListener(new SimpleSessionListener(), new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
+ final BGPListenerRegistration reg = this.bgp.registerUpdateListener(new SimpleSessionListener(),
+ new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
assertEquals(SimpleSessionListener.class, reg.getListener().getClass());
}
import org.opendaylight.protocol.bgp.parser.BGPError;
import org.opendaylight.protocol.bgp.parser.BGPMessage;
import org.opendaylight.protocol.bgp.parser.BGPParameter;
-import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory;
+import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl;
import org.opendaylight.protocol.bgp.parser.message.BGPKeepAliveMessage;
import org.opendaylight.protocol.bgp.parser.message.BGPNotificationMessage;
import org.opendaylight.protocol.bgp.parser.message.BGPOpenMessage;
(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,
(byte) 0xff, (byte) 0xff, (byte) 0x00, (byte) 0x17, (byte) 0x03, (byte) 0x02, (byte) 0x04, (byte) 0x04, (byte) 0x09 };
- final ProtocolMessageFactory<BGPMessage> factory = new BGPMessageFactory();
+ final ProtocolMessageFactory<BGPMessage> factory = new BGPMessageFactoryImpl();
@Test
public void testHeaderErrors() throws DeserializerException, DocumentedException {
fail("Exception should have occcured.");
} catch (final IllegalArgumentException e) {
assertEquals("Too few bytes in passed array. Passed: " + wrong.length + ". Expected: >= "
- + BGPMessageFactory.COMMON_HEADER_LENGTH + ".", e.getMessage());
+ + BGPMessageFactoryImpl.COMMON_HEADER_LENGTH + ".", e.getMessage());
return;
}
fail();
import org.opendaylight.protocol.bgp.parser.BGPError;
import org.opendaylight.protocol.bgp.parser.BGPMessage;
import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
-import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory;
+import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl;
import org.opendaylight.protocol.bgp.parser.message.BGPNotificationMessage;
import org.opendaylight.protocol.bgp.rib.impl.BGP;
import org.opendaylight.protocol.concepts.ListenerRegistration;
private List<BGPMessage> parsePrevious(final List<byte[]> msgs) {
final List<BGPMessage> messages = Lists.newArrayList();
- final ProtocolMessageFactory<BGPMessage> parser = new BGPMessageFactory();
+ final ProtocolMessageFactory<BGPMessage> parser = new BGPMessageFactoryImpl();
try {
for (final byte[] b : msgs) {
import java.net.InetSocketAddress;
import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
-import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory;
+import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl;
import org.opendaylight.protocol.bgp.rib.impl.BGPDispatcherImpl;
import org.opendaylight.protocol.bgp.rib.impl.BGPSessionProposalImpl;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
BGPDispatcherImpl dispatcher;
public Main() throws IOException {
- this.dispatcher = new BGPDispatcherImpl(new BGPMessageFactory());
+ this.dispatcher = new BGPDispatcherImpl(new BGPMessageFactoryImpl());
}
public static void main(final String[] args) throws NumberFormatException, IOException {
*/
package org.opendaylight.protocol.bgp.testtool;
+import io.netty.channel.socket.SocketChannel;
import io.netty.util.HashedWheelTimer;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.net.InetSocketAddress;
+import org.opendaylight.protocol.bgp.parser.BGPMessage;
import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
-import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory;
+import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl;
+import org.opendaylight.protocol.bgp.rib.impl.BGPHandlerFactory;
+import org.opendaylight.protocol.bgp.rib.impl.BGPSessionImpl;
import org.opendaylight.protocol.bgp.rib.impl.BGPSessionNegotiatorFactory;
import org.opendaylight.protocol.bgp.rib.impl.BGPSessionProposalImpl;
import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
import org.opendaylight.protocol.concepts.ASNumber;
import org.opendaylight.protocol.concepts.IPv4;
import org.opendaylight.protocol.framework.AbstractDispatcher;
+import org.opendaylight.protocol.framework.ProtocolHandlerFactory;
+import org.opendaylight.protocol.framework.ProtocolMessage;
+import org.opendaylight.protocol.framework.ProtocolSession;
+import org.opendaylight.protocol.framework.SessionListener;
import org.opendaylight.protocol.framework.SessionListenerFactory;
+import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
-public class BGPSpeakerMock extends AbstractDispatcher {
+import com.google.common.base.Preconditions;
- public static void main(final String[] args) throws IOException {
+public class BGPSpeakerMock<M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends
+ AbstractDispatcher<S, L> {
- final BGPSpeakerMock m = new BGPSpeakerMock();
+ private final SessionNegotiatorFactory<M, S, L> negotiatorFactory;
+ private final ProtocolHandlerFactory<?> factory;
- final BGPSessionPreferences prefs = new BGPSessionProposalImpl((short) 90, new ASNumber(25), IPv4.FAMILY.addressForString("127.0.0.2")).getProposal();
+ public BGPSpeakerMock(final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final ProtocolHandlerFactory<?> factory,
+ final DefaultPromise<BGPSessionImpl> defaultPromise) {
+ this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
+ this.factory = Preconditions.checkNotNull(factory);
+ }
+
+ @Override
+ public void initializeChannel(final SocketChannel ch, final Promise<S> promise, final SessionListenerFactory<L> listenerFactory) {
+ ch.pipeline().addLast(this.factory.getDecoders());
+ ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
+ ch.pipeline().addLast(this.factory.getEncoders());
+ }
+
+ public static void main(final String[] args) throws IOException {
final SessionListenerFactory<BGPSessionListener> f = new SessionListenerFactory<BGPSessionListener>() {
@Override
public BGPSessionListener getSessionListener() {
- return new SpeakerSessionListener(m);
+ return new SpeakerSessionListener();
}
};
- m.createServer(new InetSocketAddress("127.0.0.2", 12345), f,
- new BGPSessionNegotiatorFactory(new HashedWheelTimer(), prefs), new BGPMessageFactory());
+ final BGPSessionPreferences prefs = new BGPSessionProposalImpl((short) 90, new ASNumber(25), IPv4.FAMILY.addressForString("127.0.0.2")).getProposal();
+
+ final SessionNegotiatorFactory<BGPMessage, BGPSessionImpl, BGPSessionListener> snf = new BGPSessionNegotiatorFactory(new HashedWheelTimer(), prefs);
+
+ final BGPSpeakerMock<BGPMessage, BGPSessionImpl, BGPSessionListener> mock = new BGPSpeakerMock<BGPMessage, BGPSessionImpl, BGPSessionListener>(snf, new BGPHandlerFactory(new BGPMessageFactoryImpl()), new DefaultPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE));
+
+ mock.createServer(new InetSocketAddress("127.0.0.2", 12345), f);
}
}
import org.opendaylight.protocol.bgp.parser.BGPSession;
import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
import org.opendaylight.protocol.bgp.parser.BGPTerminationReason;
-import org.opendaylight.protocol.framework.AbstractDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SpeakerSessionListener implements BGPSessionListener {
private static final Logger logger = LoggerFactory.getLogger(SpeakerSessionListener.class);
- AbstractDispatcher d;
-
- SpeakerSessionListener(final AbstractDispatcher d) {
- this.d = d;
- }
-
@Override
public void onSessionUp(final BGPSession session) {
logger.info("Server: Session is up.");
*/
package org.opendaylight.protocol.framework;
+import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
import java.io.Closeable;
import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
/**
* Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
* start method that will handle sockets in different thread.
*/
-public abstract class AbstractDispatcher implements Closeable {
+public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(AbstractDispatcher.class);
this.workerGroup = new NioEventLoopGroup();
}
+ /**
+ * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this
+ * method needs to be implemented in protocol specific Dispatchers.
+ *
+ * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory}
+ * @param promise to be passed to {@link SessionNegotiatorFactory}
+ */
+ public abstract void initializeChannel(SocketChannel channel, Promise<S> promise, final SessionListenerFactory<L> lfactory);
+
/**
* Creates server. Each server needs factories to pass their instances to client sessions.
*
* @param address address to which the server should be bound
- * @param listenerFactory factory for creating protocol listeners, passed to the negotiator
- * @param negotiatorFactory protocol session negotiator factory
- * @param messageFactory message parser
*
* @return ChannelFuture representing the binding process
*/
- protected <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> ChannelFuture createServer(
- final InetSocketAddress address, final SessionListenerFactory<L> listenerFactory,
- final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final ProtocolMessageFactory<M> messageFactory) {
+ @VisibleForTesting
+ public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<L> lfactory) {
final ServerBootstrap b = new ServerBootstrap();
b.group(this.bossGroup, this.workerGroup);
b.channel(NioServerSocketChannel.class);
b.option(ChannelOption.SO_BACKLOG, 128);
- b.childHandler(new ChannelInitializerImpl<M, S, L>(negotiatorFactory,
- listenerFactory, new ProtocolHandlerFactory<M>(messageFactory), new DefaultPromise<S>(GlobalEventExecutor.INSTANCE)));
+ b.childHandler(new ChannelInitializer<SocketChannel>() {
+
+ @Override
+ protected void initChannel(final SocketChannel ch) throws Exception {
+ initializeChannel(ch, new DefaultPromise<S>(GlobalEventExecutor.INSTANCE), lfactory);
+ }
+ });
b.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
* Creates a client.
*
* @param address remote address
- * @param listener session listener
- * @param negotiatorFactory session negotiator factory
- * @param messageFactory message parser
* @param connectStrategy Reconnection strategy to be used when initial connection fails
*
- * @return Future representing the connection process. Its result represents
- * the combined success of TCP connection as well as session negotiation.
+ * @return Future representing the connection process. Its result represents the combined success of TCP connection
+ * as well as session negotiation.
*/
- protected <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> Future<S> createClient(
- final InetSocketAddress address, final L listener, final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
- final ProtocolMessageFactory<M> messageFactory, final ReconnectStrategy strategy) {
- final ProtocolSessionPromise<M, S, L> p = new ProtocolSessionPromise<M, S, L>(workerGroup, address, negotiatorFactory,
- new SessionListenerFactory<L>() {
- private boolean created = false;
-
- @Override
- public synchronized L getSessionListener() {
- Preconditions.checkState(created == false);
- created = true;
- return listener;
- }
-
- }, new ProtocolHandlerFactory<M>(messageFactory), strategy);
-
+ @VisibleForTesting
+ public Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy,
+ final SessionListenerFactory<L> lfactory) {
+ final Bootstrap b = new Bootstrap();
+ final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<S>(address, strategy, b);
+ b.group(this.workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(
+ new ChannelInitializer<SocketChannel>() {
+
+ @Override
+ protected void initChannel(final SocketChannel ch) throws Exception {
+ initializeChannel(ch, p, lfactory);
+ }
+ });
p.connect();
logger.debug("Client created.");
return p;
* Creates a client.
*
* @param address remote address
- * @param listener session listener
- * @param negotiatorFactory session negotiator factory
- * @param messageFactory message parser
* @param connectStrategyFactory Factory for creating reconnection strategy to be used when initial connection fails
* @param reestablishStrategy Reconnection strategy to be used when the already-established session fails
*
- * @return Future representing the reconnection task. It will report
- * completion based on reestablishStrategy, e.g. success if
- * it indicates no further attempts should be made and failure
- * if it reports an error
+ * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
+ * success if it indicates no further attempts should be made and failure if it reports an error
*/
- protected <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> Future<Void> createReconnectingClient(
- final InetSocketAddress address, final L listener, final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
- final ProtocolMessageFactory<M> messageFactory, final ReconnectStrategyFactory connectStrategyFactory,
- final ReconnectStrategy reestablishStrategy) {
-
- final ReconnectPromise<M, S, L> p = new ReconnectPromise<M, S, L>(this, address, listener, negotiatorFactory,
- messageFactory, connectStrategyFactory, reestablishStrategy);
+ protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
+ final ReconnectStrategy reestablishStrategy, final SessionListenerFactory<L> lfactory) {
+ final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(this, address, connectStrategyFactory, reestablishStrategy, lfactory);
p.connect();
return p;
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.protocol.framework;
-
-
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.util.concurrent.Promise;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-final class ChannelInitializerImpl<M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends ChannelInitializer<SocketChannel> {
- private static final Logger logger = LoggerFactory.getLogger(ChannelInitializerImpl.class);
- private final SessionNegotiatorFactory<M, S, L> negotiatorFactory;
- private final SessionListenerFactory<L> listenerFactory;
- private final ProtocolHandlerFactory<?> factory;
- private final Promise<S> promise;
-
- ChannelInitializerImpl(final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final SessionListenerFactory<L> listenerFactory,
- final ProtocolHandlerFactory<?> factory, final Promise<S> promise) {
- this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
- this.listenerFactory = Preconditions.checkNotNull(listenerFactory);
- this.promise = Preconditions.checkNotNull(promise);
- this.factory = Preconditions.checkNotNull(factory);
- }
-
- @Override
- protected void initChannel(final SocketChannel ch) {
- logger.debug("Initializing channel {}", ch);
- ch.pipeline().addLast("decoder", factory.getDecoder());
- ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
- ch.pipeline().addLast("encoder", factory.getEncoder());
- logger.debug("Channel {} initialized", ch);
- }
-}
\ No newline at end of file
public class ProtocolHandlerFactory<T extends ProtocolMessage> {
private final ProtocolMessageEncoder<T> encoder;
- final ProtocolMessageFactory<T> msgFactory;
+ protected final ProtocolMessageFactory<T> msgFactory;
public ProtocolHandlerFactory(final ProtocolMessageFactory<T> msgFactory) {
this.msgFactory = Preconditions.checkNotNull(msgFactory);
this.encoder = new ProtocolMessageEncoder<T>(msgFactory);
}
- public ChannelHandler getEncoder() {
- return this.encoder;
+ public ChannelHandler[] getEncoders() {
+ return new ChannelHandler[] { this.encoder };
}
- public ChannelHandler getDecoder() {
- return new ProtocolMessageDecoder<T>(msgFactory);
+ public ChannelHandler[] getDecoders() {
+ return new ChannelHandler[] { new ProtocolMessageDecoder<T>(this.msgFactory) };
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class ProtocolMessageDecoder<T extends ProtocolMessage> extends ByteToMessageDecoder {
+public final class ProtocolMessageDecoder<T extends ProtocolMessage> extends ByteToMessageDecoder {
private final static Logger logger = LoggerFactory.getLogger(ProtocolMessageDecoder.class);
import org.slf4j.LoggerFactory;
@Sharable
-final class ProtocolMessageEncoder<T extends ProtocolMessage> extends MessageToByteEncoder<Object> {
+public final class ProtocolMessageEncoder<T extends ProtocolMessage> extends MessageToByteEncoder<Object> {
private final static Logger logger = LoggerFactory.getLogger(ProtocolMessageEncoder.class);
@Override
protected void encode(final ChannelHandlerContext ctx, final Object msg, final ByteBuf out) throws Exception {
logger.debug("Sent to encode : {}", msg);
- out.writeBytes(this.factory.put((T)msg));
+ final byte[] bytes = this.factory.put((T) msg);
+ out.writeBytes(bytes);
}
}
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import com.google.common.base.Preconditions;
@ThreadSafe
-final class ProtocolSessionPromise<M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends DefaultPromise<S> {
+final class ProtocolSessionPromise<S extends ProtocolSession<?>> extends DefaultPromise<S> {
private static final Logger logger = LoggerFactory.getLogger(ProtocolSessionPromise.class);
- private final ChannelInitializerImpl<M, S, L> init;
private final ReconnectStrategy strategy;
private final InetSocketAddress address;
private final Bootstrap b;
@GuardedBy("this")
private Future<?> pending;
- ProtocolSessionPromise(final EventLoopGroup workerGroup, final InetSocketAddress address, final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
- final SessionListenerFactory<L> listenerFactory,
- final ProtocolHandlerFactory<?> protocolFactory, final ReconnectStrategy strategy) {
+ ProtocolSessionPromise(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap b) {
this.strategy = Preconditions.checkNotNull(strategy);
this.address = Preconditions.checkNotNull(address);
-
- init = new ChannelInitializerImpl<M, S, L>(negotiatorFactory, listenerFactory, protocolFactory, this);
- b = new Bootstrap();
- b.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(init);
+ this.b = Preconditions.checkNotNull(b);
}
synchronized void connect() {
final Object lock = this;
try {
- final int timeout = strategy.getConnectTimeout();
+ final int timeout = this.strategy.getConnectTimeout();
logger.debug("Promise {} attempting connect for {}ms", lock, timeout);
- b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
- pending = b.connect(address).addListener(new ChannelFutureListener() {
+ this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
+ this.pending = this.b.connect(this.address).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture cf) throws Exception {
synchronized (lock) {
logger.debug("Promise {} connection resolved", lock);
// Triggered when a connection attempt is resolved.
- Preconditions.checkState(pending == cf);
+ Preconditions.checkState(ProtocolSessionPromise.this.pending == cf);
/*
* The promise we gave out could have been cancelled,
}
if (!cf.isSuccess()) {
- final Future<Void> rf = strategy.scheduleReconnect(cf.cause());
+ final Future<Void> rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
rf.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(final Future<Void> sf) {
synchronized (lock) {
// Triggered when a connection attempt is to be made.
- Preconditions.checkState(pending == sf);
+ Preconditions.checkState(ProtocolSessionPromise.this.pending == sf);
/*
* The promise we gave out could have been cancelled,
}
});
- pending = rf;
+ ProtocolSessionPromise.this.pending = rf;
} else {
logger.debug("Promise {} connection successful", lock);
}
}
}
});
- } catch (Exception e) {
+ } catch (final Exception e) {
setFailure(e);
}
}
@Override
public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
if (super.cancel(mayInterruptIfRunning)) {
- pending.cancel(mayInterruptIfRunning);
+ this.pending.cancel(mayInterruptIfRunning);
return true;
}
@Override
public synchronized Promise<S> setSuccess(final S result) {
logger.debug("Promise {} completed", this);
- strategy.reconnectSuccessful();
+ this.strategy.reconnectSuccessful();
return super.setSuccess(result);
}
-}
\ No newline at end of file
+}
import com.google.common.base.Preconditions;
-final class ReconnectPromise<M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends DefaultPromise<Void> {
- private final AbstractDispatcher dispatcher;
+final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends DefaultPromise<Void> {
+ private final AbstractDispatcher<S, L> dispatcher;
private final InetSocketAddress address;
- private final L listener;
- private final SessionNegotiatorFactory<M, S, L> negotiatorFactory;
- private final ProtocolMessageFactory<M> messageFactory;
private final ReconnectStrategyFactory strategyFactory;
private final ReconnectStrategy strategy;
private Future<?> pending;
+ private final SessionListenerFactory<L> lfactory;
- public ReconnectPromise(final AbstractDispatcher dispatcher,
- final InetSocketAddress address, final L listener,
- final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
- final ProtocolMessageFactory<M> messageFactory,
- final ReconnectStrategyFactory connectStrategyFactory,
- final ReconnectStrategy reestablishStrategy) {
+ public ReconnectPromise(final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
+ final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
+ final SessionListenerFactory<L> lfactory) {
this.dispatcher = Preconditions.checkNotNull(dispatcher);
this.address = Preconditions.checkNotNull(address);
- this.listener = Preconditions.checkNotNull(listener);
- this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
- this.messageFactory = Preconditions.checkNotNull(messageFactory);
this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
this.strategy = Preconditions.checkNotNull(reestablishStrategy);
+ this.lfactory = Preconditions.checkNotNull(lfactory);
}
synchronized void connect() {
- final ReconnectStrategy cs = strategyFactory.createReconnectStrategy();
+ final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
final ReconnectStrategy rs = new ReconnectStrategy() {
@Override
public Future<Void> scheduleReconnect(final Throwable cause) {
@Override
public int getConnectTimeout() throws Exception {
final int cst = cs.getConnectTimeout();
- final int rst = strategy.getConnectTimeout();
+ final int rst = ReconnectPromise.this.strategy.getConnectTimeout();
if (cst == 0) {
return rst;
}
};
- final Future<S> cf = dispatcher.createClient(address,
- listener, negotiatorFactory, messageFactory, rs);
+ final Future<S> cf = this.dispatcher.createClient(this.address, rs, this.lfactory);
final Object lock = this;
- pending = cf;
+ this.pending = cf;
cf.addListener(new FutureListener<S>() {
@Override
public void operationComplete(final Future<S> future) {
synchronized (lock) {
if (!future.isSuccess()) {
- final Future<Void> rf = strategy.scheduleReconnect(cf.cause());
- pending = rf;
+ final Future<Void> rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause());
+ ReconnectPromise.this.pending = rf;
rf.addListener(new FutureListener<Void>() {
@Override
* FIXME: we have a slight race window with cancellation
* here. Analyze and define its semantics.
*/
- strategy.reconnectSuccessful();
+ ReconnectPromise.this.strategy.reconnectSuccessful();
setSuccess(null);
}
}
@Override
public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
if (super.cancel(mayInterruptIfRunning)) {
- pending.cancel(mayInterruptIfRunning);
+ this.pending.cancel(mayInterruptIfRunning);
return true;
}
package org.opendaylight.protocol.framework;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertFalse;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
-import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Test;
public class ServerTest {
public static final int PORT = 18080;
- AbstractDispatcher clientDispatcher, dispatcher;
+ AbstractDispatcher<?, SimpleSessionListener> clientDispatcher, dispatcher;
final SimpleSessionListener pce = new SimpleSessionListener();
@Test
public void testConnectionEstablished() throws Exception {
- this.dispatcher = new AbstractDispatcher() { };
-
final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+ this.dispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+
+ @Override
+ public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
+ final Channel channel, final Promise<SimpleSession> promise) {
+ p.setSuccess(true);
+ return new SimpleSessionNegotiator(promise, channel);
+ }
+ }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE));
+
this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
@Override
public SimpleSessionListener getSessionListener() {
return new SimpleSessionListener();
}
- }, new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+ });
+ this.server.get();
+
+ this.clientDispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
@Override
public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
final Channel channel, final Promise<SimpleSession> promise) {
- p.setSuccess(true);
return new SimpleSessionNegotiator(promise, channel);
}
- }, new MessageFactory());
+ }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE));
- this.server.get();
-
- this.clientDispatcher = new AbstractDispatcher() { };
-
- this.session = this.clientDispatcher.createClient(this.serverAddress, new SimpleSessionListener(),
- new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+ this.session = (SimpleSession) this.clientDispatcher.createClient(this.serverAddress,
+ new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
@Override
- public SessionNegotiator<SimpleSession> getSessionNegotiator(
- final SessionListenerFactory<SimpleSessionListener> factory, final Channel channel,
- final Promise<SimpleSession> promise) {
- return new SimpleSessionNegotiator(promise, channel);
+ public SimpleSessionListener getSessionListener() {
+ return new SimpleSessionListener();
}
- }, new MessageFactory(), new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000)).get();
+ }).get(6, TimeUnit.SECONDS);
assertEquals(true, p.get(3, TimeUnit.SECONDS));
}
- public void testConnectionFailed() throws IOException, InterruptedException {
- this.dispatcher = new AbstractDispatcher() { };
- this.clientDispatcher = new AbstractDispatcher() { };
- final SimpleSessionListener listener = new SimpleSessionListener();
+ @Test
+ public void testConnectionFailed() throws IOException, InterruptedException, ExecutionException, TimeoutException {
+ final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
- try {
- this.clientDispatcher.createClient(this.serverAddress, listener,
- new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
- @Override
- public SessionNegotiator<SimpleSession> getSessionNegotiator(
- final SessionListenerFactory<SimpleSessionListener> factory, final Channel channel,
- final Promise<SimpleSession> promise) {
- return null;
- }
- }, new MessageFactory(), new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000)).get();
-
- fail("Connection succeeded unexpectedly");
- } catch (final ExecutionException e) {
- assertTrue(listener.failed);
- assertTrue(e.getCause() instanceof ConnectException);
- }
+ this.dispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+
+ @Override
+ public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
+ final Channel channel, final Promise<SimpleSession> promise) {
+ p.setSuccess(true);
+ return new SimpleSessionNegotiator(promise, channel);
+ }
+ }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE));
+
+ this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
+ @Override
+ public SimpleSessionListener getSessionListener() {
+ return new SimpleSessionListener();
+ }
+ });
+
+ this.server.get();
+
+ this.clientDispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+ @Override
+ public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
+ final Channel channel, final Promise<SimpleSession> promise) {
+ return new SimpleSessionNegotiator(promise, channel);
+ }
+ }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE));
+
+ this.session = (SimpleSession) this.clientDispatcher.createClient(this.serverAddress,
+ new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
+ @Override
+ public SimpleSessionListener getSessionListener() {
+ return new SimpleSessionListener();
+ }
+ }).get(6, TimeUnit.SECONDS);
+
+ final Future<?> session = this.clientDispatcher.createClient(this.serverAddress,
+ new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
+ @Override
+ public SimpleSessionListener getSessionListener() {
+ return new SimpleSessionListener();
+ }
+ });
+ assertFalse(session.isSuccess());
}
@After
--- /dev/null
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Promise;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class SimpleDispatcher<M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends
+ AbstractDispatcher<S, L> {
+
+ private static final Logger logger = LoggerFactory.getLogger(SimpleDispatcher.class);
+
+ private final SessionNegotiatorFactory<M, S, L> negotiatorFactory;
+ private final ProtocolHandlerFactory<?> factory;
+
+ public SimpleDispatcher(final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final ProtocolHandlerFactory<?> factory,
+ final Promise<S> promise) {
+ this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
+ this.factory = Preconditions.checkNotNull(factory);
+ }
+
+ @Override
+ public void initializeChannel(final SocketChannel ch, final Promise<S> promise, final SessionListenerFactory<L> lfactory) {
+ ch.pipeline().addLast(this.factory.getDecoders());
+ ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(lfactory, ch, promise));
+ ch.pipeline().addLast(this.factory.getEncoders());
+ logger.debug("initialization completed for channel {}", ch);
+ }
+}
this.messages.add(message);
}
- public synchronized void onConnectionFailed(final ProtocolSession<?> session, final Exception e) {
- logger.debug("Connection Failed: {}", e.getMessage(), e);
- this.failed = true;
- this.notifyAll();
- session.close();
- }
-
@Override
public void onSessionUp(final SimpleSession session) {
this.up = true;
@Override
public void onSessionDown(final SimpleSession session, final Exception e) {
- this.up = false;
+ this.failed = true;
+ this.notifyAll();
}
@Override
- public void onSessionTerminated(final SimpleSession session,
- final TerminationReason reason) {
- this.up = false;
+ public void onSessionTerminated(final SimpleSession session, final TerminationReason reason) {
+ this.failed = true;
+ this.notifyAll();
}
}
package org.opendaylight.protocol.pcep;
import io.netty.channel.ChannelFuture;
-import io.netty.util.concurrent.Future;
import java.io.IOException;
import java.net.InetSocketAddress;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
import org.opendaylight.protocol.framework.SessionListenerFactory;
/**
public interface PCEPDispatcher {
/**
* Creates server. Each server needs three factories to pass their instances to client sessions.
+ *
* @param address to be bound with the server
* @param listenerFactory to create listeners for clients
- * @param proposalFactory to create proposed open objects for clients
- * @param checkerFactory to create session characteristics checker for clients
* @return instance of PCEPServer
* @throws IOException if some IO error occurred
*/
public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<PCEPSessionListener> listenerFactory);
-
- /**
- * Creates a client. Needs to be started via the start method.
- * @param connection PCEP connection settings
- * @param strategy Reconnection strategy to be used for TCP-level connection
- * @throws IOException if some IO error occurred
- */
- public Future<? extends PCEPSession> createClient(InetSocketAddress address, final PCEPSessionListener listener, final ReconnectStrategy strategy);
}
-
import org.opendaylight.protocol.framework.ProtocolMessage;
/**
- * Basic structure for PCEP Message. Cannot be instantiated directly. Current
- * PCEP version is 1. Each message contains a list of PCEP objects.
- *
+ * Basic structure for PCEP Message. Cannot be instantiated directly. Current PCEP version is 1. Each message contains a
+ * list of PCEP objects.
+ *
*/
public abstract class PCEPMessage implements ProtocolMessage {
private final List<PCEPObject> objects;
/**
- * Constructor is protected to prevent direct instantiation, but to allow to
- * call this constructor via super().
- *
+ * Constructor is protected to prevent direct instantiation, but to allow to call this constructor via super().
+ *
* @param objects
*/
- protected PCEPMessage(List<PCEPObject> objects) {
+ protected PCEPMessage(final List<PCEPObject> objects) {
+ if (objects.contains(null))
+ throw new IllegalArgumentException("Object list contains null element at offset " + objects.indexOf(null));
+
this.objects = objects;
}
/**
* Returns list of all objects that the message contains
- *
+ *
* @return list of all objects that the message contains
*/
public List<PCEPObject> getAllObjects() {
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj)
return true;
if (obj == null)
import org.opendaylight.protocol.pcep.PCEPObject;
+import com.google.common.base.Preconditions;
+
/**
* Structure that combines set of related objects.
- *
+ *
* @see <a href="http://www.ietf.org/id/draft-crabbe-pce-pce-initiated-lsp-00.txt">PCCreate Message</a>
*/
public class CompositeInstantiationObject {
/**
* Constructs basic composite object only with mandatory objects.
- *
- * @param endPoints
- * PCEPEndPointsObject<?>. Can't be null.
- * @param lspa
- * PCEPLspaObject. Can't be null.
+ *
+ * @param endPoints PCEPEndPointsObject<?>. Can't be null.
+ * @param lspa PCEPLspaObject. Can't be null.
*/
- public CompositeInstantiationObject(PCEPEndPointsObject<?> endPoints, PCEPLspaObject lspa) {
+ public CompositeInstantiationObject(final PCEPEndPointsObject<?> endPoints, final PCEPLspaObject lspa) {
this(endPoints, lspa, null, null, null);
}
/**
* Constructs composite object with optional objects.
- *
- * @param endPoints
- * PCEPEndPointsObject<?>. Can't be null.
- * @param lspa
- * PCEPLspaObject. Can't be null.
- * @param ero
- * PCEPExplicitRouteObject
- * @param bandwidth
- * PCEPRequestedPathBandwidthObject
- * @param metrics
- * List<PCEPMetricObject>
+ *
+ * @param endPoints PCEPEndPointsObject<?>. Can't be null.
+ * @param lspa PCEPLspaObject. Can't be null.
+ * @param ero PCEPExplicitRouteObject
+ * @param bandwidth PCEPRequestedPathBandwidthObject
+ * @param metrics List<PCEPMetricObject>
*/
- public CompositeInstantiationObject(PCEPEndPointsObject<?> endPoints, PCEPLspaObject lspa, PCEPExplicitRouteObject ero, PCEPRequestedPathBandwidthObject bandwidth, List<PCEPMetricObject> metrics) {
- this.endPoints = endPoints;
- this.lspa = lspa;
+ public CompositeInstantiationObject(final PCEPEndPointsObject<?> endPoints, final PCEPLspaObject lspa,
+ final PCEPExplicitRouteObject ero, final PCEPRequestedPathBandwidthObject bandwidth, final List<PCEPMetricObject> metrics) {
+ this.endPoints = Preconditions.checkNotNull(endPoints);
+ this.lspa = Preconditions.checkNotNull(lspa);
this.ero = ero;
this.bandwidth = bandwidth;
this.metrics = metrics;
/**
* Gets list of all objects, which are in appropriate order.
- *
+ *
* @return List<PCEPObject>. Can't be null or empty.
*/
public List<PCEPObject> getCompositeAsList() {
/**
* Creates this object from a list of PCEPObjects.
- *
- * @param objects
- * List<PCEPObject> list of PCEPObjects from whose this object
- * should be created.
+ *
+ * @param objects List<PCEPObject> list of PCEPObjects from whose this object should be created.
* @return CompositeInstantiationObject
*/
- public static CompositeInstantiationObject getCompositeFromList(List<PCEPObject> objects) {
+ public static CompositeInstantiationObject getCompositeFromList(final List<PCEPObject> objects) {
if (objects == null || objects.isEmpty()) {
throw new IllegalArgumentException("List cannot be null or empty.");
}
while (!objects.isEmpty()) {
final PCEPObject obj = objects.get(0);
switch (state) {
- case 1:
- state = 2;
- if (obj instanceof PCEPExplicitRouteObject) {
- ero = (PCEPExplicitRouteObject) obj;
- break;
- }
- case 2:
+ case 1:
+ state = 2;
+ if (obj instanceof PCEPExplicitRouteObject) {
+ ero = (PCEPExplicitRouteObject) obj;
+ break;
+ }
+ case 2:
+ state = 3;
+ if (obj instanceof PCEPRequestedPathBandwidthObject) {
+ bandwidth = (PCEPRequestedPathBandwidthObject) obj;
+ break;
+ }
+ case 3:
+ state = 4;
+ if (obj instanceof PCEPMetricObject) {
+ metrics.add((PCEPMetricObject) obj);
state = 3;
- if (obj instanceof PCEPRequestedPathBandwidthObject) {
- bandwidth = (PCEPRequestedPathBandwidthObject) obj;
- break;
- }
- case 3:
- state = 4;
- if (obj instanceof PCEPMetricObject) {
- metrics.add((PCEPMetricObject) obj);
- state = 3;
- break;
- }
+ break;
+ }
}
if (state == 4) {
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result
- + ((this.bandwidth == null) ? 0 : this.bandwidth.hashCode());
- result = prime * result
- + ((this.endPoints == null) ? 0 : this.endPoints.hashCode());
+ result = prime * result + ((this.bandwidth == null) ? 0 : this.bandwidth.hashCode());
+ result = prime * result + ((this.endPoints == null) ? 0 : this.endPoints.hashCode());
result = prime * result + ((this.ero == null) ? 0 : this.ero.hashCode());
result = prime * result + ((this.lspa == null) ? 0 : this.lspa.hashCode());
result = prime * result + ((this.metrics == null) ? 0 : this.metrics.hashCode());
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj)
return true;
if (obj == null)
package org.opendaylight.protocol.pcep.impl;
import io.netty.channel.ChannelFuture;
-import io.netty.util.concurrent.Future;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutionException;
import org.opendaylight.protocol.framework.AbstractDispatcher;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
import org.opendaylight.protocol.framework.SessionListenerFactory;
import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
import org.opendaylight.protocol.pcep.PCEPDispatcher;
import org.opendaylight.protocol.pcep.PCEPMessage;
-import org.opendaylight.protocol.pcep.PCEPSession;
import org.opendaylight.protocol.pcep.PCEPSessionListener;
+import com.google.common.base.Preconditions;
+
/**
* Implementation of PCEPDispatcher.
*/
-public class PCEPDispatcherImpl extends AbstractDispatcher implements PCEPDispatcher {
- private static final PCEPMessageFactory msgFactory = new PCEPMessageFactory();
+public class PCEPDispatcherImpl extends AbstractDispatcher<PCEPSessionImpl, PCEPSessionListener> implements PCEPDispatcher {
+
private final SessionNegotiatorFactory<PCEPMessage, PCEPSessionImpl, PCEPSessionListener> snf;
+ private final PCEPHandlerFactory hf = new PCEPHandlerFactory();
+
/**
* Creates an instance of PCEPDispatcherImpl, gets the default selector and opens it.
*
* @throws IOException if some error occurred during opening the selector
*/
- public PCEPDispatcherImpl(final SessionNegotiatorFactory<PCEPMessage, PCEPSessionImpl, PCEPSessionListener> snf) {
+ public PCEPDispatcherImpl(final SessionNegotiatorFactory<PCEPMessage, PCEPSessionImpl, PCEPSessionListener> negotiatorFactory) {
super();
- this.snf = snf;
+ this.snf = Preconditions.checkNotNull(negotiatorFactory);
}
@Override
public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<PCEPSessionListener> listenerFactory) {
- return this.createServer(address, listenerFactory, snf, msgFactory);
+ return super.createServer(address, listenerFactory);
}
- /**
- * Create client is used for mock purposes only.
- *
- * @throws ExecutionException
- * @throws InterruptedException
- */
@Override
- public Future<? extends PCEPSession> createClient(final InetSocketAddress address, final PCEPSessionListener listener, final ReconnectStrategy strategy) {
- return this.createClient(address, listener, snf, msgFactory, strategy);
+ public void initializeChannel(final SocketChannel ch, final Promise<PCEPSessionImpl> promise,
+ final SessionListenerFactory<PCEPSessionListener> listenerFactory) {
+ ch.pipeline().addLast(this.hf.getDecoders());
+ ch.pipeline().addLast("negotiator", this.snf.getSessionNegotiator(listenerFactory, ch, promise));
+ ch.pipeline().addLast(this.hf.getEncoders());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.pcep.impl;
+
+import io.netty.channel.ChannelHandler;
+
+import org.opendaylight.protocol.framework.ProtocolHandlerFactory;
+import org.opendaylight.protocol.framework.ProtocolMessageDecoder;
+import org.opendaylight.protocol.framework.ProtocolMessageEncoder;
+import org.opendaylight.protocol.pcep.PCEPMessage;
+
+/**
+ * PCEP specific factory for protocol inbound/outbound handlers.
+ */
+public class PCEPHandlerFactory extends ProtocolHandlerFactory<PCEPMessage> {
+ private final ProtocolMessageEncoder<PCEPMessage> encoder;
+
+ public PCEPHandlerFactory() {
+ super(new PCEPMessageFactory());
+ this.encoder = new ProtocolMessageEncoder<PCEPMessage>(this.msgFactory);
+ }
+
+ @Override
+ public ChannelHandler[] getEncoders() {
+ return new ChannelHandler[] { this.encoder };
+ }
+
+ @Override
+ public ChannelHandler[] getDecoders() {
+ return new ChannelHandler[] { new PCEPMessageHeaderDecoder(), new ProtocolMessageDecoder<PCEPMessage>(this.msgFactory) };
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.pcep.impl;
+
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+/**
+ * @see <a href="http://tools.ietf.org/html/rfc5440#section-6.1">Common Message Header</a>
+ */
+public class PCEPMessageHeaderDecoder extends LengthFieldBasedFrameDecoder {
+
+ private static final int MAX_FRAME_SIZE = 65528; // min 4, max 4096
+
+ private static final int VERSION_FLAGS_SIZE = 1;
+
+ private static final int LENGTH_SIZE = 2; // the length field represents the length of the whole message including
+ // the header
+
+ private static final int MESSAGE_TYPE_SIZE = 1;
+
+ /*
+
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Ver | Flags | Message-Type | Message-Length |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+ */
+
+ public PCEPMessageHeaderDecoder() {
+ super(MAX_FRAME_SIZE, VERSION_FLAGS_SIZE + MESSAGE_TYPE_SIZE, LENGTH_SIZE, -LENGTH_SIZE - MESSAGE_TYPE_SIZE - VERSION_FLAGS_SIZE, 0);
+ }
+}
/**
* Implementation of PCEPSession. (Not final for testing.)
*/
-class PCEPSessionImpl extends AbstractProtocolSession<PCEPMessage> implements PCEPSession, PCEPSessionRuntimeMXBean {
+@VisibleForTesting
+public class PCEPSessionImpl extends AbstractProtocolSession<PCEPMessage> implements PCEPSession, PCEPSessionRuntimeMXBean {
/**
* System.nanoTime value about when was sent the last message Protected to be updated also in tests.
*/
private final Channel channel;
- PCEPSessionImpl(final Timer timer, final PCEPSessionListener listener, final int maxUnknownMessages,
- final Channel channel, final PCEPOpenObject localOpen, final PCEPOpenObject remoteOpen) {
+ PCEPSessionImpl(final Timer timer, final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
+ final PCEPOpenObject localOpen, final PCEPOpenObject remoteOpen) {
this.listener = Preconditions.checkNotNull(listener);
this.stateTimer = Preconditions.checkNotNull(timer);
this.channel = Preconditions.checkNotNull(channel);
}
if (getDeadTimerValue() != 0) {
- stateTimer.newTimeout(new TimerTask() {
+ this.stateTimer.newTimeout(new TimerTask() {
@Override
public void run(final Timeout timeout) throws Exception {
handleDeadTimer();
}
if (getKeepAliveTimerValue() != 0) {
- stateTimer.newTimeout(new TimerTask() {
+ this.stateTimer.newTimeout(new TimerTask() {
@Override
public void run(final Timeout timeout) throws Exception {
handleKeepaliveTimer();
logger.debug("DeadTimer expired. " + new Date());
this.terminate(Reason.EXP_DEADTIMER);
} else {
- stateTimer.newTimeout(new TimerTask() {
+ this.stateTimer.newTimeout(new TimerTask() {
@Override
public void run(final Timeout timeout) throws Exception {
handleDeadTimer();
long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
- if (channel.isActive()) {
+ if (this.channel.isActive()) {
if (ct >= nextKeepalive) {
this.sendMessage(new PCEPKeepAliveMessage());
nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
@Override
public Integer getDeadTimerValue() {
- return remoteOpen.getDeadTimerValue();
+ return this.remoteOpen.getDeadTimerValue();
}
@Override
public Integer getKeepAliveTimerValue() {
- return localOpen.getKeepAliveTimerValue();
+ return this.localOpen.getKeepAliveTimerValue();
}
@Override
public String getPeerAddress() {
- InetSocketAddress a = (InetSocketAddress) channel.remoteAddress();
+ final InetSocketAddress a = (InetSocketAddress) this.channel.remoteAddress();
return a.getHostName();
}
@Override
public String getNodeIdentifier() {
- for (PCEPTlv tlv : this.remoteOpen.getTlvs()) {
+ for (final PCEPTlv tlv : this.remoteOpen.getTlvs()) {
if (tlv instanceof NodeIdentifierTlv) {
return tlv.toString();
}
}
protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
- toStringHelper.add("localOpen", localOpen);
- toStringHelper.add("remoteOpen", remoteOpen);
+ toStringHelper.add("localOpen", this.localOpen);
+ toStringHelper.add("remoteOpen", this.remoteOpen);
return toStringHelper;
}
@Override
protected void sessionUp() {
- listener.onSessionUp(this);
+ this.listener.onSessionUp(this);
}
}
import org.opendaylight.protocol.pcep.object.CompositeInstantiationObject;
import org.opendaylight.protocol.pcep.object.PCEPEndPointsObject;
import org.opendaylight.protocol.pcep.object.PCEPExplicitRouteObject;
+import org.opendaylight.protocol.pcep.object.PCEPLspaObject;
import org.opendaylight.protocol.pcep.subobject.EROIPPrefixSubobject;
import org.opendaylight.protocol.pcep.subobject.ExplicitRouteSubobject;
import org.slf4j.Logger;
subs.add(new EROIPPrefixSubobject<Prefix<?>>(new IPv4Prefix(new IPv4Address(new byte[] { 10, 1, 1, 2 }), 32), false));
subs.add(new EROIPPrefixSubobject<Prefix<?>>(new IPv4Prefix(new IPv4Address(new byte[] { 2, 2, 2, 2 }), 32), false));
final CompositeInstantiationObject cpo = new CompositeInstantiationObject(new PCEPEndPointsObject<IPv4Address>(IPv4.FAMILY.addressForBytes(new byte[] {
- 1, 1, 1, 1 }), IPv4.FAMILY.addressForBytes(new byte[] { 2, 2, 2, 2 })), null, new PCEPExplicitRouteObject(subs, false), null, null);
+ 1, 1, 1, 1 }), IPv4.FAMILY.addressForBytes(new byte[] { 2, 2, 2, 2 })), new PCEPLspaObject(0, 0, 0, (short) 0, (short) 0, false, false, false, false), new PCEPExplicitRouteObject(subs, false), null, null);
session.sendMessage(new PCCreateMessage(Lists.newArrayList(cpo)));
}
+++ /dev/null
-import java.util.Queue
-
-import org.opendaylight.protocol.pcep.PCEPMessage
-import org.opendaylight.protocol.pcep.object.PCEPRequestParameterObject
-import org.opendaylight.protocol.pcep.message.PCEPReplyMessage
-import org.opendaylight.protocol.pcep.object.CompositeResponseObject
-import org.opendaylight.protocol.pcep.tool.MessageGeneratorService
-
-class GroovyReplyMessageGenerator implements MessageGeneratorService {
-
- public GroovyReplyMessageGenerator() {
- }
-
- @Override
- public Queue<PCEPMessage> generateMessages() {
- def queue = new LinkedList<PCEPMessage>()
- queue.push(
- new PCEPReplyMessage(
- [
- new CompositeResponseObject(
- new PCEPRequestParameterObject(true, false, true, false, true, 7 as Short, 6565 as Long, true, false)
- )
- ]
- )
- )
-
- queue.push(
- new PCEPReplyMessage(
- [
- new CompositeResponseObject(
- new PCEPRequestParameterObject(true, false, true, false, true, 5 as Short, 235568 as Long, true, false)
- )
- ]
- )
- )
-
- return queue
- }
-}
\ No newline at end of file
*/
package org.opendaylight.protocol.pcep.testtool;
+import io.netty.channel.socket.SocketChannel;
import io.netty.util.HashedWheelTimer;
+import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import java.util.List;
+import org.opendaylight.protocol.framework.AbstractDispatcher;
import org.opendaylight.protocol.framework.NeverReconnectStrategy;
+import org.opendaylight.protocol.framework.ProtocolHandlerFactory;
+import org.opendaylight.protocol.framework.ProtocolMessage;
+import org.opendaylight.protocol.framework.ProtocolSession;
+import org.opendaylight.protocol.framework.SessionListener;
+import org.opendaylight.protocol.framework.SessionListenerFactory;
+import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
+import org.opendaylight.protocol.pcep.PCEPMessage;
+import org.opendaylight.protocol.pcep.PCEPSessionListener;
import org.opendaylight.protocol.pcep.PCEPTlv;
import org.opendaylight.protocol.pcep.impl.DefaultPCEPSessionNegotiatorFactory;
-import org.opendaylight.protocol.pcep.impl.PCEPDispatcherImpl;
+import org.opendaylight.protocol.pcep.impl.PCEPHandlerFactory;
+import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
import org.opendaylight.protocol.pcep.object.PCEPOpenObject;
import org.opendaylight.protocol.pcep.tlv.NodeIdentifierTlv;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-public class PCCMock {
+public class PCCMock<M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends
+ AbstractDispatcher<S, L> {
+
+ private final SessionNegotiatorFactory<M, S, L> negotiatorFactory;
+ private final ProtocolHandlerFactory<?> factory;
+
+ public PCCMock(final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final ProtocolHandlerFactory<?> factory,
+ final DefaultPromise<PCEPSessionImpl> defaultPromise) {
+ this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
+ this.factory = Preconditions.checkNotNull(factory);
+ }
+
+ @Override
+ public void initializeChannel(final SocketChannel ch, final Promise<S> promise, final SessionListenerFactory<L> listenerFactory) {
+ ch.pipeline().addLast(this.factory.getDecoders());
+ ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
+ ch.pipeline().addLast(this.factory.getEncoders());
+ }
public static void main(final String[] args) throws Exception {
final List<PCEPTlv> tlvs = Lists.newArrayList();
tlvs.add(new NodeIdentifierTlv(new byte[] { (byte) 127, (byte) 2, (byte) 3, (byte) 7 }));
- final PCEPDispatcherImpl d = new PCEPDispatcherImpl(new DefaultPCEPSessionNegotiatorFactory(new HashedWheelTimer(), new PCEPOpenObject(30, 120, 0, tlvs), 0));
-
- try {
- d.createClient(new InetSocketAddress("127.0.0.3", 12345), new SimpleSessionListener(),
- new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 2000)).get();
-
- // Thread.sleep(5000);
- // final List<CompositeRequestObject> cro = new ArrayList<CompositeRequestObject>();
- // cro.add(new CompositeRequestObject(new PCEPRequestParameterObject(false, true, true, true, true, (short)
- // 4, 123, false, false),
- // new PCEPEndPointsObject<IPv4Address>(new IPv4Address(InetAddress.getByName("10.0.0.3")), new
- // IPv4Address(InetAddress.getByName("10.0.0.5")))));
- // for (int i = 0; i < 3; i++) {
- // Thread.sleep(1000);
- // session.sendMessage(new PCEPRequestMessage(cro));
- // }
- // Thread.sleep(5000);
- // Thread.sleep(1000);
- } finally {
- // di.stop();
- }
+ final SessionNegotiatorFactory<PCEPMessage, PCEPSessionImpl, PCEPSessionListener> snf = new DefaultPCEPSessionNegotiatorFactory(new HashedWheelTimer(), new PCEPOpenObject(30, 120, 0, tlvs), 0);
+
+ final PCCMock<PCEPMessage, PCEPSessionImpl, PCEPSessionListener> pcc = new PCCMock<>(snf, new PCEPHandlerFactory(), new DefaultPromise<PCEPSessionImpl>(GlobalEventExecutor.INSTANCE));
+
+ pcc.createClient(new InetSocketAddress("127.0.0.3", 12345), new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 2000),
+ new SessionListenerFactory<PCEPSessionListener>() {
+
+ @Override
+ public PCEPSessionListener getSessionListener() {
+ return new SimpleSessionListener();
+ }
+ }).get();
}
}