BUG-54 : switched channel pipeline to be protocol specific.
[bgpcep.git] / framework / src / main / java / org / opendaylight / protocol / framework / ReconnectPromise.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.framework;
9
10 import io.netty.util.concurrent.DefaultPromise;
11 import io.netty.util.concurrent.Future;
12 import io.netty.util.concurrent.FutureListener;
13
14 import java.net.InetSocketAddress;
15
16 import com.google.common.base.Preconditions;
17
18 final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends DefaultPromise<Void> {
19         private final AbstractDispatcher<S, L> dispatcher;
20         private final InetSocketAddress address;
21         private final ReconnectStrategyFactory strategyFactory;
22         private final ReconnectStrategy strategy;
23         private Future<?> pending;
24         private final SessionListenerFactory<L> lfactory;
25
26         public ReconnectPromise(final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
27                         final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
28                         final SessionListenerFactory<L> lfactory) {
29
30                 this.dispatcher = Preconditions.checkNotNull(dispatcher);
31                 this.address = Preconditions.checkNotNull(address);
32                 this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
33                 this.strategy = Preconditions.checkNotNull(reestablishStrategy);
34                 this.lfactory = Preconditions.checkNotNull(lfactory);
35         }
36
37         synchronized void connect() {
38                 final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
39                 final ReconnectStrategy rs = new ReconnectStrategy() {
40                         @Override
41                         public Future<Void> scheduleReconnect(final Throwable cause) {
42                                 return cs.scheduleReconnect(cause);
43                         }
44
45                         @Override
46                         public void reconnectSuccessful() {
47                                 cs.reconnectSuccessful();
48                         }
49
50                         @Override
51                         public int getConnectTimeout() throws Exception {
52                                 final int cst = cs.getConnectTimeout();
53                                 final int rst = ReconnectPromise.this.strategy.getConnectTimeout();
54
55                                 if (cst == 0) {
56                                         return rst;
57                                 }
58                                 if (rst == 0) {
59                                         return cst;
60                                 }
61                                 return Math.min(cst, rst);
62                         }
63                 };
64
65                 final Future<S> cf = this.dispatcher.createClient(this.address, rs, this.lfactory);
66
67                 final Object lock = this;
68                 this.pending = cf;
69
70                 cf.addListener(new FutureListener<S>() {
71                         @Override
72                         public void operationComplete(final Future<S> future) {
73                                 synchronized (lock) {
74                                         if (!future.isSuccess()) {
75                                                 final Future<Void> rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause());
76                                                 ReconnectPromise.this.pending = rf;
77
78                                                 rf.addListener(new FutureListener<Void>() {
79                                                         @Override
80                                                         public void operationComplete(final Future<Void> sf) {
81                                                                 synchronized (lock) {
82                                                                         /*
83                                                                          * The promise we gave out could have been cancelled,
84                                                                          * which cascades to the reconnect attempt getting
85                                                                          * cancelled, but there is a slight race window, where
86                                                                          * the reconnect attempt is already enqueued, but the
87                                                                          * listener has not yet been notified -- if cancellation
88                                                                          * happens at that point, we need to catch it here.
89                                                                          */
90                                                                         if (!isCancelled()) {
91                                                                                 if (sf.isSuccess()) {
92                                                                                         connect();
93                                                                                 } else {
94                                                                                         setFailure(sf.cause());
95                                                                                 }
96                                                                         }
97                                                                 }
98                                                         }
99                                                 });
100                                         } else {
101                                                 /*
102                                                  *  FIXME: we have a slight race window with cancellation
103                                                  *         here. Analyze and define its semantics.
104                                                  */
105                                                 ReconnectPromise.this.strategy.reconnectSuccessful();
106                                                 setSuccess(null);
107                                         }
108                                 }
109                         }
110                 });
111         }
112
113         @Override
114         public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
115                 if (super.cancel(mayInterruptIfRunning)) {
116                         this.pending.cancel(mayInterruptIfRunning);
117                         return true;
118                 }
119
120                 return false;
121         }
122 }