/* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.protocol.framework; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; import java.net.InetSocketAddress; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; @ThreadSafe final class ProtocolSessionPromise, L extends SessionListener> extends DefaultPromise { private static final Logger logger = LoggerFactory.getLogger(ProtocolSessionPromise.class); private final ChannelInitializerImpl init; private final ReconnectStrategy strategy; private final InetSocketAddress address; private final Bootstrap b; @GuardedBy("this") private Future pending; ProtocolSessionPromise(final EventLoopGroup workerGroup, final InetSocketAddress address, final SessionNegotiatorFactory negotiatorFactory, final SessionListenerFactory listenerFactory, final ProtocolHandlerFactory protocolFactory, final ReconnectStrategy strategy) { this.strategy = Preconditions.checkNotNull(strategy); this.address = Preconditions.checkNotNull(address); init = new ChannelInitializerImpl(negotiatorFactory, listenerFactory, protocolFactory, this); b = new Bootstrap(); b.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(init); } synchronized void connect() { final Object lock = this; try { final int timeout = strategy.getConnectTimeout(); logger.debug("Promise {} attempting connect for {}ms", lock, timeout); b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout); pending = b.connect(address).addListener(new ChannelFutureListener() { @Override public void operationComplete(final ChannelFuture cf) throws Exception { synchronized (lock) { logger.debug("Promise {} connection resolved", lock); // Triggered when a connection attempt is resolved. Preconditions.checkState(pending == cf); /* * The promise we gave out could have been cancelled, * which cascades to the connect getting cancelled, * but there is a slight race window, where the connect * is already resolved, but the listener has not yet * been notified -- cancellation at that point won't * stop the notification arriving, so we have to close * the race here. */ if (isCancelled()) { if (cf.isSuccess()) { logger.debug("Closing channel for cancelled promise {}", lock); cf.channel().close(); } return; } if (!cf.isSuccess()) { final Future rf = strategy.scheduleReconnect(cf.cause()); rf.addListener(new FutureListener() { @Override public void operationComplete(final Future sf) { synchronized (lock) { // Triggered when a connection attempt is to be made. Preconditions.checkState(pending == sf); /* * The promise we gave out could have been cancelled, * which cascades to the reconnect attempt getting * cancelled, but there is a slight race window, where * the reconnect attempt is already enqueued, but the * listener has not yet been notified -- if cancellation * happens at that point, we need to catch it here. */ if (!isCancelled()) { if (sf.isSuccess()) { connect(); } else { setFailure(sf.cause()); } } } } }); pending = rf; } else { logger.debug("Promise {} connection successful", lock); } } } }); } catch (Exception e) { setFailure(e); } } @Override public synchronized boolean cancel(final boolean mayInterruptIfRunning) { if (super.cancel(mayInterruptIfRunning)) { pending.cancel(mayInterruptIfRunning); return true; } return false; } @Override public synchronized Promise setSuccess(final S result) { logger.debug("Promise {} completed", this); strategy.reconnectSuccessful(); return super.setSuccess(result); } }