BUG-54 : switched channel pipeline to be protocol specific.
[bgpcep.git] / framework / src / main / java / org / opendaylight / protocol / framework / ProtocolSessionPromise.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.framework;
9
10 import io.netty.bootstrap.Bootstrap;
11 import io.netty.channel.ChannelFuture;
12 import io.netty.channel.ChannelFutureListener;
13 import io.netty.channel.ChannelOption;
14 import io.netty.util.concurrent.DefaultPromise;
15 import io.netty.util.concurrent.Future;
16 import io.netty.util.concurrent.FutureListener;
17 import io.netty.util.concurrent.Promise;
18
19 import java.net.InetSocketAddress;
20
21 import javax.annotation.concurrent.GuardedBy;
22 import javax.annotation.concurrent.ThreadSafe;
23
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 import com.google.common.base.Preconditions;
28
29 @ThreadSafe
30 final class ProtocolSessionPromise<S extends ProtocolSession<?>> extends DefaultPromise<S> {
31         private static final Logger logger = LoggerFactory.getLogger(ProtocolSessionPromise.class);
32         private final ReconnectStrategy strategy;
33         private final InetSocketAddress address;
34         private final Bootstrap b;
35
36         @GuardedBy("this")
37         private Future<?> pending;
38
39         ProtocolSessionPromise(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap b) {
40                 this.strategy = Preconditions.checkNotNull(strategy);
41                 this.address = Preconditions.checkNotNull(address);
42                 this.b = Preconditions.checkNotNull(b);
43         }
44
45         synchronized void connect() {
46                 final Object lock = this;
47
48                 try {
49                         final int timeout = this.strategy.getConnectTimeout();
50
51                         logger.debug("Promise {} attempting connect for {}ms", lock, timeout);
52
53                         this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
54                         this.pending = this.b.connect(this.address).addListener(new ChannelFutureListener() {
55                                 @Override
56                                 public void operationComplete(final ChannelFuture cf) throws Exception {
57                                         synchronized (lock) {
58
59                                                 logger.debug("Promise {} connection resolved", lock);
60
61                                                 // Triggered when a connection attempt is resolved.
62                                                 Preconditions.checkState(ProtocolSessionPromise.this.pending == cf);
63
64                                                 /*
65                                                  * The promise we gave out could have been cancelled,
66                                                  * which cascades to the connect getting cancelled,
67                                                  * but there is a slight race window, where the connect
68                                                  * is already resolved, but the listener has not yet
69                                                  * been notified -- cancellation at that point won't
70                                                  * stop the notification arriving, so we have to close
71                                                  * the race here.
72                                                  */
73                                                 if (isCancelled()) {
74                                                         if (cf.isSuccess()) {
75                                                                 logger.debug("Closing channel for cancelled promise {}", lock);
76                                                                 cf.channel().close();
77                                                         }
78                                                         return;
79                                                 }
80
81                                                 if (!cf.isSuccess()) {
82                                                         final Future<Void> rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
83                                                         rf.addListener(new FutureListener<Void>() {
84                                                                 @Override
85                                                                 public void operationComplete(final Future<Void> sf) {
86                                                                         synchronized (lock) {
87                                                                                 // Triggered when a connection attempt is to be made.
88                                                                                 Preconditions.checkState(ProtocolSessionPromise.this.pending == sf);
89
90                                                                                 /*
91                                                                                  * The promise we gave out could have been cancelled,
92                                                                                  * which cascades to the reconnect attempt getting
93                                                                                  * cancelled, but there is a slight race window, where
94                                                                                  * the reconnect attempt is already enqueued, but the
95                                                                                  * listener has not yet been notified -- if cancellation
96                                                                                  * happens at that point, we need to catch it here.
97                                                                                  */
98                                                                                 if (!isCancelled()) {
99                                                                                         if (sf.isSuccess()) {
100                                                                                                 connect();
101                                                                                         } else {
102                                                                                                 setFailure(sf.cause());
103                                                                                         }
104                                                                                 }
105                                                                         }
106                                                                 }
107                                                         });
108
109                                                         ProtocolSessionPromise.this.pending = rf;
110                                                 } else {
111                                                         logger.debug("Promise {} connection successful", lock);
112                                                 }
113                                         }
114                                 }
115                         });
116                 } catch (final Exception e) {
117                         setFailure(e);
118                 }
119         }
120
121         @Override
122         public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
123                 if (super.cancel(mayInterruptIfRunning)) {
124                         this.pending.cancel(mayInterruptIfRunning);
125                         return true;
126                 }
127
128                 return false;
129         }
130
131         @Override
132         public synchronized Promise<S> setSuccess(final S result) {
133                 logger.debug("Promise {} completed", this);
134                 this.strategy.reconnectSuccessful();
135                 return super.setSuccess(result);
136         }
137 }