Merge "Added test for MouontPoints and URI"
[controller.git] / opendaylight / commons / protocol-framework / src / main / java / org / opendaylight / protocol / framework / ProtocolSessionPromise.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.framework;
9
10 import io.netty.bootstrap.Bootstrap;
11 import io.netty.channel.ChannelFuture;
12 import io.netty.channel.ChannelFutureListener;
13 import io.netty.channel.ChannelOption;
14 import io.netty.util.concurrent.DefaultPromise;
15 import io.netty.util.concurrent.EventExecutor;
16 import io.netty.util.concurrent.Future;
17 import io.netty.util.concurrent.FutureListener;
18 import io.netty.util.concurrent.Promise;
19
20 import java.net.InetSocketAddress;
21
22 import javax.annotation.concurrent.GuardedBy;
23 import javax.annotation.concurrent.ThreadSafe;
24
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import com.google.common.base.Preconditions;
29
30 @ThreadSafe
31 final class ProtocolSessionPromise<S extends ProtocolSession<?>> extends DefaultPromise<S> {
32     private static final Logger LOG = LoggerFactory.getLogger(ProtocolSessionPromise.class);
33     private final ReconnectStrategy strategy;
34     private final InetSocketAddress address;
35     private final Bootstrap b;
36
37     @GuardedBy("this")
38     private Future<?> pending;
39
40     ProtocolSessionPromise(final EventExecutor executor, final InetSocketAddress address, final ReconnectStrategy strategy,
41             final Bootstrap b) {
42         super(executor);
43         this.strategy = Preconditions.checkNotNull(strategy);
44         this.address = Preconditions.checkNotNull(address);
45         this.b = Preconditions.checkNotNull(b);
46     }
47
48     synchronized void connect() {
49         final Object lock = this;
50
51         try {
52             final int timeout = this.strategy.getConnectTimeout();
53
54             LOG.debug("Promise {} attempting connect for {}ms", lock, timeout);
55
56             this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
57             this.pending = this.b.connect(this.address).addListener(new ChannelFutureListener() {
58                 @Override
59                 public void operationComplete(final ChannelFuture cf) throws Exception {
60                     synchronized (lock) {
61
62                         LOG.debug("Promise {} connection resolved", lock);
63
64                         // Triggered when a connection attempt is resolved.
65                         Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(cf));
66
67                         /*
68                          * The promise we gave out could have been cancelled,
69                          * which cascades to the connect getting cancelled,
70                          * but there is a slight race window, where the connect
71                          * is already resolved, but the listener has not yet
72                          * been notified -- cancellation at that point won't
73                          * stop the notification arriving, so we have to close
74                          * the race here.
75                          */
76                         if (isCancelled()) {
77                             if (cf.isSuccess()) {
78                                 LOG.debug("Closing channel for cancelled promise {}", lock);
79                                 cf.channel().close();
80                             }
81                             return;
82                         }
83
84                         if (!cf.isSuccess()) {
85                             LOG.warn("Attempt to connect to connect to {} failed", ProtocolSessionPromise.this.address, cf.cause());
86                             final Future<Void> rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
87                             rf.addListener(new FutureListener<Void>() {
88                                 @Override
89                                 public void operationComplete(final Future<Void> sf) {
90                                     synchronized (lock) {
91                                         // Triggered when a connection attempt is to be made.
92                                         Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(sf));
93
94                                         /*
95                                          * The promise we gave out could have been cancelled,
96                                          * which cascades to the reconnect attempt getting
97                                          * cancelled, but there is a slight race window, where
98                                          * the reconnect attempt is already enqueued, but the
99                                          * listener has not yet been notified -- if cancellation
100                                          * happens at that point, we need to catch it here.
101                                          */
102                                         if (!isCancelled()) {
103                                             if (sf.isSuccess()) {
104                                                 connect();
105                                             } else {
106                                                 setFailure(sf.cause());
107                                             }
108                                         }
109                                     }
110                                 }
111                             });
112
113                             ProtocolSessionPromise.this.pending = rf;
114                         } else {
115                             LOG.debug("Promise {} connection successful", lock);
116                         }
117                     }
118                 }
119             });
120         } catch (final Exception e) {
121             LOG.info("Failed to connect to {}", e);
122             setFailure(e);
123         }
124     }
125
126     @Override
127     public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
128         if (super.cancel(mayInterruptIfRunning)) {
129             this.pending.cancel(mayInterruptIfRunning);
130             return true;
131         }
132
133         return false;
134     }
135
136     @Override
137     public synchronized Promise<S> setSuccess(final S result) {
138         LOG.debug("Promise {} completed", this);
139         this.strategy.reconnectSuccessful();
140         return super.setSuccess(result);
141     }
142 }