--- /dev/null
+target
+.classpath
+.settings
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.opendaylight.odlparent</groupId>
+ <artifactId>bundle-parent</artifactId>
+ <version>2.0.2</version>
+ <relativePath/>
+ </parent>
+
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>protocol-framework</artifactId>
+ <version>0.10.0-SNAPSHOT</version>
+ <packaging>bundle</packaging>
+ <name>${project.artifactId}</name>
+ <description>Common protocol framework</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+
+ <!-- Testing dependencies -->
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>mockito-configuration</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <phase>package</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <scm>
+ <connection>scm:git:http://git.opendaylight.org/gerrit/controller.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+ <tag>HEAD</tag>
+ <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:Main</url>
+ </scm>
+</project>
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.local.LocalServerChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
+import java.io.Closeable;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
+ * start method that will handle sockets in different thread.
+ */
+@Deprecated
+public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> implements Closeable {
+
+
+ protected interface ChannelPipelineInitializer<CH extends Channel, S extends ProtocolSession<?>> {
+ /**
+ * Initializes channel by specifying the handlers in its pipeline. Handlers are protocol specific, therefore this
+ * method needs to be implemented in protocol specific Dispatchers.
+ *
+ * @param channel whose pipeline should be defined, also to be passed to {@link SessionNegotiatorFactory}
+ * @param promise to be passed to {@link SessionNegotiatorFactory}
+ */
+ void initializeChannel(CH channel, Promise<S> promise);
+ }
+
+ protected interface PipelineInitializer<S extends ProtocolSession<?>> extends ChannelPipelineInitializer<SocketChannel, S> {
+
+ }
+
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractDispatcher.class);
+
+ private final EventLoopGroup bossGroup;
+
+ private final EventLoopGroup workerGroup;
+
+ private final EventExecutor executor;
+
+ protected AbstractDispatcher(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
+ this(GlobalEventExecutor.INSTANCE, bossGroup, workerGroup);
+ }
+
+ protected AbstractDispatcher(final EventExecutor executor, final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
+ this.bossGroup = Preconditions.checkNotNull(bossGroup);
+ this.workerGroup = Preconditions.checkNotNull(workerGroup);
+ this.executor = Preconditions.checkNotNull(executor);
+ }
+
+
+ /**
+ * Creates server. Each server needs factories to pass their instances to client sessions.
+ *
+ * @param address address to which the server should be bound
+ * @param initializer instance of PipelineInitializer used to initialize the channel pipeline
+ *
+ * @return ChannelFuture representing the binding process
+ */
+ protected ChannelFuture createServer(final InetSocketAddress address, final PipelineInitializer<S> initializer) {
+ return createServer(address, NioServerSocketChannel.class, initializer);
+ }
+
+ /**
+ * Creates server. Each server needs factories to pass their instances to client sessions.
+ *
+ * @param address address to which the server should be bound
+ * @param channelClass The {@link Class} which is used to create {@link Channel} instances from.
+ * @param initializer instance of PipelineInitializer used to initialize the channel pipeline
+ *
+ * @return ChannelFuture representing the binding process
+ */
+ protected <CH extends Channel> ChannelFuture createServer(final SocketAddress address, final Class<? extends ServerChannel> channelClass,
+ final ChannelPipelineInitializer<CH, S> initializer) {
+ final ServerBootstrap b = new ServerBootstrap();
+ b.childHandler(new ChannelInitializer<CH>() {
+
+ @Override
+ protected void initChannel(final CH ch) {
+ initializer.initializeChannel(ch, new DefaultPromise<>(executor));
+ }
+ });
+
+ b.option(ChannelOption.SO_BACKLOG, 128);
+ if (LocalServerChannel.class.equals(channelClass) == false) {
+ // makes no sense for LocalServer and produces warning
+ b.childOption(ChannelOption.SO_KEEPALIVE, true);
+ b.childOption(ChannelOption.TCP_NODELAY , true);
+ }
+ b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ customizeBootstrap(b);
+
+ if (b.group() == null) {
+ b.group(bossGroup, workerGroup);
+ }
+ try {
+ b.channel(channelClass);
+ } catch (final IllegalStateException e) {
+ // FIXME: if this is ok, document why
+ LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
+ }
+
+ // Bind and start to accept incoming connections.
+ final ChannelFuture f = b.bind(address);
+ LOG.debug("Initiated server {} at {}.", f, address);
+ return f;
+ }
+
+ /**
+ * Customize a server bootstrap before the server is created. This allows
+ * subclasses to assign non-default server options before the server is
+ * created.
+ *
+ * @param b Server bootstrap
+ */
+ protected void customizeBootstrap(final ServerBootstrap b) {
+ // The default is a no-op
+ }
+
+ /**
+ * Creates a client.
+ *
+ * @param address remote address
+ * @param strategy Reconnection strategy to be used when initial connection fails
+ *
+ * @return Future representing the connection process. Its result represents the combined success of TCP connection
+ * as well as session negotiation.
+ */
+ protected Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final PipelineInitializer<S> initializer) {
+ final Bootstrap b = new Bootstrap();
+ final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<>(executor, address, strategy, b);
+ b.option(ChannelOption.SO_KEEPALIVE, true).handler(
+ new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(final SocketChannel ch) {
+ initializer.initializeChannel(ch, p);
+ }
+ });
+
+ customizeBootstrap(b);
+ setWorkerGroup(b);
+ setChannelFactory(b);
+
+ p.connect();
+ LOG.debug("Client created.");
+ return p;
+ }
+
+ private void setWorkerGroup(final Bootstrap b) {
+ if (b.group() == null) {
+ b.group(workerGroup);
+ }
+ }
+
+ /**
+ * Create a client but use a pre-configured bootstrap.
+ * This method however replaces the ChannelInitializer in the bootstrap. All other configuration is preserved.
+ *
+ * @param address remote address
+ */
+ protected Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap bootstrap, final PipelineInitializer<S> initializer) {
+ final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<>(executor, address, strategy, bootstrap);
+
+ bootstrap.handler(
+ new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(final SocketChannel ch) {
+ initializer.initializeChannel(ch, p);
+ }
+ });
+
+ p.connect();
+ LOG.debug("Client created.");
+ return p;
+ }
+
+ /**
+ * Customize a client bootstrap before the connection is attempted. This
+ * allows subclasses to assign non-default options before the client is
+ * created.
+ *
+ * @param b Client bootstrap
+ */
+ protected void customizeBootstrap(final Bootstrap b) {
+ // The default is a no-op
+ }
+
+ /**
+ *
+ * @deprecated use {@link org.opendaylight.protocol.framework.AbstractDispatcher#createReconnectingClient(java.net.InetSocketAddress, ReconnectStrategyFactory, org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer)} with only one reconnectStrategyFactory instead.
+ *
+ * Creates a client.
+ *
+ * @param address remote address
+ * @param connectStrategyFactory Factory for creating reconnection strategy to be used when initial connection fails
+ * @param reestablishStrategy Reconnection strategy to be used when the already-established session fails
+ *
+ * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
+ * success if it indicates no further attempts should be made and failure if it reports an error
+ */
+ @Deprecated
+ protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
+ final ReconnectStrategy reestablishStrategy, final PipelineInitializer<S> initializer) {
+ return createReconnectingClient(address, connectStrategyFactory, initializer);
+ }
+
+ /**
+ * Creates a reconnecting client.
+ *
+ * @param address remote address
+ * @param connectStrategyFactory Factory for creating reconnection strategy for every reconnect attempt
+ *
+ * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
+ * success is never reported, only failure when it runs out of reconnection attempts.
+ */
+ protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
+ final PipelineInitializer<S> initializer) {
+ final Bootstrap b = new Bootstrap();
+
+ final ReconnectPromise<S, L> p = new ReconnectPromise<>(GlobalEventExecutor.INSTANCE, this, address, connectStrategyFactory, b, initializer);
+
+ b.option(ChannelOption.SO_KEEPALIVE, true);
+
+ customizeBootstrap(b);
+ setWorkerGroup(b);
+ setChannelFactory(b);
+
+ p.connect();
+ return p;
+ }
+
+ private void setChannelFactory(final Bootstrap b) {
+ // There is no way to detect if this was already set by
+ // customizeBootstrap()
+ try {
+ b.channel(NioSocketChannel.class);
+ } catch (final IllegalStateException e) {
+ LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
+ }
+ }
+
+ /**
+ * @deprecated Should only be used with AbstractDispatcher#AbstractDispatcher()
+ */
+ @Deprecated
+ @Override
+ public void close() {
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Deprecated
+public abstract class AbstractProtocolSession<M> extends SimpleChannelInboundHandler<Object> implements ProtocolSession<M> {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractProtocolSession.class);
+
+ /**
+ * Handles incoming message (parsing, reacting if necessary).
+ *
+ * @param msg incoming message
+ */
+ protected abstract void handleMessage(final M msg);
+
+ /**
+ * Called when reached the end of input stream while reading.
+ */
+ protected abstract void endOfInput();
+
+ /**
+ * Called when the session is added to the pipeline.
+ */
+ protected abstract void sessionUp();
+
+ @Override
+ public final void channelInactive(final ChannelHandlerContext ctx) {
+ LOG.debug("Channel {} inactive.", ctx.channel());
+ endOfInput();
+ try {
+ // Forward channel inactive event, all handlers in pipeline might be interested in the event e.g. close channel handler of reconnect promise
+ super.channelInactive(ctx);
+ } catch (final Exception e) {
+ throw new RuntimeException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected final void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
+ LOG.debug("Message was received: {}", msg);
+ handleMessage((M) msg);
+ }
+
+ @Override
+ public final void handlerAdded(final ChannelHandlerContext ctx) {
+ sessionUp();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.concurrent.Promise;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Abstract base class for session negotiators. It implements the basic
+ * substrate to implement SessionNegotiator API specification, with subclasses
+ * needing to provide only
+ *
+ * @param <M> Protocol message type
+ * @param <S> Protocol session type, has to extend {@code ProtocolSession<M>}
+ */
+@Deprecated
+public abstract class AbstractSessionNegotiator<M, S extends AbstractProtocolSession<?>> extends ChannelInboundHandlerAdapter implements SessionNegotiator<S> {
+ private final Logger LOG = LoggerFactory.getLogger(AbstractSessionNegotiator.class);
+ private final Promise<S> promise;
+ protected final Channel channel;
+
+ public AbstractSessionNegotiator(final Promise<S> promise, final Channel channel) {
+ this.promise = Preconditions.checkNotNull(promise);
+ this.channel = Preconditions.checkNotNull(channel);
+ }
+
+ protected abstract void startNegotiation() throws Exception;
+ protected abstract void handleMessage(M msg) throws Exception;
+
+ protected final void negotiationSuccessful(final S session) {
+ LOG.debug("Negotiation on channel {} successful with session {}", channel, session);
+ channel.pipeline().replace(this, "session", session);
+ promise.setSuccess(session);
+ }
+
+ protected void negotiationFailed(final Throwable cause) {
+ LOG.debug("Negotiation on channel {} failed", channel, cause);
+ channel.close();
+ promise.setFailure(cause);
+ }
+
+ /**
+ * Send a message to peer and fail negotiation if it does not reach
+ * the peer.
+ *
+ * @param msg Message which should be sent.
+ */
+ protected final void sendMessage(final M msg) {
+ this.channel.writeAndFlush(msg).addListener(
+ (ChannelFutureListener) f -> {
+ if (!f.isSuccess()) {
+ LOG.info("Failed to send message {}", msg, f.cause());
+ negotiationFailed(f.cause());
+ } else {
+ LOG.trace("Message {} sent to socket", msg);
+ }
+ });
+ }
+
+ @Override
+ public final void channelActive(final ChannelHandlerContext ctx) {
+ LOG.debug("Starting session negotiation on channel {}", channel);
+ try {
+ startNegotiation();
+ } catch (final Exception e) {
+ LOG.warn("Unexpected negotiation failure", e);
+ negotiationFailed(e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public final void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+ LOG.debug("Negotiation read invoked on channel {}", channel);
+ try {
+ handleMessage((M)msg);
+ } catch (final Exception e) {
+ LOG.debug("Unexpected error while handling negotiation message {}", msg, e);
+ negotiationFailed(e);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ LOG.info("Unexpected error during negotiation", cause);
+ negotiationFailed(cause);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility ReconnectStrategy singleton, which will cause the reconnect process
+ * to always fail.
+ */
+@Deprecated
+@ThreadSafe
+public final class NeverReconnectStrategy implements ReconnectStrategy {
+ private final EventExecutor executor;
+ private final int timeout;
+
+ public NeverReconnectStrategy(final EventExecutor executor, final int timeout) {
+ Preconditions.checkArgument(timeout >= 0);
+ this.executor = Preconditions.checkNotNull(executor);
+ this.timeout = timeout;
+ }
+
+ @Override
+ public Future<Void> scheduleReconnect(final Throwable cause) {
+ return executor.newFailedFuture(new Throwable("Reconnect failed", cause));
+ }
+
+ @Override
+ public void reconnectSuccessful() {
+ // Nothing to do
+ }
+
+ @Override
+ public int getConnectTimeout() {
+ return timeout;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import java.io.Closeable;
+
+/**
+ * Protocol Session represents the finite state machine in underlying protocol, including timers and its purpose is to
+ * create a connection between server and client. Session is automatically started, when TCP connection is created, but
+ * can be stopped manually. If the session is up, it has to redirect messages to/from user. Handles also malformed
+ * messages and unknown requests.
+ *
+ * This interface should be implemented by a final class representing a protocol specific session.
+ */
+@Deprecated
+public interface ProtocolSession<T> extends Closeable {
+ @Override
+ void close();
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
+import java.net.InetSocketAddress;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Deprecated
+@ThreadSafe
+final class ProtocolSessionPromise<S extends ProtocolSession<?>> extends DefaultPromise<S> {
+ private static final Logger LOG = LoggerFactory.getLogger(ProtocolSessionPromise.class);
+ private final ReconnectStrategy strategy;
+ private InetSocketAddress address;
+ private final Bootstrap b;
+
+ @GuardedBy("this")
+ private Future<?> pending;
+
+ ProtocolSessionPromise(final EventExecutor executor, final InetSocketAddress address, final ReconnectStrategy strategy,
+ final Bootstrap b) {
+ super(executor);
+ this.strategy = Preconditions.checkNotNull(strategy);
+ this.address = Preconditions.checkNotNull(address);
+ this.b = Preconditions.checkNotNull(b);
+ }
+
+ synchronized void connect() {
+ final Object lock = this;
+
+ try {
+ final int timeout = this.strategy.getConnectTimeout();
+
+ LOG.debug("Promise {} attempting connect for {}ms", lock, timeout);
+
+ if(this.address.isUnresolved()) {
+ this.address = new InetSocketAddress(this.address.getHostName(), this.address.getPort());
+ }
+ this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
+ final ChannelFuture connectFuture = this.b.connect(this.address);
+ // Add listener that attempts reconnect by invoking this method again.
+ connectFuture.addListener(new BootstrapConnectListener(lock));
+ this.pending = connectFuture;
+ } catch (final Exception e) {
+ LOG.info("Failed to connect to {}", address, e);
+ setFailure(e);
+ }
+ }
+
+ @Override
+ public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+ if (super.cancel(mayInterruptIfRunning)) {
+ this.pending.cancel(mayInterruptIfRunning);
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public synchronized Promise<S> setSuccess(final S result) {
+ LOG.debug("Promise {} completed", this);
+ this.strategy.reconnectSuccessful();
+ return super.setSuccess(result);
+ }
+
+ private class BootstrapConnectListener implements ChannelFutureListener {
+ private final Object lock;
+
+ public BootstrapConnectListener(final Object lock) {
+ this.lock = lock;
+ }
+
+ @Override
+ public void operationComplete(final ChannelFuture cf) throws Exception {
+ synchronized (lock) {
+
+ LOG.debug("Promise {} connection resolved", lock);
+
+ // Triggered when a connection attempt is resolved.
+ Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(cf));
+
+ /*
+ * The promise we gave out could have been cancelled,
+ * which cascades to the connect getting cancelled,
+ * but there is a slight race window, where the connect
+ * is already resolved, but the listener has not yet
+ * been notified -- cancellation at that point won't
+ * stop the notification arriving, so we have to close
+ * the race here.
+ */
+ if (isCancelled()) {
+ if (cf.isSuccess()) {
+ LOG.debug("Closing channel for cancelled promise {}", lock);
+ cf.channel().close();
+ }
+ return;
+ }
+
+ if(cf.isSuccess()) {
+ LOG.debug("Promise {} connection successful", lock);
+ return;
+ }
+
+ LOG.debug("Attempt to connect to {} failed", ProtocolSessionPromise.this.address, cf.cause());
+
+ final Future<Void> rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
+ rf.addListener(new ReconnectingStrategyListener());
+ ProtocolSessionPromise.this.pending = rf;
+ }
+ }
+
+ private class ReconnectingStrategyListener implements FutureListener<Void> {
+ @Override
+ public void operationComplete(final Future<Void> sf) {
+ synchronized (lock) {
+ // Triggered when a connection attempt is to be made.
+ Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(sf));
+
+ /*
+ * The promise we gave out could have been cancelled,
+ * which cascades to the reconnect attempt getting
+ * cancelled, but there is a slight race window, where
+ * the reconnect attempt is already enqueued, but the
+ * listener has not yet been notified -- if cancellation
+ * happens at that point, we need to catch it here.
+ */
+ if (!isCancelled()) {
+ if (sf.isSuccess()) {
+ connect();
+ } else {
+ setFailure(sf.cause());
+ }
+ }
+ }
+ }
+ }
+
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility ReconnectStrategy singleton, which will cause the reconnect process
+ * to immediately schedule a reconnection attempt.
+ */
+@Deprecated
+@ThreadSafe
+public final class ReconnectImmediatelyStrategy implements ReconnectStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(ReconnectImmediatelyStrategy.class);
+ private final EventExecutor executor;
+ private final int timeout;
+
+ public ReconnectImmediatelyStrategy(final EventExecutor executor, final int timeout) {
+ Preconditions.checkArgument(timeout >= 0);
+ this.executor = Preconditions.checkNotNull(executor);
+ this.timeout = timeout;
+ }
+
+ @Override
+ public Future<Void> scheduleReconnect(final Throwable cause) {
+ LOG.debug("Connection attempt failed", cause);
+ return executor.newSucceededFuture(null);
+ }
+
+ @Override
+ public void reconnectSuccessful() {
+ // Nothing to do
+ }
+
+ @Override
+ public int getConnectTimeout() {
+ return timeout;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import java.net.InetSocketAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Deprecated
+final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends DefaultPromise<Void> {
+ private static final Logger LOG = LoggerFactory.getLogger(ReconnectPromise.class);
+
+ private final AbstractDispatcher<S, L> dispatcher;
+ private final InetSocketAddress address;
+ private final ReconnectStrategyFactory strategyFactory;
+ private final Bootstrap b;
+ private final AbstractDispatcher.PipelineInitializer<S> initializer;
+ private Future<?> pending;
+
+ public ReconnectPromise(final EventExecutor executor, final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
+ final ReconnectStrategyFactory connectStrategyFactory, final Bootstrap b, final AbstractDispatcher.PipelineInitializer<S> initializer) {
+ super(executor);
+ this.b = b;
+ this.initializer = Preconditions.checkNotNull(initializer);
+ this.dispatcher = Preconditions.checkNotNull(dispatcher);
+ this.address = Preconditions.checkNotNull(address);
+ this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
+ }
+
+ synchronized void connect() {
+ final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
+
+ // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
+ pending = this.dispatcher.createClient(this.address, cs, b, (channel, promise) -> {
+ initializer.initializeChannel(channel, promise);
+ // add closed channel handler
+ // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
+ // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
+ // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started
+ channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this));
+ });
+
+ pending.addListener((GenericFutureListener<Future<Object>>) future -> {
+ if (!future.isSuccess()) {
+ ReconnectPromise.this.setFailure(future.cause());
+ }
+ });
+ }
+
+ /**
+ *
+ * @return true if initial connection was established successfully, false if initial connection failed due to e.g. Connection refused, Negotiation failed
+ */
+ private boolean isInitialConnectFinished() {
+ Preconditions.checkNotNull(pending);
+ return pending.isDone() && pending.isSuccess();
+ }
+
+ @Override
+ public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+ if (super.cancel(mayInterruptIfRunning)) {
+ Preconditions.checkNotNull(pending);
+ this.pending.cancel(mayInterruptIfRunning);
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Channel handler that responds to channelInactive event and reconnects the session.
+ * Only if the promise was not canceled.
+ */
+ private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
+ private final ReconnectPromise<?, ?> promise;
+
+ public ClosedChannelHandler(final ReconnectPromise<?, ?> promise) {
+ this.promise = promise;
+ }
+
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ // This is the ultimate channel inactive handler, not forwarding
+ if (promise.isCancelled()) {
+ return;
+ }
+
+ if (promise.isInitialConnectFinished() == false) {
+ LOG.debug("Connection to {} was dropped during negotiation, reattempting", promise.address);
+ }
+
+ LOG.debug("Reconnecting after connection to {} was dropped", promise.address);
+ promise.connect();
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.util.concurrent.Future;
+
+/**
+ * Interface exposed by a reconnection strategy provider. A reconnection
+ * strategy decides whether to attempt reconnection and when to do that.
+ *
+ * The proper way of using this API is such that when a connection attempt
+ * has failed, the user will call scheduleReconnect() to obtain a future,
+ * which tracks schedule of the next connect attempt. The user should add its
+ * own listener to be get notified when the future is done. Once the
+ * the notification fires, user should examine the future to see whether
+ * it is successful or not. If it is successful, the user should immediately
+ * initiate a connection attempt. If it is unsuccessful, the user must
+ * not attempt any more connection attempts and should abort the reconnection
+ * process.
+ */
+@Deprecated
+public interface ReconnectStrategy {
+ /**
+ * Query the strategy for the connect timeout.
+ *
+ * @return connect try timeout in milliseconds, or
+ * 0 for infinite (or system-default) timeout
+ * @throws Exception if the connection should not be attempted
+ */
+ int getConnectTimeout() throws Exception;
+
+ /**
+ * Schedule a connection attempt. The precise time when the connection
+ * should be attempted is signaled by successful completion of returned
+ * future.
+ *
+ * @param cause Cause of previous failure
+ * @return a future tracking the schedule, may not be null
+ * @throws IllegalStateException when a connection attempt is currently
+ * scheduled.
+ */
+ Future<Void> scheduleReconnect(Throwable cause);
+
+ /**
+ * Reset the strategy state. Users call this method once the reconnection
+ * process succeeds.
+ */
+ void reconnectSuccessful();
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+/**
+ * Factory interface for creating new ReconnectStrategy instances. This is
+ * primarily useful for allowing injection of a specific type of strategy for
+ * on-demand use, pretty much like you would use a ThreadFactory.
+ */
+@Deprecated
+public interface ReconnectStrategyFactory {
+ /**
+ * Create a new ReconnectStrategy.
+ *
+ * @return a new reconnecty strategy
+ */
+ ReconnectStrategy createReconnectStrategy();
+}
+
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import java.util.EventListener;
+
+/**
+ * Listener that receives session state information. This interface should be
+ * implemented by a protocol specific abstract class, that is extended by
+ * a final class that implements the methods.
+ */
+@Deprecated
+public interface SessionListener<M, S extends ProtocolSession<?>, T extends TerminationReason> extends EventListener {
+ /**
+ * Fired when the session was established successfully.
+ *
+ * @param session New session
+ */
+ void onSessionUp(S session);
+
+ /**
+ * Fired when the session went down because of an IO error. Implementation should take care of closing underlying
+ * session.
+ *
+ * @param session that went down
+ * @param e Exception that was thrown as the cause of session being down
+ */
+ void onSessionDown(S session, Exception e);
+
+ /**
+ * Fired when the session is terminated locally. The session has already been closed and transitioned to IDLE state.
+ * Any outstanding queued messages were not sent. The user should not attempt to make any use of the session.
+ *
+ * @param reason the cause why the session went down
+ */
+ void onSessionTerminated(S session, T reason);
+
+ /**
+ * Fired when a normal protocol message is received.
+ *
+ * @param message Protocol message
+ */
+ void onMessage(S session, M message);
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+
+/**
+ * Factory for generating Session Listeners. Used by a server. This interface should be
+ * implemented by a protocol specific abstract class, that is extended by
+ * a final class that implements the methods.
+ */
+@Deprecated
+public interface SessionListenerFactory<T extends SessionListener<?, ?, ?>> {
+ /**
+ * Returns one session listener
+ * @return specific session listener
+ */
+ T getSessionListener();
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.ChannelInboundHandler;
+
+/**
+ * Session negotiator concepts. A negotiator is responsible for message
+ * handling while the exact session parameters are not known. Once the
+ * session parameters are finalized, the negotiator replaces itself in
+ * the channel pipeline with the session.
+ *
+ * @param <T> Protocol session type.
+ */
+@Deprecated
+public interface SessionNegotiator<T extends ProtocolSession<?>> extends ChannelInboundHandler {
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.Channel;
+import io.netty.util.concurrent.Promise;
+
+/**
+ * A factory class creating SessionNegotiators.
+ *
+ * @param <S> session type
+ */
+@Deprecated
+public interface SessionNegotiatorFactory<M, S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> {
+ /**
+ * Create a new negotiator attached to a channel, which will notify
+ * a promise once the negotiation completes.
+ *
+ * @param channel Underlying channel
+ * @param promise Promise to be notified
+ * @return new negotiator instance
+ */
+ SessionNegotiator<S> getSessionNegotiator(SessionListenerFactory<L> factory, Channel channel, Promise<S> promise);
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+/**
+ * Marker interface for grouping session termination cause.
+ */
+@Deprecated
+public interface TerminationReason {
+
+ /**
+ * Get cause of session termination.
+ * @return human-readable cause.
+ */
+ String getErrorMessage();
+}
+
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Swiss army knife equivalent for reconnect strategies.
+ *
+ * This strategy continues to schedule reconnect attempts, each having to complete in a fixed time (connectTime).
+ *
+ * Initial sleep time is specified as minSleep. Each subsequent unsuccessful attempt multiplies this time by a constant
+ * factor (sleepFactor) -- this allows for either constant reconnect times (sleepFactor = 1), or various degrees of
+ * exponential back-off (sleepFactor > 1). Maximum sleep time between attempts can be capped to a specific value
+ * (maxSleep).
+ *
+ * The strategy can optionally give up based on two criteria:
+ *
+ * A preset number of connection retries (maxAttempts) has been reached, or
+ *
+ * A preset absolute deadline is reached (deadline nanoseconds, as reported by System.nanoTime(). In this specific case,
+ * both connectTime and maxSleep will be controlled such that the connection attempt is resolved as closely to the
+ * deadline as possible.
+ *
+ * Both these caps can be combined, with the strategy giving up as soon as the first one is reached.
+ */
+@Deprecated
+@ThreadSafe
+public final class TimedReconnectStrategy implements ReconnectStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(TimedReconnectStrategy.class);
+ private final EventExecutor executor;
+ private final Long deadline, maxAttempts, maxSleep;
+ private final double sleepFactor;
+ private final int connectTime;
+ private final long minSleep;
+
+ @GuardedBy("this")
+ private long attempts;
+
+ @GuardedBy("this")
+ private long lastSleep;
+
+ @GuardedBy("this")
+ private boolean scheduled;
+
+ public TimedReconnectStrategy(final EventExecutor executor, final int connectTime, final long minSleep, final double sleepFactor,
+ final Long maxSleep, final Long maxAttempts, final Long deadline) {
+ Preconditions.checkArgument(maxSleep == null || minSleep <= maxSleep);
+ Preconditions.checkArgument(sleepFactor >= 1);
+ Preconditions.checkArgument(connectTime >= 0);
+ this.executor = Preconditions.checkNotNull(executor);
+ this.deadline = deadline;
+ this.maxAttempts = maxAttempts;
+ this.minSleep = minSleep;
+ this.maxSleep = maxSleep;
+ this.sleepFactor = sleepFactor;
+ this.connectTime = connectTime;
+ }
+
+ @Override
+ public synchronized Future<Void> scheduleReconnect(final Throwable cause) {
+ LOG.debug("Connection attempt failed", cause);
+
+ // Check if a reconnect attempt is scheduled
+ Preconditions.checkState(!this.scheduled);
+
+ // Get a stable 'now' time for deadline calculations
+ final long now = System.nanoTime();
+
+ // Obvious stop conditions
+ if (this.maxAttempts != null && this.attempts >= this.maxAttempts) {
+ return this.executor.newFailedFuture(new Throwable("Maximum reconnection attempts reached"));
+ }
+ if (this.deadline != null && this.deadline <= now) {
+ return this.executor.newFailedFuture(new TimeoutException("Reconnect deadline reached"));
+ }
+
+ /*
+ * First connection attempt gets initialized to minimum sleep,
+ * each subsequent is exponentially backed off by sleepFactor.
+ */
+ if (this.attempts != 0) {
+ this.lastSleep *= this.sleepFactor;
+ } else {
+ this.lastSleep = this.minSleep;
+ }
+
+ // Cap the sleep time to maxSleep
+ if (this.maxSleep != null && this.lastSleep > this.maxSleep) {
+ LOG.debug("Capped sleep time from {} to {}", this.lastSleep, this.maxSleep);
+ this.lastSleep = this.maxSleep;
+ }
+
+ this.attempts++;
+
+ // Check if the reconnect attempt is within the deadline
+ if (this.deadline != null && this.deadline <= now + TimeUnit.MILLISECONDS.toNanos(this.lastSleep)) {
+ return this.executor.newFailedFuture(new TimeoutException("Next reconnect would happen after deadline"));
+ }
+
+ LOG.debug("Connection attempt {} sleeping for {} milliseconds", this.attempts, this.lastSleep);
+
+ // If we are not sleeping at all, return an already-succeeded future
+ if (this.lastSleep == 0) {
+ return this.executor.newSucceededFuture(null);
+ }
+
+ // Need to retain a final reference to this for locking purposes,
+ // also set the scheduled flag.
+ final Object lock = this;
+ this.scheduled = true;
+
+ // Schedule a task for the right time. It will also clear the flag.
+ return this.executor.schedule(() -> {
+ synchronized (lock) {
+ Preconditions.checkState(TimedReconnectStrategy.this.scheduled);
+ TimedReconnectStrategy.this.scheduled = false;
+ }
+
+ return null;
+ }, this.lastSleep, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public synchronized void reconnectSuccessful() {
+ Preconditions.checkState(!this.scheduled);
+ this.attempts = 0;
+ }
+
+ @Override
+ public int getConnectTimeout() throws TimeoutException {
+ int timeout = this.connectTime;
+
+ if (this.deadline != null) {
+
+ // If there is a deadline, we may need to cap the connect
+ // timeout to meet the deadline.
+ final long now = System.nanoTime();
+ if (now >= this.deadline) {
+ throw new TimeoutException("Reconnect deadline already passed");
+ }
+
+ final long left = TimeUnit.NANOSECONDS.toMillis(this.deadline - now);
+ if (left < 1) {
+ throw new TimeoutException("Connect timeout too close to deadline");
+ }
+
+ /*
+ * A bit of magic:
+ * - if time left is less than the timeout, set it directly
+ * - if there is no timeout, and time left is:
+ * - less than maximum integer, set timeout to time left
+ * - more than maximum integer, set timeout Integer.MAX_VALUE
+ */
+ if (timeout > left) {
+ timeout = (int) left;
+ } else if (timeout == 0) {
+ timeout = left <= Integer.MAX_VALUE ? (int) left : Integer.MAX_VALUE;
+ }
+ }
+ return timeout;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.SucceededFuture;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ServerTest {
+ SimpleDispatcher clientDispatcher, dispatcher;
+
+ SimpleSession session = null;
+
+ ChannelFuture server = null;
+
+ InetSocketAddress serverAddress;
+ private NioEventLoopGroup eventLoopGroup;
+ // Dedicated loop group for server, needed for testing reconnection client
+ // With dedicated server group we can simulate session drop by shutting only the server group down
+ private NioEventLoopGroup serverLoopGroup;
+
+ @Before
+ public void setUp() {
+ final int port = 10000 + (int)(10000 * Math.random());
+ serverAddress = new InetSocketAddress("127.0.0.1", port);
+ eventLoopGroup = new NioEventLoopGroup();
+ serverLoopGroup = new NioEventLoopGroup();
+ }
+
+ @After
+ public void tearDown() throws IOException, InterruptedException, ExecutionException {
+ if(server != null) {
+ this.server.channel().close();
+ }
+ this.eventLoopGroup.shutdownGracefully().get();
+ this.serverLoopGroup.shutdownGracefully().get();
+ try {
+ Thread.sleep(500);
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testConnectionRefused() throws Exception {
+ this.clientDispatcher = getClientDispatcher();
+
+ final ReconnectStrategy mockReconnectStrategy = getMockedReconnectStrategy();
+
+ this.clientDispatcher.createClient(this.serverAddress, mockReconnectStrategy, SimpleSessionListener::new);
+
+ Mockito.verify(mockReconnectStrategy, timeout(5000).atLeast(2)).scheduleReconnect(any(Throwable.class));
+ }
+
+ @Test
+ public void testConnectionReestablishInitial() throws Exception {
+ this.clientDispatcher = getClientDispatcher();
+
+ final ReconnectStrategy mockReconnectStrategy = getMockedReconnectStrategy();
+
+ this.clientDispatcher.createClient(this.serverAddress, mockReconnectStrategy, SimpleSessionListener::new);
+
+ Mockito.verify(mockReconnectStrategy, timeout(5000).atLeast(2)).scheduleReconnect(any(Throwable.class));
+
+ final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+ this.dispatcher = getServerDispatcher(p);
+
+ this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
+
+ this.server.get();
+
+ assertEquals(true, p.get(3, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testConnectionDrop() throws Exception {
+ final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+
+ this.dispatcher = getServerDispatcher(p);
+
+ this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
+
+ this.server.get();
+
+ this.clientDispatcher = getClientDispatcher();
+
+ final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy();
+ this.session = this.clientDispatcher.createClient(this.serverAddress,
+ reconnectStrategy, SimpleSessionListener::new).get(6, TimeUnit.SECONDS);
+
+ assertEquals(true, p.get(3, TimeUnit.SECONDS));
+
+ shutdownServer();
+
+ // No reconnect should be scheduled after server drops connection with not-reconnecting client
+ verify(reconnectStrategy, times(0)).scheduleReconnect(any(Throwable.class));
+ }
+
+ @Test
+ public void testConnectionReestablishAfterDrop() throws Exception {
+ final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+
+ this.dispatcher = getServerDispatcher(p);
+
+ this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
+
+ this.server.get();
+
+ this.clientDispatcher = getClientDispatcher();
+
+ final ReconnectStrategyFactory reconnectStrategyFactory = mock(ReconnectStrategyFactory.class);
+ final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy();
+ doReturn(reconnectStrategy).when(reconnectStrategyFactory).createReconnectStrategy();
+
+ this.clientDispatcher.createReconnectingClient(this.serverAddress,
+ reconnectStrategyFactory, SimpleSessionListener::new);
+
+ assertEquals(true, p.get(3, TimeUnit.SECONDS));
+ shutdownServer();
+
+ verify(reconnectStrategyFactory, timeout(20000).atLeast(2)).createReconnectStrategy();
+ }
+
+ @Test
+ public void testConnectionEstablished() throws Exception {
+ final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+
+ this.dispatcher = getServerDispatcher(p);
+
+ this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
+
+ this.server.get();
+
+ this.clientDispatcher = getClientDispatcher();
+
+ this.session = this.clientDispatcher.createClient(this.serverAddress,
+ new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), SimpleSessionListener::new).get(6,
+ TimeUnit.SECONDS);
+
+ assertEquals(true, p.get(3, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testConnectionFailed() throws IOException, InterruptedException, ExecutionException, TimeoutException {
+ final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+
+ this.dispatcher = getServerDispatcher(p);
+
+ this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
+
+ this.server.get();
+
+ this.clientDispatcher = getClientDispatcher();
+
+ this.session = this.clientDispatcher.createClient(this.serverAddress,
+ new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), SimpleSessionListener::new).get(6,
+ TimeUnit.SECONDS);
+
+ final Future<?> session = this.clientDispatcher.createClient(this.serverAddress,
+ new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), SimpleSessionListener::new);
+ assertFalse(session.isSuccess());
+ }
+
+ @Test
+ public void testNegotiationFailedReconnect() throws Exception {
+ final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+
+ this.dispatcher = getServerDispatcher(p);
+
+ this.server = this.dispatcher.createServer(this.serverAddress, SimpleSessionListener::new);
+
+ this.server.get();
+
+ this.clientDispatcher = new SimpleDispatcher(
+ (factory, channel, promise) -> new SimpleSessionNegotiator(promise, channel) {
+ @Override
+ protected void startNegotiation() throws Exception {
+ negotiationFailed(new IllegalStateException("Negotiation failed"));
+ }
+ }, new DefaultPromise<>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+
+ final ReconnectStrategyFactory reconnectStrategyFactory = mock(ReconnectStrategyFactory.class);
+ final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy();
+ doReturn(reconnectStrategy).when(reconnectStrategyFactory).createReconnectStrategy();
+
+ this.clientDispatcher.createReconnectingClient(this.serverAddress,
+ reconnectStrategyFactory, SimpleSessionListener::new);
+
+
+ // Reconnect strategy should be consulted at least twice, for initial connect and reconnect attempts after drop
+ verify(reconnectStrategyFactory, timeout((int) TimeUnit.MINUTES.toMillis(3)).atLeast(2)).createReconnectStrategy();
+ }
+
+ private SimpleDispatcher getClientDispatcher() {
+ return new SimpleDispatcher((factory, channel, promise) -> new SimpleSessionNegotiator(promise, channel), new DefaultPromise<>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+ }
+
+ private ReconnectStrategy getMockedReconnectStrategy() throws Exception {
+ final ReconnectStrategy mockReconnectStrategy = mock(ReconnectStrategy.class);
+ final Future<Void> future = new SucceededFuture<>(GlobalEventExecutor.INSTANCE, null);
+ doReturn(future).when(mockReconnectStrategy).scheduleReconnect(any(Throwable.class));
+ doReturn(5000).when(mockReconnectStrategy).getConnectTimeout();
+ doNothing().when(mockReconnectStrategy).reconnectSuccessful();
+ return mockReconnectStrategy;
+ }
+
+
+ private void shutdownServer() throws InterruptedException, ExecutionException {
+ // Shutdown server
+ server.channel().close().get();
+ // Closing server channel does not close established connections, eventLoop has to be closed as well to simulate dropped session
+ serverLoopGroup.shutdownGracefully().get();
+ }
+
+ private SimpleDispatcher getServerDispatcher(final Promise<Boolean> p) {
+ return new SimpleDispatcher((factory, channel, promise) -> {
+ p.setSuccess(true);
+ return new SimpleSessionNegotiator(promise, channel);
+ }, null, serverLoopGroup);
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class Session extends AbstractProtocolSession<SimpleMessage> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Session.class);
+
+ public final List<SimpleMessage> msgs = Lists.newArrayList();
+
+ public boolean up = false;
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void handleMessage(final SimpleMessage msg) {
+ LOG.debug("Message received: {}", msg.getMessage());
+ this.up = true;
+ this.msgs.add(msg);
+ LOG.debug(this.msgs.size() + "");
+ }
+
+ @Override
+ public void endOfInput() {
+ LOG.debug("End of input reported.");
+ }
+
+ @Override
+ protected void sessionUp() {
+ LOG.debug("Session up reported.");
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ *
+ */
+public class SimpleByteToMessageDecoder extends ByteToMessageDecoder {
+ @Override
+ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
+ out.add(new SimpleMessage(StandardCharsets.UTF_8.decode(in.nioBuffer()).toString()));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOutboundHandler;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
+
+import java.net.InetSocketAddress;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class SimpleDispatcher extends AbstractDispatcher<SimpleSession, SimpleSessionListener> {
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleDispatcher.class);
+
+ private final SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener> negotiatorFactory;
+ private final ChannelOutboundHandler encoder = new SimpleMessageToByteEncoder();
+
+ private final class SimplePipelineInitializer implements PipelineInitializer<SimpleSession> {
+ final SessionListenerFactory<SimpleSessionListener> listenerFactory;
+
+ SimplePipelineInitializer(final SessionListenerFactory<SimpleSessionListener> listenerFactory) {
+ this.listenerFactory = Preconditions.checkNotNull(listenerFactory);
+ }
+
+ @Override
+ public void initializeChannel(final SocketChannel channel, final Promise<SimpleSession> promise) {
+ channel.pipeline().addLast(new SimpleByteToMessageDecoder());
+ channel.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(listenerFactory, channel, promise));
+ channel.pipeline().addLast(encoder);
+ LOG.debug("initialization completed for channel {}", channel);
+ }
+
+ }
+
+ public SimpleDispatcher(final SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener> negotiatorFactory,
+ final Promise<SimpleSession> promise, final EventLoopGroup eventLoopGroup) {
+ super(eventLoopGroup, eventLoopGroup);
+ this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
+ }
+
+ public Future<SimpleSession> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final SessionListenerFactory<SimpleSessionListener> listenerFactory) {
+ return super.createClient(address, strategy, new SimplePipelineInitializer(listenerFactory));
+ }
+
+ public Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory strategy, final SessionListenerFactory<SimpleSessionListener> listenerFactory) {
+ return super.createReconnectingClient(address, strategy, new SimplePipelineInitializer(listenerFactory));
+ }
+
+ public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<SimpleSessionListener> listenerFactory) {
+ return super.createServer(address, new SimplePipelineInitializer(listenerFactory));
+ }
+
+ @Override
+ public void close() {
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+public class SimpleMessage {
+
+ private final String s;
+
+ public SimpleMessage(final String s) {
+ this.s = s;
+ }
+
+ public String getMessage() {
+ return this.s;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+/**
+ *
+ */
+@Sharable
+public class SimpleMessageToByteEncoder extends MessageToByteEncoder<SimpleMessage> {
+ @Override
+ protected void encode(final ChannelHandlerContext ctx, final SimpleMessage msg, final ByteBuf out) {
+ out.writeBytes(msg.getMessage().getBytes());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+public final class SimpleSession extends AbstractProtocolSession<SimpleMessage> {
+
+ public SimpleSession() {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void handleMessage(final SimpleMessage msg) {
+ }
+
+ @Override
+ public void endOfInput() {
+ }
+
+ @Override
+ protected void sessionUp() {
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple Session Listener that is notified about messages and changes in the session.
+ */
+public class SimpleSessionListener implements SessionListener<SimpleMessage, SimpleSession, TerminationReason> {
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleSessionListener.class);
+
+ public List<SimpleMessage> messages = new ArrayList<>();
+
+ public boolean up = false;
+
+ public boolean failed = false;
+
+ @Override
+ public void onMessage(final SimpleSession session, final SimpleMessage message) {
+ LOG.debug("Received message: " + message.getClass() + " " + message);
+ this.messages.add(message);
+ }
+
+ @Override
+ public void onSessionUp(final SimpleSession session) {
+ this.up = true;
+ }
+
+ @Override
+ public void onSessionDown(final SimpleSession session, final Exception e) {
+ this.failed = true;
+ this.notifyAll();
+ }
+
+ @Override
+ public void onSessionTerminated(final SimpleSession session, final TerminationReason reason) {
+ this.failed = true;
+ this.notifyAll();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+public class SimpleSessionListenerFactory implements SessionListenerFactory<SimpleSessionListener> {
+
+ @Override
+ public SimpleSessionListener getSessionListener() {
+ return new SimpleSessionListener();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.protocol.framework;
+
+import io.netty.channel.Channel;
+import io.netty.util.concurrent.Promise;
+
+public class SimpleSessionNegotiator extends AbstractSessionNegotiator<SimpleMessage, SimpleSession> {
+
+ public SimpleSessionNegotiator(final Promise<SimpleSession> promise, final Channel channel) {
+ super(promise, channel);
+ }
+
+ @Override
+ protected void startNegotiation() throws Exception {
+ negotiationSuccessful(new SimpleSession());
+ }
+
+ @Override
+ protected void handleMessage(final SimpleMessage msg) throws Exception {
+ throw new IllegalStateException("This method should never be invoked");
+ }
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
+ </encoder>
+ </appender>
+
+ <root level="TRACE">
+ <appender-ref ref="STDOUT" />
+ </root>
+</configuration>