494ccf8ec4cb5d305dad89d9cf54ed41ab3f3263
[controller.git] / opendaylight / commons / protocol-framework / src / main / java / org / opendaylight / protocol / framework / ProtocolSessionPromise.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.framework;
9
10 import com.google.common.base.Preconditions;
11 import io.netty.bootstrap.Bootstrap;
12 import io.netty.channel.ChannelFuture;
13 import io.netty.channel.ChannelFutureListener;
14 import io.netty.channel.ChannelOption;
15 import io.netty.util.concurrent.DefaultPromise;
16 import io.netty.util.concurrent.EventExecutor;
17 import io.netty.util.concurrent.Future;
18 import io.netty.util.concurrent.FutureListener;
19 import io.netty.util.concurrent.Promise;
20 import java.net.InetSocketAddress;
21 import javax.annotation.concurrent.GuardedBy;
22 import javax.annotation.concurrent.ThreadSafe;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 @Deprecated
27 @ThreadSafe
28 final class ProtocolSessionPromise<S extends ProtocolSession<?>> extends DefaultPromise<S> {
29     private static final Logger LOG = LoggerFactory.getLogger(ProtocolSessionPromise.class);
30     private final ReconnectStrategy strategy;
31     private InetSocketAddress address;
32     private final Bootstrap b;
33
34     @GuardedBy("this")
35     private Future<?> pending;
36
37     ProtocolSessionPromise(final EventExecutor executor, final InetSocketAddress address, final ReconnectStrategy strategy,
38             final Bootstrap b) {
39         super(executor);
40         this.strategy = Preconditions.checkNotNull(strategy);
41         this.address = Preconditions.checkNotNull(address);
42         this.b = Preconditions.checkNotNull(b);
43     }
44
45     synchronized void connect() {
46         final Object lock = this;
47
48         try {
49             final int timeout = this.strategy.getConnectTimeout();
50
51             LOG.debug("Promise {} attempting connect for {}ms", lock, timeout);
52
53             if(this.address.isUnresolved()) {
54                 this.address = new InetSocketAddress(this.address.getHostName(), this.address.getPort());
55             }
56             this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
57             final ChannelFuture connectFuture = this.b.connect(this.address);
58             // Add listener that attempts reconnect by invoking this method again.
59             connectFuture.addListener(new BootstrapConnectListener(lock));
60             this.pending = connectFuture;
61         } catch (final Exception e) {
62             LOG.info("Failed to connect to {}", address, e);
63             setFailure(e);
64         }
65     }
66
67     @Override
68     public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
69         if (super.cancel(mayInterruptIfRunning)) {
70             this.pending.cancel(mayInterruptIfRunning);
71             return true;
72         }
73
74         return false;
75     }
76
77     @Override
78     public synchronized Promise<S> setSuccess(final S result) {
79         LOG.debug("Promise {} completed", this);
80         this.strategy.reconnectSuccessful();
81         return super.setSuccess(result);
82     }
83
84     private class BootstrapConnectListener implements ChannelFutureListener {
85         private final Object lock;
86
87         public BootstrapConnectListener(final Object lock) {
88             this.lock = lock;
89         }
90
91         @Override
92         public void operationComplete(final ChannelFuture cf) throws Exception {
93             synchronized (lock) {
94
95                 LOG.debug("Promise {} connection resolved", lock);
96
97                 // Triggered when a connection attempt is resolved.
98                 Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(cf));
99
100                 /*
101                  * The promise we gave out could have been cancelled,
102                  * which cascades to the connect getting cancelled,
103                  * but there is a slight race window, where the connect
104                  * is already resolved, but the listener has not yet
105                  * been notified -- cancellation at that point won't
106                  * stop the notification arriving, so we have to close
107                  * the race here.
108                  */
109                 if (isCancelled()) {
110                     if (cf.isSuccess()) {
111                         LOG.debug("Closing channel for cancelled promise {}", lock);
112                         cf.channel().close();
113                     }
114                     return;
115                 }
116
117                 if(cf.isSuccess()) {
118                     LOG.debug("Promise {} connection successful", lock);
119                     return;
120                 }
121
122                 LOG.debug("Attempt to connect to {} failed", ProtocolSessionPromise.this.address, cf.cause());
123
124                 final Future<Void> rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
125                 rf.addListener(new ReconnectingStrategyListener());
126                 ProtocolSessionPromise.this.pending = rf;
127             }
128         }
129
130         private class ReconnectingStrategyListener implements FutureListener<Void> {
131             @Override
132             public void operationComplete(final Future<Void> sf) {
133                 synchronized (lock) {
134                     // Triggered when a connection attempt is to be made.
135                     Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(sf));
136
137                     /*
138                      * The promise we gave out could have been cancelled,
139                      * which cascades to the reconnect attempt getting
140                      * cancelled, but there is a slight race window, where
141                      * the reconnect attempt is already enqueued, but the
142                      * listener has not yet been notified -- if cancellation
143                      * happens at that point, we need to catch it here.
144                      */
145                     if (!isCancelled()) {
146                         if (sf.isSuccess()) {
147                             connect();
148                         } else {
149                             setFailure(sf.cause());
150                         }
151                     }
152                 }
153             }
154         }
155
156     }
157
158 }