BUG-58: refactor to take advantage of netty
[bgpcep.git] / framework / src / main / java / org / opendaylight / protocol / framework / ReconnectPromise.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.framework;
9
10 import io.netty.util.concurrent.DefaultPromise;
11 import io.netty.util.concurrent.Future;
12 import io.netty.util.concurrent.FutureListener;
13
14 import java.net.InetSocketAddress;
15
16 import com.google.common.base.Preconditions;
17
18 final class ReconnectPromise<M extends ProtocolMessage, S extends ProtocolSession<M>, L extends SessionListener<M, ?, ?>> extends DefaultPromise<Void> {
19         private final Dispatcher dispatcher;
20         private final InetSocketAddress address;
21         private final L listener;
22         private final SessionNegotiatorFactory<M, S, L> negotiatorFactory;
23         private final ProtocolMessageFactory<M> messageFactory;
24         private final ReconnectStrategyFactory strategyFactory;
25         private final ReconnectStrategy strategy;
26         private Future<?> pending;
27
28         public ReconnectPromise(final Dispatcher dispatcher,
29                         final InetSocketAddress address, final L listener,
30                         final SessionNegotiatorFactory<M, S, L> negotiatorFactory,
31                         final ProtocolMessageFactory<M> messageFactory,
32                         final ReconnectStrategyFactory connectStrategyFactory,
33                         final ReconnectStrategy reestablishStrategy) {
34
35                 this.dispatcher = Preconditions.checkNotNull(dispatcher);
36                 this.address = Preconditions.checkNotNull(address);
37                 this.listener = Preconditions.checkNotNull(listener);
38                 this.negotiatorFactory = Preconditions.checkNotNull(negotiatorFactory);
39                 this.messageFactory =  Preconditions.checkNotNull(messageFactory);
40                 this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
41                 this.strategy = Preconditions.checkNotNull(reestablishStrategy);
42         }
43
44         synchronized void connect() {
45                 final ReconnectStrategy cs = strategyFactory.createReconnectStrategy();
46                 final ReconnectStrategy rs = new ReconnectStrategy() {
47                         @Override
48                         public Future<Void> scheduleReconnect(final Throwable cause) {
49                                 return cs.scheduleReconnect(cause);
50                         }
51
52                         @Override
53                         public void reconnectSuccessful() {
54                                 cs.reconnectSuccessful();
55                         }
56
57                         @Override
58                         public int getConnectTimeout() throws Exception {
59                                 final int cst = cs.getConnectTimeout();
60                                 final int rst = strategy.getConnectTimeout();
61
62                                 if (cst == 0) {
63                                         return rst;
64                                 }
65                                 if (rst == 0) {
66                                         return cst;
67                                 }
68                                 return Math.min(cst, rst);
69                         }
70                 };
71
72                 final Future<S> cf = dispatcher.createClient(address,
73                                 listener, negotiatorFactory, messageFactory, rs);
74
75                 final Object lock = this;
76                 pending = cf;
77
78                 cf.addListener(new FutureListener<S>() {
79                         @Override
80                         public void operationComplete(final Future<S> future) {
81                                 synchronized (lock) {
82                                         if (!future.isSuccess()) {
83                                                 final Future<Void> rf = strategy.scheduleReconnect(cf.cause());
84                                                 pending = rf;
85
86                                                 rf.addListener(new FutureListener<Void>() {
87                                                         @Override
88                                                         public void operationComplete(final Future<Void> sf) {
89                                                                 synchronized (lock) {
90                                                                         /*
91                                                                          * The promise we gave out could have been cancelled,
92                                                                          * which cascades to the reconnect attempt getting
93                                                                          * cancelled, but there is a slight race window, where
94                                                                          * the reconnect attempt is already enqueued, but the
95                                                                          * listener has not yet been notified -- if cancellation
96                                                                          * happens at that point, we need to catch it here.
97                                                                          */
98                                                                         if (!isCancelled()) {
99                                                                                 if (sf.isSuccess()) {
100                                                                                         connect();
101                                                                                 } else {
102                                                                                         setFailure(sf.cause());
103                                                                                 }
104                                                                         }
105                                                                 }
106                                                         }
107                                                 });
108                                         } else {
109                                                 /*
110                                                  *  FIXME: we have a slight race window with cancellation
111                                                  *         here. Analyze and define its semantics.
112                                                  */
113                                                 strategy.reconnectSuccessful();
114                                                 setSuccess(null);
115                                         }
116                                 }
117                         }
118                 });
119         }
120
121         @Override
122         public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
123                 if (super.cancel(mayInterruptIfRunning)) {
124                         pending.cancel(mayInterruptIfRunning);
125                         return true;
126                 }
127
128                 return false;
129         }
130 }