BUG-54 : switched channel pipeline to be protocol specific. 56/1156/7
authorDana Kutenicsova <dkutenic@cisco.com>
Wed, 11 Sep 2013 13:31:24 +0000 (15:31 +0200)
committerDana Kutenicsova <dkutenic@cisco.com>
Tue, 17 Sep 2013 07:48:47 +0000 (09:48 +0200)
Added LengthFrameDecoder for both protocols.

Change-Id: Iebdaced167b54b850ec114cff42c2b0385a37f31
Signed-off-by: Dana Kutenicsova <dkutenic@cisco.com>
37 files changed:
bgp/parser-api/src/main/java/org/opendaylight/protocol/bgp/parser/BGPMessageFactory.java [new file with mode: 0644]
bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/BGPMessageFactoryImpl.java [moved from bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/BGPMessageFactory.java with 96% similarity]
bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/message/BGPUpdateMessageParser.java
bgp/parser-impl/src/test/java/org/opendaylight/protocol/bgp/parser/impl/BGPParserTest.java
bgp/parser-impl/src/test/java/org/opendaylight/protocol/bgp/parser/impl/BGPUpdateMessageParserTest.java
bgp/parser-impl/src/test/java/org/opendaylight/protocol/bgp/parser/impl/ComplementaryTest.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPDispatcherImpl.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPHandlerFactory.java [new file with mode: 0644]
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPMessageHeaderDecoder.java [new file with mode: 0644]
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionNegotiator.java
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/BGPImplTest.java
bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/ParserTest.java
bgp/rib-mock/src/main/java/org/opendaylight/protocol/bgp/rib/mock/BGPMock.java
bgp/testtool/src/main/java/org/opendaylight/protocol/bgp/testtool/Main.java
bgp/testtool/src/test/java/org/opendaylight/protocol/bgp/testtool/BGPSpeakerMock.java
bgp/testtool/src/test/java/org/opendaylight/protocol/bgp/testtool/SpeakerSessionListener.java
framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java
framework/src/main/java/org/opendaylight/protocol/framework/ChannelInitializerImpl.java [deleted file]
framework/src/main/java/org/opendaylight/protocol/framework/ProtocolHandlerFactory.java
framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageDecoder.java
framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageEncoder.java
framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java
framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java
framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java
framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java [new file with mode: 0644]
framework/src/test/java/org/opendaylight/protocol/framework/SimpleSessionListener.java
pcep/api/src/main/java/org/opendaylight/protocol/pcep/PCEPDispatcher.java
pcep/api/src/main/java/org/opendaylight/protocol/pcep/PCEPMessage.java
pcep/api/src/main/java/org/opendaylight/protocol/pcep/object/CompositeInstantiationObject.java
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPDispatcherImpl.java
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPHandlerFactory.java [new file with mode: 0644]
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPMessageHeaderDecoder.java [new file with mode: 0644]
pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java
pcep/testtool/src/main/java/org/opendaylight/protocol/pcep/testtool/SimpleSessionListener.java
pcep/testtool/src/main/resources/GroovyReplyMessageGenerator.groovy [deleted file]
pcep/testtool/src/test/java/org/opendaylight/protocol/pcep/testtool/PCCMock.java

diff --git a/bgp/parser-api/src/main/java/org/opendaylight/protocol/bgp/parser/BGPMessageFactory.java b/bgp/parser-api/src/main/java/org/opendaylight/protocol/bgp/parser/BGPMessageFactory.java
new file mode 100644 (file)
index 0000000..fe40691
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.parser;
+
+import org.opendaylight.protocol.framework.ProtocolMessageFactory;
+
+/**
+ * Interface to expose BGP specific MessageFactory.
+ */
+public interface BGPMessageFactory extends ProtocolMessageFactory<BGPMessage> {
+
+}
similarity index 96%
rename from bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/BGPMessageFactory.java
rename to bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/BGPMessageFactoryImpl.java
index 72d05978575a84b25cde8dc1e8c6fdae62b29131..21dc7c8a978e664fe1fdadcad0a714dda932207f 100644 (file)
@@ -13,6 +13,7 @@ import java.util.List;
 import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
 import org.opendaylight.protocol.bgp.parser.BGPError;
 import org.opendaylight.protocol.bgp.parser.BGPMessage;
+import org.opendaylight.protocol.bgp.parser.BGPMessageFactory;
 import org.opendaylight.protocol.bgp.parser.impl.message.BGPNotificationMessageParser;
 import org.opendaylight.protocol.bgp.parser.impl.message.BGPOpenMessageParser;
 import org.opendaylight.protocol.bgp.parser.impl.message.BGPUpdateMessageParser;
@@ -21,7 +22,6 @@ import org.opendaylight.protocol.bgp.parser.message.BGPNotificationMessage;
 import org.opendaylight.protocol.bgp.parser.message.BGPOpenMessage;
 import org.opendaylight.protocol.framework.DeserializerException;
 import org.opendaylight.protocol.framework.DocumentedException;
-import org.opendaylight.protocol.framework.ProtocolMessageFactory;
 import org.opendaylight.protocol.util.ByteArray;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,9 +32,9 @@ import com.google.common.primitives.UnsignedBytes;
 /**
  * The byte array
  */
