From 5fbf4f8681909a3ef9dfbf2fa3e9b1a1f0b5b6fc Mon Sep 17 00:00:00 2001 From: Dana Kutenicsova Date: Wed, 11 Sep 2013 15:31:24 +0200 Subject: [PATCH] BUG-54 : switched channel pipeline to be protocol specific. Added LengthFrameDecoder for both protocols. Change-Id: Iebdaced167b54b850ec114cff42c2b0385a37f31 Signed-off-by: Dana Kutenicsova --- .../bgp/parser/BGPMessageFactory.java | 17 +++ ...actory.java => BGPMessageFactoryImpl.java} | 9 +- .../impl/message/BGPUpdateMessageParser.java | 4 +- .../bgp/parser/impl/BGPParserTest.java | 62 +++++----- .../impl/BGPUpdateMessageParserTest.java | 6 +- .../bgp/parser/impl/ComplementaryTest.java | 4 +- .../bgp/rib/impl/BGPDispatcherImpl.java | 37 ++++-- .../bgp/rib/impl/BGPHandlerFactory.java | 38 ++++++ .../bgp/rib/impl/BGPMessageHeaderDecoder.java | 45 +++++++ .../protocol/bgp/rib/impl/BGPSessionImpl.java | 29 ++--- .../bgp/rib/impl/BGPSessionNegotiator.java | 49 ++++---- .../protocol/bgp/rib/impl/BGPImplTest.java | 7 +- .../protocol/bgp/rib/impl/ParserTest.java | 6 +- .../protocol/bgp/rib/mock/BGPMock.java | 4 +- .../protocol/bgp/testtool/Main.java | 4 +- .../protocol/bgp/testtool/BGPSpeakerMock.java | 48 ++++++-- .../bgp/testtool/SpeakerSessionListener.java | 7 -- .../framework/AbstractDispatcher.java | 90 +++++++------- .../framework/ChannelInitializerImpl.java | 43 ------- .../framework/ProtocolHandlerFactory.java | 10 +- .../framework/ProtocolMessageDecoder.java | 2 +- .../framework/ProtocolMessageEncoder.java | 5 +- .../framework/ProtocolSessionPromise.java | 36 +++--- .../protocol/framework/ReconnectPromise.java | 38 +++--- .../protocol/framework/ServerTest.java | 110 +++++++++++------- .../protocol/framework/SimpleDispatcher.java | 32 +++++ .../framework/SimpleSessionListener.java | 16 +-- .../protocol/pcep/PCEPDispatcher.java | 14 +-- .../protocol/pcep/PCEPMessage.java | 20 ++-- .../object/CompositeInstantiationObject.java | 92 +++++++-------- .../pcep/impl/PCEPDispatcherImpl.java | 33 +++--- .../pcep/impl/PCEPHandlerFactory.java | 37 ++++++ .../pcep/impl/PCEPMessageHeaderDecoder.java | 38 ++++++ .../protocol/pcep/impl/PCEPSessionImpl.java | 29 ++--- .../pcep/testtool/SimpleSessionListener.java | 3 +- .../GroovyReplyMessageGenerator.groovy | 39 ------- .../protocol/pcep/testtool/PCCMock.java | 68 +++++++---- 37 files changed, 657 insertions(+), 474 deletions(-) create mode 100644 bgp/parser-api/src/main/java/org/opendaylight/protocol/bgp/parser/BGPMessageFactory.java rename bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/{BGPMessageFactory.java => BGPMessageFactoryImpl.java} (96%) create mode 100644 bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPHandlerFactory.java create mode 100644 bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPMessageHeaderDecoder.java delete mode 100644 framework/src/main/java/org/opendaylight/protocol/framework/ChannelInitializerImpl.java create mode 100644 framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java create mode 100644 pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPHandlerFactory.java create mode 100644 pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPMessageHeaderDecoder.java delete mode 100644 pcep/testtool/src/main/resources/GroovyReplyMessageGenerator.groovy diff --git a/bgp/parser-api/src/main/java/org/opendaylight/protocol/bgp/parser/BGPMessageFactory.java b/bgp/parser-api/src/main/java/org/opendaylight/protocol/bgp/parser/BGPMessageFactory.java new file mode 100644 index 0000000000..fe40691c96 --- /dev/null +++ b/bgp/parser-api/src/main/java/org/opendaylight/protocol/bgp/parser/BGPMessageFactory.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.protocol.bgp.parser; + +import org.opendaylight.protocol.framework.ProtocolMessageFactory; + +/** + * Interface to expose BGP specific MessageFactory. + */ +public interface BGPMessageFactory extends ProtocolMessageFactory { + +} diff --git a/bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/BGPMessageFactory.java b/bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/BGPMessageFactoryImpl.java similarity index 96% rename from bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/BGPMessageFactory.java rename to bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/BGPMessageFactoryImpl.java index 72d0597857..21dc7c8a97 100644 --- a/bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/BGPMessageFactory.java +++ b/bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/BGPMessageFactoryImpl.java @@ -13,6 +13,7 @@ import java.util.List; import org.opendaylight.protocol.bgp.parser.BGPDocumentedException; import org.opendaylight.protocol.bgp.parser.BGPError; import org.opendaylight.protocol.bgp.parser.BGPMessage; +import org.opendaylight.protocol.bgp.parser.BGPMessageFactory; import org.opendaylight.protocol.bgp.parser.impl.message.BGPNotificationMessageParser; import org.opendaylight.protocol.bgp.parser.impl.message.BGPOpenMessageParser; import org.opendaylight.protocol.bgp.parser.impl.message.BGPUpdateMessageParser; @@ -21,7 +22,6 @@ import org.opendaylight.protocol.bgp.parser.message.BGPNotificationMessage; import org.opendaylight.protocol.bgp.parser.message.BGPOpenMessage; import org.opendaylight.protocol.framework.DeserializerException; import org.opendaylight.protocol.framework.DocumentedException; -import org.opendaylight.protocol.framework.ProtocolMessageFactory; import org.opendaylight.protocol.util.ByteArray; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,9 +32,9 @@ import com.google.common.primitives.UnsignedBytes; /** * The byte array */ -public class BGPMessageFactory implements ProtocolMessageFactory { +public final class BGPMessageFactoryImpl implements BGPMessageFactory { - private final static Logger logger = LoggerFactory.getLogger(BGPMessageFactory.class); + private final static Logger logger = LoggerFactory.getLogger(BGPMessageFactoryImpl.class); final static int LENGTH_FIELD_LENGTH = 2; // bytes @@ -44,9 +44,6 @@ public class BGPMessageFactory implements ProtocolMessageFactory { public final static int COMMON_HEADER_LENGTH = LENGTH_FIELD_LENGTH + TYPE_FIELD_LENGTH + MARKER_LENGTH; - public BGPMessageFactory() { - } - /* * (non-Javadoc) * @see org.opendaylight.protocol.bgp.parser.BGPMessageParser#parse(byte[]) diff --git a/bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/message/BGPUpdateMessageParser.java b/bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/message/BGPUpdateMessageParser.java index 9480e00a26..9289f8a934 100644 --- a/bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/message/BGPUpdateMessageParser.java +++ b/bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/message/BGPUpdateMessageParser.java @@ -21,7 +21,7 @@ import org.opendaylight.protocol.bgp.parser.BGPError; import org.opendaylight.protocol.bgp.parser.BGPParsingException; import org.opendaylight.protocol.bgp.parser.BGPUpdateEvent; import org.opendaylight.protocol.bgp.parser.BGPUpdateSynchronized; -import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory; +import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl; import org.opendaylight.protocol.bgp.parser.impl.BGPUpdateEventBuilder; import org.opendaylight.protocol.bgp.parser.impl.IPv6MP; import org.opendaylight.protocol.bgp.parser.impl.PathAttribute; @@ -90,7 +90,7 @@ public class BGPUpdateMessageParser { byteOffset += TOTAL_PATH_ATTR_LENGTH_SIZE; eventBuilder.setTotalPathAttrLength(totalPathAttrLength); - if (withdrawnRoutesLength + totalPathAttrLength + BGPMessageFactory.COMMON_HEADER_LENGTH > msgLength) + if (withdrawnRoutesLength + totalPathAttrLength + BGPMessageFactoryImpl.COMMON_HEADER_LENGTH > msgLength) throw new BGPDocumentedException("Message length inconsistent with withdrawn router length.", BGPError.MALFORMED_ATTR_LIST); if (withdrawnRoutesLength == 0 && totalPathAttrLength == 0) { diff --git a/bgp/parser-impl/src/test/java/org/opendaylight/protocol/bgp/parser/impl/BGPParserTest.java b/bgp/parser-impl/src/test/java/org/opendaylight/protocol/bgp/parser/impl/BGPParserTest.java index a1737e7766..83b899ce75 100644 --- a/bgp/parser-impl/src/test/java/org/opendaylight/protocol/bgp/parser/impl/BGPParserTest.java +++ b/bgp/parser-impl/src/test/java/org/opendaylight/protocol/bgp/parser/impl/BGPParserTest.java @@ -166,9 +166,9 @@ public class BGPParserTest { @Test public void testGetUpdateMessage1() throws Exception { - final byte[] body = ByteArray.cutBytes(inputBytes.get(0), BGPMessageFactory.COMMON_HEADER_LENGTH); - final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(0), BGPMessageFactory.MARKER_LENGTH, - BGPMessageFactory.LENGTH_FIELD_LENGTH)); + final byte[] body = ByteArray.cutBytes(inputBytes.get(0), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH); + final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(0), BGPMessageFactoryImpl.MARKER_LENGTH, + BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH)); final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength); assertTrue(ret instanceof BGPUpdateMessage); @@ -292,9 +292,9 @@ public class BGPParserTest { */ @Test public void testGetUpdateMessage2() throws Exception { - final byte[] body = ByteArray.cutBytes(inputBytes.get(1), BGPMessageFactory.COMMON_HEADER_LENGTH); - final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(1), BGPMessageFactory.MARKER_LENGTH, - BGPMessageFactory.LENGTH_FIELD_LENGTH)); + final byte[] body = ByteArray.cutBytes(inputBytes.get(1), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH); + final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(1), BGPMessageFactoryImpl.MARKER_LENGTH, + BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH)); final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength); assertTrue(ret instanceof BGPUpdateMessage); @@ -401,9 +401,9 @@ public class BGPParserTest { */ @Test public void testGetUpdateMessage3() throws Exception { - final byte[] body = ByteArray.cutBytes(inputBytes.get(2), BGPMessageFactory.COMMON_HEADER_LENGTH); - final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(2), BGPMessageFactory.MARKER_LENGTH, - BGPMessageFactory.LENGTH_FIELD_LENGTH)); + final byte[] body = ByteArray.cutBytes(inputBytes.get(2), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH); + final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(2), BGPMessageFactoryImpl.MARKER_LENGTH, + BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH)); final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength); assertTrue(ret instanceof BGPUpdateMessage); final BGPUpdateMessage message = (BGPUpdateMessage) ret; @@ -505,9 +505,9 @@ public class BGPParserTest { */ @Test public void testGetUpdateMessage4() throws Exception { - final byte[] body = ByteArray.cutBytes(inputBytes.get(3), BGPMessageFactory.COMMON_HEADER_LENGTH); - final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(3), BGPMessageFactory.MARKER_LENGTH, - BGPMessageFactory.LENGTH_FIELD_LENGTH)); + final byte[] body = ByteArray.cutBytes(inputBytes.get(3), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH); + final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(3), BGPMessageFactoryImpl.MARKER_LENGTH, + BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH)); final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength); assertTrue(ret instanceof BGPUpdateMessage); @@ -587,9 +587,9 @@ public class BGPParserTest { */ @Test public void testGetUpdateMessage5() throws Exception { - final byte[] body = ByteArray.cutBytes(inputBytes.get(4), BGPMessageFactory.COMMON_HEADER_LENGTH); - final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(4), BGPMessageFactory.MARKER_LENGTH, - BGPMessageFactory.LENGTH_FIELD_LENGTH)); + final byte[] body = ByteArray.cutBytes(inputBytes.get(4), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH); + final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(4), BGPMessageFactoryImpl.MARKER_LENGTH, + BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH)); final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength); assertTrue(ret instanceof BGPUpdateMessage); @@ -617,9 +617,9 @@ public class BGPParserTest { */ @Test public void testEORIpv4() throws Exception { - final byte[] body = ByteArray.cutBytes(inputBytes.get(5), BGPMessageFactory.COMMON_HEADER_LENGTH); - final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(5), BGPMessageFactory.MARKER_LENGTH, - BGPMessageFactory.LENGTH_FIELD_LENGTH)); + final byte[] body = ByteArray.cutBytes(inputBytes.get(5), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH); + final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(5), BGPMessageFactoryImpl.MARKER_LENGTH, + BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH)); final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength); assertTrue(ret instanceof BGPUpdateSynchronized); @@ -653,9 +653,9 @@ public class BGPParserTest { */ @Test public void testEORIpv6() throws Exception { - final byte[] body = ByteArray.cutBytes(inputBytes.get(6), BGPMessageFactory.COMMON_HEADER_LENGTH); - final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(6), BGPMessageFactory.MARKER_LENGTH, - BGPMessageFactory.LENGTH_FIELD_LENGTH)); + final byte[] body = ByteArray.cutBytes(inputBytes.get(6), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH); + final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(6), BGPMessageFactoryImpl.MARKER_LENGTH, + BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH)); final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength); assertTrue(ret instanceof BGPUpdateSynchronized); @@ -691,9 +691,9 @@ public class BGPParserTest { */ @Test public void testEORLS() throws Exception { - final byte[] body = ByteArray.cutBytes(inputBytes.get(7), BGPMessageFactory.COMMON_HEADER_LENGTH); - final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(7), BGPMessageFactory.MARKER_LENGTH, - BGPMessageFactory.LENGTH_FIELD_LENGTH)); + final byte[] body = ByteArray.cutBytes(inputBytes.get(7), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH); + final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(7), BGPMessageFactoryImpl.MARKER_LENGTH, + BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH)); final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength); assertTrue(ret instanceof BGPUpdateSynchronized); @@ -865,9 +865,9 @@ public class BGPParserTest { */ @Test public void testBGPLink() throws Exception { - final byte[] body = ByteArray.cutBytes(inputBytes.get(8), BGPMessageFactory.COMMON_HEADER_LENGTH); - final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(8), BGPMessageFactory.MARKER_LENGTH, - BGPMessageFactory.LENGTH_FIELD_LENGTH)); + final byte[] body = ByteArray.cutBytes(inputBytes.get(8), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH); + final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(8), BGPMessageFactoryImpl.MARKER_LENGTH, + BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH)); final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength); assertTrue(ret instanceof BGPUpdateMessage); @@ -997,9 +997,9 @@ public class BGPParserTest { */ @Test public void testBGPNode() throws Exception { - final byte[] body = ByteArray.cutBytes(inputBytes.get(9), BGPMessageFactory.COMMON_HEADER_LENGTH); - final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(9), BGPMessageFactory.MARKER_LENGTH, - BGPMessageFactory.LENGTH_FIELD_LENGTH)); + final byte[] body = ByteArray.cutBytes(inputBytes.get(9), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH); + final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(9), BGPMessageFactoryImpl.MARKER_LENGTH, + BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH)); final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength); assertTrue(ret instanceof BGPUpdateMessage); @@ -1077,7 +1077,7 @@ public class BGPParserTest { */ @Test public void testOpenMessage() throws Exception { - final BGPMessageFactory msgFactory = new BGPMessageFactory(); + final BGPMessageFactoryImpl msgFactory = new BGPMessageFactoryImpl(); final BGPOpenMessage open = (BGPOpenMessage) msgFactory.parse(inputBytes.get(13)).get(0); final Set types = Sets.newHashSet(); for (final BGPParameter param : open.getOptParams()) { diff --git a/bgp/parser-impl/src/test/java/org/opendaylight/protocol/bgp/parser/impl/BGPUpdateMessageParserTest.java b/bgp/parser-impl/src/test/java/org/opendaylight/protocol/bgp/parser/impl/BGPUpdateMessageParserTest.java index 41b8623edf..60462c8237 100644 --- a/bgp/parser-impl/src/test/java/org/opendaylight/protocol/bgp/parser/impl/BGPUpdateMessageParserTest.java +++ b/bgp/parser-impl/src/test/java/org/opendaylight/protocol/bgp/parser/impl/BGPUpdateMessageParserTest.java @@ -31,9 +31,9 @@ public class BGPUpdateMessageParserTest { public void testNodeParsing() throws Exception { final List result = HexDumpBGPFileParser.parseMessages(new File(this.getClass().getResource("/bgp-update-nodes.txt").getFile())); assertEquals(1, result.size()); - final byte[] body = ByteArray.cutBytes(result.get(0), BGPMessageFactory.COMMON_HEADER_LENGTH); - final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(result.get(0), BGPMessageFactory.MARKER_LENGTH, - BGPMessageFactory.LENGTH_FIELD_LENGTH)); + final byte[] body = ByteArray.cutBytes(result.get(0), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH); + final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(result.get(0), BGPMessageFactoryImpl.MARKER_LENGTH, + BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH)); final BGPUpdateEvent event = BGPUpdateMessageParser.parse(body, messageLength); final BGPUpdateMessage updateMessage = (BGPUpdateMessage) event; final Set addedObjects = updateMessage.getAddedObjects(); diff --git a/bgp/parser-impl/src/test/java/org/opendaylight/protocol/bgp/parser/impl/ComplementaryTest.java b/bgp/parser-impl/src/test/java/org/opendaylight/protocol/bgp/parser/impl/ComplementaryTest.java index 29d68d6679..fb3703d162 100644 --- a/bgp/parser-impl/src/test/java/org/opendaylight/protocol/bgp/parser/impl/ComplementaryTest.java +++ b/bgp/parser-impl/src/test/java/org/opendaylight/protocol/bgp/parser/impl/ComplementaryTest.java @@ -190,7 +190,7 @@ public class ComplementaryTest { @Test public void testBGPHeaderParser() throws IOException { - final BGPMessageFactory h = new BGPMessageFactory(); + final BGPMessageFactoryImpl h = new BGPMessageFactoryImpl(); try { h.parse(new byte[] { (byte) 0, (byte) 0 }); fail("Exception should have occured."); @@ -219,7 +219,7 @@ public class ComplementaryTest { @Test public void testMessageParser() throws IOException { - final BGPMessageFactory parser = new BGPMessageFactory(); + final BGPMessageFactoryImpl parser = new BGPMessageFactoryImpl(); String ex = ""; try { parser.put(null); diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java index 698743c9a8..35081a2096 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java @@ -7,38 +7,57 @@ */ package org.opendaylight.protocol.bgp.rib.impl; +import io.netty.channel.socket.SocketChannel; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; import java.net.InetSocketAddress; -import org.opendaylight.protocol.bgp.parser.BGPMessage; +import org.opendaylight.protocol.bgp.parser.BGPMessageFactory; import org.opendaylight.protocol.bgp.parser.BGPSession; import org.opendaylight.protocol.bgp.parser.BGPSessionListener; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences; import org.opendaylight.protocol.framework.AbstractDispatcher; -import org.opendaylight.protocol.framework.ProtocolMessageFactory; import org.opendaylight.protocol.framework.ReconnectStrategy; - -import com.google.common.base.Preconditions; +import org.opendaylight.protocol.framework.SessionListenerFactory; /** * Implementation of BGPDispatcher. */ -public final class BGPDispatcherImpl extends AbstractDispatcher implements BGPDispatcher { +public final class BGPDispatcherImpl extends AbstractDispatcher implements BGPDispatcher { private final Timer timer = new HashedWheelTimer(); - private final ProtocolMessageFactory parser; - public BGPDispatcherImpl(final ProtocolMessageFactory parser) { + private BGPSessionNegotiatorFactory snf; + + private final BGPHandlerFactory hf; + + public BGPDispatcherImpl(final BGPMessageFactory parser) { super(); - this.parser = Preconditions.checkNotNull(parser); + this.hf = new BGPHandlerFactory(parser); } @Override public Future createClient(final InetSocketAddress address, final BGPSessionPreferences preferences, final BGPSessionListener listener, final ReconnectStrategy strategy) { - return createClient(address, listener, new BGPSessionNegotiatorFactory(timer, preferences), parser, strategy); + this.snf = new BGPSessionNegotiatorFactory(this.timer, preferences); + final SessionListenerFactory slf = new SessionListenerFactory() { + + @Override + public BGPSessionListener getSessionListener() { + return listener; + } + }; + return super.createClient(address, strategy, slf); + } + + @Override + public void initializeChannel(final SocketChannel ch, final Promise promise, + final SessionListenerFactory slf) { + ch.pipeline().addLast(this.hf.getDecoders()); + ch.pipeline().addLast("negotiator", this.snf.getSessionNegotiator(slf, ch, promise)); + ch.pipeline().addLast(this.hf.getEncoders()); } } diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPHandlerFactory.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPHandlerFactory.java new file mode 100644 index 0000000000..0e2b9ceed8 --- /dev/null +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPHandlerFactory.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.protocol.bgp.rib.impl; + +import io.netty.channel.ChannelHandler; + +import org.opendaylight.protocol.bgp.parser.BGPMessage; +import org.opendaylight.protocol.bgp.parser.BGPMessageFactory; +import org.opendaylight.protocol.framework.ProtocolHandlerFactory; +import org.opendaylight.protocol.framework.ProtocolMessageDecoder; +import org.opendaylight.protocol.framework.ProtocolMessageEncoder; + +/** + * BGP specific factory for protocol inbound/outbound handlers. + */ +public class BGPHandlerFactory extends ProtocolHandlerFactory { + private final ProtocolMessageEncoder encoder; + + public BGPHandlerFactory(final BGPMessageFactory msgFactory) { + super(msgFactory); + this.encoder = new ProtocolMessageEncoder(this.msgFactory); + } + + @Override + public ChannelHandler[] getEncoders() { + return new ChannelHandler[] { this.encoder }; + } + + @Override + public ChannelHandler[] getDecoders() { + return new ChannelHandler[] { new BGPMessageHeaderDecoder(), new ProtocolMessageDecoder(this.msgFactory) }; + } +} diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPMessageHeaderDecoder.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPMessageHeaderDecoder.java new file mode 100644 index 0000000000..e4144ba9ef --- /dev/null +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPMessageHeaderDecoder.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.protocol.bgp.rib.impl; + +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; + +/** + * @see BGP Message Header + */ +public final class BGPMessageHeaderDecoder extends LengthFieldBasedFrameDecoder { + + private static final int MAX_FRAME_SIZE = 4096; // min 19, max 4096 + + private static final int MARKER_SIZE = 16; + + private static final int LENGTH_SIZE = 2; // the length field represents the length of the whole message including + // the header + + /* + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | | + + + + | | + + + + | Marker | + + + + | | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length | Type | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + */ + + public BGPMessageHeaderDecoder() { + super(MAX_FRAME_SIZE, MARKER_SIZE, LENGTH_SIZE, -MARKER_SIZE - LENGTH_SIZE, 0); + } +} diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java index 9892942748..aa87e2b40b 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java @@ -34,12 +34,14 @@ import org.opendaylight.protocol.framework.AbstractProtocolSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Objects.ToStringHelper; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; -class BGPSessionImpl extends AbstractProtocolSession implements BGPSession { +@VisibleForTesting +public class BGPSessionImpl extends AbstractProtocolSession implements BGPSession { private static final Logger logger = LoggerFactory.getLogger(BGPSessionImpl.class); @@ -77,7 +79,8 @@ class BGPSessionImpl extends AbstractProtocolSession implements BGPS private final Set tableTypes; - BGPSessionImpl(final Timer timer, final BGPSessionListener listener, final Channel channel, final short keepAlive, final BGPOpenMessage remoteOpen) { + BGPSessionImpl(final Timer timer, final BGPSessionListener listener, final Channel channel, final short keepAlive, + final BGPOpenMessage remoteOpen) { this.listener = Preconditions.checkNotNull(listener); this.stateTimer = Preconditions.checkNotNull(timer); this.channel = Preconditions.checkNotNull(channel); @@ -116,10 +119,10 @@ class BGPSessionImpl extends AbstractProtocolSession implements BGPS @Override public synchronized void close() { logger.debug("Closing session: {}", this); - if (!closed) { + if (!this.closed) { this.sendMessage(new BGPNotificationMessage(BGPError.CEASE)); - channel.close(); - closed = true; + this.channel.close(); + this.closed = true; } } @@ -156,7 +159,7 @@ class BGPSessionImpl extends AbstractProtocolSession implements BGPS @Override public synchronized void endOfInput() { - if (!closed) { + if (!this.closed) { this.listener.onSessionDown(this, new IOException("End of input detected. Close the session.")); } } @@ -173,8 +176,8 @@ class BGPSessionImpl extends AbstractProtocolSession implements BGPS private synchronized void closeWithoutMessage() { logger.debug("Closing session: {}", this); - channel.close(); - closed = true; + this.channel.close(); + this.closed = true; } /** @@ -200,7 +203,7 @@ class BGPSessionImpl extends AbstractProtocolSession implements BGPS final long nextHold = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(HOLD_TIMER_VALUE); - if (!closed) { + if (!this.closed) { if (ct >= nextHold) { logger.debug("HoldTimer expired. " + new Date()); this.terminate(BGPError.HOLD_TIMER_EXPIRED); @@ -226,7 +229,7 @@ class BGPSessionImpl extends AbstractProtocolSession implements BGPS long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive); - if (!closed) { + if (!this.closed) { if (ct >= nextKeepalive) { this.sendMessage(new BGPKeepAliveMessage()); nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive); @@ -246,8 +249,8 @@ class BGPSessionImpl extends AbstractProtocolSession implements BGPS } protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { - toStringHelper.add("channel", channel); - toStringHelper.add("closed", closed); + toStringHelper.add("channel", this.channel); + toStringHelper.add("closed", this.closed); return toStringHelper; } @@ -258,6 +261,6 @@ class BGPSessionImpl extends AbstractProtocolSession implements BGPS @Override protected void sessionUp() { - listener.onSessionUp(this); + this.listener.onSessionUp(this); } } diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionNegotiator.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionNegotiator.java index 970729429a..718a0d4b8f 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionNegotiator.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionNegotiator.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -final class BGPSessionNegotiator extends AbstractSessionNegotiator { +public final class BGPSessionNegotiator extends AbstractSessionNegotiator { // 4 minutes recommended in http://tools.ietf.org/html/rfc4271#section-8.2.2 private static final int INITIAL_HOLDTIMER = 4; @@ -39,13 +39,12 @@ final class BGPSessionNegotiator extends AbstractSessionNegotiator promise, final Channel channel, + public BGPSessionNegotiator(final Timer timer, final Promise promise, final Channel channel, final BGPSessionPreferences initialPrefs, final BGPSessionListener listener) { super(promise, channel); this.listener = Preconditions.checkNotNull(listener); @@ -72,18 +71,18 @@ final class BGPSessionNegotiator extends AbstractSessionNegotiator emptyList())).when(this.prop).getProposal(); this.bgp = new BGPImpl(this.disp, new InetSocketAddress(InetAddress.getLoopbackAddress(), 2000), this.prop); - final BGPListenerRegistration reg = this.bgp.registerUpdateListener(new SimpleSessionListener(), new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000)); + final BGPListenerRegistration reg = this.bgp.registerUpdateListener(new SimpleSessionListener(), + new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000)); assertEquals(SimpleSessionListener.class, reg.getListener().getClass()); } diff --git a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/ParserTest.java b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/ParserTest.java index be851629c0..4d083faa53 100644 --- a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/ParserTest.java +++ b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/ParserTest.java @@ -28,7 +28,7 @@ import org.opendaylight.protocol.bgp.parser.BGPDocumentedException; import org.opendaylight.protocol.bgp.parser.BGPError; import org.opendaylight.protocol.bgp.parser.BGPMessage; import org.opendaylight.protocol.bgp.parser.BGPParameter; -import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory; +import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl; import org.opendaylight.protocol.bgp.parser.message.BGPKeepAliveMessage; import org.opendaylight.protocol.bgp.parser.message.BGPNotificationMessage; import org.opendaylight.protocol.bgp.parser.message.BGPOpenMessage; @@ -59,7 +59,7 @@ public class ParserTest { (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0x00, (byte) 0x17, (byte) 0x03, (byte) 0x02, (byte) 0x04, (byte) 0x04, (byte) 0x09 }; - final ProtocolMessageFactory factory = new BGPMessageFactory(); + final ProtocolMessageFactory factory = new BGPMessageFactoryImpl(); @Test public void testHeaderErrors() throws DeserializerException, DocumentedException { @@ -71,7 +71,7 @@ public class ParserTest { fail("Exception should have occcured."); } catch (final IllegalArgumentException e) { assertEquals("Too few bytes in passed array. Passed: " + wrong.length + ". Expected: >= " - + BGPMessageFactory.COMMON_HEADER_LENGTH + ".", e.getMessage()); + + BGPMessageFactoryImpl.COMMON_HEADER_LENGTH + ".", e.getMessage()); return; } fail(); diff --git a/bgp/rib-mock/src/main/java/org/opendaylight/protocol/bgp/rib/mock/BGPMock.java b/bgp/rib-mock/src/main/java/org/opendaylight/protocol/bgp/rib/mock/BGPMock.java index 135df29de1..d1122d5245 100644 --- a/bgp/rib-mock/src/main/java/org/opendaylight/protocol/bgp/rib/mock/BGPMock.java +++ b/bgp/rib-mock/src/main/java/org/opendaylight/protocol/bgp/rib/mock/BGPMock.java @@ -18,7 +18,7 @@ import javax.annotation.concurrent.ThreadSafe; import org.opendaylight.protocol.bgp.parser.BGPError; import org.opendaylight.protocol.bgp.parser.BGPMessage; import org.opendaylight.protocol.bgp.parser.BGPSessionListener; -import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory; +import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl; import org.opendaylight.protocol.bgp.parser.message.BGPNotificationMessage; import org.opendaylight.protocol.bgp.rib.impl.BGP; import org.opendaylight.protocol.concepts.ListenerRegistration; @@ -55,7 +55,7 @@ public final class BGPMock implements BGP, Closeable { private List parsePrevious(final List msgs) { final List messages = Lists.newArrayList(); - final ProtocolMessageFactory parser = new BGPMessageFactory(); + final ProtocolMessageFactory parser = new BGPMessageFactoryImpl(); try { for (final byte[] b : msgs) { diff --git a/bgp/testtool/src/main/java/org/opendaylight/protocol/bgp/testtool/Main.java b/bgp/testtool/src/main/java/org/opendaylight/protocol/bgp/testtool/Main.java index 1d90e240c0..44ebe77f1c 100644 --- a/bgp/testtool/src/main/java/org/opendaylight/protocol/bgp/testtool/Main.java +++ b/bgp/testtool/src/main/java/org/opendaylight/protocol/bgp/testtool/Main.java @@ -14,7 +14,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import org.opendaylight.protocol.bgp.parser.BGPSessionListener; -import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory; +import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl; import org.opendaylight.protocol.bgp.rib.impl.BGPDispatcherImpl; import org.opendaylight.protocol.bgp.rib.impl.BGPSessionProposalImpl; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences; @@ -49,7 +49,7 @@ public class Main { BGPDispatcherImpl dispatcher; public Main() throws IOException { - this.dispatcher = new BGPDispatcherImpl(new BGPMessageFactory()); + this.dispatcher = new BGPDispatcherImpl(new BGPMessageFactoryImpl()); } public static void main(final String[] args) throws NumberFormatException, IOException { diff --git a/bgp/testtool/src/test/java/org/opendaylight/protocol/bgp/testtool/BGPSpeakerMock.java b/bgp/testtool/src/test/java/org/opendaylight/protocol/bgp/testtool/BGPSpeakerMock.java index d0c021def3..105f130593 100644 --- a/bgp/testtool/src/test/java/org/opendaylight/protocol/bgp/testtool/BGPSpeakerMock.java +++ b/bgp/testtool/src/test/java/org/opendaylight/protocol/bgp/testtool/BGPSpeakerMock.java @@ -7,37 +7,69 @@ */ package org.opendaylight.protocol.bgp.testtool; +import io.netty.channel.socket.SocketChannel; import io.netty.util.HashedWheelTimer; +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.Promise; import java.io.IOException; import java.net.InetSocketAddress; +import org.opendaylight.protocol.bgp.parser.BGPMessage; import org.opendaylight.protocol.bgp.parser.BGPSessionListener; -import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory; +import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl; +import org.opendaylight.protocol.bgp.rib.impl.BGPHandlerFactory; +import org.opendaylight.protocol.bgp.rib.impl.BGPSessionImpl; import org.opendaylight.protocol.bgp.rib.impl.BGPSessionNegotiatorFactory; import org.opendaylight.protocol.bgp.rib.impl.BGPSessionProposalImpl; import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences; import org.opendaylight.protocol.concepts.ASNumber; import org.opendaylight.protocol.concepts.IPv4; import org.opendaylight.protocol.framework.AbstractDispatcher; +import org.opendaylight.protocol.framework.ProtocolHandlerFactory; +import org.opendaylight.protocol.framework.ProtocolMessage; +import org.opendaylight.protocol.framework.ProtocolSession; +import org.opendaylight.protocol.framework.SessionListener; import org.opendaylight.protocol.framework.SessionListenerFactory; +import org.opendaylight.protocol.framework.SessionNegotiatorFactory; -public class BGPSpeakerMock extends AbstractDispatcher { +import com.google.common.base.Preconditions; - public static void main(final String[] args) throws IOException { +public class BGPSpeakerMock, L extends SessionListener> extends + AbstractDispatcher { - final BGPSpeakerMock m = new BGPSpeakerMock(); + private final SessionNegotiatorFactory negotiatorFactory; + private final ProtocolHandlerFactory factory; - final BGPSessionPreferences prefs = new BGPSessionProposalImpl((short) 90, new ASNumber(25), IPv4.FAMILY.addressForString("127.0.0.2")).getProposal(); + public BGPSpeakerMock(final SessionNegotiatorFactory negotiatorFactory, final ProtocolHandlerFactory factory, + final DefaultPromise defaultPromise) { + this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory); + this.factory = Preconditions.checkNotNull(factory); + } + + @Override + public void initializeChannel(final SocketChannel ch, final Promise promise, final SessionListenerFactory listenerFactory) { + ch.pipeline().addLast(this.factory.getDecoders()); + ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise)); + ch.pipeline().addLast(this.factory.getEncoders()); + } + + public static void main(final String[] args) throws IOException { final SessionListenerFactory f = new SessionListenerFactory() { @Override public BGPSessionListener getSessionListener() { - return new SpeakerSessionListener(m); + return new SpeakerSessionListener(); } }; - m.createServer(new InetSocketAddress("127.0.0.2", 12345), f, - new BGPSessionNegotiatorFactory(new HashedWheelTimer(), prefs), new BGPMessageFactory()); + final BGPSessionPreferences prefs = new BGPSessionProposalImpl((short) 90, new ASNumber(25), IPv4.FAMILY.addressForString("127.0.0.2")).getProposal(); + + final SessionNegotiatorFactory snf = new BGPSessionNegotiatorFactory(new HashedWheelTimer(), prefs); + + final BGPSpeakerMock mock = new BGPSpeakerMock(snf, new BGPHandlerFactory(new BGPMessageFactoryImpl()), new DefaultPromise(GlobalEventExecutor.INSTANCE)); + + mock.createServer(new InetSocketAddress("127.0.0.2", 12345), f); } } diff --git a/bgp/testtool/src/test/java/org/opendaylight/protocol/bgp/testtool/SpeakerSessionListener.java b/bgp/testtool/src/test/java/org/opendaylight/protocol/bgp/testtool/SpeakerSessionListener.java index 07b401efe0..22852c049d 100644 --- a/bgp/testtool/src/test/java/org/opendaylight/protocol/bgp/testtool/SpeakerSessionListener.java +++ b/bgp/testtool/src/test/java/org/opendaylight/protocol/bgp/testtool/SpeakerSessionListener.java @@ -11,19 +11,12 @@ import org.opendaylight.protocol.bgp.parser.BGPMessage; import org.opendaylight.protocol.bgp.parser.BGPSession; import org.opendaylight.protocol.bgp.parser.BGPSessionListener; import org.opendaylight.protocol.bgp.parser.BGPTerminationReason; -import org.opendaylight.protocol.framework.AbstractDispatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SpeakerSessionListener implements BGPSessionListener { private static final Logger logger = LoggerFactory.getLogger(SpeakerSessionListener.class); - AbstractDispatcher d; - - SpeakerSessionListener(final AbstractDispatcher d) { - this.d = d; - } - @Override public void onSessionUp(final BGPSession session) { logger.info("Server: Session is up."); diff --git a/framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java b/framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java index 2179671fde..98021be1d7 100644 --- a/framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java +++ b/framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java @@ -7,15 +7,20 @@ */ package org.opendaylight.protocol.framework; +import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.Promise; import java.io.Closeable; import java.net.InetSocketAddress; @@ -23,13 +28,13 @@ import java.net.InetSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; +import com.google.common.annotations.VisibleForTesting; /** * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the * start method that will handle sockets in different thread. */ -public abstract class AbstractDispatcher implements Closeable { +public abstract class AbstractDispatcher, L extends SessionListener> implements Closeable { private static final Logger logger = LoggerFactory.getLogger(AbstractDispatcher.class); @@ -43,25 +48,35 @@ public abstract class AbstractDispatcher implements Closeable { this.workerGroup = new NioEventLoopGroup(); } + /** + * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this + * method needs to be implemented in protocol specific Dispatchers. + * + * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory} + * @param promise to be passed to {@link SessionNegotiatorFactory} + */ + public abstract void initializeChannel(SocketChannel channel, Promise promise, final SessionListenerFactory lfactory); + /** * Creates server. Each server needs factories to pass their instances to client sessions. * * @param address address to which the server should be bound - * @param listenerFactory factory for creating protocol listeners, passed to the negotiator - * @param negotiatorFactory protocol session negotiator factory - * @param messageFactory message parser * * @return ChannelFuture representing the binding process */ - protected , L extends SessionListener> ChannelFuture createServer( - final InetSocketAddress address, final SessionListenerFactory listenerFactory, - final SessionNegotiatorFactory negotiatorFactory, final ProtocolMessageFactory messageFactory) { + @VisibleForTesting + public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory lfactory) { final ServerBootstrap b = new ServerBootstrap(); b.group(this.bossGroup, this.workerGroup); b.channel(NioServerSocketChannel.class); b.option(ChannelOption.SO_BACKLOG, 128); - b.childHandler(new ChannelInitializerImpl(negotiatorFactory, - listenerFactory, new ProtocolHandlerFactory(messageFactory), new DefaultPromise(GlobalEventExecutor.INSTANCE))); + b.childHandler(new ChannelInitializer() { + + @Override + protected void initChannel(final SocketChannel ch) throws Exception { + initializeChannel(ch, new DefaultPromise(GlobalEventExecutor.INSTANCE), lfactory); + } + }); b.childOption(ChannelOption.SO_KEEPALIVE, true); // Bind and start to accept incoming connections. @@ -75,30 +90,24 @@ public abstract class AbstractDispatcher implements Closeable { * Creates a client. * * @param address remote address - * @param listener session listener - * @param negotiatorFactory session negotiator factory - * @param messageFactory message parser * @param connectStrategy Reconnection strategy to be used when initial connection fails * - * @return Future representing the connection process. Its result represents - * the combined success of TCP connection as well as session negotiation. + * @return Future representing the connection process. Its result represents the combined success of TCP connection + * as well as session negotiation. */ - protected , L extends SessionListener> Future createClient( - final InetSocketAddress address, final L listener, final SessionNegotiatorFactory negotiatorFactory, - final ProtocolMessageFactory messageFactory, final ReconnectStrategy strategy) { - final ProtocolSessionPromise p = new ProtocolSessionPromise(workerGroup, address, negotiatorFactory, - new SessionListenerFactory() { - private boolean created = false; - - @Override - public synchronized L getSessionListener() { - Preconditions.checkState(created == false); - created = true; - return listener; - } - - }, new ProtocolHandlerFactory(messageFactory), strategy); - + @VisibleForTesting + public Future createClient(final InetSocketAddress address, final ReconnectStrategy strategy, + final SessionListenerFactory lfactory) { + final Bootstrap b = new Bootstrap(); + final ProtocolSessionPromise p = new ProtocolSessionPromise(address, strategy, b); + b.group(this.workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler( + new ChannelInitializer() { + + @Override + protected void initChannel(final SocketChannel ch) throws Exception { + initializeChannel(ch, p, lfactory); + } + }); p.connect(); logger.debug("Client created."); return p; @@ -108,25 +117,16 @@ public abstract class AbstractDispatcher implements Closeable { * Creates a client. * * @param address remote address - * @param listener session listener - * @param negotiatorFactory session negotiator factory - * @param messageFactory message parser * @param connectStrategyFactory Factory for creating reconnection strategy to be used when initial connection fails * @param reestablishStrategy Reconnection strategy to be used when the already-established session fails * - * @return Future representing the reconnection task. It will report - * completion based on reestablishStrategy, e.g. success if - * it indicates no further attempts should be made and failure - * if it reports an error + * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g. + * success if it indicates no further attempts should be made and failure if it reports an error */ - protected , L extends SessionListener> Future createReconnectingClient( - final InetSocketAddress address, final L listener, final SessionNegotiatorFactory negotiatorFactory, - final ProtocolMessageFactory messageFactory, final ReconnectStrategyFactory connectStrategyFactory, - final ReconnectStrategy reestablishStrategy) { - - final ReconnectPromise p = new ReconnectPromise(this, address, listener, negotiatorFactory, - messageFactory, connectStrategyFactory, reestablishStrategy); + protected Future createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory, + final ReconnectStrategy reestablishStrategy, final SessionListenerFactory lfactory) { + final ReconnectPromise p = new ReconnectPromise(this, address, connectStrategyFactory, reestablishStrategy, lfactory); p.connect(); return p; diff --git a/framework/src/main/java/org/opendaylight/protocol/framework/ChannelInitializerImpl.java b/framework/src/main/java/org/opendaylight/protocol/framework/ChannelInitializerImpl.java deleted file mode 100644 index 20ea3a5cbd..0000000000 --- a/framework/src/main/java/org/opendaylight/protocol/framework/ChannelInitializerImpl.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.protocol.framework; - - -import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; -import io.netty.util.concurrent.Promise; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; - -final class ChannelInitializerImpl, L extends SessionListener> extends ChannelInitializer { - private static final Logger logger = LoggerFactory.getLogger(ChannelInitializerImpl.class); - private final SessionNegotiatorFactory negotiatorFactory; - private final SessionListenerFactory listenerFactory; - private final ProtocolHandlerFactory factory; - private final Promise promise; - - ChannelInitializerImpl(final SessionNegotiatorFactory negotiatorFactory, final SessionListenerFactory listenerFactory, - final ProtocolHandlerFactory factory, final Promise promise) { - this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory); - this.listenerFactory = Preconditions.checkNotNull(listenerFactory); - this.promise = Preconditions.checkNotNull(promise); - this.factory = Preconditions.checkNotNull(factory); - } - - @Override - protected void initChannel(final SocketChannel ch) { - logger.debug("Initializing channel {}", ch); - ch.pipeline().addLast("decoder", factory.getDecoder()); - ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise)); - ch.pipeline().addLast("encoder", factory.getEncoder()); - logger.debug("Channel {} initialized", ch); - } -} \ No newline at end of file diff --git a/framework/src/main/java/org/opendaylight/protocol/framework/ProtocolHandlerFactory.java b/framework/src/main/java/org/opendaylight/protocol/framework/ProtocolHandlerFactory.java index 2f8364dd97..381436dddc 100644 --- a/framework/src/main/java/org/opendaylight/protocol/framework/ProtocolHandlerFactory.java +++ b/framework/src/main/java/org/opendaylight/protocol/framework/ProtocolHandlerFactory.java @@ -13,18 +13,18 @@ import com.google.common.base.Preconditions; public class ProtocolHandlerFactory { private final ProtocolMessageEncoder encoder; - final ProtocolMessageFactory msgFactory; + protected final ProtocolMessageFactory msgFactory; public ProtocolHandlerFactory(final ProtocolMessageFactory msgFactory) { this.msgFactory = Preconditions.checkNotNull(msgFactory); this.encoder = new ProtocolMessageEncoder(msgFactory); } - public ChannelHandler getEncoder() { - return this.encoder; + public ChannelHandler[] getEncoders() { + return new ChannelHandler[] { this.encoder }; } - public ChannelHandler getDecoder() { - return new ProtocolMessageDecoder(msgFactory); + public ChannelHandler[] getDecoders() { + return new ChannelHandler[] { new ProtocolMessageDecoder(this.msgFactory) }; } } diff --git a/framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageDecoder.java b/framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageDecoder.java index 4ad53dcdd5..a0ec795317 100644 --- a/framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageDecoder.java +++ b/framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageDecoder.java @@ -17,7 +17,7 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class ProtocolMessageDecoder extends ByteToMessageDecoder { +public final class ProtocolMessageDecoder extends ByteToMessageDecoder { private final static Logger logger = LoggerFactory.getLogger(ProtocolMessageDecoder.class); diff --git a/framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageEncoder.java b/framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageEncoder.java index c54c780185..ce19c0d930 100644 --- a/framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageEncoder.java +++ b/framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageEncoder.java @@ -16,7 +16,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Sharable -final class ProtocolMessageEncoder extends MessageToByteEncoder { +public final class ProtocolMessageEncoder extends MessageToByteEncoder { private final static Logger logger = LoggerFactory.getLogger(ProtocolMessageEncoder.class); @@ -29,6 +29,7 @@ final class ProtocolMessageEncoder extends MessageToB @Override protected void encode(final ChannelHandlerContext ctx, final Object msg, final ByteBuf out) throws Exception { logger.debug("Sent to encode : {}", msg); - out.writeBytes(this.factory.put((T)msg)); + final byte[] bytes = this.factory.put((T) msg); + out.writeBytes(bytes); } } diff --git a/framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java b/framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java index b32859b018..abc253ef78 100644 --- a/framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java +++ b/framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java @@ -11,8 +11,6 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -29,9 +27,8 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; @ThreadSafe -final class ProtocolSessionPromise, L extends SessionListener> extends DefaultPromise { +final class ProtocolSessionPromise> extends DefaultPromise { private static final Logger logger = LoggerFactory.getLogger(ProtocolSessionPromise.class); - private final ChannelInitializerImpl init; private final ReconnectStrategy strategy; private final InetSocketAddress address; private final Bootstrap b; @@ -39,27 +36,22 @@ final class ProtocolSessionPromise pending; - ProtocolSessionPromise(final EventLoopGroup workerGroup, final InetSocketAddress address, final SessionNegotiatorFactory negotiatorFactory, - final SessionListenerFactory listenerFactory, - final ProtocolHandlerFactory protocolFactory, final ReconnectStrategy strategy) { + ProtocolSessionPromise(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap b) { this.strategy = Preconditions.checkNotNull(strategy); this.address = Preconditions.checkNotNull(address); - - init = new ChannelInitializerImpl(negotiatorFactory, listenerFactory, protocolFactory, this); - b = new Bootstrap(); - b.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(init); + this.b = Preconditions.checkNotNull(b); } synchronized void connect() { final Object lock = this; try { - final int timeout = strategy.getConnectTimeout(); + final int timeout = this.strategy.getConnectTimeout(); logger.debug("Promise {} attempting connect for {}ms", lock, timeout); - b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout); - pending = b.connect(address).addListener(new ChannelFutureListener() { + this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout); + this.pending = this.b.connect(this.address).addListener(new ChannelFutureListener() { @Override public void operationComplete(final ChannelFuture cf) throws Exception { synchronized (lock) { @@ -67,7 +59,7 @@ final class ProtocolSessionPromise rf = strategy.scheduleReconnect(cf.cause()); + final Future rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause()); rf.addListener(new FutureListener() { @Override public void operationComplete(final Future sf) { synchronized (lock) { // Triggered when a connection attempt is to be made. - Preconditions.checkState(pending == sf); + Preconditions.checkState(ProtocolSessionPromise.this.pending == sf); /* * The promise we gave out could have been cancelled, @@ -114,14 +106,14 @@ final class ProtocolSessionPromise setSuccess(final S result) { logger.debug("Promise {} completed", this); - strategy.reconnectSuccessful(); + this.strategy.reconnectSuccessful(); return super.setSuccess(result); } -} \ No newline at end of file +} diff --git a/framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java b/framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java index a37572cb20..508737866a 100644 --- a/framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java +++ b/framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java @@ -15,34 +15,27 @@ import java.net.InetSocketAddress; import com.google.common.base.Preconditions; -final class ReconnectPromise, L extends SessionListener> extends DefaultPromise { - private final AbstractDispatcher dispatcher; +final class ReconnectPromise, L extends SessionListener> extends DefaultPromise { + private final AbstractDispatcher dispatcher; private final InetSocketAddress address; - private final L listener; - private final SessionNegotiatorFactory negotiatorFactory; - private final ProtocolMessageFactory messageFactory; private final ReconnectStrategyFactory strategyFactory; private final ReconnectStrategy strategy; private Future pending; + private final SessionListenerFactory lfactory; - public ReconnectPromise(final AbstractDispatcher dispatcher, - final InetSocketAddress address, final L listener, - final SessionNegotiatorFactory negotiatorFactory, - final ProtocolMessageFactory messageFactory, - final ReconnectStrategyFactory connectStrategyFactory, - final ReconnectStrategy reestablishStrategy) { + public ReconnectPromise(final AbstractDispatcher dispatcher, final InetSocketAddress address, + final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy, + final SessionListenerFactory lfactory) { this.dispatcher = Preconditions.checkNotNull(dispatcher); this.address = Preconditions.checkNotNull(address); - this.listener = Preconditions.checkNotNull(listener); - this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory); - this.messageFactory = Preconditions.checkNotNull(messageFactory); this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory); this.strategy = Preconditions.checkNotNull(reestablishStrategy); + this.lfactory = Preconditions.checkNotNull(lfactory); } synchronized void connect() { - final ReconnectStrategy cs = strategyFactory.createReconnectStrategy(); + final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy(); final ReconnectStrategy rs = new ReconnectStrategy() { @Override public Future scheduleReconnect(final Throwable cause) { @@ -57,7 +50,7 @@ final class ReconnectPromise cf = dispatcher.createClient(address, - listener, negotiatorFactory, messageFactory, rs); + final Future cf = this.dispatcher.createClient(this.address, rs, this.lfactory); final Object lock = this; - pending = cf; + this.pending = cf; cf.addListener(new FutureListener() { @Override public void operationComplete(final Future future) { synchronized (lock) { if (!future.isSuccess()) { - final Future rf = strategy.scheduleReconnect(cf.cause()); - pending = rf; + final Future rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause()); + ReconnectPromise.this.pending = rf; rf.addListener(new FutureListener() { @Override @@ -110,7 +102,7 @@ final class ReconnectPromise clientDispatcher, dispatcher; final SimpleSessionListener pce = new SimpleSessionListener(); @@ -40,63 +40,93 @@ public class ServerTest { @Test public void testConnectionEstablished() throws Exception { - this.dispatcher = new AbstractDispatcher() { }; - final Promise p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE); + this.dispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory() { + + @Override + public SessionNegotiator getSessionNegotiator(final SessionListenerFactory factory, + final Channel channel, final Promise promise) { + p.setSuccess(true); + return new SimpleSessionNegotiator(promise, channel); + } + }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise(GlobalEventExecutor.INSTANCE)); + this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory() { @Override public SimpleSessionListener getSessionListener() { return new SimpleSessionListener(); } - }, new SessionNegotiatorFactory() { + }); + this.server.get(); + + this.clientDispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory() { @Override public SessionNegotiator getSessionNegotiator(final SessionListenerFactory factory, final Channel channel, final Promise promise) { - p.setSuccess(true); return new SimpleSessionNegotiator(promise, channel); } - }, new MessageFactory()); + }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise(GlobalEventExecutor.INSTANCE)); - this.server.get(); - - this.clientDispatcher = new AbstractDispatcher() { }; - - this.session = this.clientDispatcher.createClient(this.serverAddress, new SimpleSessionListener(), - new SessionNegotiatorFactory() { + this.session = (SimpleSession) this.clientDispatcher.createClient(this.serverAddress, + new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory() { @Override - public SessionNegotiator getSessionNegotiator( - final SessionListenerFactory factory, final Channel channel, - final Promise promise) { - return new SimpleSessionNegotiator(promise, channel); + public SimpleSessionListener getSessionListener() { + return new SimpleSessionListener(); } - }, new MessageFactory(), new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000)).get(); + }).get(6, TimeUnit.SECONDS); assertEquals(true, p.get(3, TimeUnit.SECONDS)); } - public void testConnectionFailed() throws IOException, InterruptedException { - this.dispatcher = new AbstractDispatcher() { }; - this.clientDispatcher = new AbstractDispatcher() { }; - final SimpleSessionListener listener = new SimpleSessionListener(); + @Test + public void testConnectionFailed() throws IOException, InterruptedException, ExecutionException, TimeoutException { + final Promise p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE); - try { - this.clientDispatcher.createClient(this.serverAddress, listener, - new SessionNegotiatorFactory() { - @Override - public SessionNegotiator getSessionNegotiator( - final SessionListenerFactory factory, final Channel channel, - final Promise promise) { - return null; - } - }, new MessageFactory(), new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000)).get(); - - fail("Connection succeeded unexpectedly"); - } catch (final ExecutionException e) { - assertTrue(listener.failed); - assertTrue(e.getCause() instanceof ConnectException); - } + this.dispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory() { + + @Override + public SessionNegotiator getSessionNegotiator(final SessionListenerFactory factory, + final Channel channel, final Promise promise) { + p.setSuccess(true); + return new SimpleSessionNegotiator(promise, channel); + } + }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise(GlobalEventExecutor.INSTANCE)); + + this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory() { + @Override + public SimpleSessionListener getSessionListener() { + return new SimpleSessionListener(); + } + }); + + this.server.get(); + + this.clientDispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory() { + @Override + public SessionNegotiator getSessionNegotiator(final SessionListenerFactory factory, + final Channel channel, final Promise promise) { + return new SimpleSessionNegotiator(promise, channel); + } + }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise(GlobalEventExecutor.INSTANCE)); + + this.session = (SimpleSession) this.clientDispatcher.createClient(this.serverAddress, + new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory() { + @Override + public SimpleSessionListener getSessionListener() { + return new SimpleSessionListener(); + } + }).get(6, TimeUnit.SECONDS); + + final Future session = this.clientDispatcher.createClient(this.serverAddress, + new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory() { + @Override + public SimpleSessionListener getSessionListener() { + return new SimpleSessionListener(); + } + }); + assertFalse(session.isSuccess()); } @After diff --git a/framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java b/framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java new file mode 100644 index 0000000000..0925f65c8c --- /dev/null +++ b/framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java @@ -0,0 +1,32 @@ +package org.opendaylight.protocol.framework; + +import io.netty.channel.socket.SocketChannel; +import io.netty.util.concurrent.Promise; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class SimpleDispatcher, L extends SessionListener> extends + AbstractDispatcher { + + private static final Logger logger = LoggerFactory.getLogger(SimpleDispatcher.class); + + private final SessionNegotiatorFactory negotiatorFactory; + private final ProtocolHandlerFactory factory; + + public SimpleDispatcher(final SessionNegotiatorFactory negotiatorFactory, final ProtocolHandlerFactory factory, + final Promise promise) { + this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory); + this.factory = Preconditions.checkNotNull(factory); + } + + @Override + public void initializeChannel(final SocketChannel ch, final Promise promise, final SessionListenerFactory lfactory) { + ch.pipeline().addLast(this.factory.getDecoders()); + ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(lfactory, ch, promise)); + ch.pipeline().addLast(this.factory.getEncoders()); + logger.debug("initialization completed for channel {}", ch); + } +} diff --git a/framework/src/test/java/org/opendaylight/protocol/framework/SimpleSessionListener.java b/framework/src/test/java/org/opendaylight/protocol/framework/SimpleSessionListener.java index 127279d8ee..40a6509cf2 100644 --- a/framework/src/test/java/org/opendaylight/protocol/framework/SimpleSessionListener.java +++ b/framework/src/test/java/org/opendaylight/protocol/framework/SimpleSessionListener.java @@ -31,13 +31,6 @@ public class SimpleSessionListener implements SessionListener session, final Exception e) { - logger.debug("Connection Failed: {}", e.getMessage(), e); - this.failed = true; - this.notifyAll(); - session.close(); - } - @Override public void onSessionUp(final SimpleSession session) { this.up = true; @@ -45,12 +38,13 @@ public class SimpleSessionListener implements SessionListener listenerFactory); - - /** - * Creates a client. Needs to be started via the start method. - * @param connection PCEP connection settings - * @param strategy Reconnection strategy to be used for TCP-level connection - * @throws IOException if some IO error occurred - */ - public Future createClient(InetSocketAddress address, final PCEPSessionListener listener, final ReconnectStrategy strategy); } - diff --git a/pcep/api/src/main/java/org/opendaylight/protocol/pcep/PCEPMessage.java b/pcep/api/src/main/java/org/opendaylight/protocol/pcep/PCEPMessage.java index f11a76c6f1..d5b674c92f 100644 --- a/pcep/api/src/main/java/org/opendaylight/protocol/pcep/PCEPMessage.java +++ b/pcep/api/src/main/java/org/opendaylight/protocol/pcep/PCEPMessage.java @@ -12,9 +12,9 @@ import java.util.List; import org.opendaylight.protocol.framework.ProtocolMessage; /** - * Basic structure for PCEP Message. Cannot be instantiated directly. Current - * PCEP version is 1. Each message contains a list of PCEP objects. - * + * Basic structure for PCEP Message. Cannot be instantiated directly. Current PCEP version is 1. Each message contains a + * list of PCEP objects. + * */ public abstract class PCEPMessage implements ProtocolMessage { @@ -28,18 +28,20 @@ public abstract class PCEPMessage implements ProtocolMessage { private final List objects; /** - * Constructor is protected to prevent direct instantiation, but to allow to - * call this constructor via super(). - * + * Constructor is protected to prevent direct instantiation, but to allow to call this constructor via super(). + * * @param objects */ - protected PCEPMessage(List objects) { + protected PCEPMessage(final List objects) { + if (objects.contains(null)) + throw new IllegalArgumentException("Object list contains null element at offset " + objects.indexOf(null)); + this.objects = objects; } /** * Returns list of all objects that the message contains - * + * * @return list of all objects that the message contains */ public List getAllObjects() { @@ -55,7 +57,7 @@ public abstract class PCEPMessage implements ProtocolMessage { } @Override - public boolean equals(Object obj) { + public boolean equals(final Object obj) { if (this == obj) return true; if (obj == null) diff --git a/pcep/api/src/main/java/org/opendaylight/protocol/pcep/object/CompositeInstantiationObject.java b/pcep/api/src/main/java/org/opendaylight/protocol/pcep/object/CompositeInstantiationObject.java index e2eb50e84f..83c9615ea5 100644 --- a/pcep/api/src/main/java/org/opendaylight/protocol/pcep/object/CompositeInstantiationObject.java +++ b/pcep/api/src/main/java/org/opendaylight/protocol/pcep/object/CompositeInstantiationObject.java @@ -12,9 +12,11 @@ import java.util.List; import org.opendaylight.protocol.pcep.PCEPObject; +import com.google.common.base.Preconditions; + /** * Structure that combines set of related objects. - * + * * @see PCCreate Message */ public class CompositeInstantiationObject { @@ -31,33 +33,27 @@ public class CompositeInstantiationObject { /** * Constructs basic composite object only with mandatory objects. - * - * @param endPoints - * PCEPEndPointsObject. Can't be null. - * @param lspa - * PCEPLspaObject. Can't be null. + * + * @param endPoints PCEPEndPointsObject. Can't be null. + * @param lspa PCEPLspaObject. Can't be null. */ - public CompositeInstantiationObject(PCEPEndPointsObject endPoints, PCEPLspaObject lspa) { + public CompositeInstantiationObject(final PCEPEndPointsObject endPoints, final PCEPLspaObject lspa) { this(endPoints, lspa, null, null, null); } /** * Constructs composite object with optional objects. - * - * @param endPoints - * PCEPEndPointsObject. Can't be null. - * @param lspa - * PCEPLspaObject. Can't be null. - * @param ero - * PCEPExplicitRouteObject - * @param bandwidth - * PCEPRequestedPathBandwidthObject - * @param metrics - * List + * + * @param endPoints PCEPEndPointsObject. Can't be null. + * @param lspa PCEPLspaObject. Can't be null. + * @param ero PCEPExplicitRouteObject + * @param bandwidth PCEPRequestedPathBandwidthObject + * @param metrics List */ - public CompositeInstantiationObject(PCEPEndPointsObject endPoints, PCEPLspaObject lspa, PCEPExplicitRouteObject ero, PCEPRequestedPathBandwidthObject bandwidth, List metrics) { - this.endPoints = endPoints; - this.lspa = lspa; + public CompositeInstantiationObject(final PCEPEndPointsObject endPoints, final PCEPLspaObject lspa, + final PCEPExplicitRouteObject ero, final PCEPRequestedPathBandwidthObject bandwidth, final List metrics) { + this.endPoints = Preconditions.checkNotNull(endPoints); + this.lspa = Preconditions.checkNotNull(lspa); this.ero = ero; this.bandwidth = bandwidth; this.metrics = metrics; @@ -65,7 +61,7 @@ public class CompositeInstantiationObject { /** * Gets list of all objects, which are in appropriate order. - * + * * @return List. Can't be null or empty. */ public List getCompositeAsList() { @@ -83,13 +79,11 @@ public class CompositeInstantiationObject { /** * Creates this object from a list of PCEPObjects. - * - * @param objects - * List list of PCEPObjects from whose this object - * should be created. + * + * @param objects List list of PCEPObjects from whose this object should be created. * @return CompositeInstantiationObject */ - public static CompositeInstantiationObject getCompositeFromList(List objects) { + public static CompositeInstantiationObject getCompositeFromList(final List objects) { if (objects == null || objects.isEmpty()) { throw new IllegalArgumentException("List cannot be null or empty."); } @@ -116,26 +110,26 @@ public class CompositeInstantiationObject { while (!objects.isEmpty()) { final PCEPObject obj = objects.get(0); switch (state) { - case 1: - state = 2; - if (obj instanceof PCEPExplicitRouteObject) { - ero = (PCEPExplicitRouteObject) obj; - break; - } - case 2: + case 1: + state = 2; + if (obj instanceof PCEPExplicitRouteObject) { + ero = (PCEPExplicitRouteObject) obj; + break; + } + case 2: + state = 3; + if (obj instanceof PCEPRequestedPathBandwidthObject) { + bandwidth = (PCEPRequestedPathBandwidthObject) obj; + break; + } + case 3: + state = 4; + if (obj instanceof PCEPMetricObject) { + metrics.add((PCEPMetricObject) obj); state = 3; - if (obj instanceof PCEPRequestedPathBandwidthObject) { - bandwidth = (PCEPRequestedPathBandwidthObject) obj; - break; - } - case 3: - state = 4; - if (obj instanceof PCEPMetricObject) { - metrics.add((PCEPMetricObject) obj); - state = 3; - break; - } + break; + } } if (state == 4) { @@ -190,10 +184,8 @@ public class CompositeInstantiationObject { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result - + ((this.bandwidth == null) ? 0 : this.bandwidth.hashCode()); - result = prime * result - + ((this.endPoints == null) ? 0 : this.endPoints.hashCode()); + result = prime * result + ((this.bandwidth == null) ? 0 : this.bandwidth.hashCode()); + result = prime * result + ((this.endPoints == null) ? 0 : this.endPoints.hashCode()); result = prime * result + ((this.ero == null) ? 0 : this.ero.hashCode()); result = prime * result + ((this.lspa == null) ? 0 : this.lspa.hashCode()); result = prime * result + ((this.metrics == null) ? 0 : this.metrics.hashCode()); @@ -204,7 +196,7 @@ public class CompositeInstantiationObject { * @see java.lang.Object#equals(java.lang.Object) */ @Override - public boolean equals(Object obj) { + public boolean equals(final Object obj) { if (this == obj) return true; if (obj == null) diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPDispatcherImpl.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPDispatcherImpl.java index 2feaed9b83..52cc2df3c6 100644 --- a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPDispatcherImpl.java +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPDispatcherImpl.java @@ -8,51 +8,50 @@ package org.opendaylight.protocol.pcep.impl; import io.netty.channel.ChannelFuture; -import io.netty.util.concurrent.Future; +import io.netty.channel.socket.SocketChannel; +import io.netty.util.concurrent.Promise; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.concurrent.ExecutionException; import org.opendaylight.protocol.framework.AbstractDispatcher; -import org.opendaylight.protocol.framework.ReconnectStrategy; import org.opendaylight.protocol.framework.SessionListenerFactory; import org.opendaylight.protocol.framework.SessionNegotiatorFactory; import org.opendaylight.protocol.pcep.PCEPDispatcher; import org.opendaylight.protocol.pcep.PCEPMessage; -import org.opendaylight.protocol.pcep.PCEPSession; import org.opendaylight.protocol.pcep.PCEPSessionListener; +import com.google.common.base.Preconditions; + /** * Implementation of PCEPDispatcher. */ -public class PCEPDispatcherImpl extends AbstractDispatcher implements PCEPDispatcher { - private static final PCEPMessageFactory msgFactory = new PCEPMessageFactory(); +public class PCEPDispatcherImpl extends AbstractDispatcher implements PCEPDispatcher { + private final SessionNegotiatorFactory snf; + private final PCEPHandlerFactory hf = new PCEPHandlerFactory(); + /** * Creates an instance of PCEPDispatcherImpl, gets the default selector and opens it. * * @throws IOException if some error occurred during opening the selector */ - public PCEPDispatcherImpl(final SessionNegotiatorFactory snf) { + public PCEPDispatcherImpl(final SessionNegotiatorFactory negotiatorFactory) { super(); - this.snf = snf; + this.snf = Preconditions.checkNotNull(negotiatorFactory); } @Override public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory listenerFactory) { - return this.createServer(address, listenerFactory, snf, msgFactory); + return super.createServer(address, listenerFactory); } - /** - * Create client is used for mock purposes only. - * - * @throws ExecutionException - * @throws InterruptedException - */ @Override - public Future createClient(final InetSocketAddress address, final PCEPSessionListener listener, final ReconnectStrategy strategy) { - return this.createClient(address, listener, snf, msgFactory, strategy); + public void initializeChannel(final SocketChannel ch, final Promise promise, + final SessionListenerFactory listenerFactory) { + ch.pipeline().addLast(this.hf.getDecoders()); + ch.pipeline().addLast("negotiator", this.snf.getSessionNegotiator(listenerFactory, ch, promise)); + ch.pipeline().addLast(this.hf.getEncoders()); } } diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPHandlerFactory.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPHandlerFactory.java new file mode 100644 index 0000000000..5eadc1ec72 --- /dev/null +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPHandlerFactory.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.protocol.pcep.impl; + +import io.netty.channel.ChannelHandler; + +import org.opendaylight.protocol.framework.ProtocolHandlerFactory; +import org.opendaylight.protocol.framework.ProtocolMessageDecoder; +import org.opendaylight.protocol.framework.ProtocolMessageEncoder; +import org.opendaylight.protocol.pcep.PCEPMessage; + +/** + * PCEP specific factory for protocol inbound/outbound handlers. + */ +public class PCEPHandlerFactory extends ProtocolHandlerFactory { + private final ProtocolMessageEncoder encoder; + + public PCEPHandlerFactory() { + super(new PCEPMessageFactory()); + this.encoder = new ProtocolMessageEncoder(this.msgFactory); + } + + @Override + public ChannelHandler[] getEncoders() { + return new ChannelHandler[] { this.encoder }; + } + + @Override + public ChannelHandler[] getDecoders() { + return new ChannelHandler[] { new PCEPMessageHeaderDecoder(), new ProtocolMessageDecoder(this.msgFactory) }; + } +} diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPMessageHeaderDecoder.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPMessageHeaderDecoder.java new file mode 100644 index 0000000000..1a151efcd5 --- /dev/null +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPMessageHeaderDecoder.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.protocol.pcep.impl; + +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; + +/** + * @see Common Message Header + */ +public class PCEPMessageHeaderDecoder extends LengthFieldBasedFrameDecoder { + + private static final int MAX_FRAME_SIZE = 65528; // min 4, max 4096 + + private static final int VERSION_FLAGS_SIZE = 1; + + private static final int LENGTH_SIZE = 2; // the length field represents the length of the whole message including + // the header + + private static final int MESSAGE_TYPE_SIZE = 1; + + /* + + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Ver | Flags | Message-Type | Message-Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + */ + + public PCEPMessageHeaderDecoder() { + super(MAX_FRAME_SIZE, VERSION_FLAGS_SIZE + MESSAGE_TYPE_SIZE, LENGTH_SIZE, -LENGTH_SIZE - MESSAGE_TYPE_SIZE - VERSION_FLAGS_SIZE, 0); + } +} diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java index 075a9702af..5a2079244a 100644 --- a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java @@ -48,7 +48,8 @@ import com.google.common.base.Preconditions; /** * Implementation of PCEPSession. (Not final for testing.) */ -class PCEPSessionImpl extends AbstractProtocolSession implements PCEPSession, PCEPSessionRuntimeMXBean { +@VisibleForTesting +public class PCEPSessionImpl extends AbstractProtocolSession implements PCEPSession, PCEPSessionRuntimeMXBean { /** * System.nanoTime value about when was sent the last message Protected to be updated also in tests. */ @@ -94,8 +95,8 @@ class PCEPSessionImpl extends AbstractProtocolSession implements PC private final Channel channel; - PCEPSessionImpl(final Timer timer, final PCEPSessionListener listener, final int maxUnknownMessages, - final Channel channel, final PCEPOpenObject localOpen, final PCEPOpenObject remoteOpen) { + PCEPSessionImpl(final Timer timer, final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel, + final PCEPOpenObject localOpen, final PCEPOpenObject remoteOpen) { this.listener = Preconditions.checkNotNull(listener); this.stateTimer = Preconditions.checkNotNull(timer); this.channel = Preconditions.checkNotNull(channel); @@ -108,7 +109,7 @@ class PCEPSessionImpl extends AbstractProtocolSession implements PC } if (getDeadTimerValue() != 0) { - stateTimer.newTimeout(new TimerTask() { + this.stateTimer.newTimeout(new TimerTask() { @Override public void run(final Timeout timeout) throws Exception { handleDeadTimer(); @@ -117,7 +118,7 @@ class PCEPSessionImpl extends AbstractProtocolSession implements PC } if (getKeepAliveTimerValue() != 0) { - stateTimer.newTimeout(new TimerTask() { + this.stateTimer.newTimeout(new TimerTask() { @Override public void run(final Timeout timeout) throws Exception { handleKeepaliveTimer(); @@ -144,7 +145,7 @@ class PCEPSessionImpl extends AbstractProtocolSession implements PC logger.debug("DeadTimer expired. " + new Date()); this.terminate(Reason.EXP_DEADTIMER); } else { - stateTimer.newTimeout(new TimerTask() { + this.stateTimer.newTimeout(new TimerTask() { @Override public void run(final Timeout timeout) throws Exception { handleDeadTimer(); @@ -165,7 +166,7 @@ class PCEPSessionImpl extends AbstractProtocolSession implements PC long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue()); - if (channel.isActive()) { + if (this.channel.isActive()) { if (ct >= nextKeepalive) { this.sendMessage(new PCEPKeepAliveMessage()); nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue()); @@ -326,17 +327,17 @@ class PCEPSessionImpl extends AbstractProtocolSession implements PC @Override public Integer getDeadTimerValue() { - return remoteOpen.getDeadTimerValue(); + return this.remoteOpen.getDeadTimerValue(); } @Override public Integer getKeepAliveTimerValue() { - return localOpen.getKeepAliveTimerValue(); + return this.localOpen.getKeepAliveTimerValue(); } @Override public String getPeerAddress() { - InetSocketAddress a = (InetSocketAddress) channel.remoteAddress(); + final InetSocketAddress a = (InetSocketAddress) this.channel.remoteAddress(); return a.getHostName(); } @@ -347,7 +348,7 @@ class PCEPSessionImpl extends AbstractProtocolSession implements PC @Override public String getNodeIdentifier() { - for (PCEPTlv tlv : this.remoteOpen.getTlvs()) { + for (final PCEPTlv tlv : this.remoteOpen.getTlvs()) { if (tlv instanceof NodeIdentifierTlv) { return tlv.toString(); } @@ -361,13 +362,13 @@ class PCEPSessionImpl extends AbstractProtocolSession implements PC } protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { - toStringHelper.add("localOpen", localOpen); - toStringHelper.add("remoteOpen", remoteOpen); + toStringHelper.add("localOpen", this.localOpen); + toStringHelper.add("remoteOpen", this.remoteOpen); return toStringHelper; } @Override protected void sessionUp() { - listener.onSessionUp(this); + this.listener.onSessionUp(this); } } diff --git a/pcep/testtool/src/main/java/org/opendaylight/protocol/pcep/testtool/SimpleSessionListener.java b/pcep/testtool/src/main/java/org/opendaylight/protocol/pcep/testtool/SimpleSessionListener.java index 5234fc7a4f..ee09a743e5 100644 --- a/pcep/testtool/src/main/java/org/opendaylight/protocol/pcep/testtool/SimpleSessionListener.java +++ b/pcep/testtool/src/main/java/org/opendaylight/protocol/pcep/testtool/SimpleSessionListener.java @@ -22,6 +22,7 @@ import org.opendaylight.protocol.pcep.message.PCCreateMessage; import org.opendaylight.protocol.pcep.object.CompositeInstantiationObject; import org.opendaylight.protocol.pcep.object.PCEPEndPointsObject; import org.opendaylight.protocol.pcep.object.PCEPExplicitRouteObject; +import org.opendaylight.protocol.pcep.object.PCEPLspaObject; import org.opendaylight.protocol.pcep.subobject.EROIPPrefixSubobject; import org.opendaylight.protocol.pcep.subobject.ExplicitRouteSubobject; import org.slf4j.Logger; @@ -54,7 +55,7 @@ public class SimpleSessionListener implements PCEPSessionListener { subs.add(new EROIPPrefixSubobject>(new IPv4Prefix(new IPv4Address(new byte[] { 10, 1, 1, 2 }), 32), false)); subs.add(new EROIPPrefixSubobject>(new IPv4Prefix(new IPv4Address(new byte[] { 2, 2, 2, 2 }), 32), false)); final CompositeInstantiationObject cpo = new CompositeInstantiationObject(new PCEPEndPointsObject(IPv4.FAMILY.addressForBytes(new byte[] { - 1, 1, 1, 1 }), IPv4.FAMILY.addressForBytes(new byte[] { 2, 2, 2, 2 })), null, new PCEPExplicitRouteObject(subs, false), null, null); + 1, 1, 1, 1 }), IPv4.FAMILY.addressForBytes(new byte[] { 2, 2, 2, 2 })), new PCEPLspaObject(0, 0, 0, (short) 0, (short) 0, false, false, false, false), new PCEPExplicitRouteObject(subs, false), null, null); session.sendMessage(new PCCreateMessage(Lists.newArrayList(cpo))); } diff --git a/pcep/testtool/src/main/resources/GroovyReplyMessageGenerator.groovy b/pcep/testtool/src/main/resources/GroovyReplyMessageGenerator.groovy deleted file mode 100644 index 25be4c62fb..0000000000 --- a/pcep/testtool/src/main/resources/GroovyReplyMessageGenerator.groovy +++ /dev/null @@ -1,39 +0,0 @@ -import java.util.Queue - -import org.opendaylight.protocol.pcep.PCEPMessage -import org.opendaylight.protocol.pcep.object.PCEPRequestParameterObject -import org.opendaylight.protocol.pcep.message.PCEPReplyMessage -import org.opendaylight.protocol.pcep.object.CompositeResponseObject -import org.opendaylight.protocol.pcep.tool.MessageGeneratorService - -class GroovyReplyMessageGenerator implements MessageGeneratorService { - - public GroovyReplyMessageGenerator() { - } - - @Override - public Queue generateMessages() { - def queue = new LinkedList() - queue.push( - new PCEPReplyMessage( - [ - new CompositeResponseObject( - new PCEPRequestParameterObject(true, false, true, false, true, 7 as Short, 6565 as Long, true, false) - ) - ] - ) - ) - - queue.push( - new PCEPReplyMessage( - [ - new CompositeResponseObject( - new PCEPRequestParameterObject(true, false, true, false, true, 5 as Short, 235568 as Long, true, false) - ) - ] - ) - ) - - return queue - } -} \ No newline at end of file diff --git a/pcep/testtool/src/test/java/org/opendaylight/protocol/pcep/testtool/PCCMock.java b/pcep/testtool/src/test/java/org/opendaylight/protocol/pcep/testtool/PCCMock.java index 8b0b76448c..f19f563b5d 100644 --- a/pcep/testtool/src/test/java/org/opendaylight/protocol/pcep/testtool/PCCMock.java +++ b/pcep/testtool/src/test/java/org/opendaylight/protocol/pcep/testtool/PCCMock.java @@ -7,47 +7,69 @@ */ package org.opendaylight.protocol.pcep.testtool; +import io.netty.channel.socket.SocketChannel; import io.netty.util.HashedWheelTimer; +import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.Promise; import java.net.InetSocketAddress; import java.util.List; +import org.opendaylight.protocol.framework.AbstractDispatcher; import org.opendaylight.protocol.framework.NeverReconnectStrategy; +import org.opendaylight.protocol.framework.ProtocolHandlerFactory; +import org.opendaylight.protocol.framework.ProtocolMessage; +import org.opendaylight.protocol.framework.ProtocolSession; +import org.opendaylight.protocol.framework.SessionListener; +import org.opendaylight.protocol.framework.SessionListenerFactory; +import org.opendaylight.protocol.framework.SessionNegotiatorFactory; +import org.opendaylight.protocol.pcep.PCEPMessage; +import org.opendaylight.protocol.pcep.PCEPSessionListener; import org.opendaylight.protocol.pcep.PCEPTlv; import org.opendaylight.protocol.pcep.impl.DefaultPCEPSessionNegotiatorFactory; -import org.opendaylight.protocol.pcep.impl.PCEPDispatcherImpl; +import org.opendaylight.protocol.pcep.impl.PCEPHandlerFactory; +import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl; import org.opendaylight.protocol.pcep.object.PCEPOpenObject; import org.opendaylight.protocol.pcep.tlv.NodeIdentifierTlv; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -public class PCCMock { +public class PCCMock, L extends SessionListener> extends + AbstractDispatcher { + + private final SessionNegotiatorFactory negotiatorFactory; + private final ProtocolHandlerFactory factory; + + public PCCMock(final SessionNegotiatorFactory negotiatorFactory, final ProtocolHandlerFactory factory, + final DefaultPromise defaultPromise) { + this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory); + this.factory = Preconditions.checkNotNull(factory); + } + + @Override + public void initializeChannel(final SocketChannel ch, final Promise promise, final SessionListenerFactory listenerFactory) { + ch.pipeline().addLast(this.factory.getDecoders()); + ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise)); + ch.pipeline().addLast(this.factory.getEncoders()); + } public static void main(final String[] args) throws Exception { final List tlvs = Lists.newArrayList(); tlvs.add(new NodeIdentifierTlv(new byte[] { (byte) 127, (byte) 2, (byte) 3, (byte) 7 })); - final PCEPDispatcherImpl d = new PCEPDispatcherImpl(new DefaultPCEPSessionNegotiatorFactory(new HashedWheelTimer(), new PCEPOpenObject(30, 120, 0, tlvs), 0)); - - try { - d.createClient(new InetSocketAddress("127.0.0.3", 12345), new SimpleSessionListener(), - new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 2000)).get(); - - // Thread.sleep(5000); - // final List cro = new ArrayList(); - // cro.add(new CompositeRequestObject(new PCEPRequestParameterObject(false, true, true, true, true, (short) - // 4, 123, false, false), - // new PCEPEndPointsObject(new IPv4Address(InetAddress.getByName("10.0.0.3")), new - // IPv4Address(InetAddress.getByName("10.0.0.5"))))); - // for (int i = 0; i < 3; i++) { - // Thread.sleep(1000); - // session.sendMessage(new PCEPRequestMessage(cro)); - // } - // Thread.sleep(5000); - // Thread.sleep(1000); - } finally { - // di.stop(); - } + final SessionNegotiatorFactory snf = new DefaultPCEPSessionNegotiatorFactory(new HashedWheelTimer(), new PCEPOpenObject(30, 120, 0, tlvs), 0); + + final PCCMock pcc = new PCCMock<>(snf, new PCEPHandlerFactory(), new DefaultPromise(GlobalEventExecutor.INSTANCE)); + + pcc.createClient(new InetSocketAddress("127.0.0.3", 12345), new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 2000), + new SessionListenerFactory() { + + @Override + public PCEPSessionListener getSessionListener() { + return new SimpleSessionListener(); + } + }).get(); } } -- 2.36.6