added feature topology manager shell
[controller.git] / opendaylight / commons / protocol-framework / src / main / java / org / opendaylight / protocol / framework / ReconnectPromise.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.framework;
9
10 import io.netty.channel.ChannelFuture;
11 import io.netty.channel.socket.SocketChannel;
12 import io.netty.util.concurrent.DefaultPromise;
13 import io.netty.util.concurrent.EventExecutor;
14 import io.netty.util.concurrent.Future;
15 import io.netty.util.concurrent.FutureListener;
16 import io.netty.util.concurrent.Promise;
17
18 import java.io.Closeable;
19 import java.net.InetSocketAddress;
20 import java.util.concurrent.atomic.AtomicBoolean;
21
22 import org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer;
23
24 import com.google.common.base.Preconditions;
25
26 final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends DefaultPromise<Void> {
27     private final AbstractDispatcher<S, L> dispatcher;
28     private final InetSocketAddress address;
29     private final ReconnectStrategyFactory strategyFactory;
30     private final ReconnectStrategy strategy;
31     private final PipelineInitializer<S> initializer;
32     private Future<?> pending;
33
34     private final AtomicBoolean negotiationFinished = new AtomicBoolean(false);
35
36     public ReconnectPromise(final EventExecutor executor, final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
37             final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
38             final PipelineInitializer<S> initializer) {
39         super(executor);
40         this.dispatcher = Preconditions.checkNotNull(dispatcher);
41         this.address = Preconditions.checkNotNull(address);
42         this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
43         this.strategy = Preconditions.checkNotNull(reestablishStrategy);
44         this.initializer = Preconditions.checkNotNull(initializer);
45     }
46
47     // FIXME: BUG-190: refactor
48
49     synchronized void connect() {
50         negotiationFinished.set(false);
51
52         final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
53         final ReconnectStrategy rs = new ReconnectStrategy() {
54             @Override
55             public Future<Void> scheduleReconnect(final Throwable cause) {
56                 return cs.scheduleReconnect(cause);
57             }
58
59             @Override
60             public void reconnectSuccessful() {
61                 cs.reconnectSuccessful();
62             }
63
64             @Override
65             public int getConnectTimeout() throws Exception {
66                 final int cst = cs.getConnectTimeout();
67                 final int rst = ReconnectPromise.this.strategy.getConnectTimeout();
68
69                 if (cst == 0) {
70                     return rst;
71                 }
72                 if (rst == 0) {
73                     return cst;
74                 }
75                 return Math.min(cst, rst);
76             }
77         };
78
79         final Future<S> cf = this.dispatcher.createClient(this.address, rs, new PipelineInitializer<S>() {
80             @Override
81             public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
82                 addChannelClosedListener(channel.closeFuture());
83                 initializer.initializeChannel(channel, promise);
84             }
85         });
86
87         final Object lock = this;
88         this.pending = cf;
89
90         cf.addListener(new FutureListener<S>() {
91
92             @Override
93             public void operationComplete(final Future<S> future) {
94                 synchronized (lock) {
95                     if (!future.isSuccess()) {
96                         final Future<Void> rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause());
97
98                         if(rf == null) {
99                             // This should reflect: no more reconnecting strategies, enough
100                             // Currently all reconnect strategies fail with exception, should return null
101                             return;
102                         }
103
104                         ReconnectPromise.this.pending = rf;
105
106                         rf.addListener(new FutureListener<Void>() {
107                             @Override
108                             public void operationComplete(final Future<Void> sf) {
109                                 synchronized (lock) {
110                                     /*
111                                      * The promise we gave out could have been cancelled,
112                                      * which cascades to the reconnect attempt getting
113                                      * cancelled, but there is a slight race window, where
114                                      * the reconnect attempt is already enqueued, but the
115                                      * listener has not yet been notified -- if cancellation
116                                      * happens at that point, we need to catch it here.
117                                      */
118                                     if (!isCancelled()) {
119                                         if (sf.isSuccess()) {
120                                             connect();
121                                         } else {
122                                             setFailure(sf.cause());
123                                         }
124                                     }
125                                 }
126                             }
127                         });
128                     } else {
129                         /*
130                          *  FIXME: BUG-190: we have a slight race window with cancellation
131                          *         here. Analyze and define its semantics.
132                          */
133                         ReconnectPromise.this.strategy.reconnectSuccessful();
134                         negotiationFinished.set(true);
135                     }
136                 }
137             }
138         });
139     }
140
141     private final ClosedChannelListener closedChannelListener = new ClosedChannelListener();
142
143     class ClosedChannelListener implements Closeable, FutureListener<Void> {
144
145         private final AtomicBoolean stop = new AtomicBoolean(false);
146
147         @Override
148         public void operationComplete(final Future<Void> future) throws Exception {
149             if (stop.get()) {
150                 return;
151             }
152
153             // Start reconnecting crashed session after negotiation was successful
154             if (!negotiationFinished.get()) {
155                 return;
156             }
157
158             connect();
159         }
160
161         @Override
162         public void close() {
163             this.stop.set(true);
164         }
165     }
166
167     private void addChannelClosedListener(final ChannelFuture channelFuture) {
168         channelFuture.addListener(closedChannelListener);
169     }
170
171     @Override
172     public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
173         closedChannelListener.close();
174
175         if (super.cancel(mayInterruptIfRunning)) {
176             this.pending.cancel(mayInterruptIfRunning);
177             return true;
178         }
179
180         return false;
181     }
182 }