/* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.protocol.framework; import io.netty.channel.ChannelFuture; import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; import java.io.Closeable; import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicBoolean; import org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer; import com.google.common.base.Preconditions; final class ReconnectPromise, L extends SessionListener> extends DefaultPromise { private final AbstractDispatcher dispatcher; private final InetSocketAddress address; private final ReconnectStrategyFactory strategyFactory; private final ReconnectStrategy strategy; private final PipelineInitializer initializer; private Future pending; private final AtomicBoolean negotiationFinished = new AtomicBoolean(false); public ReconnectPromise(final EventExecutor executor, final AbstractDispatcher dispatcher, final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy, final PipelineInitializer initializer) { super(executor); this.dispatcher = Preconditions.checkNotNull(dispatcher); this.address = Preconditions.checkNotNull(address); this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory); this.strategy = Preconditions.checkNotNull(reestablishStrategy); this.initializer = Preconditions.checkNotNull(initializer); } // FIXME: BUG-190: refactor synchronized void connect() { negotiationFinished.set(false); final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy(); final ReconnectStrategy rs = new ReconnectStrategy() { @Override public Future scheduleReconnect(final Throwable cause) { return cs.scheduleReconnect(cause); } @Override public void reconnectSuccessful() { cs.reconnectSuccessful(); } @Override public int getConnectTimeout() throws Exception { final int cst = cs.getConnectTimeout(); final int rst = ReconnectPromise.this.strategy.getConnectTimeout(); if (cst == 0) { return rst; } if (rst == 0) { return cst; } return Math.min(cst, rst); } }; final Future cf = this.dispatcher.createClient(this.address, rs, new PipelineInitializer() { @Override public void initializeChannel(final SocketChannel channel, final Promise promise) { addChannelClosedListener(channel.closeFuture()); initializer.initializeChannel(channel, promise); } }); final Object lock = this; this.pending = cf; cf.addListener(new FutureListener() { @Override public void operationComplete(final Future future) { synchronized (lock) { if (!future.isSuccess()) { final Future rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause()); if(rf == null) { // This should reflect: no more reconnecting strategies, enough // Currently all reconnect strategies fail with exception, should return null return; } ReconnectPromise.this.pending = rf; rf.addListener(new FutureListener() { @Override public void operationComplete(final Future sf) { synchronized (lock) { /* * The promise we gave out could have been cancelled, * which cascades to the reconnect attempt getting * cancelled, but there is a slight race window, where * the reconnect attempt is already enqueued, but the * listener has not yet been notified -- if cancellation * happens at that point, we need to catch it here. */ if (!isCancelled()) { if (sf.isSuccess()) { connect(); } else { setFailure(sf.cause()); } } } } }); } else { /* * FIXME: BUG-190: we have a slight race window with cancellation * here. Analyze and define its semantics. */ ReconnectPromise.this.strategy.reconnectSuccessful(); negotiationFinished.set(true); } } } }); } private final ClosedChannelListener closedChannelListener = new ClosedChannelListener(); class ClosedChannelListener implements Closeable, FutureListener { private final AtomicBoolean stop = new AtomicBoolean(false); @Override public void operationComplete(final Future future) throws Exception { if (stop.get()) { return; } // Start reconnecting crashed session after negotiation was successful if (!negotiationFinished.get()) { return; } connect(); } @Override public void close() { this.stop.set(true); } } private void addChannelClosedListener(final ChannelFuture channelFuture) { channelFuture.addListener(closedChannelListener); } @Override public synchronized boolean cancel(final boolean mayInterruptIfRunning) { closedChannelListener.close(); if (super.cancel(mayInterruptIfRunning)) { this.pending.cancel(mayInterruptIfRunning); return true; } return false; } }