Move protocol framework from BGPCEP project 15/4315/5
authorRobert Varga <rovarga@cisco.com>
Thu, 16 Jan 2014 10:58:29 +0000 (11:58 +0100)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 18 Jan 2014 19:10:18 +0000 (19:10 +0000)
This moves the current parts of BGPCEP protocol framework into the
controller project, eliminating the circular dependency forced by
netconf being resident in this project.

Change-Id: I7c8457ebaec5c261217587d60a95a7b35e473d1e
Signed-off-by: Robert Varga <rovarga@cisco.com>
47 files changed:
opendaylight/commons/opendaylight/pom.xml
opendaylight/commons/protocol-framework/.gitignore [new file with mode: 0644]
opendaylight/commons/protocol-framework/pom.xml [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractProtocolSession.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractSessionNegotiator.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/DeserializerException.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/DocumentedException.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/NeverReconnectStrategy.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolHandlerFactory.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageDecoder.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageEncoder.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageFactory.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSession.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectImmediatelyStrategy.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectStrategy.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectStrategyFactory.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionListener.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionListenerFactory.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionNegotiator.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionNegotiatorFactory.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/TerminationReason.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/TimedReconnectStrategy.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ComplementaryTest.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/Session.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleByteToMessageDecoder.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleMessage.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleMessageToByteEncoder.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleSession.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleSessionListener.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleSessionListenerFactory.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleSessionNegotiator.java [new file with mode: 0644]
opendaylight/commons/protocol-framework/src/test/resources/logback-test.xml [new file with mode: 0644]
opendaylight/config/pom.xml
opendaylight/distribution/opendaylight/pom.xml
opendaylight/md-sal/sal-binding-it/src/main/java/org/opendaylight/controller/test/sal/binding/it/TestHelper.java
opendaylight/md-sal/test/sal-rest-connector-it/src/test/java/org/opendaylight/controller/test/restconf/it/ServiceProviderController.java
opendaylight/netconf/netconf-api/pom.xml
opendaylight/netconf/netconf-client/pom.xml
opendaylight/netconf/netconf-impl/pom.xml
opendaylight/netconf/netconf-util/pom.xml
opendaylight/netconf/pom.xml
pom.xml

index d49437f..06c1d8d 100644 (file)
@@ -93,6 +93,7 @@
     <networkconfig.bridgedomain.northbound.version>0.0.2-SNAPSHOT</networkconfig.bridgedomain.northbound.version>
     <commons.httpclient.version>0.1.1-SNAPSHOT</commons.httpclient.version>
     <concepts.version>0.5.1-SNAPSHOT</concepts.version>
+    <protocol-framework.version>0.4.0-SNAPSHOT</protocol-framework.version>
     <jolokia.bridge.version>0.0.1-SNAPSHOT</jolokia.bridge.version>
     <netty.version>4.0.10.Final</netty.version>
     <commons.io.version>2.4</commons.io.version>
        <artifactId>yang-ext</artifactId>
        <version>${yang-ext.version}</version>
       </dependency>
-      <!-- bgppcep dependencies -->
-    <dependency>
-        <groupId>org.opendaylight.bgpcep</groupId>
-        <artifactId>framework</artifactId>
-        <version>${bgpcep.version}</version>
-     </dependency>
+
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>protocol-framework</artifactId>
+        <version>${protocol-framework.version}</version>
+      </dependency>
+
         <!--Netty-->
         <dependency>
             <groupId>io.netty</groupId>
diff --git a/opendaylight/commons/protocol-framework/.gitignore b/opendaylight/commons/protocol-framework/.gitignore
new file mode 100644 (file)
index 0000000..fc1d35e
--- /dev/null
@@ -0,0 +1,3 @@
+target
+.classpath
+.settings
diff --git a/opendaylight/commons/protocol-framework/pom.xml b/opendaylight/commons/protocol-framework/pom.xml
new file mode 100644 (file)
index 0000000..0ea865f
--- /dev/null
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>commons.opendaylight</artifactId>
+        <version>1.4.1-SNAPSHOT</version>
+        <relativePath>../../commons/opendaylight</relativePath>
+    </parent>
+    <scm>
+        <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+        <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+        <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:Main</url>
+        <tag>HEAD</tag>
+    </scm>
+
+    <artifactId>protocol-framework</artifactId>
+    <version>${protocol-framework.version}</version>
+    <description>Common protocol framework</description>
+    <packaging>bundle</packaging>
+    <name>${project.artifactId}</name>
+    <prerequisites>
+        <maven>3.0.4</maven>
+    </prerequisites>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-buffer</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+        <!-- Testing dependencies -->
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>mockito-configuration</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+                    </instructions>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java
new file mode 100644 (file)
index 0000000..e90af73
--- /dev/null
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
+
+import java.io.Closeable;
+import java.net.InetSocketAddress;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
+ * start method that will handle sockets in different thread.
+ */
+public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> implements Closeable {
+
+    protected interface PipelineInitializer<S extends ProtocolSession<?>> {
+        /**
+         * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this
+         * method needs to be implemented in protocol specific Dispatchers.
+         *
+         * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory}
+         * @param promise to be passed to {@link SessionNegotiatorFactory}
+         */
+        void initializeChannel(SocketChannel channel, Promise<S> promise);
+    }
+
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractDispatcher.class);
+
+    private final EventLoopGroup bossGroup;
+
+    private final EventLoopGroup workerGroup;
+
+    private final EventExecutor executor;
+
+    protected AbstractDispatcher(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
+        this(GlobalEventExecutor.INSTANCE, bossGroup, workerGroup);
+    }
+
+    protected AbstractDispatcher(final EventExecutor executor, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
+        this.bossGroup = Preconditions.checkNotNull(bossGroup);
+        this.workerGroup = Preconditions.checkNotNull(workerGroup);
+        this.executor = Preconditions.checkNotNull(executor);
+    }
+
+
+    /**
+     * Creates server. Each server needs factories to pass their instances to client sessions.
+     *
+     * @param address address to which the server should be bound
+     * @param initializer instance of PipelineInitializer used to initialize the channel pipeline
+     *
+     * @return ChannelFuture representing the binding process
+     */
+    protected ChannelFuture createServer(final InetSocketAddress address, final PipelineInitializer<S> initializer) {
+        final ServerBootstrap b = new ServerBootstrap();
+        b.group(this.bossGroup, this.workerGroup);
+        b.channel(NioServerSocketChannel.class);
+        b.option(ChannelOption.SO_BACKLOG, 128);
+        b.childHandler(new ChannelInitializer<SocketChannel>() {
+
+            @Override
+            protected void initChannel(final SocketChannel ch) {
+                initializer.initializeChannel(ch, new DefaultPromise<S>(executor));
+            }
+        });
+        b.childOption(ChannelOption.SO_KEEPALIVE, true);
+
+        // Bind and start to accept incoming connections.
+        final ChannelFuture f = b.bind(address);
+        LOG.debug("Initiated server {} at {}.", f, address);
+        return f;
+
+    }
+
+    /**
+     * Creates a client.
+     *
+     * @param address remote address
+     * @param connectStrategy Reconnection strategy to be used when initial connection fails
+     *
+     * @return Future representing the connection process. Its result represents the combined success of TCP connection
+     *         as well as session negotiation.
+     */
+    protected Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final PipelineInitializer<S> initializer) {
+        final Bootstrap b = new Bootstrap();
+        final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<S>(executor, address, strategy, b);
+        b.group(this.workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(
+                new ChannelInitializer<SocketChannel>() {
+
+                    @Override
+                    protected void initChannel(final SocketChannel ch) {
+                        initializer.initializeChannel(ch, p);
+                    }
+                });
+        p.connect();
+        LOG.debug("Client created.");
+        return p;
+    }
+
+    /**
+     * Creates a client.
+     *
+     * @param address remote address
+     * @param connectStrategyFactory Factory for creating reconnection strategy to be used when initial connection fails
+     * @param reestablishStrategy Reconnection strategy to be used when the already-established session fails
+     *
+     * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
+     *         success if it indicates no further attempts should be made and failure if it reports an error
+     */
+    protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
+            final ReconnectStrategy reestablishStrategy, final PipelineInitializer<S> initializer) {
+
+        final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(GlobalEventExecutor.INSTANCE, this, address, connectStrategyFactory, reestablishStrategy, initializer);
+        p.connect();
+
+        return p;
+
+    }
+
+    /**
+     * @deprecated Should only be used with {@link AbstractDispatcher#AbstractDispatcher()}
+     */
+    @Deprecated
+    @Override
+    public void close() {
+        try {
+            this.workerGroup.shutdownGracefully();
+        } finally {
+            this.bossGroup.shutdownGracefully();
+        }
+    }
+
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractProtocolSession.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractProtocolSession.java
new file mode 100644 (file)
index 0000000..e7bd665
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractProtocolSession<M> extends SimpleChannelInboundHandler<Object> implements ProtocolSession<M> {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractProtocolSession.class);
+
+    /**
+     * Handles incoming message (parsing, reacting if necessary).
+     *
+     * @param msg incoming message
+     */
+    protected abstract void handleMessage(final M msg);
+
+    /**
+     * Called when reached the end of input stream while reading.
+     */
+    protected abstract void endOfInput();
+
+    /**
+     * Called when the session is added to the pipeline.
+     */
+    protected abstract void sessionUp();
+
+    @Override
+    public final void channelInactive(final ChannelHandlerContext ctx) {
+        LOG.debug("Channel {} inactive.", ctx.channel());
+        endOfInput();
+    }
+
+    @Override
+    protected final void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
+        LOG.debug("Message was received: {}", msg);
+        handleMessage((M) msg);
+    }
+
+    @Override
+    public final void handlerAdded(final ChannelHandlerContext ctx) {
+        sessionUp();
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractSessionNegotiator.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractSessionNegotiator.java
new file mode 100644 (file)
index 0000000..9ecfb1b
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.concurrent.Promise;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Abstract base class for session negotiators. It implements the basic
+ * substrate to implement SessionNegotiator API specification, with subclasses
+ * needing to provide only
+ *
+ * @param <M> Protocol message type
+ * @param <S> Protocol session type, has to extend ProtocolSession<M>
+ */
+public abstract class AbstractSessionNegotiator<M, S extends AbstractProtocolSession<?>> extends ChannelInboundHandlerAdapter implements SessionNegotiator<S> {
+    private final Logger logger = LoggerFactory.getLogger(AbstractSessionNegotiator.class);
+    private final Promise<S> promise;
+    protected final Channel channel;
+
+    public AbstractSessionNegotiator(final Promise<S> promise, final Channel channel) {
+        this.promise = Preconditions.checkNotNull(promise);
+        this.channel = Preconditions.checkNotNull(channel);
+    }
+
+    protected abstract void startNegotiation() throws Exception;
+    protected abstract void handleMessage(M msg) throws Exception;
+
+    protected final void negotiationSuccessful(final S session) {
+        logger.debug("Negotiation on channel {} successful with session {}", channel, session);
+        channel.pipeline().replace(this, "session", session);
+        promise.setSuccess(session);
+    }
+
+    protected final void negotiationFailed(final Throwable cause) {
+        logger.debug("Negotiation on channel {} failed", channel, cause);
+        channel.close();
+        promise.setFailure(cause);
+    }
+
+    @Override
+    public final void channelActive(final ChannelHandlerContext ctx) {
+        logger.debug("Starting session negotiation on channel {}", channel);
+        try {
+            startNegotiation();
+        } catch (Exception e) {
+            logger.info("Unexpected negotiation failure", e);
+            negotiationFailed(e);
+        }
+    }
+
+    @Override
+    public final void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+        logger.debug("Negotiation read invoked on channel {}", channel);
+        try {
+            handleMessage((M)msg);
+        } catch (Exception e) {
+            logger.debug("Unexpected exception during negotiation", e);
+            negotiationFailed(e);
+        }
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/DeserializerException.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/DeserializerException.java
new file mode 100644 (file)
index 0000000..608e949
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+/**
+ * Used when something occurs during parsing bytes to java objects.
+ *
+ * @deprecated This exception no longer carries any special meaning. Users
+ * are advised to stop using it and define their own replacement.
+ */
+@Deprecated
+public class DeserializerException extends Exception {
+
+    private static final long serialVersionUID = -2247000673438452870L;
+
+    /**
+     * Creates a deserializer exception.
+     * @param err string
+     */
+    public DeserializerException(final String err) {
+        super(err);
+    }
+
+    /**
+     * Creates a deserializer exception.
+     * @param err string
+     * @param e underlying exception
+     */
+    public DeserializerException(final String err, final Throwable e) {
+        super(err, e);
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/DocumentedException.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/DocumentedException.java
new file mode 100644 (file)
index 0000000..5e5f29e
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+/**
+ * Documented exception occurrs when an error is thrown that is documented
+ * in any RFC or draft for the specific protocol.
+ *
+ * @deprecated This exception no longer carries any special meaning. Users
+ * are advised to stop using it and define their own replacement.
+ */
+@Deprecated
+public class DocumentedException extends Exception  {
+
+    private static final long serialVersionUID = -3727963789710833704L;
+
+    /**
+     * Creates a documented exception
+     * @param message string
+     */
+    public DocumentedException(final String message) {
+        super(message);
+    }
+
+    /**
+     * Creates a documented exception
+     * @param err string
+     * @param cause the cause (which is saved for later retrieval by the
+     * Throwable.getCause() method). (A null value is permitted, and indicates
+     * that the cause is nonexistent or unknown.)
+     */
+    public DocumentedException(final String err, final Exception cause) {
+        super(err, cause);
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/NeverReconnectStrategy.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/NeverReconnectStrategy.java
new file mode 100644 (file)
index 0000000..c480294
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility ReconnectStrategy singleton, which will cause the reconnect process
+ * to always fail.
+ */
+@ThreadSafe
+public final class NeverReconnectStrategy implements ReconnectStrategy {
+    private final EventExecutor executor;
+    private final int timeout;
+
+    public NeverReconnectStrategy(final EventExecutor executor, final int timeout) {
+        Preconditions.checkArgument(timeout >= 0);
+        this.executor = Preconditions.checkNotNull(executor);
+        this.timeout = timeout;
+    }
+
+    @Override
+    public Future<Void> scheduleReconnect(final Throwable cause) {
+        return executor.newFailedFuture(new Throwable());
+    }
+
+    @Override
+    public void reconnectSuccessful() {
+        // Nothing to do
+    }
+
+    @Override
+    public int getConnectTimeout() {
+        return timeout;
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolHandlerFactory.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolHandlerFactory.java
new file mode 100644 (file)
index 0000000..5c1377d
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.ChannelHandler;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * @deprecated This is an adaptor class for turning ProtocolMessageFactory into
+ * Netty encoder/decoder. Use Netty-provided classes directly, by subclassing
+ * {@link io.netty.handler.codec.ByteToMessageDecoder} or similar instead.
+ */
+@Deprecated
+public class ProtocolHandlerFactory<T> {
+    private final ProtocolMessageEncoder<T> encoder;
+    protected final ProtocolMessageFactory<T> msgFactory;
+
+    public ProtocolHandlerFactory(final ProtocolMessageFactory<T> msgFactory) {
+        this.msgFactory = Preconditions.checkNotNull(msgFactory);
+        this.encoder = new ProtocolMessageEncoder<T>(msgFactory);
+    }
+
+    public ChannelHandler[] getEncoders() {
+        return new ChannelHandler[] { this.encoder };
+    }
+
+    public ChannelHandler[] getDecoders() {
+        return new ChannelHandler[] { new ProtocolMessageDecoder<T>(this.msgFactory) };
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageDecoder.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageDecoder.java
new file mode 100644 (file)
index 0000000..725e0a2
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * @deprecated This is an adaptor class for turning ProtocolMessageFactory into Netty decoder. Use Netty-provided
+ *             classes directly, by subclassing {@link io.netty.handler.codec.ByteToMessageDecoder} or similar instead.
+ */
+@Deprecated
+public final class ProtocolMessageDecoder<T> extends ByteToMessageDecoder {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ProtocolMessageDecoder.class);
+
+    private final ProtocolMessageFactory<T> factory;
+
+    public ProtocolMessageDecoder(final ProtocolMessageFactory<T> factory) {
+        this.factory = Preconditions.checkNotNull(factory);
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception {
+        if (in.readableBytes() == 0) {
+            LOG.debug("No more content in incoming buffer.");
+            return;
+        }
+        in.markReaderIndex();
+        try {
+            LOG.trace("Received to decode: {}", ByteBufUtil.hexDump(in));
+            final byte[] bytes = new byte[in.readableBytes()];
+            in.readBytes(bytes);
+            out.add(this.factory.parse(bytes));
+        } catch (DeserializerException | DocumentedException e) {
+            LOG.debug("Failed to decode protocol message", e);
+            this.exceptionCaught(ctx, e);
+        }
+        in.discardReadBytes();
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageEncoder.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageEncoder.java
new file mode 100644 (file)
index 0000000..66378e7
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @deprecated This is an adaptor class for turning ProtocolMessageFactory into Netty encoder. Use Netty-provided
+ *             classes directly, by subclassing {@link io.netty.handler.codec.MessageToByteDecoder} or similar instead.
+ */
+@Deprecated
+@Sharable
+public final class ProtocolMessageEncoder<T> extends MessageToByteEncoder<Object> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ProtocolMessageEncoder.class);
+
+    private final ProtocolMessageFactory<T> factory;
+
+    public ProtocolMessageEncoder(final ProtocolMessageFactory<T> factory) {
+        this.factory = factory;
+    }
+
+    @Override
+    protected void encode(final ChannelHandlerContext ctx, final Object msg, final ByteBuf out) throws Exception {
+        LOG.debug("Sent to encode : {}", msg);
+        final byte[] bytes = this.factory.put((T) msg);
+        out.writeBytes(bytes);
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageFactory.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolMessageFactory.java
new file mode 100644 (file)
index 0000000..9b89dc3
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+
+/**
+ * Interface for factory for parsing and serializing protocol specific messages. Needs to be implemented by a protocol
+ * specific message factory. The methods put/parse should delegate parsing to specific message parsers, e.g.
+ * OpenMessageParser etc.
+ *
+ * @param <T> type of messages created by this factory
+ *
+ * @deprecated Interact with Netty 4.0 directly, by subclassing {@link io.netty.handler.codec.ByteToMessageCodec} or
+ * similar.
+ */
+@Deprecated
+public interface ProtocolMessageFactory<T> {
+
+    /**
+     * Parses message from byte array. Requires specific protocol message header object to parse the header.
+     *
+     * @param bytes byte array from which the message will be parsed
+     * @return List of specific protocol messages
+     * @throws DeserializerException if some parsing error occurs
+     * @throws DocumentedException if some documented error occurs
+     */
+    T parse(byte[] bytes) throws DeserializerException, DocumentedException;
+
+    /**
+     * Serializes protocol specific message to byte array.
+     *
+     * @param msg message to be serialized.
+     * @return byte array resulting message
+     */
+    byte[] put(T msg);
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSession.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSession.java
new file mode 100644 (file)
index 0000000..6e79d67
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import java.io.Closeable;
+
+/**
+ * Protocol Session represents the finite state machine in underlying protocol, including timers and its purpose is to
+ * create a connection between server and client. Session is automatically started, when TCP connection is created, but
+ * can be stopped manually. If the session is up, it has to redirect messages to/from user. Handles also malformed
+ * messages and unknown requests.
+ *
+ * This interface should be implemented by a final class representing a protocol specific session.
+ */
+public interface ProtocolSession<T> extends Closeable {
+    @Override
+    void close();
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java
new file mode 100644 (file)
index 0000000..c54bf84
--- /dev/null
@@ -0,0 +1,142 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
+
+import java.net.InetSocketAddress;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+@ThreadSafe
+final class ProtocolSessionPromise<S extends ProtocolSession<?>> extends DefaultPromise<S> {
+    private static final Logger LOG = LoggerFactory.getLogger(ProtocolSessionPromise.class);
+    private final ReconnectStrategy strategy;
+    private final InetSocketAddress address;
+    private final Bootstrap b;
+
+    @GuardedBy("this")
+    private Future<?> pending;
+
+    ProtocolSessionPromise(final EventExecutor executor, final InetSocketAddress address, final ReconnectStrategy strategy,
+            final Bootstrap b) {
+        super(executor);
+        this.strategy = Preconditions.checkNotNull(strategy);
+        this.address = Preconditions.checkNotNull(address);
+        this.b = Preconditions.checkNotNull(b);
+    }
+
+    synchronized void connect() {
+        final Object lock = this;
+
+        try {
+            final int timeout = this.strategy.getConnectTimeout();
+
+            LOG.debug("Promise {} attempting connect for {}ms", lock, timeout);
+
+            this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
+            this.pending = this.b.connect(this.address).addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(final ChannelFuture cf) throws Exception {
+                    synchronized (lock) {
+
+                        LOG.debug("Promise {} connection resolved", lock);
+
+                        // Triggered when a connection attempt is resolved.
+                        Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(cf));
+
+                        /*
+                         * The promise we gave out could have been cancelled,
+                         * which cascades to the connect getting cancelled,
+                         * but there is a slight race window, where the connect
+                         * is already resolved, but the listener has not yet
+                         * been notified -- cancellation at that point won't
+                         * stop the notification arriving, so we have to close
+                         * the race here.
+                         */
+                        if (isCancelled()) {
+                            if (cf.isSuccess()) {
+                                LOG.debug("Closing channel for cancelled promise {}", lock);
+                                cf.channel().close();
+                            }
+                            return;
+                        }
+
+                        if (!cf.isSuccess()) {
+                            LOG.info("Attempt to connect to connect to {} failed", ProtocolSessionPromise.this.address, cf.cause());
+                            final Future<Void> rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
+                            rf.addListener(new FutureListener<Void>() {
+                                @Override
+                                public void operationComplete(final Future<Void> sf) {
+                                    synchronized (lock) {
+                                        // Triggered when a connection attempt is to be made.
+                                        Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(sf));
+
+                                        /*
+                                         * The promise we gave out could have been cancelled,
+                                         * which cascades to the reconnect attempt getting
+                                         * cancelled, but there is a slight race window, where
+                                         * the reconnect attempt is already enqueued, but the
+                                         * listener has not yet been notified -- if cancellation
+                                         * happens at that point, we need to catch it here.
+                                         */
+                                        if (!isCancelled()) {
+                                            if (sf.isSuccess()) {
+                                                connect();
+                                            } else {
+                                                setFailure(sf.cause());
+                                            }
+                                        }
+                                    }
+                                }
+                            });
+
+                            ProtocolSessionPromise.this.pending = rf;
+                        } else {
+                            LOG.debug("Promise {} connection successful", lock);
+                        }
+                    }
+                }
+            });
+        } catch (final Exception e) {
+            LOG.info("Failed to connect to {}", e);
+            setFailure(e);
+        }
+    }
+
+    @Override
+    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+        if (super.cancel(mayInterruptIfRunning)) {
+            this.pending.cancel(mayInterruptIfRunning);
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public synchronized Promise<S> setSuccess(final S result) {
+        LOG.debug("Promise {} completed", this);
+        this.strategy.reconnectSuccessful();
+        return super.setSuccess(result);
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectImmediatelyStrategy.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectImmediatelyStrategy.java
new file mode 100644 (file)
index 0000000..a567af1
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility ReconnectStrategy singleton, which will cause the reconnect process
+ * to immediately schedule a reconnection attempt.
+ */
+@ThreadSafe
+public final class ReconnectImmediatelyStrategy implements ReconnectStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(ReconnectImmediatelyStrategy.class);
+    private final EventExecutor executor;
+    private final int timeout;
+
+    public ReconnectImmediatelyStrategy(final EventExecutor executor, final int timeout) {
+        Preconditions.checkArgument(timeout >= 0);
+        this.executor = Preconditions.checkNotNull(executor);
+        this.timeout = timeout;
+    }
+
+    @Override
+    public Future<Void> scheduleReconnect(final Throwable cause) {
+        LOG.debug("Connection attempt failed", cause);
+        return executor.newSucceededFuture(null);
+    }
+
+    @Override
+    public void reconnectSuccessful() {
+        // Nothing to do
+    }
+
+    @Override
+    public int getConnectTimeout() {
+        return timeout;
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java
new file mode 100644 (file)
index 0000000..1fa6a81
--- /dev/null
@@ -0,0 +1,182 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
+
+import java.io.Closeable;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer;
+
+import com.google.common.base.Preconditions;
+
+final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends DefaultPromise<Void> {
+    private final AbstractDispatcher<S, L> dispatcher;
+    private final InetSocketAddress address;
+    private final ReconnectStrategyFactory strategyFactory;
+    private final ReconnectStrategy strategy;
+    private final PipelineInitializer<S> initializer;
+    private Future<?> pending;
+
+    private final AtomicBoolean negotiationFinished = new AtomicBoolean(false);
+
+    public ReconnectPromise(final EventExecutor executor, final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
+            final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
+            final PipelineInitializer<S> initializer) {
+        super(executor);
+        this.dispatcher = Preconditions.checkNotNull(dispatcher);
+        this.address = Preconditions.checkNotNull(address);
+        this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
+        this.strategy = Preconditions.checkNotNull(reestablishStrategy);
+        this.initializer = Preconditions.checkNotNull(initializer);
+    }
+
+    // FIXME: BUG-190: refactor
+
+    synchronized void connect() {
+        negotiationFinished.set(false);
+
+        final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
+        final ReconnectStrategy rs = new ReconnectStrategy() {
+            @Override
+            public Future<Void> scheduleReconnect(final Throwable cause) {
+                return cs.scheduleReconnect(cause);
+            }
+
+            @Override
+            public void reconnectSuccessful() {
+                cs.reconnectSuccessful();
+            }
+
+            @Override
+            public int getConnectTimeout() throws Exception {
+                final int cst = cs.getConnectTimeout();
+                final int rst = ReconnectPromise.this.strategy.getConnectTimeout();
+
+                if (cst == 0) {
+                    return rst;
+                }
+                if (rst == 0) {
+                    return cst;
+                }
+                return Math.min(cst, rst);
+            }
+        };
+
+        final Future<S> cf = this.dispatcher.createClient(this.address, rs, new PipelineInitializer<S>() {
+            @Override
+            public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
+                addChannelClosedListener(channel.closeFuture());
+                initializer.initializeChannel(channel, promise);
+            }
+        });
+
+        final Object lock = this;
+        this.pending = cf;
+
+        cf.addListener(new FutureListener<S>() {
+
+            @Override
+            public void operationComplete(final Future<S> future) {
+                synchronized (lock) {
+                    if (!future.isSuccess()) {
+                        final Future<Void> rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause());
+
+                        if(rf == null) {
+                            // This should reflect: no more reconnecting strategies, enough
+                            // Currently all reconnect strategies fail with exception, should return null
+                            return;
+                        }
+
+                        ReconnectPromise.this.pending = rf;
+
+                        rf.addListener(new FutureListener<Void>() {
+                            @Override
+                            public void operationComplete(final Future<Void> sf) {
+                                synchronized (lock) {
+                                    /*
+                                     * The promise we gave out could have been cancelled,
+                                     * which cascades to the reconnect attempt getting
+                                     * cancelled, but there is a slight race window, where
+                                     * the reconnect attempt is already enqueued, but the
+                                     * listener has not yet been notified -- if cancellation
+                                     * happens at that point, we need to catch it here.
+                                     */
+                                    if (!isCancelled()) {
+                                        if (sf.isSuccess()) {
+                                            connect();
+                                        } else {
+                                            setFailure(sf.cause());
+                                        }
+                                    }
+                                }
+                            }
+                        });
+                    } else {
+                        /*
+                         *  FIXME: BUG-190: we have a slight race window with cancellation
+                         *         here. Analyze and define its semantics.
+                         */
+                        ReconnectPromise.this.strategy.reconnectSuccessful();
+                        negotiationFinished.set(true);
+                    }
+                }
+            }
+        });
+    }
+
+    private final ClosedChannelListener closedChannelListener = new ClosedChannelListener();
+
+    class ClosedChannelListener implements Closeable, FutureListener<Void> {
+
+        private final AtomicBoolean stop = new AtomicBoolean(false);
+
+        @Override
+        public void operationComplete(final Future<Void> future) throws Exception {
+            if (stop.get()) {
+                return;
+            }
+
+            // Start reconnecting crashed session after negotiation was successful
+            if (!negotiationFinished.get()) {
+                return;
+            }
+
+            connect();
+        }
+
+        @Override
+        public void close() {
+            this.stop.set(true);
+        }
+    }
+
+    private void addChannelClosedListener(final ChannelFuture channelFuture) {
+        channelFuture.addListener(closedChannelListener);
+    }
+
+    @Override
+    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+        closedChannelListener.close();
+
+        if (super.cancel(mayInterruptIfRunning)) {
+            this.pending.cancel(mayInterruptIfRunning);
+            return true;
+        }
+
+        return false;
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectStrategy.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectStrategy.java
new file mode 100644 (file)
index 0000000..24ff84b
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.util.concurrent.Future;
+
+/**
+ * Interface exposed by a reconnection strategy provider. A reconnection
+ * strategy decides whether to attempt reconnection and when to do that.
+ *
+ * The proper way of using this API is such that when a connection attempt
+ * has failed, the user will call scheduleReconnect() to obtain a future,
+ * which tracks schedule of the next connect attempt. The user should add its
+ * own listener to be get notified when the future is done. Once the
+ * the notification fires, user should examine the future to see whether
+ * it is successful or not. If it is successful, the user should immediately
+ * initiate a connection attempt. If it is unsuccessful, the user must
+ * not attempt any more connection attempts and should abort the reconnection
+ * process.
+ */
+public interface ReconnectStrategy {
+    /**
+     * Query the strategy for the connect timeout.
+     *
+     * @return connect try timeout in milliseconds, or
+     *         0 for infinite (or system-default) timeout
+     * @throws Exception if the connection should not be attempted
+     */
+    int getConnectTimeout() throws Exception;
+
+    /**
+     * Schedule a connection attempt. The precise time when the connection
+     * should be attempted is signaled by successful completion of returned
+     * future.
+     *
+     * @param cause Cause of previous failure
+     * @return a future tracking the schedule, may not be null
+     * @throws IllegalStateException when a connection attempt is currently
+     *         scheduled.
+     */
+    Future<Void> scheduleReconnect(Throwable cause);
+
+    /**
+     * Reset the strategy state. Users call this method once the reconnection
+     * process succeeds.
+     */
+    void reconnectSuccessful();
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectStrategyFactory.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectStrategyFactory.java
new file mode 100644 (file)
index 0000000..3c61044
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+/**
+ * Factory interface for creating new ReconnectStrategy instances. This is
+ * primarily useful for allowing injection of a specific type of strategy for
+ * on-demand use, pretty much like you would use a ThreadFactory.
+ */
+public interface ReconnectStrategyFactory {
+    /**
+     * Create a new ReconnectStrategy.
+     *
+     * @return a new reconnecty strategy
+     */
+    ReconnectStrategy createReconnectStrategy();
+}
+
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionListener.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionListener.java
new file mode 100644 (file)
index 0000000..3c429fc
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import java.util.EventListener;
+
+/**
+ * Listener that receives session state informations. This interface should be
+ * implemented by a protocol specific abstract class, that is extended by
+ * a final class that implements the methods.
+ */
+public interface SessionListener<M, S extends ProtocolSession<?>, T extends TerminationReason> extends EventListener {
+    /**
+     * Fired when the session was established successfully.
+     *
+     * @param remoteParams Peer address families which we accepted
+     */
+    void onSessionUp(S session);
+
+    /**
+     * Fired when the session went down because of an IO error. Implementation should take care of closing underlying
+     * session.
+     *
+     * @param session that went down
+     * @param e Exception that was thrown as the cause of session being down
+     */
+    void onSessionDown(S session, Exception e);
+
+    /**
+     * Fired when the session is terminated locally. The session has already been closed and transitioned to IDLE state.
+     * Any outstanding queued messages were not sent. The user should not attempt to make any use of the session.
+     *
+     * @param reason the cause why the session went down
+     */
+    void onSessionTerminated(S session, T reason);
+
+    /**
+     * Fired when a normal protocol message is received.
+     *
+     * @param message Protocol message
+     */
+    void onMessage(S session, M message);
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionListenerFactory.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionListenerFactory.java
new file mode 100644 (file)
index 0000000..1187128
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+
+/**
+ * Factory for generating Session Listeners. Used by a server. This interface should be
+ * implemented by a protocol specific abstract class, that is extended by
+ * a final class that implements the methods.
+ */
+public interface SessionListenerFactory<T extends SessionListener<?, ?, ?>> {
+    /**
+     * Returns one session listener
+     * @return specific session listener
+     */
+    T getSessionListener();
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionNegotiator.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionNegotiator.java
new file mode 100644 (file)
index 0000000..3de64b0
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.ChannelInboundHandler;
+
+/**
+ * Session negotiator concepts. A negotiator is responsible for message
+ * handling while the exact session parameters are not known. Once the
+ * session parameters are finalized, the negotiator replaces itself in
+ * the channel pipeline with the session.
+ *
+ * @param <T> Protocol session type.
+ */
+public interface SessionNegotiator<T extends ProtocolSession<?>> extends ChannelInboundHandler {
+
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionNegotiatorFactory.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionNegotiatorFactory.java
new file mode 100644 (file)
index 0000000..90844ca
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.Channel;
+import io.netty.util.concurrent.Promise;
+
+/**
+ * A factory class creating SessionNegotiators.
+ *
+ * @param <S> session type
+ */
+public interface SessionNegotiatorFactory<M, S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> {
+    /**
+     * Create a new negotiator attached to a channel, which will notify
+     * a promise once the negotiation completes.
+     *
+     * @param channel Underlying channel
+     * @param promise Promise to be notified
+     * @return new negotiator instance
+     */
+    SessionNegotiator<S> getSessionNegotiator(SessionListenerFactory<L> factory, Channel channel, Promise<S> promise);
+}
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/TerminationReason.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/TerminationReason.java
new file mode 100644 (file)
index 0000000..1a6179d
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+/**
+ * Marker interface for grouping session termination cause.
+ */
+public interface TerminationReason {
+
+    /**
+     * Get cause of session termination.
+     * @return human-readable cause.
+     */
+    String getErrorMessage();
+}
+
diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/TimedReconnectStrategy.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/TimedReconnectStrategy.java
new file mode 100644 (file)
index 0000000..8bb3268
--- /dev/null
@@ -0,0 +1,183 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Swiss army knife equivalent for reconnect strategies.
+ *
+ * This strategy continues to schedule reconnect attempts, each having to complete in a fixed time (connectTime).
+ *
+ * Initial sleep time is specified as minSleep. Each subsequent unsuccessful attempt multiplies this time by a constant
+ * factor (sleepFactor) -- this allows for either constant reconnect times (sleepFactor = 1), or various degrees of
+ * exponential back-off (sleepFactor > 1). Maximum sleep time between attempts can be capped to a specific value
+ * (maxSleep).
+ *
+ * The strategy can optionally give up based on two criteria:
+ *
+ * A preset number of connection retries (maxAttempts) has been reached, or
+ *
+ * A preset absolute deadline is reached (deadline nanoseconds, as reported by System.nanoTime(). In this specific case,
+ * both connectTime and maxSleep will be controlled such that the connection attempt is resolved as closely to the
+ * deadline as possible.
+ *
+ * Both these caps can be combined, with the strategy giving up as soon as the first one is reached.
+ */
+@ThreadSafe
+public final class TimedReconnectStrategy implements ReconnectStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(TimedReconnectStrategy.class);
+    private final EventExecutor executor;
+    private final Long deadline, maxAttempts, maxSleep;
+    private final double sleepFactor;
+    private final int connectTime;
+    private final long minSleep;
+
+    @GuardedBy("this")
+    private long attempts;
+
+    @GuardedBy("this")
+    private long lastSleep;
+
+    @GuardedBy("this")
+    private boolean scheduled;
+
+    public TimedReconnectStrategy(final EventExecutor executor, final int connectTime, final long minSleep, final double sleepFactor,
+            final Long maxSleep, final Long maxAttempts, final Long deadline) {
+        Preconditions.checkArgument(maxSleep == null || minSleep <= maxSleep);
+        Preconditions.checkArgument(sleepFactor >= 1);
+        Preconditions.checkArgument(connectTime >= 0);
+        this.executor = Preconditions.checkNotNull(executor);
+        this.deadline = deadline;
+        this.maxAttempts = maxAttempts;
+        this.minSleep = minSleep;
+        this.maxSleep = maxSleep;
+        this.sleepFactor = sleepFactor;
+        this.connectTime = connectTime;
+    }
+
+    @Override
+    public synchronized Future<Void> scheduleReconnect(final Throwable cause) {
+        LOG.debug("Connection attempt failed", cause);
+
+        // Check if a reconnect attempt is scheduled
+        Preconditions.checkState(!this.scheduled);
+
+        // Get a stable 'now' time for deadline calculations
+        final long now = System.nanoTime();
+
+        // Obvious stop conditions
+        if (this.maxAttempts != null && this.attempts >= this.maxAttempts) {
+            return this.executor.newFailedFuture(new Throwable("Maximum reconnection attempts reached"));
+        }
+        if (this.deadline != null && this.deadline <= now) {
+            return this.executor.newFailedFuture(new TimeoutException("Reconnect deadline reached"));
+        }
+
+        /*
+         * First connection attempt gets initialized to minimum sleep,
+         * each subsequent is exponentially backed off by sleepFactor.
+         */
+        if (this.attempts != 0) {
+            this.lastSleep *= this.sleepFactor;
+        } else {
+            this.lastSleep = this.minSleep;
+        }
+
+        // Cap the sleep time to maxSleep
+        if (this.maxSleep != null && this.lastSleep > this.maxSleep) {
+            LOG.debug("Capped sleep time from {} to {}", this.lastSleep, this.maxSleep);
+            this.lastSleep = this.maxSleep;
+        }
+
+        this.attempts++;
+
+        // Check if the reconnect attempt is within the deadline
+        if (this.deadline != null && this.deadline <= now + TimeUnit.MILLISECONDS.toNanos(this.lastSleep)) {
+            return this.executor.newFailedFuture(new TimeoutException("Next reconnect would happen after deadline"));
+        }
+
+        LOG.debug("Connection attempt {} sleeping for {} milliseconds", this.attempts, this.lastSleep);
+
+        // If we are not sleeping at all, return an already-succeeded future
+        if (this.lastSleep == 0) {
+            return this.executor.newSucceededFuture(null);
+        }
+
+        // Need to retain a final reference to this for locking purposes,
+        // also set the scheduled flag.
+        final Object lock = this;
+        this.scheduled = true;
+
+        // Schedule a task for the right time. It will also clear the flag.
+        return this.executor.schedule(new Callable<Void>() {
+            @Override
+            public Void call() throws TimeoutException {
+                synchronized (lock) {
+                    Preconditions.checkState(TimedReconnectStrategy.this.scheduled);
+                    TimedReconnectStrategy.this.scheduled = false;
+                }
+
+                return null;
+            }
+        }, this.lastSleep, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public synchronized void reconnectSuccessful() {
+        Preconditions.checkState(!this.scheduled);
+        this.attempts = 0;
+    }
+
+    @Override
+    public int getConnectTimeout() throws TimeoutException {
+        int timeout = this.connectTime;
+
+        if (this.deadline != null) {
+
+            // If there is a deadline, we may need to cap the connect
+            // timeout to meet the deadline.
+            final long now = System.nanoTime();
+            if (now >= this.deadline) {
+                throw new TimeoutException("Reconnect deadline already passed");
+            }
+
+            final long left = TimeUnit.NANOSECONDS.toMillis(this.deadline - now);
+            if (left < 1) {
+                throw new TimeoutException("Connect timeout too close to deadline");
+            }
+
+            /*
+             * A bit of magic:
+             * - if time left is less than the timeout, set it directly
+             * - if there is no timeout, and time left is:
+             *      - less than maximum integer, set timeout to time left
+             *      - more than maximum integer, set timeout Integer.MAX_VALUE
+             */
+            if (timeout > left) {
+                timeout = (int) left;
+            } else if (timeout == 0) {
+                timeout = left <= Integer.MAX_VALUE ? (int) left : Integer.MAX_VALUE;
+            }
+        }
+        return timeout;
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ComplementaryTest.java b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ComplementaryTest.java
new file mode 100644 (file)
index 0000000..80e6ad9
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+@Deprecated
+public class ComplementaryTest {
+
+    @Test
+    public void testExceptions() {
+        final DeserializerException de = new DeserializerException("some error");
+        final DocumentedException ee = new DocumentedException("some error");
+
+        assertEquals(de.getMessage(), ee.getMessage());
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java
new file mode 100644 (file)
index 0000000..bead1ee
--- /dev/null
@@ -0,0 +1,151 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ServerTest {
+    SimpleDispatcher clientDispatcher, dispatcher;
+
+    final SimpleSessionListener pce = new SimpleSessionListener();
+
+    SimpleSession session = null;
+
+    ChannelFuture server = null;
+
+    InetSocketAddress serverAddress;
+    private NioEventLoopGroup eventLoopGroup;
+
+
+    @Before
+    public void setUp() {
+        final int port = 10000 + (int)(10000 * Math.random());
+        serverAddress = new InetSocketAddress("127.0.0.1", port);
+        eventLoopGroup = new NioEventLoopGroup();
+    }
+
+    @Test
+    public void testConnectionEstablished() throws Exception {
+        final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+
+        this.dispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+
+            @Override
+            public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
+                    final Channel channel, final Promise<SimpleSession> promise) {
+                p.setSuccess(true);
+                return new SimpleSessionNegotiator(promise, channel);
+            }
+        }, new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+
+        this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
+            @Override
+            public SimpleSessionListener getSessionListener() {
+                return new SimpleSessionListener();
+            }
+        });
+
+        this.server.get();
+
+        this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+            @Override
+            public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
+                    final Channel channel, final Promise<SimpleSession> promise) {
+                return new SimpleSessionNegotiator(promise, channel);
+            }
+        }, new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+
+        this.session = this.clientDispatcher.createClient(this.serverAddress,
+                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
+            @Override
+            public SimpleSessionListener getSessionListener() {
+                return new SimpleSessionListener();
+            }
+        }).get(6, TimeUnit.SECONDS);
+
+        assertEquals(true, p.get(3, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testConnectionFailed() throws IOException, InterruptedException, ExecutionException, TimeoutException {
+        final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+
+        this.dispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+
+            @Override
+            public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
+                    final Channel channel, final Promise<SimpleSession> promise) {
+                p.setSuccess(true);
+                return new SimpleSessionNegotiator(promise, channel);
+            }
+        }, new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+
+        this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
+            @Override
+            public SimpleSessionListener getSessionListener() {
+                return new SimpleSessionListener();
+            }
+        });
+
+        this.server.get();
+
+        this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+            @Override
+            public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
+                    final Channel channel, final Promise<SimpleSession> promise) {
+                return new SimpleSessionNegotiator(promise, channel);
+            }
+        }, new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+
+        this.session = this.clientDispatcher.createClient(this.serverAddress,
+                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
+            @Override
+            public SimpleSessionListener getSessionListener() {
+                return new SimpleSessionListener();
+            }
+        }).get(6, TimeUnit.SECONDS);
+
+        final Future<?> session = this.clientDispatcher.createClient(this.serverAddress,
+                new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
+            @Override
+            public SimpleSessionListener getSessionListener() {
+                return new SimpleSessionListener();
+            }
+        });
+        assertFalse(session.isSuccess());
+    }
+
+    @After
+    public void tearDown() throws IOException, InterruptedException {
+        this.server.channel().close();
+        this.eventLoopGroup.shutdownGracefully();
+        try {
+            Thread.sleep(500);
+        } catch (final InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/Session.java b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/Session.java
new file mode 100644 (file)
index 0000000..22ad930
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class Session extends AbstractProtocolSession<SimpleMessage> {
+
+    private static final Logger logger = LoggerFactory.getLogger(Session.class);
+
+    public final List<SimpleMessage> msgs = Lists.newArrayList();
+
+    public boolean up = false;
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void handleMessage(final SimpleMessage msg) {
+        logger.debug("Message received: {}", msg.getMessage());
+        this.up = true;
+        this.msgs.add(msg);
+        logger.debug(this.msgs.size() + "");
+    }
+
+    @Override
+    public void endOfInput() {
+        logger.debug("End of input reported.");
+    }
+
+    @Override
+    protected void sessionUp() {
+        logger.debug("Session up reported.");
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleByteToMessageDecoder.java b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleByteToMessageDecoder.java
new file mode 100644 (file)
index 0000000..cb9e180
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.List;
+
+import com.google.common.base.Charsets;
+
+/**
+ *
+ */
+public class SimpleByteToMessageDecoder extends ByteToMessageDecoder {
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
+        out.add(new SimpleMessage(Charsets.UTF_8.decode(in.nioBuffer()).toString()));
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java
new file mode 100644 (file)
index 0000000..12aac9e
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOutboundHandler;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
+
+import java.net.InetSocketAddress;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class SimpleDispatcher extends AbstractDispatcher<SimpleSession, SimpleSessionListener> {
+    private static final Logger logger = LoggerFactory.getLogger(SimpleDispatcher.class);
+
+    private final SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener> negotiatorFactory;
+    private final ChannelOutboundHandler encoder = new SimpleMessageToByteEncoder();
+
+    private final class SimplePipelineInitializer implements PipelineInitializer<SimpleSession> {
+        final SessionListenerFactory<SimpleSessionListener> listenerFactory;
+
+        SimplePipelineInitializer(final SessionListenerFactory<SimpleSessionListener> listenerFactory) {
+            this.listenerFactory = Preconditions.checkNotNull(listenerFactory);
+        }
+
+        @Override
+        public void initializeChannel(final SocketChannel channel, final Promise<SimpleSession> promise) {
+            channel.pipeline().addLast(new SimpleByteToMessageDecoder());
+            channel.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(listenerFactory, channel, promise));
+            channel.pipeline().addLast(encoder);
+            logger.debug("initialization completed for channel {}", channel);
+        }
+
+    }
+
+    public SimpleDispatcher(final SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener> negotiatorFactory,
+            final Promise<SimpleSession> promise, final EventLoopGroup eventLoopGroup) {
+        super(eventLoopGroup, eventLoopGroup);
+        this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
+    }
+
+    public Future<SimpleSession> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final SessionListenerFactory<SimpleSessionListener> listenerFactory) {
+        return super.createClient(address, strategy, new SimplePipelineInitializer(listenerFactory));
+    }
+
+    public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<SimpleSessionListener> listenerFactory) {
+        return super.createServer(address, new SimplePipelineInitializer(listenerFactory));
+    }
+
+    @Override
+    public void close() {
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleMessage.java b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleMessage.java
new file mode 100644 (file)
index 0000000..551d657
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+public class SimpleMessage {
+
+    private final String s;
+
+    public SimpleMessage(final String s) {
+        this.s = s;
+    }
+
+    public String getMessage() {
+        return this.s;
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleMessageToByteEncoder.java b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleMessageToByteEncoder.java
new file mode 100644 (file)
index 0000000..b3ed3b7
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+/**
+ *
+ */
+@Sharable
+public class SimpleMessageToByteEncoder extends MessageToByteEncoder<SimpleMessage> {
+    @Override
+    protected void encode(final ChannelHandlerContext ctx, final SimpleMessage msg, final ByteBuf out) {
+        out.writeBytes(msg.getMessage().getBytes());
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleSession.java b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleSession.java
new file mode 100644 (file)
index 0000000..9056f8d
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+public final class SimpleSession extends AbstractProtocolSession<SimpleMessage> {
+
+    public SimpleSession() {
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public void handleMessage(final SimpleMessage msg) {
+    }
+
+    @Override
+    public void endOfInput() {
+    }
+
+    @Override
+    protected void sessionUp() {
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleSessionListener.java b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleSessionListener.java
new file mode 100644 (file)
index 0000000..7004ee6
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple Session Listener that is notified about messages and changes in the session.
+ */
+public class SimpleSessionListener implements SessionListener<SimpleMessage, SimpleSession, TerminationReason> {
+    private static final Logger logger = LoggerFactory.getLogger(SimpleSessionListener.class);
+
+    public List<SimpleMessage> messages = new ArrayList<SimpleMessage>();
+
+    public boolean up = false;
+
+    public boolean failed = false;
+
+    @Override
+    public void onMessage(final SimpleSession session, final SimpleMessage message) {
+        logger.debug("Received message: " + message.getClass() + " " + message);
+        this.messages.add(message);
+    }
+
+    @Override
+    public void onSessionUp(final SimpleSession session) {
+        this.up = true;
+    }
+
+    @Override
+    public void onSessionDown(final SimpleSession session, final Exception e) {
+        this.failed = true;
+        this.notifyAll();
+    }
+
+    @Override
+    public void onSessionTerminated(final SimpleSession session, final TerminationReason reason) {
+        this.failed = true;
+        this.notifyAll();
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleSessionListenerFactory.java b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleSessionListenerFactory.java
new file mode 100644 (file)
index 0000000..3fabe3c
--- /dev/null
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+public class SimpleSessionListenerFactory implements SessionListenerFactory<SimpleSessionListener> {
+
+    @Override
+    public SimpleSessionListener getSessionListener() {
+        return new SimpleSessionListener();
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleSessionNegotiator.java b/opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleSessionNegotiator.java
new file mode 100644 (file)
index 0000000..39d5855
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.Channel;
+import io.netty.util.concurrent.Promise;
+
+public class SimpleSessionNegotiator extends AbstractSessionNegotiator<SimpleMessage, SimpleSession> {
+
+    public SimpleSessionNegotiator(Promise<SimpleSession> promise, Channel channel) {
+        super(promise, channel);
+    }
+
+    @Override
+    protected void startNegotiation() throws Exception {
+        negotiationSuccessful(new SimpleSession());
+    }
+
+    @Override
+    protected void handleMessage(SimpleMessage msg) throws Exception {
+        throw new IllegalStateException("This method should never be invoked");
+    }
+}
diff --git a/opendaylight/commons/protocol-framework/src/test/resources/logback-test.xml b/opendaylight/commons/protocol-framework/src/test/resources/logback-test.xml
new file mode 100644 (file)
index 0000000..7ace930
--- /dev/null
@@ -0,0 +1,13 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
+    </encoder>
+  </appender>
+
+  <root level="TRACE">
+    <appender-ref ref="STDOUT" />
+  </root>
+</configuration>
index 4a9d376..2ee7926 100644 (file)
                 <artifactId>org.osgi.core</artifactId>
                 <version>${osgi.version}</version>
             </dependency>
-            <dependency>
-                <groupId>com.google.code.findbugs</groupId>
-                <artifactId>jsr305</artifactId>
-                <version>2.0.1</version>
-            </dependency>
             <dependency>
                 <groupId>commons-io</groupId>
                 <artifactId>commons-io</artifactId>
index 2279928..7aa2532 100644 (file)
           <artifactId>concepts</artifactId>
           <version>${concepts.version}</version>
         </dependency>
+        <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>protocol-framework</artifactId>
+        </dependency>
         <dependency>
           <groupId>org.opendaylight.yangtools</groupId>
           <artifactId>concepts</artifactId>
      <artifactId>opendaylight-l2-types</artifactId>
     </dependency>
 
-    <!-- bgppcep dependencies -->
-    <dependency>
-      <groupId>org.opendaylight.bgpcep</groupId>
-      <artifactId>framework</artifactId>
-    </dependency>
-
     <!--Netty-->
     <dependency>
       <groupId>io.netty</groupId>
index 82ad7cf..c0dec7c 100644 (file)
@@ -38,10 +38,10 @@ public class TestHelper {
 
     public static Option configMinumumBundles() {
         return new DefaultCompositeOption(
-                mavenBundle("org.opendaylight.bgpcep", "framework").versionAsInProject(), //
                 mavenBundle("commons-codec", "commons-codec").versionAsInProject(),
 
                 mavenBundle(CONTROLLER, "config-api").versionAsInProject(), // //
+                mavenBundle(CONTROLLER, "protocol-framework").versionAsInProject(), //
                 mavenBundle(CONTROLLER, "config-manager").versionAsInProject(), // //
                 mavenBundle("commons-io", "commons-io").versionAsInProject(), //
                 mavenBundle(CONTROLLER, "config-api").versionAsInProject(), //
index a9637f8..8763724 100644 (file)
@@ -211,6 +211,7 @@ public class ServiceProviderController {
                 mavenBundle(ODL, "logback-config").versionAsInProject(),
                 mavenBundle(ODL, "config-persister-api").versionAsInProject(),
                 // mavenBundle(ODL,"config-persister-file-adapter").versionAsInProject(),
+                mavenBundle(ODL, "protocol-framework").versionAsInProject(),
                 mavenBundle(ODL, "netconf-api").versionAsInProject(),
                 mavenBundle(ODL, "netconf-impl").versionAsInProject(),
                 mavenBundle(ODL, "netconf-client").versionAsInProject(),
@@ -220,7 +221,6 @@ public class ServiceProviderController {
                 mavenBundle(ODL, "config-netconf-connector").versionAsInProject(),
                 mavenBundle(ODL, "config-persister-impl").versionAsInProject(),
 
-                mavenBundle("org.opendaylight.bgpcep", "framework").versionAsInProject(),
                 mavenBundle(YANG, "binding-generator-spi").versionAsInProject(), //
                 mavenBundle(YANG, "binding-model-api").versionAsInProject(), //
                 mavenBundle(YANG, "binding-generator-util").versionAsInProject(),
index 856bd77..d085986 100644 (file)
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>config-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>protocol-framework</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.opendaylight.yangtools.model</groupId>
             <artifactId>ietf-inet-types</artifactId>
             <artifactId>ietf-netconf-monitoring-extension</artifactId>
             <version>${project.version}</version>
         </dependency>
-
-        <dependency>
-            <groupId>org.opendaylight.bgpcep</groupId>
-            <artifactId>framework</artifactId>
-            <version>${bgpcep.version}</version>
-        </dependency>
     </dependencies>
 
     <build>
index de4fb10..56982a1 100644 (file)
@@ -22,9 +22,8 @@
             <artifactId>netconf-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.opendaylight.bgpcep</groupId>
-            <artifactId>framework</artifactId>
-            <version>${bgpcep.version}</version>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>protocol-framework</artifactId>
         </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
index 981a81d..766772b 100644 (file)
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>config-util</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>protocol-framework</artifactId>
+        </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>netconf-mapping-api</artifactId>
             <artifactId>ietf-inet-types</artifactId>
         </dependency>
 
-        <dependency>
-            <groupId>org.opendaylight.bgpcep</groupId>
-            <artifactId>framework</artifactId>
-        </dependency>
-
         <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.core</artifactId>
index 2f6da9e..9078d7d 100644 (file)
@@ -27,8 +27,8 @@
             <artifactId>config-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.opendaylight.bgpcep</groupId>
-            <artifactId>framework</artifactId>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>protocol-framework</artifactId>
         </dependency>
 
         <dependency>
index 21a1ffe..3a96b42 100644 (file)
                 <artifactId>org.osgi.core</artifactId>
                 <version>${osgi.version}</version>
             </dependency>
-            <dependency>
-                <groupId>org.opendaylight.bgpcep</groupId>
-                <artifactId>mockito-configuration</artifactId>
-                <version>${bgpcep.version}</version>
-                <scope>test</scope>
-            </dependency>
             <dependency>
                 <groupId>${project.groupId}</groupId>
                 <artifactId>config-api</artifactId>
diff --git a/pom.xml b/pom.xml
index f7f9bc2..f5eb230 100644 (file)
--- a/pom.xml
+++ b/pom.xml
 
     <!-- Parents -->
     <module>opendaylight/commons/concepts</module>
+    <module>opendaylight/commons/protocol-framework</module>
     <module>opendaylight/commons/httpclient</module>
     <module>opendaylight/commons/checkstyle</module>
     <module>opendaylight/commons/opendaylight</module>

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.