-public class BGPMessageFactory implements ProtocolMessageFactory<BGPMessage> {
+public final class BGPMessageFactoryImpl implements BGPMessageFactory {
 
-       private final static Logger logger = LoggerFactory.getLogger(BGPMessageFactory.class);
+       private final static Logger logger = LoggerFactory.getLogger(BGPMessageFactoryImpl.class);
 
        final static int LENGTH_FIELD_LENGTH = 2; // bytes
 
@@ -44,9 +44,6 @@ public class BGPMessageFactory implements ProtocolMessageFactory<BGPMessage> {
 
        public final static int COMMON_HEADER_LENGTH = LENGTH_FIELD_LENGTH + TYPE_FIELD_LENGTH + MARKER_LENGTH;
 
-       public BGPMessageFactory() {
-       }
-
        /*
         * (non-Javadoc)
         * @see org.opendaylight.protocol.bgp.parser.BGPMessageParser#parse(byte[])
index 9480e00a26d0b48146138cc80ddb55bcb99e5923..9289f8a934e5c1e39423cbde9b4c61796f6a7bfa 100644 (file)
@@ -21,7 +21,7 @@ import org.opendaylight.protocol.bgp.parser.BGPError;
 import org.opendaylight.protocol.bgp.parser.BGPParsingException;
 import org.opendaylight.protocol.bgp.parser.BGPUpdateEvent;
 import org.opendaylight.protocol.bgp.parser.BGPUpdateSynchronized;
-import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory;
+import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl;
 import org.opendaylight.protocol.bgp.parser.impl.BGPUpdateEventBuilder;
 import org.opendaylight.protocol.bgp.parser.impl.IPv6MP;
 import org.opendaylight.protocol.bgp.parser.impl.PathAttribute;
@@ -90,7 +90,7 @@ public class BGPUpdateMessageParser {
                byteOffset += TOTAL_PATH_ATTR_LENGTH_SIZE;
                eventBuilder.setTotalPathAttrLength(totalPathAttrLength);
 
-               if (withdrawnRoutesLength + totalPathAttrLength + BGPMessageFactory.COMMON_HEADER_LENGTH > msgLength)
+               if (withdrawnRoutesLength + totalPathAttrLength + BGPMessageFactoryImpl.COMMON_HEADER_LENGTH > msgLength)
                        throw new BGPDocumentedException("Message length inconsistent with withdrawn router length.", BGPError.MALFORMED_ATTR_LIST);
 
                if (withdrawnRoutesLength == 0 && totalPathAttrLength == 0) {
index a1737e77663d3707bfa7306cce2e180e66c9d4fe..83b899ce75ac1c6c88575f8ea769dca9945ba54e 100644 (file)
@@ -166,9 +166,9 @@ public class BGPParserTest {
        @Test
        public void testGetUpdateMessage1() throws Exception {
 
-               final byte[] body = ByteArray.cutBytes(inputBytes.get(0), BGPMessageFactory.COMMON_HEADER_LENGTH);
-               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(0), BGPMessageFactory.MARKER_LENGTH,
-                               BGPMessageFactory.LENGTH_FIELD_LENGTH));
+               final byte[] body = ByteArray.cutBytes(inputBytes.get(0), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(0), BGPMessageFactoryImpl.MARKER_LENGTH,
+                               BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
                final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
 
                assertTrue(ret instanceof BGPUpdateMessage);
@@ -292,9 +292,9 @@ public class BGPParserTest {
         */
        @Test
        public void testGetUpdateMessage2() throws Exception {
-               final byte[] body = ByteArray.cutBytes(inputBytes.get(1), BGPMessageFactory.COMMON_HEADER_LENGTH);
-               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(1), BGPMessageFactory.MARKER_LENGTH,
-                               BGPMessageFactory.LENGTH_FIELD_LENGTH));
+               final byte[] body = ByteArray.cutBytes(inputBytes.get(1), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(1), BGPMessageFactoryImpl.MARKER_LENGTH,
+                               BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
                final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
 
                assertTrue(ret instanceof BGPUpdateMessage);
@@ -401,9 +401,9 @@ public class BGPParserTest {
         */
        @Test
        public void testGetUpdateMessage3() throws Exception {
-               final byte[] body = ByteArray.cutBytes(inputBytes.get(2), BGPMessageFactory.COMMON_HEADER_LENGTH);
-               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(2), BGPMessageFactory.MARKER_LENGTH,
-                               BGPMessageFactory.LENGTH_FIELD_LENGTH));
+               final byte[] body = ByteArray.cutBytes(inputBytes.get(2), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(2), BGPMessageFactoryImpl.MARKER_LENGTH,
+                               BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
                final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
                assertTrue(ret instanceof BGPUpdateMessage);
                final BGPUpdateMessage message = (BGPUpdateMessage) ret;
@@ -505,9 +505,9 @@ public class BGPParserTest {
         */
        @Test
        public void testGetUpdateMessage4() throws Exception {
-               final byte[] body = ByteArray.cutBytes(inputBytes.get(3), BGPMessageFactory.COMMON_HEADER_LENGTH);
-               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(3), BGPMessageFactory.MARKER_LENGTH,
-                               BGPMessageFactory.LENGTH_FIELD_LENGTH));
+               final byte[] body = ByteArray.cutBytes(inputBytes.get(3), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(3), BGPMessageFactoryImpl.MARKER_LENGTH,
+                               BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
                final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
 
                assertTrue(ret instanceof BGPUpdateMessage);
@@ -587,9 +587,9 @@ public class BGPParserTest {
         */
        @Test
        public void testGetUpdateMessage5() throws Exception {
-               final byte[] body = ByteArray.cutBytes(inputBytes.get(4), BGPMessageFactory.COMMON_HEADER_LENGTH);
-               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(4), BGPMessageFactory.MARKER_LENGTH,
-                               BGPMessageFactory.LENGTH_FIELD_LENGTH));
+               final byte[] body = ByteArray.cutBytes(inputBytes.get(4), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(4), BGPMessageFactoryImpl.MARKER_LENGTH,
+                               BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
                final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
 
                assertTrue(ret instanceof BGPUpdateMessage);
@@ -617,9 +617,9 @@ public class BGPParserTest {
         */
        @Test
        public void testEORIpv4() throws Exception {
-               final byte[] body = ByteArray.cutBytes(inputBytes.get(5), BGPMessageFactory.COMMON_HEADER_LENGTH);
-               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(5), BGPMessageFactory.MARKER_LENGTH,
-                               BGPMessageFactory.LENGTH_FIELD_LENGTH));
+               final byte[] body = ByteArray.cutBytes(inputBytes.get(5), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(5), BGPMessageFactoryImpl.MARKER_LENGTH,
+                               BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
                final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
 
                assertTrue(ret instanceof BGPUpdateSynchronized);
@@ -653,9 +653,9 @@ public class BGPParserTest {
         */
        @Test
        public void testEORIpv6() throws Exception {
-               final byte[] body = ByteArray.cutBytes(inputBytes.get(6), BGPMessageFactory.COMMON_HEADER_LENGTH);
-               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(6), BGPMessageFactory.MARKER_LENGTH,
-                               BGPMessageFactory.LENGTH_FIELD_LENGTH));
+               final byte[] body = ByteArray.cutBytes(inputBytes.get(6), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(6), BGPMessageFactoryImpl.MARKER_LENGTH,
+                               BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
                final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
 
                assertTrue(ret instanceof BGPUpdateSynchronized);
@@ -691,9 +691,9 @@ public class BGPParserTest {
         */
        @Test
        public void testEORLS() throws Exception {
-               final byte[] body = ByteArray.cutBytes(inputBytes.get(7), BGPMessageFactory.COMMON_HEADER_LENGTH);
-               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(7), BGPMessageFactory.MARKER_LENGTH,
-                               BGPMessageFactory.LENGTH_FIELD_LENGTH));
+               final byte[] body = ByteArray.cutBytes(inputBytes.get(7), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(7), BGPMessageFactoryImpl.MARKER_LENGTH,
+                               BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
                final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
 
                assertTrue(ret instanceof BGPUpdateSynchronized);
@@ -865,9 +865,9 @@ public class BGPParserTest {
         */
        @Test
        public void testBGPLink() throws Exception {
-               final byte[] body = ByteArray.cutBytes(inputBytes.get(8), BGPMessageFactory.COMMON_HEADER_LENGTH);
-               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(8), BGPMessageFactory.MARKER_LENGTH,
-                               BGPMessageFactory.LENGTH_FIELD_LENGTH));
+               final byte[] body = ByteArray.cutBytes(inputBytes.get(8), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(8), BGPMessageFactoryImpl.MARKER_LENGTH,
+                               BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
                final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
 
                assertTrue(ret instanceof BGPUpdateMessage);
@@ -997,9 +997,9 @@ public class BGPParserTest {
         */
        @Test
        public void testBGPNode() throws Exception {
-               final byte[] body = ByteArray.cutBytes(inputBytes.get(9), BGPMessageFactory.COMMON_HEADER_LENGTH);
-               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(9), BGPMessageFactory.MARKER_LENGTH,
-                               BGPMessageFactory.LENGTH_FIELD_LENGTH));
+               final byte[] body = ByteArray.cutBytes(inputBytes.get(9), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(inputBytes.get(9), BGPMessageFactoryImpl.MARKER_LENGTH,
+                               BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
                final BGPUpdateEvent ret = BGPUpdateMessageParser.parse(body, messageLength);
 
                assertTrue(ret instanceof BGPUpdateMessage);
@@ -1077,7 +1077,7 @@ public class BGPParserTest {
         */
        @Test
        public void testOpenMessage() throws Exception {
-               final BGPMessageFactory msgFactory = new BGPMessageFactory();
+               final BGPMessageFactoryImpl msgFactory = new BGPMessageFactoryImpl();
                final BGPOpenMessage open = (BGPOpenMessage) msgFactory.parse(inputBytes.get(13)).get(0);
                final Set<BGPTableType> types = Sets.newHashSet();
                for (final BGPParameter param : open.getOptParams()) {
index 41b8623edf949d26d08f1f4c0db2518435c2ed26..60462c8237ef6a9074ba0d47ba93dcb09430fc67 100644 (file)
@@ -31,9 +31,9 @@ public class BGPUpdateMessageParserTest {
        public void testNodeParsing() throws Exception {
                final List<byte[]> result = HexDumpBGPFileParser.parseMessages(new File(this.getClass().getResource("/bgp-update-nodes.txt").getFile()));
                assertEquals(1, result.size());
-               final byte[] body = ByteArray.cutBytes(result.get(0), BGPMessageFactory.COMMON_HEADER_LENGTH);
-               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(result.get(0), BGPMessageFactory.MARKER_LENGTH,
-                               BGPMessageFactory.LENGTH_FIELD_LENGTH));
+               final byte[] body = ByteArray.cutBytes(result.get(0), BGPMessageFactoryImpl.COMMON_HEADER_LENGTH);
+               final int messageLength = ByteArray.bytesToInt(ByteArray.subByte(result.get(0), BGPMessageFactoryImpl.MARKER_LENGTH,
+                               BGPMessageFactoryImpl.LENGTH_FIELD_LENGTH));
                final BGPUpdateEvent event = BGPUpdateMessageParser.parse(body, messageLength);
                final BGPUpdateMessage updateMessage = (BGPUpdateMessage) event;
                final Set<BGPObject> addedObjects = updateMessage.getAddedObjects();
index 29d68d6679c3737b2217ce499d83238251554761..fb3703d16263974739bb2ab2fba58d1097885357 100644 (file)
@@ -190,7 +190,7 @@ public class ComplementaryTest {
 
        @Test
        public void testBGPHeaderParser() throws IOException {
-               final BGPMessageFactory h = new BGPMessageFactory();
+               final BGPMessageFactoryImpl h = new BGPMessageFactoryImpl();
                try {
                        h.parse(new byte[] { (byte) 0, (byte) 0 });
                        fail("Exception should have occured.");
@@ -219,7 +219,7 @@ public class ComplementaryTest {
 
        @Test
        public void testMessageParser() throws IOException {
-               final BGPMessageFactory parser = new BGPMessageFactory();
+               final BGPMessageFactoryImpl parser = new BGPMessageFactoryImpl();
                String ex = "";
                try {
                        parser.put(null);
index 698743c9a8bb578fe4ed558039b8391268ce113d..35081a209646a64173ebc6e0fc9cee33c70c4d93 100644 (file)
@@ -7,38 +7,57 @@
  */
 package org.opendaylight.protocol.bgp.rib.impl;
 
+import io.netty.channel.socket.SocketChannel;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
 
 import java.net.InetSocketAddress;
 
-import org.opendaylight.protocol.bgp.parser.BGPMessage;
+import org.opendaylight.protocol.bgp.parser.BGPMessageFactory;
 import org.opendaylight.protocol.bgp.parser.BGPSession;
 import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
 import org.opendaylight.protocol.framework.AbstractDispatcher;
-import org.opendaylight.protocol.framework.ProtocolMessageFactory;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
-
-import com.google.common.base.Preconditions;
+import org.opendaylight.protocol.framework.SessionListenerFactory;
 
 /**
  * Implementation of BGPDispatcher.
  */
-public final class BGPDispatcherImpl extends AbstractDispatcher implements BGPDispatcher {
+public final class BGPDispatcherImpl extends AbstractDispatcher<BGPSessionImpl, BGPSessionListener> implements BGPDispatcher {
        private final Timer timer = new HashedWheelTimer();
-       private final ProtocolMessageFactory<BGPMessage> parser;
 
-       public BGPDispatcherImpl(final ProtocolMessageFactory<BGPMessage> parser) {
+       private BGPSessionNegotiatorFactory snf;
+
+       private final BGPHandlerFactory hf;
+
+       public BGPDispatcherImpl(final BGPMessageFactory parser) {
                super();
-               this.parser = Preconditions.checkNotNull(parser);
+               this.hf = new BGPHandlerFactory(parser);
        }
 
        @Override
        public Future<? extends BGPSession> createClient(final InetSocketAddress address, final BGPSessionPreferences preferences,
                        final BGPSessionListener listener, final ReconnectStrategy strategy) {
-               return createClient(address, listener, new BGPSessionNegotiatorFactory(timer, preferences), parser, strategy);
+               this.snf = new BGPSessionNegotiatorFactory(this.timer, preferences);
+               final SessionListenerFactory<BGPSessionListener> slf = new SessionListenerFactory<BGPSessionListener>() {
+
+                       @Override
+                       public BGPSessionListener getSessionListener() {
+                               return listener;
+                       }
+               };
+               return super.createClient(address, strategy, slf);
+       }
+
+       @Override
+       public void initializeChannel(final SocketChannel ch, final Promise<BGPSessionImpl> promise,
+                       final SessionListenerFactory<BGPSessionListener> slf) {
+               ch.pipeline().addLast(this.hf.getDecoders());
+               ch.pipeline().addLast("negotiator", this.snf.getSessionNegotiator(slf, ch, promise));
+               ch.pipeline().addLast(this.hf.getEncoders());
        }
 }
diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPHandlerFactory.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPHandlerFactory.java
new file mode 100644 (file)
index 0000000..0e2b9ce
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.rib.impl;
+
+import io.netty.channel.ChannelHandler;
+
+import org.opendaylight.protocol.bgp.parser.BGPMessage;
+import org.opendaylight.protocol.bgp.parser.BGPMessageFactory;
+import org.opendaylight.protocol.framework.ProtocolHandlerFactory;
+import org.opendaylight.protocol.framework.ProtocolMessageDecoder;
+import org.opendaylight.protocol.framework.ProtocolMessageEncoder;
+
+/**
+ * BGP specific factory for protocol inbound/outbound handlers.
+ */
+public class BGPHandlerFactory extends ProtocolHandlerFactory<BGPMessage> {
+       private final ProtocolMessageEncoder<BGPMessage> encoder;
+
+       public BGPHandlerFactory(final BGPMessageFactory msgFactory) {
+               super(msgFactory);
+               this.encoder = new ProtocolMessageEncoder<BGPMessage>(this.msgFactory);
+       }
+
+       @Override
+       public ChannelHandler[] getEncoders() {
+               return new ChannelHandler[] { this.encoder };
+       }
+
+       @Override
+       public ChannelHandler[] getDecoders() {
+               return new ChannelHandler[] { new BGPMessageHeaderDecoder(), new ProtocolMessageDecoder<BGPMessage>(this.msgFactory) };
+       }
+}
diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPMessageHeaderDecoder.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPMessageHeaderDecoder.java
new file mode 100644 (file)
index 0000000..e4144ba
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.bgp.rib.impl;
+
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+/**
+ * @see <a href="http://tools.ietf.org/html/rfc4271#section-4.1">BGP Message Header</a>
+ */
+public final class BGPMessageHeaderDecoder extends LengthFieldBasedFrameDecoder {
+
+       private static final int MAX_FRAME_SIZE = 4096; // min 19, max 4096
+
+       private static final int MARKER_SIZE = 16;
+
+       private static final int LENGTH_SIZE = 2; // the length field represents the length of the whole message including
+                                                                                               // the header
+
+       /*
+               
+        0                   1                   2                   3
+         0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+         +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+         |                                                               |
+         +                                                               +
+         |                                                               |
+         +                                                               +
+         |                           Marker                              |
+         +                                                               +
+         |                                                               |
+         +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+         |          Length               |      Type     |
+         +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+         
+        */
+
+       public BGPMessageHeaderDecoder() {
+               super(MAX_FRAME_SIZE, MARKER_SIZE, LENGTH_SIZE, -MARKER_SIZE - LENGTH_SIZE, 0);
+       }
+}
index 9892942748abd6b9a06b01c83912e6f21b217b73..aa87e2b40b5ee69c619134f523d01dc9fde2ecfb 100644 (file)
@@ -34,12 +34,14 @@ import org.opendaylight.protocol.framework.AbstractProtocolSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 
-class BGPSessionImpl extends AbstractProtocolSession<BGPMessage> implements BGPSession {
+@VisibleForTesting
+public class BGPSessionImpl extends AbstractProtocolSession<BGPMessage> implements BGPSession {
 
        private static final Logger logger = LoggerFactory.getLogger(BGPSessionImpl.class);
 
@@ -77,7 +79,8 @@ class BGPSessionImpl extends AbstractProtocolSession<BGPMessage> implements BGPS
 
        private final Set<BGPTableType> tableTypes;
 
-       BGPSessionImpl(final Timer timer, final BGPSessionListener listener, final Channel channel, final short keepAlive, final BGPOpenMessage remoteOpen) {
+       BGPSessionImpl(final Timer timer, final BGPSessionListener listener, final Channel channel, final short keepAlive,
+                       final BGPOpenMessage remoteOpen) {
                this.listener = Preconditions.checkNotNull(listener);
                this.stateTimer = Preconditions.checkNotNull(timer);
                this.channel = Preconditions.checkNotNull(channel);
@@ -116,10 +119,10 @@ class BGPSessionImpl extends AbstractProtocolSession<BGPMessage> implements BGPS
        @Override
        public synchronized void close() {
                logger.debug("Closing session: {}", this);
-               if (!closed) {
+               if (!this.closed) {
                        this.sendMessage(new BGPNotificationMessage(BGPError.CEASE));
-                       channel.close();
-                       closed = true;
+                       this.channel.close();
+                       this.closed = true;
                }
        }
 
@@ -156,7 +159,7 @@ class BGPSessionImpl extends AbstractProtocolSession<BGPMessage> implements BGPS
 
        @Override
        public synchronized void endOfInput() {
-               if (!closed) {
+               if (!this.closed) {
                        this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
                }
        }
@@ -173,8 +176,8 @@ class BGPSessionImpl extends AbstractProtocolSession<BGPMessage> implements BGPS
 
        private synchronized void closeWithoutMessage() {
                logger.debug("Closing session: {}", this);
-               channel.close();
-               closed = true;
+               this.channel.close();
+               this.closed = true;
        }
 
        /**
@@ -200,7 +203,7 @@ class BGPSessionImpl extends AbstractProtocolSession<BGPMessage> implements BGPS
 
                final long nextHold = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(HOLD_TIMER_VALUE);
 
-               if (!closed) {
+               if (!this.closed) {
                        if (ct >= nextHold) {
                                logger.debug("HoldTimer expired. " + new Date());
                                this.terminate(BGPError.HOLD_TIMER_EXPIRED);
@@ -226,7 +229,7 @@ class BGPSessionImpl extends AbstractProtocolSession<BGPMessage> implements BGPS
 
                long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive);
 
-               if (!closed) {
+               if (!this.closed) {
                        if (ct >= nextKeepalive) {
                                this.sendMessage(new BGPKeepAliveMessage());
                                nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive);
@@ -246,8 +249,8 @@ class BGPSessionImpl extends AbstractProtocolSession<BGPMessage> implements BGPS
        }
 
        protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
-               toStringHelper.add("channel", channel);
-               toStringHelper.add("closed", closed);
+               toStringHelper.add("channel", this.channel);
+               toStringHelper.add("closed", this.closed);
                return toStringHelper;
        }
 
@@ -258,6 +261,6 @@ class BGPSessionImpl extends AbstractProtocolSession<BGPMessage> implements BGPS
 
        @Override
        protected void sessionUp() {
-               listener.onSessionUp(this);
+               this.listener.onSessionUp(this);
        }
 }
index 970729429af9fce8ac06aac8faa39c16b03f02d4..718a0d4b8fbaf722e4cb94f6180ab19632b94004 100644 (file)
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
-final class BGPSessionNegotiator extends AbstractSessionNegotiator<BGPMessage, BGPSessionImpl> {
+public final class BGPSessionNegotiator extends AbstractSessionNegotiator<BGPMessage, BGPSessionImpl> {
        // 4 minutes recommended in http://tools.ietf.org/html/rfc4271#section-8.2.2
        private static final int INITIAL_HOLDTIMER = 4;
 
@@ -39,13 +39,12 @@ final class BGPSessionNegotiator extends AbstractSessionNegotiator<BGPMessage, B
                 */
                Idle,
                /**
-                * We have sent our Open message, and are waiting for the peer's Open
-                * message.
+                * We have sent our Open message, and are waiting for the peer's Open message.
                 */
                OpenSent,
                /**
-                * We have received the peer's Open message, which is acceptable, and
-                * we're waiting the acknowledgement of our Open message.
+                * We have received the peer's Open message, which is acceptable, and we're waiting the acknowledgement of our
+                * Open message.
                 */
                OpenConfirm,
                /**
@@ -62,7 +61,7 @@ final class BGPSessionNegotiator extends AbstractSessionNegotiator<BGPMessage, B
        private State state = State.Idle;
        private final short keepAlive = 15;
 
-       BGPSessionNegotiator(final Timer timer, final Promise<BGPSessionImpl> promise, final Channel channel,
+       public BGPSessionNegotiator(final Timer timer, final Promise<BGPSessionImpl> promise, final Channel channel,
                        final BGPSessionPreferences initialPrefs, final BGPSessionListener listener) {
                super(promise, channel);
                this.listener = Preconditions.checkNotNull(listener);
@@ -72,18 +71,18 @@ final class BGPSessionNegotiator extends AbstractSessionNegotiator<BGPMessage, B
 
        @Override
        protected void startNegotiation() {
-               Preconditions.checkState(state == State.Idle);
-               channel.writeAndFlush(new BGPOpenMessage(localPref.getMyAs(), (short) localPref.getHoldTime(), localPref.getBgpId(), localPref.getParams()));
-               state = State.OpenSent;
+               Preconditions.checkState(this.state == State.Idle);
+               this.channel.writeAndFlush(new BGPOpenMessage(this.localPref.getMyAs(), (short) this.localPref.getHoldTime(), this.localPref.getBgpId(), this.localPref.getParams()));
+               this.state = State.OpenSent;
 
                final Object lock = this;
-               timer.newTimeout(new TimerTask() {
+               this.timer.newTimeout(new TimerTask() {
                        @Override
                        public void run(final Timeout timeout) throws Exception {
                                synchronized (lock) {
-                                       if (state != State.Finished) {
+                                       if (BGPSessionNegotiator.this.state != State.Finished) {
                                                negotiationFailed(new BGPDocumentedException("HoldTimer expired", BGPError.FSM_ERROR));
-                                               state = State.Finished;
+                                               BGPSessionNegotiator.this.state = State.Finished;
                                        }
                                }
                        }
@@ -92,25 +91,25 @@ final class BGPSessionNegotiator extends AbstractSessionNegotiator<BGPMessage, B
 
        @Override
        protected synchronized void handleMessage(final BGPMessage msg) {
-               logger.debug("Channel {} handling message in state {}", channel, state);
+               logger.debug("Channel {} handling message in state {}", this.channel, this.state);
 
-               switch (state) {
+               switch (this.state) {
                case Finished:
                case Idle:
-                       throw new IllegalStateException("Unexpected state " + state);
+                       throw new IllegalStateException("Unexpected state " + this.state);
                case OpenConfirm:
                        if (msg instanceof BGPKeepAliveMessage) {
                                final BGPKeepAliveMessage ka = (BGPKeepAliveMessage) msg;
 
                                // FIXME: we miss some stuff over here
 
-                               negotiationSuccessful(new BGPSessionImpl(timer, listener, channel, keepAlive, remotePref));
-                               state = State.Finished;
+                               negotiationSuccessful(new BGPSessionImpl(this.timer, this.listener, this.channel, this.keepAlive, this.remotePref));
+                               this.state = State.Finished;
                                return;
                        } else if (msg instanceof BGPNotificationMessage) {
                                final BGPNotificationMessage ntf = (BGPNotificationMessage) msg;
                                negotiationFailed(new BGPDocumentedException("Peer refusal", ntf.getError()));
-                               state = State.Finished;
+                               this.state = State.Finished;
                                return;
                        }
 
@@ -121,19 +120,19 @@ final class BGPSessionNegotiator extends AbstractSessionNegotiator<BGPMessage, B
 
                                // TODO: validate the open message
 
-                               remotePref = open;
-                               channel.writeAndFlush(new BGPKeepAliveMessage());
-                               state = State.OpenConfirm;
-                               logger.debug("Channel {} moved to OpenConfirm state with remote proposal {}", channel, remotePref);
+                               this.remotePref = open;
+                               this.channel.writeAndFlush(new BGPKeepAliveMessage());
+                               this.state = State.OpenConfirm;
+                               logger.debug("Channel {} moved to OpenConfirm state with remote proposal {}", this.channel, this.remotePref);
                                return;
                        }
                        break;
                }
 
                // Catch-all for unexpected message
-               logger.warn("Channel {} state {} unexpected message {}", channel, state, msg);
-               channel.writeAndFlush(new BGPNotificationMessage(BGPError.FSM_ERROR));
+               logger.warn("Channel {} state {} unexpected message {}", this.channel, this.state, msg);
+               this.channel.writeAndFlush(new BGPNotificationMessage(BGPError.FSM_ERROR));
                negotiationFailed(new BGPDocumentedException("Unexpected message", BGPError.FSM_ERROR));
-               state = State.Finished;
+               this.state = State.Finished;
        }
 }
index e41c45f9db762cdfe1026f7e94946cc472f4329e..2d57ee04eeb8901fbc1430d70e432c878e280a12 100644 (file)
@@ -22,10 +22,10 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
+import org.opendaylight.protocol.bgp.parser.BGPMessageFactory;
 import org.opendaylight.protocol.bgp.parser.BGPParameter;
 import org.opendaylight.protocol.bgp.parser.BGPSession;
 import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
-import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory;
 import org.opendaylight.protocol.bgp.rib.impl.BGPImpl.BGPListenerRegistration;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
@@ -55,7 +55,7 @@ public class BGPImplTest {
                doReturn("").when(this.parser).toString();
 
                doReturn(null).when(this.future).get();
-               doReturn(future).when(this.disp).createClient(any(InetSocketAddress.class), any(BGPSessionPreferences.class),
+               doReturn(this.future).when(this.disp).createClient(any(InetSocketAddress.class), any(BGPSessionPreferences.class),
                                any(BGPSessionListener.class), any(ReconnectStrategy.class));
        }
 
@@ -63,7 +63,8 @@ public class BGPImplTest {
        public void testBgpImpl() throws Exception {
                doReturn(new BGPSessionPreferences(null, 0, null, Collections.<BGPParameter> emptyList())).when(this.prop).getProposal();
                this.bgp = new BGPImpl(this.disp, new InetSocketAddress(InetAddress.getLoopbackAddress(), 2000), this.prop);
-               final BGPListenerRegistration reg = this.bgp.registerUpdateListener(new SimpleSessionListener(), new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
+               final BGPListenerRegistration reg = this.bgp.registerUpdateListener(new SimpleSessionListener(),
+                               new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000));
                assertEquals(SimpleSessionListener.class, reg.getListener().getClass());
        }
 
index be851629c07bc1f869774ae5c434d71f38d47475..4d083faa537f3c40f33348ce5fead8bed225f642 100644 (file)
@@ -28,7 +28,7 @@ import org.opendaylight.protocol.bgp.parser.BGPDocumentedException;
 import org.opendaylight.protocol.bgp.parser.BGPError;
 import org.opendaylight.protocol.bgp.parser.BGPMessage;
 import org.opendaylight.protocol.bgp.parser.BGPParameter;
-import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory;
+import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl;
 import org.opendaylight.protocol.bgp.parser.message.BGPKeepAliveMessage;
 import org.opendaylight.protocol.bgp.parser.message.BGPNotificationMessage;
 import org.opendaylight.protocol.bgp.parser.message.BGPOpenMessage;
@@ -59,7 +59,7 @@ public class ParserTest {
                (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,
                (byte) 0xff, (byte) 0xff, (byte) 0x00, (byte) 0x17, (byte) 0x03, (byte) 0x02, (byte) 0x04, (byte) 0x04, (byte) 0x09 };
 
-       final ProtocolMessageFactory<BGPMessage> factory = new BGPMessageFactory();
+       final ProtocolMessageFactory<BGPMessage> factory = new BGPMessageFactoryImpl();
 
        @Test
        public void testHeaderErrors() throws DeserializerException, DocumentedException {
@@ -71,7 +71,7 @@ public class ParserTest {
                        fail("Exception should have occcured.");
                } catch (final IllegalArgumentException e) {
                        assertEquals("Too few bytes in passed array. Passed: " + wrong.length + ". Expected: >= "
-                                       + BGPMessageFactory.COMMON_HEADER_LENGTH + ".", e.getMessage());
+                                       + BGPMessageFactoryImpl.COMMON_HEADER_LENGTH + ".", e.getMessage());
                        return;
                }
                fail();
index 135df29de1c6d0a1ceb7d5ac552028272664fb46..d1122d524508848aa0c44b90b01124d5bc6e7643 100644 (file)
@@ -18,7 +18,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.protocol.bgp.parser.BGPError;
 import org.opendaylight.protocol.bgp.parser.BGPMessage;
 import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
-import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory;
+import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl;
 import org.opendaylight.protocol.bgp.parser.message.BGPNotificationMessage;
 import org.opendaylight.protocol.bgp.rib.impl.BGP;
 import org.opendaylight.protocol.concepts.ListenerRegistration;
@@ -55,7 +55,7 @@ public final class BGPMock implements BGP, Closeable {
 
        private List<BGPMessage> parsePrevious(final List<byte[]> msgs) {
                final List<BGPMessage> messages = Lists.newArrayList();
-               final ProtocolMessageFactory<BGPMessage> parser = new BGPMessageFactory();
+               final ProtocolMessageFactory<BGPMessage> parser = new BGPMessageFactoryImpl();
                try {
                        for (final byte[] b : msgs) {
 
index 1d90e240c0f78d59b998501da8745e91677b22bd..44ebe77f1c201c747787aeeea33ff76e1df2caaf 100644 (file)
@@ -14,7 +14,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 
 import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
-import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory;
+import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl;
 import org.opendaylight.protocol.bgp.rib.impl.BGPDispatcherImpl;
 import org.opendaylight.protocol.bgp.rib.impl.BGPSessionProposalImpl;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
@@ -49,7 +49,7 @@ public class Main {
        BGPDispatcherImpl dispatcher;
 
        public Main() throws IOException {
-               this.dispatcher = new BGPDispatcherImpl(new BGPMessageFactory());
+               this.dispatcher = new BGPDispatcherImpl(new BGPMessageFactoryImpl());
        }
 
        public static void main(final String[] args) throws NumberFormatException, IOException {
index d0c021def3cf069037b2a83d8653369f4e38cbd8..105f130593bd8d72fae75439d9357add4cbbcedf 100644 (file)
@@ -7,37 +7,69 @@
  */
 package org.opendaylight.protocol.bgp.testtool;
 
+import io.netty.channel.socket.SocketChannel;
 import io.netty.util.HashedWheelTimer;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import org.opendaylight.protocol.bgp.parser.BGPMessage;
 import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
-import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactory;
+import org.opendaylight.protocol.bgp.parser.impl.BGPMessageFactoryImpl;
+import org.opendaylight.protocol.bgp.rib.impl.BGPHandlerFactory;
+import org.opendaylight.protocol.bgp.rib.impl.BGPSessionImpl;
 import org.opendaylight.protocol.bgp.rib.impl.BGPSessionNegotiatorFactory;
 import org.opendaylight.protocol.bgp.rib.impl.BGPSessionProposalImpl;
 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionPreferences;
 import org.opendaylight.protocol.concepts.ASNumber;
 import org.opendaylight.protocol.concepts.IPv4;
 import org.opendaylight.protocol.framework.AbstractDispatcher;
+import org.opendaylight.protocol.framework.ProtocolHandlerFactory;
+import org.opendaylight.protocol.framework.ProtocolMessage;
+import org.opendaylight.protocol.framework.ProtocolSession;
+import org.opendaylight.protocol.framework.SessionListener;
 import org.opendaylight.protocol.framework.SessionListenerFactory;
+import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
 
-public class BGPSpeakerMock extends AbstractDispatcher {
+import com.google.common.base.Preconditions;
 
-       public static void main(final String[] args) throws IOException {
+public class BGPSpeakerMock<M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends
+               AbstractDispatcher<S, L> {
 
-               final BGPSpeakerMock m = new BGPSpeakerMock();
+       private final SessionNegotiatorFactory<M, S, L> negotiatorFactory;
+       private final ProtocolHandlerFactory<?> factory;
 
-               final BGPSessionPreferences prefs = new BGPSessionProposalImpl((short) 90, new ASNumber(25), IPv4.FAMILY.addressForString("127.0.0.2")).getProposal();
+       public BGPSpeakerMock(final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final ProtocolHandlerFactory<?> factory,
+                       final DefaultPromise<BGPSessionImpl> defaultPromise) {
+               this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
+               this.factory = Preconditions.checkNotNull(factory);
+       }
+
+       @Override
+       public void initializeChannel(final SocketChannel ch, final Promise<S> promise, final SessionListenerFactory<L> listenerFactory) {
+               ch.pipeline().addLast(this.factory.getDecoders());
+               ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
+               ch.pipeline().addLast(this.factory.getEncoders());
+       }
+
+       public static void main(final String[] args) throws IOException {
 
                final SessionListenerFactory<BGPSessionListener> f = new SessionListenerFactory<BGPSessionListener>() {
                        @Override
                        public BGPSessionListener getSessionListener() {
-                               return new SpeakerSessionListener(m);
+                               return new SpeakerSessionListener();
                        }
                };
 
-               m.createServer(new InetSocketAddress("127.0.0.2", 12345), f,
-                               new BGPSessionNegotiatorFactory(new HashedWheelTimer(), prefs), new BGPMessageFactory());
+               final BGPSessionPreferences prefs = new BGPSessionProposalImpl((short) 90, new ASNumber(25), IPv4.FAMILY.addressForString("127.0.0.2")).getProposal();
+
+               final SessionNegotiatorFactory<BGPMessage, BGPSessionImpl, BGPSessionListener> snf = new BGPSessionNegotiatorFactory(new HashedWheelTimer(), prefs);
+
+               final BGPSpeakerMock<BGPMessage, BGPSessionImpl, BGPSessionListener> mock = new BGPSpeakerMock<BGPMessage, BGPSessionImpl, BGPSessionListener>(snf, new BGPHandlerFactory(new BGPMessageFactoryImpl()), new DefaultPromise<BGPSessionImpl>(GlobalEventExecutor.INSTANCE));
+
+               mock.createServer(new InetSocketAddress("127.0.0.2", 12345), f);
        }
 }
index 07b401efe0e4d9aee8ae976fa9a2af0aa38aff48..22852c049d67191a04236c15f80589ace7dc8563 100644 (file)
@@ -11,19 +11,12 @@ import org.opendaylight.protocol.bgp.parser.BGPMessage;
 import org.opendaylight.protocol.bgp.parser.BGPSession;
 import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
 import org.opendaylight.protocol.bgp.parser.BGPTerminationReason;
-import org.opendaylight.protocol.framework.AbstractDispatcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SpeakerSessionListener implements BGPSessionListener {
        private static final Logger logger = LoggerFactory.getLogger(SpeakerSessionListener.class);
 
-       AbstractDispatcher d;
-
-       SpeakerSessionListener(final AbstractDispatcher d) {
-               this.d = d;
-       }
-
        @Override
        public void onSessionUp(final BGPSession session) {
                logger.info("Server: Session is up.");
index 2179671fde1d6b320311007abffcd50be367ff59..98021be1d77077b0bc60b5104479e993ea7cbcce 100644 (file)
@@ -7,15 +7,20 @@
  */
 package org.opendaylight.protocol.framework;
 
+import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
 
 import java.io.Closeable;
 import java.net.InetSocketAddress;
@@ -23,13 +28,13 @@ import java.net.InetSocketAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
  * start method that will handle sockets in different thread.
  */
-public abstract class AbstractDispatcher implements Closeable {
+public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> implements Closeable {
 
        private static final Logger logger = LoggerFactory.getLogger(AbstractDispatcher.class);
 
@@ -43,25 +48,35 @@ public abstract class AbstractDispatcher implements Closeable {
                this.workerGroup = new NioEventLoopGroup();
        }
 
+       /**
+        * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this
+        * method needs to be implemented in protocol specific Dispatchers.
+        * 
+        * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory}
+        * @param promise to be passed to {@link SessionNegotiatorFactory}
+        */
+       public abstract void initializeChannel(SocketChannel channel, Promise<S> promise, final SessionListenerFactory<L> lfactory);
+
        /**
         * Creates server. Each server needs factories to pass their instances to client sessions.
         * 
         * @param address address to which the server should be bound
-        * @param listenerFactory factory for creating protocol listeners, passed to the negotiator
-        * @param negotiatorFactory protocol session negotiator factory
-        * @param messageFactory message parser
         * 
         * @return ChannelFuture representing the binding process
         */
-       protected <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> ChannelFuture createServer(
-                       final InetSocketAddress address, final SessionListenerFactory<L> listenerFactory,
-                       final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final ProtocolMessageFactory<M> messageFactory) {
+       @VisibleForTesting
+       public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<L> lfactory) {
                final ServerBootstrap b = new ServerBootstrap();
                b.group(this.bossGroup, this.workerGroup);
                b.channel(NioServerSocketChannel.class);
                b.option(ChannelOption.SO_BACKLOG, 128);
-               b.childHandler(new ChannelInitializerImpl<M, S, L>(negotiatorFactory,
-                               listenerFactory, new ProtocolHandlerFactory<M>(messageFactory), new DefaultPromise<S>(GlobalEventExecutor.INSTANCE)));
+               b.childHandler(new ChannelInitializer<SocketChannel>() {
+
+                       @Override
+                       protected void initChannel(final SocketChannel ch) throws Exception {
+                               initializeChannel(ch, new DefaultPromise<S>(GlobalEventExecutor.INSTANCE), lfactory);
+                       }
+               });
                b.childOption(ChannelOption.SO_KEEPALIVE, true);
 
                // Bind and start to accept incoming connections.
@@ -75,30 +90,24 @@ public abstract class AbstractDispatcher implements Closeable {
         * Creates a client.
         * 
         * @param address remote address
-        * @param listener session listener
-        * @param negotiatorFactory session negotiator factory
-        * @param messageFactory message parser
         * @param connectStrategy Reconnection strategy to be used when initial connection fails
         * 
-        * @return Future representing the connection process. Its result represents
-        *         the combined success of TCP connection as well as session negotiation.
+        * @return Future representing the connection process. Its result represents the combined success of TCP connection
+        *         as well as session negotiation.
         */
-       protected <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> Future<S> createClient(
-                       final InetSocketAddress address, final L listener, final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
-                       final ProtocolMessageFactory<M> messageFactory, final ReconnectStrategy strategy) {
-               final ProtocolSessionPromise<M, S, L> p = new ProtocolSessionPromise<M, S, L>(workerGroup, address, negotiatorFactory,
-                               new SessionListenerFactory<L>() {
-                       private boolean created = false;
-
-                       @Override
-                       public synchronized L getSessionListener() {
-                               Preconditions.checkState(created == false);
-                               created = true;
-                               return listener;
-                       }
-
-               }, new ProtocolHandlerFactory<M>(messageFactory), strategy);
-
+       @VisibleForTesting
+       public Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy,
+                       final SessionListenerFactory<L> lfactory) {
+               final Bootstrap b = new Bootstrap();
+               final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<S>(address, strategy, b);
+               b.group(this.workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(
+                               new ChannelInitializer<SocketChannel>() {
+
+                                       @Override
+                                       protected void initChannel(final SocketChannel ch) throws Exception {
+                                               initializeChannel(ch, p, lfactory);
+                                       }
+                               });
                p.connect();
                logger.debug("Client created.");
                return p;
@@ -108,25 +117,16 @@ public abstract class AbstractDispatcher implements Closeable {
         * Creates a client.
         * 
         * @param address remote address
-        * @param listener session listener
-        * @param negotiatorFactory session negotiator factory
-        * @param messageFactory message parser
         * @param connectStrategyFactory Factory for creating reconnection strategy to be used when initial connection fails
         * @param reestablishStrategy Reconnection strategy to be used when the already-established session fails
         * 
-        * @return Future representing the reconnection task. It will report
-        *         completion based on reestablishStrategy, e.g. success if
-        *         it indicates no further attempts should be made and failure
-        *         if it reports an error
+        * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
+        *         success if it indicates no further attempts should be made and failure if it reports an error
         */
-       protected <M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> Future<Void> createReconnectingClient(
-                       final InetSocketAddress address, final L listener, final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
-                       final ProtocolMessageFactory<M> messageFactory, final ReconnectStrategyFactory connectStrategyFactory,
-                       final ReconnectStrategy reestablishStrategy) {
-
-               final ReconnectPromise<M, S, L> p = new ReconnectPromise<M, S, L>(this, address, listener, negotiatorFactory,
-                               messageFactory, connectStrategyFactory, reestablishStrategy);
+       protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
+                       final ReconnectStrategy reestablishStrategy, final SessionListenerFactory<L> lfactory) {
 
+               final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(this, address, connectStrategyFactory, reestablishStrategy, lfactory);
                p.connect();
 
                return p;
diff --git a/framework/src/main/java/org/opendaylight/protocol/framework/ChannelInitializerImpl.java b/framework/src/main/java/org/opendaylight/protocol/framework/ChannelInitializerImpl.java
deleted file mode 100644 (file)
index 20ea3a5..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.protocol.framework;
-
-
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.util.concurrent.Promise;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-final class ChannelInitializerImpl<M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends ChannelInitializer<SocketChannel> {
-       private static final Logger logger = LoggerFactory.getLogger(ChannelInitializerImpl.class);
-       private final SessionNegotiatorFactory<M, S, L> negotiatorFactory;
-       private final SessionListenerFactory<L> listenerFactory;
-       private final ProtocolHandlerFactory<?> factory;
-       private final Promise<S> promise;
-
-       ChannelInitializerImpl(final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final SessionListenerFactory<L> listenerFactory,
-                       final ProtocolHandlerFactory<?> factory, final Promise<S> promise) {
-               this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
-               this.listenerFactory = Preconditions.checkNotNull(listenerFactory);
-               this.promise = Preconditions.checkNotNull(promise);
-               this.factory = Preconditions.checkNotNull(factory);
-       }
-
-       @Override
-       protected void initChannel(final SocketChannel ch) {
-               logger.debug("Initializing channel {}", ch);
-               ch.pipeline().addLast("decoder", factory.getDecoder());
-               ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
-               ch.pipeline().addLast("encoder", factory.getEncoder());
-               logger.debug("Channel {} initialized", ch);
-       }
-}
\ No newline at end of file
index 2f8364dd9774d55d0cb0ca2c083182d4d799e2b1..381436dddc3ed787ba2bdae3886d3f8e28ea64da 100644 (file)
@@ -13,18 +13,18 @@ import com.google.common.base.Preconditions;
 
 public class ProtocolHandlerFactory<T extends ProtocolMessage> {
        private final ProtocolMessageEncoder<T> encoder;
-       final ProtocolMessageFactory<T> msgFactory;
+       protected final ProtocolMessageFactory<T> msgFactory;
 
        public ProtocolHandlerFactory(final ProtocolMessageFactory<T> msgFactory) {
                this.msgFactory = Preconditions.checkNotNull(msgFactory);
                this.encoder = new ProtocolMessageEncoder<T>(msgFactory);
        }
 
-       public ChannelHandler getEncoder() {
-               return this.encoder;
+       public ChannelHandler[] getEncoders() {
+               return new ChannelHandler[] { this.encoder };
        }
 
-       public ChannelHandler getDecoder() {
-               return new ProtocolMessageDecoder<T>(msgFactory);
+       public ChannelHandler[] getDecoders() {
+               return new ChannelHandler[] { new ProtocolMessageDecoder<T>(this.msgFactory) };
        }
 }
index 4ad53dcdd5cc39ce8b2ef47e05ddb25c4cd74ca6..a0ec7953170c57374ff3b2ad3a6a2475f8588aab 100644 (file)
@@ -17,7 +17,7 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class ProtocolMessageDecoder<T extends ProtocolMessage> extends ByteToMessageDecoder {
+public final class ProtocolMessageDecoder<T extends ProtocolMessage> extends ByteToMessageDecoder {
 
        private final static Logger logger = LoggerFactory.getLogger(ProtocolMessageDecoder.class);
 
index c54c780185c1bcf62e058cb59ccd4bf3b44dd031..ce19c0d930cec3d735e4efcbb5630d6e0925869c 100644 (file)
@@ -16,7 +16,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Sharable
-final class ProtocolMessageEncoder<T extends ProtocolMessage> extends MessageToByteEncoder<Object> {
+public final class ProtocolMessageEncoder<T extends ProtocolMessage> extends MessageToByteEncoder<Object> {
 
        private final static Logger logger = LoggerFactory.getLogger(ProtocolMessageEncoder.class);
 
@@ -29,6 +29,7 @@ final class ProtocolMessageEncoder<T extends ProtocolMessage> extends MessageToB
        @Override
        protected void encode(final ChannelHandlerContext ctx, final Object msg, final ByteBuf out) throws Exception {
                logger.debug("Sent to encode : {}", msg);
-               out.writeBytes(this.factory.put((T)msg));
+               final byte[] bytes = this.factory.put((T) msg);
+               out.writeBytes(bytes);
        }
 }
index b32859b018bfe394b1bdfb4fd22942d771e5fb97..abc253ef78d014851f051485ae9278bc1619fd18 100644 (file)
@@ -11,8 +11,6 @@ import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
@@ -29,9 +27,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 
 @ThreadSafe
-final class ProtocolSessionPromise<M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends DefaultPromise<S> {
+final class ProtocolSessionPromise<S extends ProtocolSession<?>> extends DefaultPromise<S> {
        private static final Logger logger = LoggerFactory.getLogger(ProtocolSessionPromise.class);
-       private final ChannelInitializerImpl<M, S, L> init;
        private final ReconnectStrategy strategy;
        private final InetSocketAddress address;
        private final Bootstrap b;
@@ -39,27 +36,22 @@ final class ProtocolSessionPromise<M extends ProtocolMessage, S extends Protocol
        @GuardedBy("this")
        private Future<?> pending;
 
-       ProtocolSessionPromise(final EventLoopGroup workerGroup, final InetSocketAddress address, final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
-                       final SessionListenerFactory<L> listenerFactory,
-                       final ProtocolHandlerFactory<?> protocolFactory, final ReconnectStrategy strategy) {
+       ProtocolSessionPromise(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap b) {
                this.strategy = Preconditions.checkNotNull(strategy);
                this.address = Preconditions.checkNotNull(address);
-
-               init = new ChannelInitializerImpl<M, S, L>(negotiatorFactory, listenerFactory, protocolFactory, this);
-               b = new Bootstrap();
-               b.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(init);
+               this.b = Preconditions.checkNotNull(b);
        }
 
        synchronized void connect() {
                final Object lock = this;
 
                try {
-                       final int timeout = strategy.getConnectTimeout();
+                       final int timeout = this.strategy.getConnectTimeout();
 
                        logger.debug("Promise {} attempting connect for {}ms", lock, timeout);
 
-                       b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
-                       pending = b.connect(address).addListener(new ChannelFutureListener() {
+                       this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
+                       this.pending = this.b.connect(this.address).addListener(new ChannelFutureListener() {
                                @Override
                                public void operationComplete(final ChannelFuture cf) throws Exception {
                                        synchronized (lock) {
@@ -67,7 +59,7 @@ final class ProtocolSessionPromise<M extends ProtocolMessage, S extends Protocol
                                                logger.debug("Promise {} connection resolved", lock);
 
                                                // Triggered when a connection attempt is resolved.
-                                               Preconditions.checkState(pending == cf);
+                                               Preconditions.checkState(ProtocolSessionPromise.this.pending == cf);
 
                                                /*
                                                 * The promise we gave out could have been cancelled,
@@ -87,13 +79,13 @@ final class ProtocolSessionPromise<M extends ProtocolMessage, S extends Protocol
                                                }
 
                                                if (!cf.isSuccess()) {
-                                                       final Future<Void> rf = strategy.scheduleReconnect(cf.cause());
+                                                       final Future<Void> rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
                                                        rf.addListener(new FutureListener<Void>() {
                                                                @Override
                                                                public void operationComplete(final Future<Void> sf) {
                                                                        synchronized (lock) {
                                                                                // Triggered when a connection attempt is to be made.
-                                                                               Preconditions.checkState(pending == sf);
+                                                                               Preconditions.checkState(ProtocolSessionPromise.this.pending == sf);
 
                                                                                /*
                                                                                 * The promise we gave out could have been cancelled,
@@ -114,14 +106,14 @@ final class ProtocolSessionPromise<M extends ProtocolMessage, S extends Protocol
                                                                }
                                                        });
 
-                                                       pending = rf;
+                                                       ProtocolSessionPromise.this.pending = rf;
                                                } else {
                                                        logger.debug("Promise {} connection successful", lock);
                                                }
                                        }
                                }
                        });
-               } catch (Exception e) {
+               } catch (final Exception e) {
                        setFailure(e);
                }
        }
@@ -129,7 +121,7 @@ final class ProtocolSessionPromise<M extends ProtocolMessage, S extends Protocol
        @Override
        public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
                if (super.cancel(mayInterruptIfRunning)) {
-                       pending.cancel(mayInterruptIfRunning);
+                       this.pending.cancel(mayInterruptIfRunning);
                        return true;
                }
 
@@ -139,7 +131,7 @@ final class ProtocolSessionPromise<M extends ProtocolMessage, S extends Protocol
        @Override
        public synchronized Promise<S> setSuccess(final S result) {
                logger.debug("Promise {} completed", this);
-               strategy.reconnectSuccessful();
+               this.strategy.reconnectSuccessful();
                return super.setSuccess(result);
        }
-}
\ No newline at end of file
+}
index a37572cb2078464c247ef8fe2d5ed0519e972273..508737866a34665d8f7527b579fd52d91ed2ca84 100644 (file)
@@ -15,34 +15,27 @@ import java.net.InetSocketAddress;
 
 import com.google.common.base.Preconditions;
 
-final class ReconnectPromise<M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends DefaultPromise<Void> {
-       private final AbstractDispatcher dispatcher;
+final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends DefaultPromise<Void> {
+       private final AbstractDispatcher<S, L> dispatcher;
        private final InetSocketAddress address;
-       private final L listener;
-       private final SessionNegotiatorFactory<M, S, L> negotiatorFactory;
-       private final ProtocolMessageFactory<M> messageFactory;
        private final ReconnectStrategyFactory strategyFactory;
        private final ReconnectStrategy strategy;
        private Future<?> pending;
+       private final SessionListenerFactory<L> lfactory;
 
-       public ReconnectPromise(final AbstractDispatcher dispatcher,
-                       final InetSocketAddress address, final L listener,
-                       final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
-                       final ProtocolMessageFactory<M> messageFactory,
-                       final ReconnectStrategyFactory connectStrategyFactory,
-                       final ReconnectStrategy reestablishStrategy) {
+       public ReconnectPromise(final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
+                       final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
+                       final SessionListenerFactory<L> lfactory) {
 
                this.dispatcher = Preconditions.checkNotNull(dispatcher);
                this.address = Preconditions.checkNotNull(address);
-               this.listener = Preconditions.checkNotNull(listener);
-               this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
-               this.messageFactory =  Preconditions.checkNotNull(messageFactory);
                this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
                this.strategy = Preconditions.checkNotNull(reestablishStrategy);
+               this.lfactory = Preconditions.checkNotNull(lfactory);
        }
 
        synchronized void connect() {
-               final ReconnectStrategy cs = strategyFactory.createReconnectStrategy();
+               final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
                final ReconnectStrategy rs = new ReconnectStrategy() {
                        @Override
                        public Future<Void> scheduleReconnect(final Throwable cause) {
@@ -57,7 +50,7 @@ final class ReconnectPromise<M extends ProtocolMessage, S extends ProtocolSessio
                        @Override
                        public int getConnectTimeout() throws Exception {
                                final int cst = cs.getConnectTimeout();
-                               final int rst = strategy.getConnectTimeout();
+                               final int rst = ReconnectPromise.this.strategy.getConnectTimeout();
 
                                if (cst == 0) {
                                        return rst;
@@ -69,19 +62,18 @@ final class ReconnectPromise<M extends ProtocolMessage, S extends ProtocolSessio
                        }
                };
 
-               final Future<S> cf = dispatcher.createClient(address,
-                               listener, negotiatorFactory, messageFactory, rs);
+               final Future<S> cf = this.dispatcher.createClient(this.address, rs, this.lfactory);
 
                final Object lock = this;
-               pending = cf;
+               this.pending = cf;
 
                cf.addListener(new FutureListener<S>() {
                        @Override
                        public void operationComplete(final Future<S> future) {
                                synchronized (lock) {
                                        if (!future.isSuccess()) {
-                                               final Future<Void> rf = strategy.scheduleReconnect(cf.cause());
-                                               pending = rf;
+                                               final Future<Void> rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause());
+                                               ReconnectPromise.this.pending = rf;
 
                                                rf.addListener(new FutureListener<Void>() {
                                                        @Override
@@ -110,7 +102,7 @@ final class ReconnectPromise<M extends ProtocolMessage, S extends ProtocolSessio
                                                 *  FIXME: we have a slight race window with cancellation
                                                 *         here. Analyze and define its semantics.
                                                 */
-                                               strategy.reconnectSuccessful();
+                                               ReconnectPromise.this.strategy.reconnectSuccessful();
                                                setSuccess(null);
                                        }
                                }
@@ -121,7 +113,7 @@ final class ReconnectPromise<M extends ProtocolMessage, S extends ProtocolSessio
        @Override
        public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
                if (super.cancel(mayInterruptIfRunning)) {
-                       pending.cancel(mayInterruptIfRunning);
+                       this.pending.cancel(mayInterruptIfRunning);
                        return true;
                }
 
index 3eaf923d7b393b21897b5ad04ede50aa2d12bd2e..76acac8c911a58c45e3e7dc36ae45a4d536e99cf 100644 (file)
@@ -8,19 +8,19 @@
 package org.opendaylight.protocol.framework;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertFalse;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
 
 import java.io.IOException;
-import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.junit.After;
 import org.junit.Test;
@@ -28,7 +28,7 @@ import org.junit.Test;
 public class ServerTest {
        public static final int PORT = 18080;
 
-       AbstractDispatcher clientDispatcher, dispatcher;
+       AbstractDispatcher<?, SimpleSessionListener> clientDispatcher, dispatcher;
 
        final SimpleSessionListener pce = new SimpleSessionListener();
 
@@ -40,63 +40,93 @@ public class ServerTest {
 
        @Test
        public void testConnectionEstablished() throws Exception {
-               this.dispatcher = new AbstractDispatcher() { };
-
                final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
 
+               this.dispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+
+                       @Override
+                       public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
+                                       final Channel channel, final Promise<SimpleSession> promise) {
+                               p.setSuccess(true);
+                               return new SimpleSessionNegotiator(promise, channel);
+                       }
+               }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE));
+
                this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
                        @Override
                        public SimpleSessionListener getSessionListener() {
                                return new SimpleSessionListener();
                        }
-               }, new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+               });
 
+               this.server.get();
+
+               this.clientDispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
                        @Override
                        public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
                                        final Channel channel, final Promise<SimpleSession> promise) {
-                               p.setSuccess(true);
                                return new SimpleSessionNegotiator(promise, channel);
                        }
-               }, new MessageFactory());
+               }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE));
 
-               this.server.get();
-
-               this.clientDispatcher = new AbstractDispatcher() { };
-
-               this.session = this.clientDispatcher.createClient(this.serverAddress, new SimpleSessionListener(),
-                               new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+               this.session = (SimpleSession) this.clientDispatcher.createClient(this.serverAddress,
+                               new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
                                        @Override
-                                       public SessionNegotiator<SimpleSession> getSessionNegotiator(
-                                                       final SessionListenerFactory<SimpleSessionListener> factory, final Channel channel,
-                                                       final Promise<SimpleSession> promise) {
-                                               return new SimpleSessionNegotiator(promise, channel);
+                                       public SimpleSessionListener getSessionListener() {
+                                               return new SimpleSessionListener();
                                        }
-                               }, new MessageFactory(), new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000)).get();
+                               }).get(6, TimeUnit.SECONDS);
 
                assertEquals(true, p.get(3, TimeUnit.SECONDS));
        }
 
-       public void testConnectionFailed() throws IOException, InterruptedException {
-               this.dispatcher = new AbstractDispatcher() { };
-               this.clientDispatcher = new AbstractDispatcher() { };
-               final SimpleSessionListener listener = new SimpleSessionListener();
+       @Test
+       public void testConnectionFailed() throws IOException, InterruptedException, ExecutionException, TimeoutException {
+               final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
 
-               try {
-                       this.clientDispatcher.createClient(this.serverAddress, listener,
-                                       new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
-                                               @Override
-                                               public SessionNegotiator<SimpleSession> getSessionNegotiator(
-                                                               final SessionListenerFactory<SimpleSessionListener> factory, final Channel channel,
-                                                               final Promise<SimpleSession> promise) {
-                                                       return null;
-                                               }
-                                       }, new MessageFactory(), new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000)).get();
-
-                       fail("Connection succeeded unexpectedly");
-               } catch (final ExecutionException e) {
-                       assertTrue(listener.failed);
-                       assertTrue(e.getCause() instanceof ConnectException);
-               }
+               this.dispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+
+                       @Override
+                       public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
+                                       final Channel channel, final Promise<SimpleSession> promise) {
+                               p.setSuccess(true);
+                               return new SimpleSessionNegotiator(promise, channel);
+                       }
+               }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE));
+
+               this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
+                       @Override
+                       public SimpleSessionListener getSessionListener() {
+                               return new SimpleSessionListener();
+                       }
+               });
+
+               this.server.get();
+
+               this.clientDispatcher = new SimpleDispatcher<>(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+                       @Override
+                       public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
+                                       final Channel channel, final Promise<SimpleSession> promise) {
+                               return new SimpleSessionNegotiator(promise, channel);
+                       }
+               }, new ProtocolHandlerFactory<>(new MessageFactory()), new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE));
+
+               this.session = (SimpleSession) this.clientDispatcher.createClient(this.serverAddress,
+                               new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
+                                       @Override
+                                       public SimpleSessionListener getSessionListener() {
+                                               return new SimpleSessionListener();
+                                       }
+                               }).get(6, TimeUnit.SECONDS);
+
+               final Future<?> session = this.clientDispatcher.createClient(this.serverAddress,
+                               new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
+                                       @Override
+                                       public SimpleSessionListener getSessionListener() {
+                                               return new SimpleSessionListener();
+                                       }
+                               });
+               assertFalse(session.isSuccess());
        }
 
        @After
diff --git a/framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java b/framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java
new file mode 100644 (file)
index 0000000..0925f65
--- /dev/null
@@ -0,0 +1,32 @@
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Promise;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class SimpleDispatcher<M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends
+               AbstractDispatcher<S, L> {
+
+       private static final Logger logger = LoggerFactory.getLogger(SimpleDispatcher.class);
+
+       private final SessionNegotiatorFactory<M, S, L> negotiatorFactory;
+       private final ProtocolHandlerFactory<?> factory;
+
+       public SimpleDispatcher(final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final ProtocolHandlerFactory<?> factory,
+                       final Promise<S> promise) {
+               this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
+               this.factory = Preconditions.checkNotNull(factory);
+       }
+
+       @Override
+       public void initializeChannel(final SocketChannel ch, final Promise<S> promise, final SessionListenerFactory<L> lfactory) {
+               ch.pipeline().addLast(this.factory.getDecoders());
+               ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(lfactory, ch, promise));
+               ch.pipeline().addLast(this.factory.getEncoders());
+               logger.debug("initialization completed for channel {}", ch);
+       }
+}
index 127279d8eea830b11b79cd57d8e9228c0b04d4e8..40a6509cf2577ab5f13ddb0bc82307593a9318be 100644 (file)
@@ -31,13 +31,6 @@ public class SimpleSessionListener implements SessionListener<SimpleMessage, Sim
                this.messages.add(message);
        }
 
-       public synchronized void onConnectionFailed(final ProtocolSession<?> session, final Exception e) {
-               logger.debug("Connection Failed: {}", e.getMessage(), e);
-               this.failed = true;
-               this.notifyAll();
-               session.close();
-       }
-
        @Override
        public void onSessionUp(final SimpleSession session) {
                this.up = true;
@@ -45,12 +38,13 @@ public class SimpleSessionListener implements SessionListener<SimpleMessage, Sim
 
        @Override
        public void onSessionDown(final SimpleSession session, final Exception e) {
-               this.up = false;
+               this.failed = true;
+               this.notifyAll();
        }
 
        @Override
-       public void onSessionTerminated(final SimpleSession session,
-                       final TerminationReason reason) {
-               this.up = false;
+       public void onSessionTerminated(final SimpleSession session, final TerminationReason reason) {
+               this.failed = true;
+               this.notifyAll();
        }
 }
index 1eeb58a91af9b2876b2051bfa3249418472f9931..8f2bcf4b2a0296f295dbe8578d3471b7f7dc7fc9 100644 (file)
@@ -8,12 +8,10 @@
 package org.opendaylight.protocol.pcep;
 
 import io.netty.channel.ChannelFuture;
-import io.netty.util.concurrent.Future;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import org.opendaylight.protocol.framework.ReconnectStrategy;
 import org.opendaylight.protocol.framework.SessionListenerFactory;
 
 /**
@@ -22,21 +20,11 @@ import org.opendaylight.protocol.framework.SessionListenerFactory;
 public interface PCEPDispatcher {
        /**
         * Creates server. Each server needs three factories to pass their instances to client sessions.
+        * 
         * @param address to be bound with the server
         * @param listenerFactory to create listeners for clients
-        * @param proposalFactory to create proposed open objects for clients
-        * @param checkerFactory to create session characteristics checker for clients
         * @return instance of PCEPServer
         * @throws IOException if some IO error occurred
         */
        public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<PCEPSessionListener> listenerFactory);
-
-       /**
-        * Creates a client. Needs to be started via the start method.
-        * @param connection PCEP connection settings
-        * @param strategy Reconnection strategy to be used for TCP-level connection
-        * @throws IOException if some IO error occurred
-        */
-       public Future<? extends PCEPSession> createClient(InetSocketAddress address, final PCEPSessionListener listener, final ReconnectStrategy strategy);
 }
-
index f11a76c6f1d686010fdd3981f700b366157f6c2f..d5b674c92f209b704020973eb0c0d0c0ead1bacb 100644 (file)
@@ -12,9 +12,9 @@ import java.util.List;
 import org.opendaylight.protocol.framework.ProtocolMessage;
 
 /**
- * Basic structure for PCEP Message. Cannot be instantiated directly. Current
- * PCEP version is 1. Each message contains a list of PCEP objects.
- *
+ * Basic structure for PCEP Message. Cannot be instantiated directly. Current PCEP version is 1. Each message contains a
+ * list of PCEP objects.
+ * 
  */
 public abstract class PCEPMessage implements ProtocolMessage {
 
@@ -28,18 +28,20 @@ public abstract class PCEPMessage implements ProtocolMessage {
        private final List<PCEPObject> objects;
 
        /**
-        * Constructor is protected to prevent direct instantiation, but to allow to
-        * call this constructor via super().
-        *
+        * Constructor is protected to prevent direct instantiation, but to allow to call this constructor via super().
+        * 
         * @param objects
         */
-       protected PCEPMessage(List<PCEPObject> objects) {
+       protected PCEPMessage(final List<PCEPObject> objects) {
+               if (objects.contains(null))
+                       throw new IllegalArgumentException("Object list contains null element at offset " + objects.indexOf(null));
+
                this.objects = objects;
        }
 
        /**
         * Returns list of all objects that the message contains
-        *
+        * 
         * @return list of all objects that the message contains
         */
        public List<PCEPObject> getAllObjects() {
@@ -55,7 +57,7 @@ public abstract class PCEPMessage implements ProtocolMessage {
        }
 
        @Override
-       public boolean equals(Object obj) {
+       public boolean equals(final Object obj) {
                if (this == obj)
                        return true;
                if (obj == null)
index e2eb50e84f06eb803a5fd7fbd7df96b30dd5efd2..83c9615ea55aca516a91e558e0d25d363385bd30 100644 (file)
@@ -12,9 +12,11 @@ import java.util.List;
 
 import org.opendaylight.protocol.pcep.PCEPObject;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Structure that combines set of related objects.
- *
+ * 
  * @see <a href="http://www.ietf.org/id/draft-crabbe-pce-pce-initiated-lsp-00.txt">PCCreate Message</a>
  */
 public class CompositeInstantiationObject {
@@ -31,33 +33,27 @@ public class CompositeInstantiationObject {
 
        /**
         * Constructs basic composite object only with mandatory objects.
-        *
-        * @param endPoints
-        *            PCEPEndPointsObject<?>. Can't be null.
-        * @param lspa
-        *            PCEPLspaObject. Can't be null.
+        * 
+        * @param endPoints PCEPEndPointsObject<?>. Can't be null.
+        * @param lspa PCEPLspaObject. Can't be null.
         */
-       public CompositeInstantiationObject(PCEPEndPointsObject<?> endPoints, PCEPLspaObject lspa) {
+       public CompositeInstantiationObject(final PCEPEndPointsObject<?> endPoints, final PCEPLspaObject lspa) {
                this(endPoints, lspa, null, null, null);
        }
 
        /**
         * Constructs composite object with optional objects.
-        *
-        * @param endPoints
-        *                      PCEPEndPointsObject<?>. Can't be null.
-        * @param lspa
-        *                      PCEPLspaObject. Can't be null.
-        * @param ero
-        *                      PCEPExplicitRouteObject
-        * @param bandwidth
-        *                      PCEPRequestedPathBandwidthObject
-        * @param metrics
-        *                      List<PCEPMetricObject>
+        * 
+        * @param endPoints PCEPEndPointsObject<?>. Can't be null.
+        * @param lspa PCEPLspaObject. Can't be null.
+        * @param ero PCEPExplicitRouteObject
+        * @param bandwidth PCEPRequestedPathBandwidthObject
+        * @param metrics List<PCEPMetricObject>
         */
-       public CompositeInstantiationObject(PCEPEndPointsObject<?> endPoints, PCEPLspaObject lspa, PCEPExplicitRouteObject ero, PCEPRequestedPathBandwidthObject bandwidth, List<PCEPMetricObject> metrics) {
-               this.endPoints = endPoints;
-               this.lspa = lspa;
+       public CompositeInstantiationObject(final PCEPEndPointsObject<?> endPoints, final PCEPLspaObject lspa,
+                       final PCEPExplicitRouteObject ero, final PCEPRequestedPathBandwidthObject bandwidth, final List<PCEPMetricObject> metrics) {
+               this.endPoints = Preconditions.checkNotNull(endPoints);
+               this.lspa = Preconditions.checkNotNull(lspa);
                this.ero = ero;
                this.bandwidth = bandwidth;
                this.metrics = metrics;
@@ -65,7 +61,7 @@ public class CompositeInstantiationObject {
 
        /**
         * Gets list of all objects, which are in appropriate order.
-        *
+        * 
         * @return List<PCEPObject>. Can't be null or empty.
         */
        public List<PCEPObject> getCompositeAsList() {
@@ -83,13 +79,11 @@ public class CompositeInstantiationObject {
 
        /**
         * Creates this object from a list of PCEPObjects.
-        *
-        * @param objects
-        *            List<PCEPObject> list of PCEPObjects from whose this object
-        *            should be created.
+        * 
+        * @param objects List<PCEPObject> list of PCEPObjects from whose this object should be created.
         * @return CompositeInstantiationObject
         */
-       public static CompositeInstantiationObject getCompositeFromList(List<PCEPObject> objects) {
+       public static CompositeInstantiationObject getCompositeFromList(final List<PCEPObject> objects) {
                if (objects == null || objects.isEmpty()) {
                        throw new IllegalArgumentException("List cannot be null or empty.");
                }
@@ -116,26 +110,26 @@ public class CompositeInstantiationObject {
                while (!objects.isEmpty()) {
                        final PCEPObject obj = objects.get(0);
                        switch (state) {
-                               case 1:
-                                       state = 2;
-                                       if (obj instanceof PCEPExplicitRouteObject) {
-                                               ero = (PCEPExplicitRouteObject) obj;
-                                               break;
-                                       }
-                               case 2:
+                       case 1:
+                               state = 2;
+                               if (obj instanceof PCEPExplicitRouteObject) {
+                                       ero = (PCEPExplicitRouteObject) obj;
+                                       break;
+                               }
+                       case 2:
+                               state = 3;
+                               if (obj instanceof PCEPRequestedPathBandwidthObject) {
+                                       bandwidth = (PCEPRequestedPathBandwidthObject) obj;
+                                       break;
+                               }
+                       case 3:
+                               state = 4;
+                               if (obj instanceof PCEPMetricObject) {
+                                       metrics.add((PCEPMetricObject) obj);
                                        state = 3;
-                                       if (obj instanceof PCEPRequestedPathBandwidthObject) {
-                                               bandwidth = (PCEPRequestedPathBandwidthObject) obj;
-                                               break;
-                                       }
-                               case 3:
-                                       state = 4;
-                                       if (obj instanceof PCEPMetricObject) {
-                                               metrics.add((PCEPMetricObject) obj);
-                                               state = 3;
 
-                                               break;
-                                       }
+                                       break;
+                               }
                        }
 
                        if (state == 4) {
@@ -190,10 +184,8 @@ public class CompositeInstantiationObject {
        public int hashCode() {
                final int prime = 31;
                int result = 1;
-               result = prime * result
-                               + ((this.bandwidth == null) ? 0 : this.bandwidth.hashCode());
-               result = prime * result
-                               + ((this.endPoints == null) ? 0 : this.endPoints.hashCode());
+               result = prime * result + ((this.bandwidth == null) ? 0 : this.bandwidth.hashCode());
+               result = prime * result + ((this.endPoints == null) ? 0 : this.endPoints.hashCode());
                result = prime * result + ((this.ero == null) ? 0 : this.ero.hashCode());
                result = prime * result + ((this.lspa == null) ? 0 : this.lspa.hashCode());
                result = prime * result + ((this.metrics == null) ? 0 : this.metrics.hashCode());
@@ -204,7 +196,7 @@ public class CompositeInstantiationObject {
         * @see java.lang.Object#equals(java.lang.Object)
         */
        @Override
-       public boolean equals(Object obj) {
+       public boolean equals(final Object obj) {
                if (this == obj)
                        return true;
                if (obj == null)
index 2feaed9b83085335f5c2d34f126de180eeef9365..52cc2df3c642eb88725037036d4bc23ee14d988b 100644 (file)
@@ -8,51 +8,50 @@
 package org.opendaylight.protocol.pcep.impl;
 
 import io.netty.channel.ChannelFuture;
-import io.netty.util.concurrent.Future;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Promise;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutionException;
 
 import org.opendaylight.protocol.framework.AbstractDispatcher;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
 import org.opendaylight.protocol.framework.SessionListenerFactory;
 import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
 import org.opendaylight.protocol.pcep.PCEPDispatcher;
 import org.opendaylight.protocol.pcep.PCEPMessage;
-import org.opendaylight.protocol.pcep.PCEPSession;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Implementation of PCEPDispatcher.
  */
-public class PCEPDispatcherImpl extends AbstractDispatcher implements PCEPDispatcher {
-       private static final PCEPMessageFactory msgFactory = new PCEPMessageFactory();
+public class PCEPDispatcherImpl extends AbstractDispatcher<PCEPSessionImpl, PCEPSessionListener> implements PCEPDispatcher {
+
        private final SessionNegotiatorFactory<PCEPMessage, PCEPSessionImpl, PCEPSessionListener> snf;
 
+       private final PCEPHandlerFactory hf = new PCEPHandlerFactory();
+
        /**
         * Creates an instance of PCEPDispatcherImpl, gets the default selector and opens it.
         * 
         * @throws IOException if some error occurred during opening the selector
         */
-       public PCEPDispatcherImpl(final SessionNegotiatorFactory<PCEPMessage, PCEPSessionImpl, PCEPSessionListener> snf) {
+       public PCEPDispatcherImpl(final SessionNegotiatorFactory<PCEPMessage, PCEPSessionImpl, PCEPSessionListener> negotiatorFactory) {
                super();
-               this.snf = snf;
+               this.snf = Preconditions.checkNotNull(negotiatorFactory);
        }
 
        @Override
        public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<PCEPSessionListener> listenerFactory) {
-               return this.createServer(address, listenerFactory, snf, msgFactory);
+               return super.createServer(address, listenerFactory);
        }
 
-       /**
-        * Create client is used for mock purposes only.
-        * 
-        * @throws ExecutionException
-        * @throws InterruptedException
-        */
        @Override
-       public Future<? extends PCEPSession> createClient(final InetSocketAddress address, final PCEPSessionListener listener, final ReconnectStrategy strategy) {
-               return this.createClient(address, listener, snf, msgFactory, strategy);
+       public void initializeChannel(final SocketChannel ch, final Promise<PCEPSessionImpl> promise,
+                       final SessionListenerFactory<PCEPSessionListener> listenerFactory) {
+               ch.pipeline().addLast(this.hf.getDecoders());
+               ch.pipeline().addLast("negotiator", this.snf.getSessionNegotiator(listenerFactory, ch, promise));
+               ch.pipeline().addLast(this.hf.getEncoders());
        }
 }
diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPHandlerFactory.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPHandlerFactory.java
new file mode 100644 (file)
index 0000000..5eadc1e
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.pcep.impl;
+
+import io.netty.channel.ChannelHandler;
+
+import org.opendaylight.protocol.framework.ProtocolHandlerFactory;
+import org.opendaylight.protocol.framework.ProtocolMessageDecoder;
+import org.opendaylight.protocol.framework.ProtocolMessageEncoder;
+import org.opendaylight.protocol.pcep.PCEPMessage;
+
+/**
+ * PCEP specific factory for protocol inbound/outbound handlers.
+ */
+public class PCEPHandlerFactory extends ProtocolHandlerFactory<PCEPMessage> {
+       private final ProtocolMessageEncoder<PCEPMessage> encoder;
+
+       public PCEPHandlerFactory() {
+               super(new PCEPMessageFactory());
+               this.encoder = new ProtocolMessageEncoder<PCEPMessage>(this.msgFactory);
+       }
+
+       @Override
+       public ChannelHandler[] getEncoders() {
+               return new ChannelHandler[] { this.encoder };
+       }
+
+       @Override
+       public ChannelHandler[] getDecoders() {
+               return new ChannelHandler[] { new PCEPMessageHeaderDecoder(), new ProtocolMessageDecoder<PCEPMessage>(this.msgFactory) };
+       }
+}
diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPMessageHeaderDecoder.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPMessageHeaderDecoder.java
new file mode 100644 (file)
index 0000000..1a151ef
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.pcep.impl;
+
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+/**
+ * @see <a href="http://tools.ietf.org/html/rfc5440#section-6.1">Common Message Header</a>
+ */
+public class PCEPMessageHeaderDecoder extends LengthFieldBasedFrameDecoder {
+
+       private static final int MAX_FRAME_SIZE = 65528; // min 4, max 4096
+
+       private static final int VERSION_FLAGS_SIZE = 1;
+
+       private static final int LENGTH_SIZE = 2; // the length field represents the length of the whole message including
+                                                                                               // the header
+
+       private static final int MESSAGE_TYPE_SIZE = 1;
+
+       /* 
+       
+       0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+       +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+       | Ver |  Flags  |  Message-Type |       Message-Length          |
+       +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+       
+        */
+
+       public PCEPMessageHeaderDecoder() {
+               super(MAX_FRAME_SIZE, VERSION_FLAGS_SIZE + MESSAGE_TYPE_SIZE, LENGTH_SIZE, -LENGTH_SIZE - MESSAGE_TYPE_SIZE - VERSION_FLAGS_SIZE, 0);
+       }
+}
index 075a9702af2e64838c95d7dd75d5d1946507bc8c..5a2079244abb5eb3b96a6ad0e3439690a3cc9a75 100644 (file)
@@ -48,7 +48,8 @@ import com.google.common.base.Preconditions;
 /**
  * Implementation of PCEPSession. (Not final for testing.)
  */
-class PCEPSessionImpl extends AbstractProtocolSession<PCEPMessage> implements PCEPSession, PCEPSessionRuntimeMXBean {
+@VisibleForTesting
+public class PCEPSessionImpl extends AbstractProtocolSession<PCEPMessage> implements PCEPSession, PCEPSessionRuntimeMXBean {
        /**
         * System.nanoTime value about when was sent the last message Protected to be updated also in tests.
         */
@@ -94,8 +95,8 @@ class PCEPSessionImpl extends AbstractProtocolSession<PCEPMessage> implements PC
 
        private final Channel channel;
 
-       PCEPSessionImpl(final Timer timer, final PCEPSessionListener listener, final int maxUnknownMessages,
-                       final Channel channel, final PCEPOpenObject localOpen, final PCEPOpenObject remoteOpen) {
+       PCEPSessionImpl(final Timer timer, final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
+                       final PCEPOpenObject localOpen, final PCEPOpenObject remoteOpen) {
                this.listener = Preconditions.checkNotNull(listener);
                this.stateTimer = Preconditions.checkNotNull(timer);
                this.channel = Preconditions.checkNotNull(channel);
@@ -108,7 +109,7 @@ class PCEPSessionImpl extends AbstractProtocolSession<PCEPMessage> implements PC
                }
 
                if (getDeadTimerValue() != 0) {
-                       stateTimer.newTimeout(new TimerTask() {
+                       this.stateTimer.newTimeout(new TimerTask() {
                                @Override
                                public void run(final Timeout timeout) throws Exception {
                                        handleDeadTimer();
@@ -117,7 +118,7 @@ class PCEPSessionImpl extends AbstractProtocolSession<PCEPMessage> implements PC
                }
 
                if (getKeepAliveTimerValue() != 0) {
-                       stateTimer.newTimeout(new TimerTask() {
+                       this.stateTimer.newTimeout(new TimerTask() {
                                @Override
                                public void run(final Timeout timeout) throws Exception {
                                        handleKeepaliveTimer();
@@ -144,7 +145,7 @@ class PCEPSessionImpl extends AbstractProtocolSession<PCEPMessage> implements PC
                                logger.debug("DeadTimer expired. " + new Date());
                                this.terminate(Reason.EXP_DEADTIMER);
                        } else {
-                               stateTimer.newTimeout(new TimerTask() {
+                               this.stateTimer.newTimeout(new TimerTask() {
                                        @Override
                                        public void run(final Timeout timeout) throws Exception {
                                                handleDeadTimer();
@@ -165,7 +166,7 @@ class PCEPSessionImpl extends AbstractProtocolSession<PCEPMessage> implements PC
 
                long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
 
-               if (channel.isActive()) {
+               if (this.channel.isActive()) {
                        if (ct >= nextKeepalive) {
                                this.sendMessage(new PCEPKeepAliveMessage());
                                nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
@@ -326,17 +327,17 @@ class PCEPSessionImpl extends AbstractProtocolSession<PCEPMessage> implements PC
 
        @Override
        public Integer getDeadTimerValue() {
-               return remoteOpen.getDeadTimerValue();
+               return this.remoteOpen.getDeadTimerValue();
        }
 
        @Override
        public Integer getKeepAliveTimerValue() {
-               return localOpen.getKeepAliveTimerValue();
+               return this.localOpen.getKeepAliveTimerValue();
        }
 
        @Override
        public String getPeerAddress() {
-               InetSocketAddress a = (InetSocketAddress) channel.remoteAddress();
+               final InetSocketAddress a = (InetSocketAddress) this.channel.remoteAddress();
                return a.getHostName();
        }
 
@@ -347,7 +348,7 @@ class PCEPSessionImpl extends AbstractProtocolSession<PCEPMessage> implements PC
 
        @Override
        public String getNodeIdentifier() {
-               for (PCEPTlv tlv : this.remoteOpen.getTlvs()) {
+               for (final PCEPTlv tlv : this.remoteOpen.getTlvs()) {
                        if (tlv instanceof NodeIdentifierTlv) {
                                return tlv.toString();
                        }
@@ -361,13 +362,13 @@ class PCEPSessionImpl extends AbstractProtocolSession<PCEPMessage> implements PC
        }
 
        protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
-               toStringHelper.add("localOpen", localOpen);
-               toStringHelper.add("remoteOpen", remoteOpen);
+               toStringHelper.add("localOpen", this.localOpen);
+               toStringHelper.add("remoteOpen", this.remoteOpen);
                return toStringHelper;
        }
 
        @Override
        protected void sessionUp() {
-               listener.onSessionUp(this);
+               this.listener.onSessionUp(this);
        }
 }
index 5234fc7a4f8ddc8d6afece324ea1b9f5e2549de1..ee09a743e531c7c38f49e6414a8287106956180f 100644 (file)
@@ -22,6 +22,7 @@ import org.opendaylight.protocol.pcep.message.PCCreateMessage;
 import org.opendaylight.protocol.pcep.object.CompositeInstantiationObject;
 import org.opendaylight.protocol.pcep.object.PCEPEndPointsObject;
 import org.opendaylight.protocol.pcep.object.PCEPExplicitRouteObject;
+import org.opendaylight.protocol.pcep.object.PCEPLspaObject;
 import org.opendaylight.protocol.pcep.subobject.EROIPPrefixSubobject;
 import org.opendaylight.protocol.pcep.subobject.ExplicitRouteSubobject;
 import org.slf4j.Logger;
@@ -54,7 +55,7 @@ public class SimpleSessionListener implements PCEPSessionListener {
                subs.add(new EROIPPrefixSubobject<Prefix<?>>(new IPv4Prefix(new IPv4Address(new byte[] { 10, 1, 1, 2 }), 32), false));
                subs.add(new EROIPPrefixSubobject<Prefix<?>>(new IPv4Prefix(new IPv4Address(new byte[] { 2, 2, 2, 2 }), 32), false));
                final CompositeInstantiationObject cpo = new CompositeInstantiationObject(new PCEPEndPointsObject<IPv4Address>(IPv4.FAMILY.addressForBytes(new byte[] {
-                               1, 1, 1, 1 }), IPv4.FAMILY.addressForBytes(new byte[] { 2, 2, 2, 2 })), null, new PCEPExplicitRouteObject(subs, false), null, null);
+                               1, 1, 1, 1 }), IPv4.FAMILY.addressForBytes(new byte[] { 2, 2, 2, 2 })), new PCEPLspaObject(0, 0, 0, (short) 0, (short) 0, false, false, false, false), new PCEPExplicitRouteObject(subs, false), null, null);
 
                session.sendMessage(new PCCreateMessage(Lists.newArrayList(cpo)));
        }
diff --git a/pcep/testtool/src/main/resources/GroovyReplyMessageGenerator.groovy b/pcep/testtool/src/main/resources/GroovyReplyMessageGenerator.groovy
deleted file mode 100644 (file)
index 25be4c6..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-import java.util.Queue
-
-import org.opendaylight.protocol.pcep.PCEPMessage
-import org.opendaylight.protocol.pcep.object.PCEPRequestParameterObject
-import org.opendaylight.protocol.pcep.message.PCEPReplyMessage
-import org.opendaylight.protocol.pcep.object.CompositeResponseObject
-import org.opendaylight.protocol.pcep.tool.MessageGeneratorService
-
-class GroovyReplyMessageGenerator implements MessageGeneratorService {
-       
-       public GroovyReplyMessageGenerator() {
-               }
-       
-       @Override
-       public Queue<PCEPMessage> generateMessages() {
-               def queue = new LinkedList<PCEPMessage>()
-               queue.push(
-                       new PCEPReplyMessage(
-                               [
-                                       new CompositeResponseObject(
-                                               new PCEPRequestParameterObject(true, false, true, false, true, 7 as Short, 6565 as Long, true, false)
-                                       )
-                               ]
-                       )
-               )
-
-               queue.push(
-                       new PCEPReplyMessage(
-                               [
-                                       new CompositeResponseObject(
-                                               new PCEPRequestParameterObject(true, false, true, false, true, 5 as Short, 235568 as Long, true, false)
-                                       )
-                               ]
-                       )
-               )
-                               
-               return queue
-       }
-}
\ No newline at end of file
index 8b0b76448c3d70682c650505584520cdb641f9fd..f19f563b5d627c2310a1c7c8319081fe377de89c 100644 (file)
@@ -7,47 +7,69 @@
  */
 package org.opendaylight.protocol.pcep.testtool;
 
+import io.netty.channel.socket.SocketChannel;
 import io.netty.util.HashedWheelTimer;
+import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
 
 import java.net.InetSocketAddress;
 import java.util.List;
 
+import org.opendaylight.protocol.framework.AbstractDispatcher;
 import org.opendaylight.protocol.framework.NeverReconnectStrategy;
+import org.opendaylight.protocol.framework.ProtocolHandlerFactory;
+import org.opendaylight.protocol.framework.ProtocolMessage;
+import org.opendaylight.protocol.framework.ProtocolSession;
+import org.opendaylight.protocol.framework.SessionListener;
+import org.opendaylight.protocol.framework.SessionListenerFactory;
+import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
+import org.opendaylight.protocol.pcep.PCEPMessage;
+import org.opendaylight.protocol.pcep.PCEPSessionListener;
 import org.opendaylight.protocol.pcep.PCEPTlv;
 import org.opendaylight.protocol.pcep.impl.DefaultPCEPSessionNegotiatorFactory;
-import org.opendaylight.protocol.pcep.impl.PCEPDispatcherImpl;
+import org.opendaylight.protocol.pcep.impl.PCEPHandlerFactory;
+import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
 import org.opendaylight.protocol.pcep.object.PCEPOpenObject;
 import org.opendaylight.protocol.pcep.tlv.NodeIdentifierTlv;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-public class PCCMock {
+public class PCCMock<M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends
+               AbstractDispatcher<S, L> {
+
+       private final SessionNegotiatorFactory<M, S, L> negotiatorFactory;
+       private final ProtocolHandlerFactory<?> factory;
+
+       public PCCMock(final SessionNegotiatorFactory<M, S, L> negotiatorFactory, final ProtocolHandlerFactory<?> factory,
+                       final DefaultPromise<PCEPSessionImpl> defaultPromise) {
+               this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
+               this.factory = Preconditions.checkNotNull(factory);
+       }
+
+       @Override
+       public void initializeChannel(final SocketChannel ch, final Promise<S> promise, final SessionListenerFactory<L> listenerFactory) {
+               ch.pipeline().addLast(this.factory.getDecoders());
+               ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
+               ch.pipeline().addLast(this.factory.getEncoders());
+       }
 
        public static void main(final String[] args) throws Exception {
                final List<PCEPTlv> tlvs = Lists.newArrayList();
                tlvs.add(new NodeIdentifierTlv(new byte[] { (byte) 127, (byte) 2, (byte) 3, (byte) 7 }));
 
-               final PCEPDispatcherImpl d = new PCEPDispatcherImpl(new DefaultPCEPSessionNegotiatorFactory(new HashedWheelTimer(), new PCEPOpenObject(30, 120, 0, tlvs), 0));
-
-               try {
-                       d.createClient(new InetSocketAddress("127.0.0.3", 12345), new SimpleSessionListener(),
-                                       new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 2000)).get();
-
-                       // Thread.sleep(5000);
-                       // final List<CompositeRequestObject> cro = new ArrayList<CompositeRequestObject>();
-                       // cro.add(new CompositeRequestObject(new PCEPRequestParameterObject(false, true, true, true, true, (short)
-                       // 4, 123, false, false),
-                       // new PCEPEndPointsObject<IPv4Address>(new IPv4Address(InetAddress.getByName("10.0.0.3")), new
-                       // IPv4Address(InetAddress.getByName("10.0.0.5")))));
-                       // for (int i = 0; i < 3; i++) {
-                       // Thread.sleep(1000);
-                       // session.sendMessage(new PCEPRequestMessage(cro));
-                       // }
-                       // Thread.sleep(5000);
-                       // Thread.sleep(1000);
-               } finally {
-                       // di.stop();
-               }
+               final SessionNegotiatorFactory<PCEPMessage, PCEPSessionImpl, PCEPSessionListener> snf = new DefaultPCEPSessionNegotiatorFactory(new HashedWheelTimer(), new PCEPOpenObject(30, 120, 0, tlvs), 0);
+
+               final PCCMock<PCEPMessage, PCEPSessionImpl, PCEPSessionListener> pcc = new PCCMock<>(snf, new PCEPHandlerFactory(), new DefaultPromise<PCEPSessionImpl>(GlobalEventExecutor.INSTANCE));
+
+               pcc.createClient(new InetSocketAddress("127.0.0.3", 12345), new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 2000),
+                               new SessionListenerFactory<PCEPSessionListener>() {
+
+                                       @Override
+                                       public PCEPSessionListener getSessionListener() {
+                                               return new SimpleSessionListener();
+                                       }
+                               }).get();
        }
 }