BUG-2208: pcc-mock dispatcher refactored 67/24867/2
authorMilos Fabian <milfabia@cisco.com>
Mon, 3 Aug 2015 12:56:44 +0000 (14:56 +0200)
committerMilos Fabian <milfabia@cisco.com>
Thu, 6 Aug 2015 14:20:21 +0000 (16:20 +0200)
-reworked pcc dispatcher
-creating client - new API defined
-reconnect promise for pcc
-junit tests
-manualy tested againts ODL PCEP

Change-Id: I068ba4f894564fa237a733781d6f293199ddbb96
Signed-off-by: Milos Fabian <milfabia@cisco.com>
13 files changed:
pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/AbstractPCCDispatcher.java [deleted file]
pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/Main.java
pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/PCCDispatcher.java [deleted file]
pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/PCCPReconnectPromise.java [deleted file]
pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/PCCProtocolSessionPromise.java [deleted file]
pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/PccDispatcherImpl.java [new file with mode: 0644]
pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/PccReconnectPromise.java [new file with mode: 0644]
pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/api/PccDispatcher.java [new file with mode: 0644]
pcep/pcc-mock/src/test/java/org/opendaylight/protocol/pcep/pcc/mock/PCCMockTest.java
pcep/pcc-mock/src/test/java/org/opendaylight/protocol/pcep/pcc/mock/PccDispatcherImplTest.java [new file with mode: 0644]
pcep/pcc-mock/src/test/java/org/opendaylight/protocol/pcep/pcc/mock/TestingSessionListenerFactory.java
pcep/testtool/src/test/java/org/opendaylight/protocol/pcep/testtool/PCCMock.java
pcep/testtool/src/test/java/org/opendaylight/protocol/pcep/testtool/PCEPTestingToolTest.java

diff --git a/pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/AbstractPCCDispatcher.java b/pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/AbstractPCCDispatcher.java
deleted file mode 100644 (file)
index af07159..0000000
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.protocol.pcep.pcc.mock;
-
-import com.google.common.base.Preconditions;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.ServerChannel;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GlobalEventExecutor;
-import io.netty.util.concurrent.Promise;
-import java.io.Closeable;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
-import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractPCCDispatcher implements Closeable {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractPCCDispatcher.class);
-    private static final Integer SOCKET_BACKLOG_SIZE = 128;
-    private final EventLoopGroup bossGroup;
-    private final EventLoopGroup workerGroup;
-    private final EventExecutor executor;
-
-    protected AbstractPCCDispatcher(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup) {
-        this.bossGroup = Preconditions.checkNotNull(bossGroup);
-        this.workerGroup = Preconditions.checkNotNull(workerGroup);
-        this.executor = Preconditions.checkNotNull(GlobalEventExecutor.INSTANCE);
-    }
-
-    protected ChannelFuture createServer(final InetSocketAddress address, final ChannelPipelineInitializer initializer) {
-        return this.createServer(address, NioServerSocketChannel.class, initializer);
-    }
-
-    private ChannelFuture createServer(final SocketAddress address, Class<? extends ServerChannel> channelClass,
-                                       final AbstractPCCDispatcher.ChannelPipelineInitializer initializer) {
-        final ServerBootstrap b = new ServerBootstrap();
-        b.childHandler(new ChannelInitializer<SocketChannel>() {
-            @Override
-            protected void initChannel(final SocketChannel ch) {
-                initializer.initializeChannel(ch, new DefaultPromise(AbstractPCCDispatcher.this.executor));
-            }
-        });
-        b.option(ChannelOption.SO_BACKLOG, SOCKET_BACKLOG_SIZE);
-
-        b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-        this.customizeBootstrap(b);
-        if (b.group() == null) {
-            b.group(this.bossGroup, this.workerGroup);
-        }
-
-        try {
-            b.channel(channelClass);
-        } catch (IllegalStateException e) {
-            LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
-        }
-
-        final ChannelFuture f = b.bind(address);
-        LOG.debug("Initiated server {} at {}.", f, address);
-        return f;
-    }
-
-    public void customizeBootstrap(final ServerBootstrap b) {
-    }
-
-    protected Future<PCEPSessionImpl> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final ChannelPipelineInitializer initializer) {
-        final Bootstrap b = new Bootstrap();
-        final PCCProtocolSessionPromise p = new PCCProtocolSessionPromise(this.executor, address, strategy, b);
-        (b.option(ChannelOption.SO_KEEPALIVE, true)).handler(new ChannelInitializer<SocketChannel>() {
-            @Override
-            protected void initChannel(final SocketChannel ch) {
-                initializer.initializeChannel(ch, p);
-            }
-        });
-        this.customizeBootstrap(b);
-        this.setWorkerGroup(b);
-        this.setChannelFactory(b);
-        p.connect();
-        LOG.debug("Client created.");
-        return p;
-    }
-
-    private void setWorkerGroup(final Bootstrap b) {
-        if (b.group() == null) {
-            b.group(this.workerGroup);
-        }
-    }
-
-    protected Future<PCEPSessionImpl> createClient(final InetSocketAddress address, final ReconnectStrategy strategy,
-                                                   final Bootstrap bootstrap, final ChannelPipelineInitializer initializer) {
-        final PCCProtocolSessionPromise p = new PCCProtocolSessionPromise(this.executor, address, strategy, bootstrap);
-        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
-            @Override
-            protected void initChannel(final SocketChannel ch) {
-                initializer.initializeChannel(ch, p);
-            }
-        });
-        p.connect();
-        LOG.debug("Client created.");
-        return p;
-    }
-
-    protected void customizeBootstrap(final Bootstrap b) {
-    }
-
-    protected Future<Void> createReconnectingClient(final InetSocketAddress address,final  ReconnectStrategyFactory connectStrategyFactory, final ChannelPipelineInitializer initializer) {
-        final Bootstrap b = new Bootstrap();
-        final PCCPReconnectPromise p = new PCCPReconnectPromise(GlobalEventExecutor.INSTANCE, this, address, connectStrategyFactory, b, initializer);
-        b.option(ChannelOption.SO_KEEPALIVE, true);
-        this.customizeBootstrap(b);
-        this.setWorkerGroup(b);
-        this.setChannelFactory(b);
-        p.connect();
-        return p;
-    }
-
-    private void setChannelFactory(final Bootstrap b) {
-        try {
-            b.channel(NioSocketChannel.class);
-        } catch (IllegalStateException e) {
-            LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
-        }
-
-    }
-
-    @Override
-    public void close() {
-    }
-
-    protected interface ChannelPipelineInitializer {
-        void initializeChannel(SocketChannel socketChannel, Promise<PCEPSessionImpl> promise);
-    }
-}
-
index b24f34a5b1d96e1b33a4cb5f7cf4653a19f85760..7cff740196f40ed04d664411402a4d499840a16e 100644 (file)
@@ -19,39 +19,27 @@ import com.google.common.net.HostAndPort;
 import com.google.common.net.InetAddresses;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
