Merge "Add XSQL into default compilation"
[controller.git] / opendaylight / commons / protocol-framework / src / main / java / org / opendaylight / protocol / framework / ProtocolSessionPromise.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.framework;
9
10 import io.netty.bootstrap.Bootstrap;
11 import io.netty.channel.ChannelFuture;
12 import io.netty.channel.ChannelFutureListener;
13 import io.netty.channel.ChannelOption;
14 import io.netty.util.concurrent.DefaultPromise;
15 import io.netty.util.concurrent.EventExecutor;
16 import io.netty.util.concurrent.Future;
17 import io.netty.util.concurrent.FutureListener;
18 import io.netty.util.concurrent.Promise;
19
20 import java.net.InetSocketAddress;
21
22 import javax.annotation.concurrent.GuardedBy;
23 import javax.annotation.concurrent.ThreadSafe;
24
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import com.google.common.base.Preconditions;
29
30 @ThreadSafe
31 final class ProtocolSessionPromise<S extends ProtocolSession<?>> extends DefaultPromise<S> {
32     private static final Logger LOG = LoggerFactory.getLogger(ProtocolSessionPromise.class);
33     private final ReconnectStrategy strategy;
34     private final InetSocketAddress address;
35     private final Bootstrap b;
36
37     @GuardedBy("this")
38     private Future<?> pending;
39
40     ProtocolSessionPromise(final EventExecutor executor, final InetSocketAddress address, final ReconnectStrategy strategy,
41             final Bootstrap b) {
42         super(executor);
43         this.strategy = Preconditions.checkNotNull(strategy);
44         this.address = Preconditions.checkNotNull(address);
45         this.b = Preconditions.checkNotNull(b);
46     }
47
48     synchronized void connect() {
49         final Object lock = this;
50
51         try {
52             final int timeout = this.strategy.getConnectTimeout();
53
54             LOG.debug("Promise {} attempting connect for {}ms", lock, timeout);
55
56             this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
57             this.pending = this.b.connect(this.address).addListener(new ChannelFutureListener() {
58                 @Override
59                 public void operationComplete(final ChannelFuture cf) throws Exception {
60                     synchronized (lock) {
61
62                         LOG.debug("Promise {} connection resolved", lock);
63
64                         // Triggered when a connection attempt is resolved.
65                         Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(cf));
66
67                         /*
68                          * The promise we gave out could have been cancelled,
69                          * which cascades to the connect getting cancelled,
70                          * but there is a slight race window, where the connect
71                          * is already resolved, but the listener has not yet
72                          * been notified -- cancellation at that point won't
73                          * stop the notification arriving, so we have to close
74                          * the race here.
75                          */
76                         if (isCancelled()) {
77                             if (cf.isSuccess()) {
78                                 LOG.debug("Closing channel for cancelled promise {}", lock);
79                                 cf.channel().close();
80                             }
81                             return;
82                         }
83
84                         if (!cf.isSuccess()) {
85                             LOG.debug("Attempt to connect to {} failed", ProtocolSessionPromise.this.address, cf.cause());
86
87                             final Future<Void> rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
88                             rf.addListener(new FutureListener<Void>() {
89                                 @Override
90                                 public void operationComplete(final Future<Void> sf) {
91                                     synchronized (lock) {
92                                         // Triggered when a connection attempt is to be made.
93                                         Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(sf));
94
95                                         /*
96                                          * The promise we gave out could have been cancelled,
97                                          * which cascades to the reconnect attempt getting
98                                          * cancelled, but there is a slight race window, where
99                                          * the reconnect attempt is already enqueued, but the
100                                          * listener has not yet been notified -- if cancellation
101                                          * happens at that point, we need to catch it here.
102                                          */
103                                         if (!isCancelled()) {
104                                             if (sf.isSuccess()) {
105                                                 connect();
106                                             } else {
107                                                 setFailure(sf.cause());
108                                             }
109                                         }
110                                     }
111                                 }
112                             });
113
114                             ProtocolSessionPromise.this.pending = rf;
115                         } else {
116                             LOG.debug("Promise {} connection successful", lock);
117                         }
118                     }
119                 }
120             });
121         } catch (final Exception e) {
122             LOG.info("Failed to connect to {}", e);
123             setFailure(e);
124         }
125     }
126
127     @Override
128     public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
129         if (super.cancel(mayInterruptIfRunning)) {
130             this.pending.cancel(mayInterruptIfRunning);
131             return true;
132         }
133
134         return false;
135     }
136
137     @Override
138     public synchronized Promise<S> setSuccess(final S result) {
139         LOG.debug("Promise {} completed", this);
140         this.strategy.reconnectSuccessful();
141         return super.setSuccess(result);
142     }
143 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.