2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.protocol.framework;
10 import io.netty.channel.ChannelFuture;
11 import io.netty.channel.socket.SocketChannel;
12 import io.netty.util.concurrent.DefaultPromise;
13 import io.netty.util.concurrent.EventExecutor;
14 import io.netty.util.concurrent.Future;
15 import io.netty.util.concurrent.FutureListener;
16 import io.netty.util.concurrent.Promise;
18 import java.io.Closeable;
19 import java.net.InetSocketAddress;
20 import java.util.concurrent.atomic.AtomicBoolean;
22 import org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer;
24 import com.google.common.base.Preconditions;
26 final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends DefaultPromise<Void> {
27 private final AbstractDispatcher<S, L> dispatcher;
28 private final InetSocketAddress address;
29 private final ReconnectStrategyFactory strategyFactory;
30 private final ReconnectStrategy strategy;
31 private final PipelineInitializer<S> initializer;
32 private Future<?> pending;
34 private final AtomicBoolean negotiationFinished = new AtomicBoolean(false);
36 public ReconnectPromise(final EventExecutor executor, final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
37 final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
38 final PipelineInitializer<S> initializer) {
40 this.dispatcher = Preconditions.checkNotNull(dispatcher);
41 this.address = Preconditions.checkNotNull(address);
42 this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
43 this.strategy = Preconditions.checkNotNull(reestablishStrategy);
44 this.initializer = Preconditions.checkNotNull(initializer);
47 // FIXME: BUG-190: refactor
49 synchronized void connect() {
50 negotiationFinished.set(false);
52 final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
53 final ReconnectStrategy rs = new ReconnectStrategy() {
55 public Future<Void> scheduleReconnect(final Throwable cause) {
56 return cs.scheduleReconnect(cause);
60 public void reconnectSuccessful() {
61 cs.reconnectSuccessful();
65 public int getConnectTimeout() throws Exception {
66 final int cst = cs.getConnectTimeout();
67 final int rst = ReconnectPromise.this.strategy.getConnectTimeout();
75 return Math.min(cst, rst);
79 final Future<S> cf = this.dispatcher.createClient(this.address, rs, new PipelineInitializer<S>() {
81 public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
82 addChannelClosedListener(channel.closeFuture());
83 initializer.initializeChannel(channel, promise);
87 final Object lock = this;
90 cf.addListener(new FutureListener<S>() {
93 public void operationComplete(final Future<S> future) {
95 if (!future.isSuccess()) {
96 final Future<Void> rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause());
99 // This should reflect: no more reconnecting strategies, enough
100 // Currently all reconnect strategies fail with exception, should return null
104 ReconnectPromise.this.pending = rf;
106 rf.addListener(new FutureListener<Void>() {
108 public void operationComplete(final Future<Void> sf) {
109 synchronized (lock) {
111 * The promise we gave out could have been cancelled,
112 * which cascades to the reconnect attempt getting
113 * cancelled, but there is a slight race window, where
114 * the reconnect attempt is already enqueued, but the
115 * listener has not yet been notified -- if cancellation
116 * happens at that point, we need to catch it here.
118 if (!isCancelled()) {
119 if (sf.isSuccess()) {
122 setFailure(sf.cause());
130 * FIXME: BUG-190: we have a slight race window with cancellation
131 * here. Analyze and define its semantics.
133 ReconnectPromise.this.strategy.reconnectSuccessful();
134 negotiationFinished.set(true);
141 private final ClosedChannelListener closedChannelListener = new ClosedChannelListener();
143 class ClosedChannelListener implements Closeable, FutureListener<Void> {
145 private final AtomicBoolean stop = new AtomicBoolean(false);
148 public void operationComplete(final Future<Void> future) throws Exception {
153 // Start reconnecting crashed session after negotiation was successful
154 if (!negotiationFinished.get()) {
162 public void close() {
167 private void addChannelClosedListener(final ChannelFuture channelFuture) {
168 channelFuture.addListener(closedChannelListener);
172 public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
173 closedChannelListener.close();
175 if (super.cancel(mayInterruptIfRunning)) {
176 this.pending.cancel(mayInterruptIfRunning);