-import io.netty.util.concurrent.GlobalEventExecutor;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
-import org.opendaylight.protocol.framework.NeverReconnectStrategy;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
-import org.opendaylight.protocol.framework.TimedReconnectStrategy;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.protocol.pcep.PCEPCapability;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
 import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
 import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactory;
-import org.opendaylight.protocol.pcep.PCEPSessionProposalFactory;
 import org.opendaylight.protocol.pcep.ietf.initiated00.CrabbeInitiatedActivator;
+import org.opendaylight.protocol.pcep.ietf.stateful07.PCEPStatefulCapability;
 import org.opendaylight.protocol.pcep.ietf.stateful07.StatefulActivator;
 import org.opendaylight.protocol.pcep.impl.BasePCEPSessionProposalFactory;
 import org.opendaylight.protocol.pcep.impl.DefaultPCEPSessionNegotiatorFactory;
+import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
 import org.opendaylight.protocol.pcep.pcc.mock.api.PccTunnelManager;
 import org.opendaylight.protocol.pcep.spi.PCEPExtensionProviderContext;
 import org.opendaylight.protocol.pcep.spi.pojo.ServiceLoaderPCEPExtensionProviderContext;
 import org.opendaylight.tcpmd5.api.KeyMapping;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.crabbe.initiated.rev131126.Stateful1;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.crabbe.initiated.rev131126.Stateful1Builder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev131222.Tlvs1;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev131222.Tlvs1Builder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev131222.stateful.capability.tlv.StatefulBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.Open;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.OpenBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.TlvsBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,9 +51,10 @@ public final class Main {
     private static final int DEFAULT_LOCAL_PORT = 0;
     private static final short DEFAULT_KEEP_ALIVE = 30;
     private static final short DEFAULT_DEAD_TIMER = 120;
-    private static final int RECONNECT_STRATEGY_TIMEOUT = 2000;
     private static final InetAddress LOCALHOST = InetAddresses.forString("127.0.0.1");
-    private static final int MILISECONDS = 1000;
+    private static final PCEPCapability STATEFUL_CAPABILITY = new PCEPStatefulCapability(true, true, true,
+            false, false, false, false);
+    private static final List<PCEPCapability> CAPABILITIES = Lists.newArrayList(STATEFUL_CAPABILITY);
 
     private Main() { }
 
@@ -79,7 +68,7 @@ public final class Main {
         short ka = DEFAULT_KEEP_ALIVE;
         short dt = DEFAULT_DEAD_TIMER;
         String password = null;
-        int reconnectTime = -1;
+        long reconnectTime = -1;
         int redelegationTimeout = 0;
         int stateTimeout = -1;
         final Timer timer = new HashedWheelTimer();
@@ -106,7 +95,7 @@ public final class Main {
             } else if (args[argIdx].equals("--password")) {
                 password = args[++argIdx];
             } else if (args[argIdx].equals("--reconnect")) {
-                reconnectTime = Integer.valueOf(args[++argIdx]).intValue() * MILISECONDS;
+                reconnectTime =  TimeUnit.SECONDS.toMillis(Integer.valueOf(args[++argIdx]).intValue());
             } else if (args[argIdx].equals("--redelegation-timeout")) {
                 redelegationTimeout = Integer.valueOf(args[++argIdx]);
             } else if (args[argIdx].equals("--state-timeout")) {
@@ -121,41 +110,36 @@ public final class Main {
 
     public static void createPCCs(final int lspsPerPcc, final boolean pcerr, final int pccCount,
             final InetSocketAddress localAddress, final List<InetSocketAddress> remoteAddress, final short keepalive, final short deadtimer,
-            final String password, final int reconnectTime, final int redelegationTimeout, final int stateTimeout, final Timer timer) throws InterruptedException, ExecutionException {
+            final String password, final long reconnectTime, final int redelegationTimeout, final int stateTimeout, final Timer timer) throws InterruptedException, ExecutionException {
         startActivators();
         InetAddress currentAddress = localAddress.getAddress();
-        final Open openMessage = getOpenMessage(keepalive, deadtimer);
-        final PCCDispatcher pccDispatcher = new PCCDispatcher(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry(),
-                getSessionNegotiatorFactory(openMessage));
+        final PccDispatcherImpl pccDispatcher = new PccDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry());
         for (int i = 0; i < pccCount; i++) {
             final PccTunnelManager tunnelManager = new PccTunnelManagerImpl(lspsPerPcc, currentAddress,
                     redelegationTimeout, stateTimeout, timer);
             createPCC(pcerr, new InetSocketAddress(currentAddress, localAddress.getPort()),
-                    remoteAddress, openMessage, pccDispatcher, password, reconnectTime, tunnelManager);
+                    remoteAddress, getSessionNegotiatorFactory(keepalive, deadtimer), pccDispatcher, password, reconnectTime, tunnelManager);
             currentAddress = InetAddresses.increment(currentAddress);
         }
     }
 
     private static void createPCC(final boolean pcerr, final InetSocketAddress localAddress,
-            final List<InetSocketAddress> remoteAddress, final Open openMessage, final PCCDispatcher pccDispatcher,
-            final String password, final int reconnectTime, final PccTunnelManager tunnelManager) throws InterruptedException, ExecutionException {
-        final PCEPSessionNegotiatorFactory snf = getSessionNegotiatorFactory(openMessage);
+            final List<InetSocketAddress> remoteAddress, final PCEPSessionNegotiatorFactory<PCEPSessionImpl> snf, final PccDispatcherImpl pccDispatcher,
+            final String password, final long reconnectTime, final PccTunnelManager tunnelManager) throws InterruptedException, ExecutionException {
 
         for (final InetSocketAddress pceAddress : remoteAddress) {
-            pccDispatcher.createClient(localAddress, pceAddress, reconnectTime == -1 ? getNeverReconnectStrategyFactory() : getTimedReconnectStrategyFactory(reconnectTime),
+            pccDispatcher.createClient(pceAddress, reconnectTime,
                     new PCEPSessionListenerFactory() {
                         @Override
                         public PCEPSessionListener getSessionListener() {
                             return new PccSessionListener(remoteAddress.indexOf(pceAddress), tunnelManager, pcerr);
                         }
-                    } ,snf, getKeyMapping(pceAddress.getAddress(), password));
+                    }, snf, getKeyMapping(pceAddress.getAddress(), password), localAddress);
         }
     }
 
-    private static PCEPSessionNegotiatorFactory getSessionNegotiatorFactory(final Open openMessage) {
-        final List<PCEPCapability> capabilities = new ArrayList<>();
-        final PCEPSessionProposalFactory proposal = new BasePCEPSessionProposalFactory(openMessage.getDeadTimer(), openMessage.getKeepalive(), capabilities);
-        return new DefaultPCEPSessionNegotiatorFactory(proposal, 0);
+    private static PCEPSessionNegotiatorFactory<PCEPSessionImpl> getSessionNegotiatorFactory(final short keepAlive, final short deadTimer) {
+        return new DefaultPCEPSessionNegotiatorFactory(new BasePCEPSessionProposalFactory(deadTimer, keepAlive, CAPABILITIES), 0);
     }
 
     private static ch.qos.logback.classic.Logger getRootLogger(final LoggerContext lc) {
@@ -190,13 +174,6 @@ public final class Main {
         return null;
     }
 
-    private static Open getOpenMessage(final short keepalive, final short deadtimer) {
-        final Tlvs1 tlvs1 = new Tlvs1Builder().setStateful(new StatefulBuilder().addAugmentation(Stateful1.class,
-                new Stateful1Builder().setInitiation(true).build()).setLspUpdateCapability(true).build()).build();
-        return new OpenBuilder().setTlvs(new TlvsBuilder().addAugmentation(Tlvs1.class, tlvs1).build())
-                .setKeepalive(keepalive).setDeadTimer(deadtimer).setSessionId((short) 0).build();
-    }
-
     private static void startActivators() {
         final PCCActivator pccActivator = new PCCActivator();
         final StatefulActivator stateful = new StatefulActivator();
@@ -207,26 +184,4 @@ public final class Main {
         activator.start(ctx);
     }
 
-    @SuppressWarnings("deprecation")
-    private static ReconnectStrategyFactory getNeverReconnectStrategyFactory() {
-        return new ReconnectStrategyFactory() {
-
-            @Override
-            public ReconnectStrategy createReconnectStrategy() {
-                return new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, RECONNECT_STRATEGY_TIMEOUT);
-            }
-        };
-    }
-
-    @SuppressWarnings("deprecation")
-    private static ReconnectStrategyFactory getTimedReconnectStrategyFactory(final int reconnectTime) {
-        return new ReconnectStrategyFactory() {
-
-            @Override
-            public ReconnectStrategy createReconnectStrategy() {
-                return new TimedReconnectStrategy(GlobalEventExecutor.INSTANCE, RECONNECT_STRATEGY_TIMEOUT, reconnectTime, 1.0, null, null, null);
-            }
-        };
-    }
-
 }
diff --git a/pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/PCCDispatcher.java b/pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/PCCDispatcher.java
deleted file mode 100644 (file)
index 8e48d89..0000000
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.protocol.pcep.pcc.mock;
-
-import com.google.common.base.Preconditions;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ChannelFactory;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ServerChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.util.concurrent.Promise;
-import java.net.InetSocketAddress;
-import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
-import org.opendaylight.protocol.pcep.PCEPDispatcher;
-import org.opendaylight.protocol.pcep.PCEPPeerProposal;
-import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
-import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactory;
-import org.opendaylight.protocol.pcep.impl.PCEPHandlerFactory;
-import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
-import org.opendaylight.protocol.pcep.spi.MessageRegistry;
-import org.opendaylight.tcpmd5.api.DummyKeyAccessFactory;
-import org.opendaylight.tcpmd5.api.KeyAccessFactory;
-import org.opendaylight.tcpmd5.api.KeyMapping;
-import org.opendaylight.tcpmd5.jni.NativeKeyAccessFactory;
-import org.opendaylight.tcpmd5.jni.NativeSupportUnavailableException;
-import org.opendaylight.tcpmd5.netty.MD5ChannelFactory;
-import org.opendaylight.tcpmd5.netty.MD5ChannelOption;
-import org.opendaylight.tcpmd5.netty.MD5NioSocketChannelFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class PCCDispatcher extends AbstractPCCDispatcher implements PCEPDispatcher {
-    private static final Logger LOG = LoggerFactory.getLogger(PCCDispatcher.class);
-    private final PCEPHandlerFactory factory;
-    private final MD5ChannelFactory<?> cf;
-    private final PCEPSessionNegotiatorFactory snf;
-    private final PCEPHandlerFactory hf;
-    private final ChannelFactory<? extends ServerChannel> scf;
-    private InetSocketAddress localAddress;
-    private KeyMapping keys;
-
-    public PCCDispatcher(final MessageRegistry registry, final PCEPSessionNegotiatorFactory negotiatorFactory) {
-        super(new NioEventLoopGroup(), new NioEventLoopGroup());
-        this.snf = Preconditions.checkNotNull(negotiatorFactory);
-        this.factory = new PCEPHandlerFactory(registry);
-        this.cf = new MD5NioSocketChannelFactory(DeafultKeyAccessFactory.getKeyAccessFactory());
-        this.hf = new PCEPHandlerFactory(registry);
-        this.scf = null;
-    }
-
-
-    @Override
-    protected void customizeBootstrap(final Bootstrap b) {
-        if (this.keys != null && !this.keys.isEmpty()) {
-            if (this.cf == null) {
-                throw new UnsupportedOperationException("No key access instance available, cannot use key mapping");
-            }
-
-            b.channelFactory(this.cf);
-            b.option(MD5ChannelOption.TCP_MD5SIG, this.keys);
-        }
-        if (this.localAddress != null) {
-            b.localAddress(this.localAddress);
-        }
-    }
-
-    public synchronized void createClient(final InetSocketAddress localAddress, final InetSocketAddress remoteAddress,
-            final ReconnectStrategyFactory strategyFactory, final PCEPSessionListenerFactory listenerFactory,
-            final PCEPSessionNegotiatorFactory negotiatorFactory, final KeyMapping keys) {
-        this.localAddress = localAddress;
-        this.keys = keys;
-        super.createReconnectingClient(remoteAddress, strategyFactory, new AbstractPCCDispatcher.ChannelPipelineInitializer() {
-            @Override
-            public void initializeChannel(final SocketChannel ch, final Promise<PCEPSessionImpl> promise) {
-                ch.pipeline().addLast(PCCDispatcher.this.factory.getDecoders());
-                ch.pipeline().addLast("negotiator",
-                        negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise, null));
-                ch.pipeline().addLast(PCCDispatcher.this.factory.getEncoders());
-            }
-        });
-        this.localAddress = null;
-        this.keys = null;
-    }
-
-    @Override
-    public synchronized ChannelFuture createServer(final InetSocketAddress address, final PCEPSessionListenerFactory listenerFactory, final PCEPPeerProposal peerProposal) {
-        return createServer(address, null, listenerFactory, peerProposal);
-    }
-
-    @Override
-    public void customizeBootstrap(final ServerBootstrap b) {
-        if (this.keys != null && !this.keys.isEmpty()) {
-            if (this.scf == null) {
-                throw new UnsupportedOperationException("No key access instance available, cannot use key mapping");
-            }
-
-            LOG.debug("Adding MD5 keys {} to boostrap {}", this.keys, b);
-            b.channelFactory(this.scf);
-            b.option(MD5ChannelOption.TCP_MD5SIG, this.keys);
-        }
-
-        // Make sure we are doing round-robin processing
-        b.childOption(ChannelOption.MAX_MESSAGES_PER_READ, 1);
-    }
-
-    @Override
-    public synchronized ChannelFuture createServer(final InetSocketAddress address, final KeyMapping keys,
-                                                   final PCEPSessionListenerFactory listenerFactory, final PCEPPeerProposal peerProposal) {
-        this.keys = keys;
-        final ChannelFuture ret = super.createServer(address, new ChannelPipelineInitializer() {
-            @Override
-            public void initializeChannel(final SocketChannel ch, final Promise<PCEPSessionImpl> promise) {
-                ch.pipeline().addLast(PCCDispatcher.this.hf.getDecoders());
-                ch.pipeline().addLast("negotiator", PCCDispatcher.this.snf.getSessionNegotiator(listenerFactory, ch, promise, peerProposal));
-                ch.pipeline().addLast(PCCDispatcher.this.hf.getEncoders());
-            }
-        });
-
-        this.keys = null;
-        return ret;
-    }
-
-    private static final class DeafultKeyAccessFactory {
-        private static final Logger LOG = LoggerFactory.getLogger(DeafultKeyAccessFactory.class);
-        private static final KeyAccessFactory FACTORY;
-
-        static {
-            KeyAccessFactory factory;
-
-            try {
-                factory = NativeKeyAccessFactory.getInstance();
-            } catch (final NativeSupportUnavailableException e) {
-                LOG.debug("Native key access not available, using no-op fallback", e);
-                factory = DummyKeyAccessFactory.getInstance();
-            }
-
-            FACTORY = factory;
-        }
-
-        private DeafultKeyAccessFactory() {
-            throw new UnsupportedOperationException("Utility class should never be instantiated");
-        }
-
-        public static KeyAccessFactory getKeyAccessFactory() {
-            return FACTORY;
-        }
-    }
-}
diff --git a/pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/PCCPReconnectPromise.java b/pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/PCCPReconnectPromise.java
deleted file mode 100644 (file)
index b7cc923..0000000
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.protocol.pcep.pcc.mock;
-
-import com.google.common.base.Preconditions;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
-import java.net.InetSocketAddress;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
-import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PCCPReconnectPromise extends DefaultPromise<Void> {
-    private static final Logger LOG = LoggerFactory.getLogger(PCCPReconnectPromise.class);
-    private final AbstractPCCDispatcher dispatcher;
-    private final InetSocketAddress address;
-    private final ReconnectStrategyFactory strategyFactory;
-    private final Bootstrap b;
-    private final AbstractPCCDispatcher.ChannelPipelineInitializer initializer;
-    private Future<?> pending;
-
-    public PCCPReconnectPromise(final EventExecutor executor, final AbstractPCCDispatcher dispatcher, final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
-                                final Bootstrap b, final AbstractPCCDispatcher.ChannelPipelineInitializer initializer) {
-        super(executor);
-        this.b = b;
-        this.initializer = Preconditions.checkNotNull(initializer);
-        this.dispatcher = Preconditions.checkNotNull(dispatcher);
-        this.address = Preconditions.checkNotNull(address);
-        this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
-    }
-
-    synchronized void connect() {
-        ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
-        this.pending = this.dispatcher.createClient(this.address, cs, this.b, new AbstractPCCDispatcher.ChannelPipelineInitializer() {
-            @Override
-            public void initializeChannel(final SocketChannel channel, final Promise<PCEPSessionImpl> promise) {
-                PCCPReconnectPromise.this.initializer.initializeChannel(channel, promise);
-                channel.pipeline().addLast(new ClosedChannelHandler(PCCPReconnectPromise.this));
-            }
-        });
-        this.pending.addListener(new GenericFutureListener<Future<Object>>() {
-            @Override
-            public void operationComplete(final Future<Object> future) throws Exception {
-                if (!future.isSuccess()) {
-                    PCCPReconnectPromise.this.setFailure(future.cause());
-                }
-
-            }
-        });
-    }
-
-    private boolean isInitialConnectFinished() {
-        Preconditions.checkNotNull(this.pending);
-        return this.pending.isDone() && this.pending.isSuccess();
-    }
-
-    @Override
-    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
-        if (super.cancel(mayInterruptIfRunning)) {
-            Preconditions.checkNotNull(this.pending);
-            this.pending.cancel(mayInterruptIfRunning);
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
-        private final PCCPReconnectPromise promise;
-
-        public ClosedChannelHandler(PCCPReconnectPromise promise) {
-            this.promise = promise;
-        }
-
-        @Override
-        public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
-            if (!this.promise.isCancelled()) {
-                if (!this.promise.isInitialConnectFinished()) {
-                    PCCPReconnectPromise.LOG.debug("Connection to {} was dropped during negotiation, reattempting", this.promise.address);
-                }
-
-                PCCPReconnectPromise.LOG.debug("Reconnecting after connection to {} was dropped", this.promise.address);
-                this.promise.connect();
-            }
-        }
-    }
-}
-
diff --git a/pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/PCCProtocolSessionPromise.java b/pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/PCCProtocolSessionPromise.java
deleted file mode 100644 (file)
index ac573d9..0000000
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.protocol.pcep.pcc.mock;
-
-import com.google.common.base.Preconditions;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelOption;
-import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-import io.netty.util.concurrent.Promise;
-import java.net.InetSocketAddress;
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.ThreadSafe;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
-import org.opendaylight.protocol.pcep.PCEPSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@ThreadSafe
-public class PCCProtocolSessionPromise<S extends PCEPSession> extends DefaultPromise<S> {
-    private static final Logger LOG = LoggerFactory.getLogger(PCCProtocolSessionPromise.class);
-    private final ReconnectStrategy strategy;
-    private final Bootstrap b;
-    private InetSocketAddress address;
-    @GuardedBy("this")
-    private Future<?> pending;
-
-    PCCProtocolSessionPromise(final EventExecutor executor, final InetSocketAddress address,
-                              final ReconnectStrategy strategy, final Bootstrap b) {
-        super(executor);
-        this.strategy = Preconditions.checkNotNull(strategy);
-        this.address = Preconditions.checkNotNull(address);
-        this.b = Preconditions.checkNotNull(b);
-    }
-
-    synchronized void connect() {
-        final PCCProtocolSessionPromise lock = this;
-
-        try {
-            final int e = this.strategy.getConnectTimeout();
-            LOG.debug("Promise {} attempting connect for {}ms", lock, Integer.valueOf(e));
-            if (this.address.isUnresolved()) {
-                this.address = new InetSocketAddress(this.address.getHostName(), this.address.getPort());
-            }
-
-            this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, e);
-            final ChannelFuture connectFuture = this.b.connect(this.address);
-            connectFuture.addListener(new PCCProtocolSessionPromise.BootstrapConnectListener(lock));
-            this.pending = connectFuture;
-        } catch (final Exception e) {
-            LOG.info("Failed to connect to {}", this.address, e);
-            this.setFailure(e);
-        }
-
-    }
-
-    @Override
-    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
-        if (super.cancel(mayInterruptIfRunning)) {
-            this.pending.cancel(mayInterruptIfRunning);
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public synchronized Promise<S> setSuccess(final S result) {
-        LOG.debug("Promise {} completed", this);
-        this.strategy.reconnectSuccessful();
-        return super.setSuccess(result);
-    }
-
-    private class BootstrapConnectListener implements ChannelFutureListener {
-        private final Object lock;
-
-        public BootstrapConnectListener(final Object lock) {
-            this.lock = lock;
-        }
-
-        @Override
-        public void operationComplete(final ChannelFuture cf) throws Exception {
-            synchronized (this.lock) {
-                PCCProtocolSessionPromise.LOG.debug("Promise {} connection resolved", this.lock);
-                Preconditions.checkState(PCCProtocolSessionPromise.this.pending.equals(cf));
-                if (PCCProtocolSessionPromise.this.isCancelled()) {
-                    if (cf.isSuccess()) {
-                        PCCProtocolSessionPromise.LOG.debug("Closing channel for cancelled promise {}", this.lock);
-                        cf.channel().close();
-                    }
-
-                } else if (cf.isSuccess()) {
-                    PCCProtocolSessionPromise.LOG.debug("Promise {} connection successful", this.lock);
-                } else {
-                    PCCProtocolSessionPromise.LOG.debug("Attempt to connect to {} failed", PCCProtocolSessionPromise.this.address, cf.cause());
-                    final Future rf = PCCProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
-                    rf.addListener(new PCCProtocolSessionPromise.BootstrapConnectListener.ReconnectingStrategyListener());
-                    PCCProtocolSessionPromise.this.pending = rf;
-                }
-            }
-        }
-
-        private final class ReconnectingStrategyListener implements FutureListener<Void> {
-            private ReconnectingStrategyListener() {
-            }
-
-            @Override
-            public void operationComplete(final Future<Void> sf) {
-                synchronized (BootstrapConnectListener.this.lock) {
-                    Preconditions.checkState(PCCProtocolSessionPromise.this.pending.equals(sf));
-                    if (!PCCProtocolSessionPromise.this.isCancelled()) {
-                        if (sf.isSuccess()) {
-                            PCCProtocolSessionPromise.this.connect();
-                        } else {
-                            PCCProtocolSessionPromise.this.setFailure(sf.cause());
-                        }
-                    }
-
-                }
-            }
-        }
-    }
-}
diff --git a/pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/PccDispatcherImpl.java b/pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/PccDispatcherImpl.java
new file mode 100644 (file)
index 0000000..5f51934
--- /dev/null
@@ -0,0 +1,170 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.pcep.pcc.mock;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.protocol.framework.NeverReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
+import org.opendaylight.protocol.framework.TimedReconnectStrategy;
+import org.opendaylight.protocol.pcep.PCEPSession;
+import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
+import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactory;
+import org.opendaylight.protocol.pcep.impl.PCEPHandlerFactory;
+import org.opendaylight.protocol.pcep.pcc.mock.api.PccDispatcher;
+import org.opendaylight.protocol.pcep.spi.MessageRegistry;
+import org.opendaylight.tcpmd5.api.DummyKeyAccessFactory;
+import org.opendaylight.tcpmd5.api.KeyAccessFactory;
+import org.opendaylight.tcpmd5.api.KeyMapping;
+import org.opendaylight.tcpmd5.jni.NativeKeyAccessFactory;
+import org.opendaylight.tcpmd5.jni.NativeSupportUnavailableException;
+import org.opendaylight.tcpmd5.netty.MD5ChannelFactory;
+import org.opendaylight.tcpmd5.netty.MD5ChannelOption;
+import org.opendaylight.tcpmd5.netty.MD5NioSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class PccDispatcherImpl implements PccDispatcher, AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PccDispatcherImpl.class);
+
+    private static final int RECONNECT_STRATEGY_TIMEOUT = 2000;
+
+    private final PCEPHandlerFactory factory;
+    private final MD5ChannelFactory<?> cf;
+    private final NioEventLoopGroup workerGroup;
+
+    public PccDispatcherImpl(final MessageRegistry registry) {
+        this.workerGroup = new NioEventLoopGroup();
+        this.factory = new PCEPHandlerFactory(registry);
+        this.cf = new MD5NioSocketChannelFactory(DeafultKeyAccessFactory.getKeyAccessFactory());
+    }
+
+    private static final class DeafultKeyAccessFactory {
+        private static final Logger LOG = LoggerFactory.getLogger(DeafultKeyAccessFactory.class);
+        private static final KeyAccessFactory FACTORY;
+
+        static {
+            KeyAccessFactory factory;
+
+            try {
+                factory = NativeKeyAccessFactory.getInstance();
+            } catch (final NativeSupportUnavailableException e) {
+                LOG.debug("Native key access not available, using no-op fallback", e);
+                factory = DummyKeyAccessFactory.getInstance();
+            }
+
+            FACTORY = factory;
+        }
+
+        private DeafultKeyAccessFactory() {
+            throw new UnsupportedOperationException("Utility class should never be instantiated");
+        }
+
+        public static KeyAccessFactory getKeyAccessFactory() {
+            return FACTORY;
+        }
+    }
+
+    @Override
+    public Future<PCEPSession> createClient(
+            final InetSocketAddress remoteAddress, final long reconnectTime, final PCEPSessionListenerFactory listenerFactory,
+            final PCEPSessionNegotiatorFactory negotiatorFactory, final KeyMapping keys, final InetSocketAddress localAddress) {
+        final Bootstrap b = new Bootstrap();
+        b.group(this.workerGroup);
+        b.localAddress(localAddress);
+        setChannelFactory(b, keys);
+        b.option(ChannelOption.SO_KEEPALIVE, true);
+        b.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
+        final PccReconnectPromise promise = new PccReconnectPromise(remoteAddress,
+                reconnectTime == -1 ? getNeverReconnectStrategyFactory() : getTimedReconnectStrategyFactory(reconnectTime), b);
+        final ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() {
+            @Override
+            protected void initChannel(final SocketChannel ch) throws Exception {
+                ch.pipeline().addLast(PccDispatcherImpl.this.factory.getDecoders());
+                ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise, null));
+                ch.pipeline().addLast(PccDispatcherImpl.this.factory.getEncoders());
+                ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
+                    @Override
+                    public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+                        if (promise.isCancelled()) {
+                            return;
+                        }
+
+                        if (!promise.isInitialConnectFinished()) {
+                            LOG.debug("Connection to {} was dropped during negotiation, reattempting", remoteAddress);
+                        }
+                        LOG.debug("Reconnecting after connection to {} was dropped", remoteAddress);
+                        PccDispatcherImpl.this.createClient(remoteAddress, reconnectTime, listenerFactory, negotiatorFactory,
+                                keys, localAddress);
+                    }
+                });
+            }
+        };
+        b.handler(channelInitializer);
+        promise.connect();
+        return promise;
+    }
+
+    private void setChannelFactory(final Bootstrap bootstrap, final KeyMapping keys) {
+        if (keys != null && !keys.isEmpty()) {
+            if (this.cf == null) {
+                throw new UnsupportedOperationException("No key access instance available, cannot use key mapping");
+            }
+
+            LOG.debug("Adding MD5 keys {} to boostrap {}", keys, bootstrap);
+            bootstrap.channelFactory(this.cf);
+            bootstrap.option(MD5ChannelOption.TCP_MD5SIG, keys);
+        } else {
+            bootstrap.channel(NioSocketChannel.class);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            this.workerGroup.shutdownGracefully().get();
+        } catch (final InterruptedException | ExecutionException e) {
+            LOG.warn("Failed to properly close dispatcher.", e);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private static ReconnectStrategyFactory getNeverReconnectStrategyFactory() {
+        return new ReconnectStrategyFactory() {
+
+            @Override
+            public ReconnectStrategy createReconnectStrategy() {
+                return new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, RECONNECT_STRATEGY_TIMEOUT);
+            }
+        };
+    }
+
+    @SuppressWarnings("deprecation")
+    private static ReconnectStrategyFactory getTimedReconnectStrategyFactory(final long reconnectTime) {
+        return new ReconnectStrategyFactory() {
+
+            @Override
+            public ReconnectStrategy createReconnectStrategy() {
+                return new TimedReconnectStrategy(GlobalEventExecutor.INSTANCE, RECONNECT_STRATEGY_TIMEOUT, reconnectTime, 1.0, null, null, null);
+            }
+        };
+    }
+}
diff --git a/pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/PccReconnectPromise.java b/pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/PccReconnectPromise.java
new file mode 100644 (file)
index 0000000..9232156
--- /dev/null
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.pcep.pcc.mock;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
+import java.net.InetSocketAddress;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
+import org.opendaylight.protocol.pcep.PCEPSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PccReconnectPromise extends DefaultPromise<PCEPSession> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PccReconnectPromise.class);
+
+    private final InetSocketAddress address;
+    private final Bootstrap b;
+    private final ReconnectStrategy reconnectStrategy;
+
+    @GuardedBy("this")
+    private Future<?> pending;
+
+    public PccReconnectPromise(final InetSocketAddress address, final ReconnectStrategyFactory rsf, final Bootstrap b) {
+        this.address = address;
+        this.b = b;
+        this.reconnectStrategy = rsf.createReconnectStrategy();
+    }
+
+    public synchronized void connect() {
+        try {
+            this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.reconnectStrategy.getConnectTimeout());
+            final ChannelFuture cf = this.b.connect(this.address);
+            cf.addListener(new BootstrapConnectListener(PccReconnectPromise.this));
+            this.pending = cf;
+        } catch (final Exception e) {
+            LOG.info("Failed to connect to {}", this.address, e);
+            this.setFailure(e);
+        }
+    }
+
+    @Override
+    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+        if (super.cancel(mayInterruptIfRunning)) {
+            this.pending.cancel(mayInterruptIfRunning);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public synchronized Promise<PCEPSession> setSuccess(final PCEPSession result) {
+        LOG.debug("Promise {} completed", this);
+        this.reconnectStrategy.reconnectSuccessful();
+        return super.setSuccess(result);
+    }
+
+    protected boolean isInitialConnectFinished() {
+        Preconditions.checkNotNull(this.pending);
+        return this.pending.isDone() && this.pending.isSuccess();
+    }
+
+    private final class BootstrapConnectListener implements ChannelFutureListener {
+
+        private final Object lock;
+
+        public BootstrapConnectListener(final Object lock) {
+            this.lock = lock;
+        }
+
+        @Override
+        public void operationComplete(final ChannelFuture cf) throws Exception {
+
+            synchronized (this.lock) {
+                if (PccReconnectPromise.this.isCancelled()) {
+                    if (cf.isSuccess()) {
+                        PccReconnectPromise.LOG.debug("Closing channels for cancelled promise {}");
+                        cf.channel().close();
+                    }
+                } else if (cf.isSuccess()) {
+                    PccReconnectPromise.LOG.debug("Promise connection is successful.");
+                } else {
+                    PccReconnectPromise.LOG.debug("Attempt to reconnect using reconnect strategy ...");
+                    final Future<Void> rf = PccReconnectPromise.this.reconnectStrategy.scheduleReconnect(cf.cause());
+                    rf.addListener(new PccReconnectPromise.BootstrapConnectListener.ReconnectStrategyListener());
+                }
+            }
+        }
+
+        private final class ReconnectStrategyListener implements FutureListener<Void> {
+
+            @Override
+            public void operationComplete(final Future<Void> f ) {
+                synchronized (BootstrapConnectListener.this.lock) {
+                    if (!PccReconnectPromise.this.isCancelled()) {
+                        if (f.isSuccess()) {
+                            PccReconnectPromise.LOG.debug("ReconnectStrategy has scheduled a retry.");
+                            PccReconnectPromise.this.connect();
+                        } else {
+                            PccReconnectPromise.LOG.debug("ReconnectStrategy has failed. No attempts will be made.");
+                            PccReconnectPromise.this.setFailure(f.cause());
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/api/PccDispatcher.java b/pcep/pcc-mock/src/main/java/org/opendaylight/protocol/pcep/pcc/mock/api/PccDispatcher.java
new file mode 100644 (file)
index 0000000..263b21e
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.pcep.pcc.mock.api;
+
+import io.netty.util.concurrent.Future;
+import java.net.InetSocketAddress;
+import org.opendaylight.protocol.pcep.PCEPSession;
+import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
+import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactory;
+import org.opendaylight.tcpmd5.api.KeyMapping;
+
+public interface PccDispatcher {
+
+    Future<PCEPSession> createClient(final InetSocketAddress remoteAddress,
+            final long reconnectTime, final PCEPSessionListenerFactory listenerFactory,
+            final PCEPSessionNegotiatorFactory<? extends PCEPSession> negotiatorFactory, final KeyMapping keys,
+            final InetSocketAddress localAddress);
+}
index b7db74fa40a389d1e13b11672c3b2e934ce9ce44..faec6d6d6c2c57f6c678bfdd9ba59c0bb1fafd96 100644 (file)
@@ -15,8 +15,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import org.junit.Assert;
 import org.junit.Before;
@@ -29,6 +28,8 @@ import org.opendaylight.protocol.pcep.impl.BasePCEPSessionProposalFactory;
 import org.opendaylight.protocol.pcep.impl.DefaultPCEPSessionNegotiatorFactory;
 import org.opendaylight.protocol.pcep.impl.PCEPDispatcherImpl;
 import org.opendaylight.protocol.pcep.spi.pojo.ServiceLoaderPCEPExtensionProviderContext;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.crabbe.initiated.rev131126.Stateful1;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.ietf.stateful.rev131222.Tlvs1;
 
 public class PCCMockTest {
 
@@ -43,14 +44,15 @@ public class PCCMockTest {
     private static final InetSocketAddress SERVER_ADDRESS4 = new InetSocketAddress(REMOTE_ADDRESS4, 4189);
     private static final String LOCAL_ADDRESS = "127.0.0.1";
     private static final String LOCAL_ADDRESS2 = "127.0.0.2";
+    private static final PCEPSessionProposalFactory PROPOSAL = new BasePCEPSessionProposalFactory(DEAD_TIMER, KEEP_ALIVE,
+            Collections.<PCEPCapability>emptyList());
 
     private PCEPDispatcher pceDispatcher;
-    private static final List<PCEPCapability> CAPS = new ArrayList<>();
-    private static final PCEPSessionProposalFactory PROPOSAL = new BasePCEPSessionProposalFactory(DEAD_TIMER, KEEP_ALIVE, CAPS);
 
     @Before
     public void setUp() {
-        final DefaultPCEPSessionNegotiatorFactory nf = new DefaultPCEPSessionNegotiatorFactory(PROPOSAL, 0);
+        final DefaultPCEPSessionNegotiatorFactory nf = new DefaultPCEPSessionNegotiatorFactory(
+                PROPOSAL, 0);
         this.pceDispatcher = new PCEPDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry(),
                 nf, new NioEventLoopGroup(), new NioEventLoopGroup());
     }
@@ -60,7 +62,7 @@ public class PCCMockTest {
         final TestingSessionListenerFactory factory = new TestingSessionListenerFactory();
         final Channel channel = this.pceDispatcher.createServer(new InetSocketAddress(REMOTE_ADDRESS, 4567), factory, null).channel();
         Main.main(new String[] {"--local-address", LOCAL_ADDRESS, "--remote-address", REMOTE_ADDRESS + ":4567", "--pcc", "1", "--lsp", "3",
-            "--log-level", "DEBUG", "-ka", "10", "-d", "40"});
+            "--log-level", "DEBUG", "-ka", "10", "-d", "40", "--reconnect", "-1", "--redelegation-timeout", "0", "--state-timeout", "-1"});
         Thread.sleep(1000);
         final TestingSessionListener sessionListener = factory.getSessionListenerByRemoteAddress(InetAddresses.forString(LOCAL_ADDRESS));
         Assert.assertTrue(sessionListener.isUp());
@@ -70,7 +72,7 @@ public class PCCMockTest {
         Assert.assertNotNull(session);
         Assert.assertEquals(40, session.getPeerPref().getDeadtimer().shortValue());
         Assert.assertEquals(10, session.getPeerPref().getKeepalive().shortValue());
-//        Assert.assertTrue(session.getRemoteTlvs().getAugmentation(Tlvs1.class).getStateful().getAugmentation(Stateful1.class).isInitiation());
+        Assert.assertTrue(session.getRemoteTlvs().getAugmentation(Tlvs1.class).getStateful().getAugmentation(Stateful1.class).isInitiation());
         channel.close().get();
     }
 
diff --git a/pcep/pcc-mock/src/test/java/org/opendaylight/protocol/pcep/pcc/mock/PccDispatcherImplTest.java b/pcep/pcc-mock/src/test/java/org/opendaylight/protocol/pcep/pcc/mock/PccDispatcherImplTest.java
new file mode 100644 (file)
index 0000000..ee658f8
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.protocol.pcep.pcc.mock;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.protocol.pcep.PCEPCapability;
+import org.opendaylight.protocol.pcep.PCEPDispatcher;
+import org.opendaylight.protocol.pcep.PCEPSession;
+import org.opendaylight.protocol.pcep.PCEPSessionProposalFactory;
+import org.opendaylight.protocol.pcep.impl.BasePCEPSessionProposalFactory;
+import org.opendaylight.protocol.pcep.impl.DefaultPCEPSessionNegotiatorFactory;
+import org.opendaylight.protocol.pcep.impl.PCEPDispatcherImpl;
+import org.opendaylight.protocol.pcep.spi.pojo.ServiceLoaderPCEPExtensionProviderContext;
+
+public class PccDispatcherImplTest {
+
+    private static final List<PCEPCapability> CAPS = new ArrayList<>();
+    private static final PCEPSessionProposalFactory PROPOSAL = new BasePCEPSessionProposalFactory(30, 120, CAPS);
+
+    private PccDispatcherImpl dispatcher;
+    private final DefaultPCEPSessionNegotiatorFactory nf = new DefaultPCEPSessionNegotiatorFactory(PROPOSAL, 0);
+    private PCEPDispatcher pcepDispatcher;
+    private InetSocketAddress serverAddress;
+    private InetSocketAddress clientAddress;
+    private final Random random = new Random();
+    private EventLoopGroup workerGroup;
+    private EventLoopGroup bossGroup;
+
+    @Before
+    public void setUp() {
+        this.workerGroup = new NioEventLoopGroup();
+        this.bossGroup = new NioEventLoopGroup();
+        this.dispatcher = new PccDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry());
+        this.pcepDispatcher = new PCEPDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry(),
+                this.nf, this.bossGroup, this.workerGroup);
+        this.serverAddress = new InetSocketAddress("127.0.5.0", getRandomPort());
+        this.clientAddress = new InetSocketAddress("127.0.4.0", getRandomPort());
+    }
+
+    @After
+    public void tearDown() throws InterruptedException, ExecutionException {
+        this.dispatcher.close();
+        this.workerGroup.shutdownGracefully().get();
+        this.bossGroup.shutdownGracefully().get();
+    }
+
+    @Test
+    public void testClientReconnect() throws Exception {
+        final Future<PCEPSession> futureSession = this.dispatcher.createClient(this.serverAddress, 500, new TestingSessionListenerFactory(),
+                this.nf, null, this.clientAddress);
+
+        final TestingSessionListenerFactory slf = new TestingSessionListenerFactory();
+        final Channel channel = this.pcepDispatcher.createServer(this.serverAddress, slf, null).channel();
+        Assert.assertNotNull(futureSession.get());
+        final TestingSessionListener sl = slf.getSessionListenerByRemoteAddress(this.clientAddress.getAddress());
+        Assert.assertNotNull(sl);
+        Assert.assertTrue(sl.isUp());
+
+        channel.close().get();
+        this.workerGroup.shutdownGracefully().get();
+        this.bossGroup.shutdownGracefully().get();
+
+        this.workerGroup = new NioEventLoopGroup();
+        this.bossGroup = new NioEventLoopGroup();
+        this.pcepDispatcher = new PCEPDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry(),
+                this.nf, this.bossGroup, this.workerGroup);
+
+        final TestingSessionListenerFactory slf2 = new TestingSessionListenerFactory();
+        this.pcepDispatcher.createServer(this.serverAddress, slf2, null).channel();
+        Thread.sleep(2000);
+
+        final TestingSessionListener sl2 = slf2.getSessionListenerByRemoteAddress(this.clientAddress.getAddress());
+        Assert.assertNotNull(sl2);
+        Assert.assertTrue(sl2.isUp());
+    }
+
+    private int getRandomPort() {
+        return this.random.nextInt(this.random.nextInt(60000) + 1024);
+    }
+
+}
index 2adf54a68e98877158cac06d3667702cc34ddde4..75c50f2354aa49d4dba0939c9c734f715a8e920b 100644 (file)
@@ -19,7 +19,7 @@ import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
 public class TestingSessionListenerFactory implements PCEPSessionListenerFactory {
 
     @GuardedBy("this")
-    private List<TestingSessionListener> sessionListeners = new ArrayList<>();
+    private final List<TestingSessionListener> sessionListeners = new ArrayList<>();
 
     @Override
     public PCEPSessionListener getSessionListener() {
index 2b8ef0ccdc8255e652ae2d0eeec37c0865553e7e..6989e1c11ee33ee547afe6d4747d85cbdf17f82d 100644 (file)
@@ -7,18 +7,10 @@
  */
 package org.opendaylight.protocol.pcep.testtool;
 
-import com.google.common.base.Preconditions;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GlobalEventExecutor;
-import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
-import org.opendaylight.protocol.framework.NeverReconnectStrategy;
-import org.opendaylight.protocol.framework.ReconnectStrategy;
+import java.util.concurrent.ExecutionException;
 import org.opendaylight.protocol.pcep.PCEPCapability;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
 import org.opendaylight.protocol.pcep.PCEPSessionListenerFactory;
@@ -26,48 +18,23 @@ import org.opendaylight.protocol.pcep.PCEPSessionNegotiatorFactory;
 import org.opendaylight.protocol.pcep.PCEPSessionProposalFactory;
 import org.opendaylight.protocol.pcep.impl.BasePCEPSessionProposalFactory;
 import org.opendaylight.protocol.pcep.impl.DefaultPCEPSessionNegotiatorFactory;
-import org.opendaylight.protocol.pcep.impl.PCEPHandlerFactory;
-import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
-import org.opendaylight.protocol.pcep.pcc.mock.AbstractPCCDispatcher;
+import org.opendaylight.protocol.pcep.pcc.mock.PccDispatcherImpl;
 import org.opendaylight.protocol.pcep.spi.pojo.ServiceLoaderPCEPExtensionProviderContext;
 
-public class PCCMock extends AbstractPCCDispatcher {
+public class PCCMock {
 
-    private final PCEPSessionNegotiatorFactory negotiatorFactory;
-    private final PCEPHandlerFactory factory;
-
-    public PCCMock(final PCEPSessionNegotiatorFactory negotiatorFactory, final PCEPHandlerFactory factory,
-            final DefaultPromise<PCEPSessionImpl> defaultPromise) {
-        super(new NioEventLoopGroup(), new NioEventLoopGroup());
-        this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
-        this.factory = Preconditions.checkNotNull(factory);
-    }
-
-    public Future<PCEPSessionImpl> createClient(final InetSocketAddress address, final ReconnectStrategy strategy,
-            final PCEPSessionListenerFactory listenerFactory) {
-        return super.createClient(address, strategy, new ChannelPipelineInitializer() {
-            @Override
-            public void initializeChannel(final SocketChannel ch, final Promise<PCEPSessionImpl> promise) {
-                ch.pipeline().addLast(PCCMock.this.factory.getDecoders());
-                ch.pipeline().addLast("negotiator", PCCMock.this.negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise, null));
-                ch.pipeline().addLast(PCCMock.this.factory.getEncoders());
-            }
-        });
-    }
-
-    public static void main(final String[] args) throws Exception {
+    public static void main(final String[] args) throws InterruptedException, ExecutionException {
         final List<PCEPCapability> caps = new ArrayList<>();
         final PCEPSessionProposalFactory proposal = new BasePCEPSessionProposalFactory((short) 120, (short) 30, caps);
         final PCEPSessionNegotiatorFactory snf = new DefaultPCEPSessionNegotiatorFactory(proposal, 0);
 
-        final PCCMock pcc = new PCCMock(snf, new PCEPHandlerFactory(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry()), new DefaultPromise<PCEPSessionImpl>(GlobalEventExecutor.INSTANCE));
-
-        pcc.createClient(new InetSocketAddress("127.0.0.3", 12345), new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 2000),
-                new PCEPSessionListenerFactory() {
-                    @Override
-                    public PCEPSessionListener getSessionListener() {
-                        return new SimpleSessionListener();
-                    }
-                }).get();
+        try (final PccDispatcherImpl pccDispatcher = new PccDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry())) {
+            pccDispatcher.createClient(new InetSocketAddress("127.0.0.3", 12345), -1, new PCEPSessionListenerFactory() {
+                @Override
+                public PCEPSessionListener getSessionListener() {
+                    return new SimpleSessionListener();
+                }
+            }, snf, null, new InetSocketAddress("127.0.0.1", 12345)).get();
+        }
     }
 }
index fc8ecbe8c1416ac79f6088e7ae8efe6642123913..15a8651bae262f393b610df1b4eb002f0816b8f7 100644 (file)
@@ -24,7 +24,7 @@ public class PCEPTestingToolTest {
         try {
             Main.main(new String[]{"-a", "127.0.0.3:12345", "-ka", "10", "-d", "0", "--stateful", "--active", "--instant"});
             PCCMock.main(new String[0]);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             Assert.fail();
         }
     }