*/
package org.opendaylight.protocol.framework;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
+import java.io.Closeable;
import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer;
private final PipelineInitializer<S> initializer;
private Future<?> pending;
+ private final AtomicBoolean negotiationFinished = new AtomicBoolean(false);
+
public ReconnectPromise(final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
final PipelineInitializer<S> initializer) {
this.initializer = Preconditions.checkNotNull(initializer);
}
+ // TODO rafactor
+
synchronized void connect() {
+ negotiationFinished.set(false);
+
final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
final ReconnectStrategy rs = new ReconnectStrategy() {
@Override
}
};
- final Future<S> cf = this.dispatcher.createClient(this.address, rs, this.initializer);
+ final Future<S> cf = this.dispatcher.createClient(this.address, rs, new PipelineInitializer<S>() {
+ @Override
+ public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
+ addChannelClosedListener(channel.closeFuture());
+ initializer.initializeChannel(channel, promise);
+ }
+ });
final Object lock = this;
this.pending = cf;
cf.addListener(new FutureListener<S>() {
+
@Override
public void operationComplete(final Future<S> future) {
synchronized (lock) {
if (!future.isSuccess()) {
final Future<Void> rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause());
+
+ if(rf == null) {
+ // This should reflect: no more reconnecting strategies, enough
+ // Currently all reconnect strategies fail with exception, should return null
+ return;
+ }
+
ReconnectPromise.this.pending = rf;
rf.addListener(new FutureListener<Void>() {
* here. Analyze and define its semantics.
*/
ReconnectPromise.this.strategy.reconnectSuccessful();
- setSuccess(null);
+ negotiationFinished.set(true);
}
}
}
});
}
+ private final ClosedChannelListener closedChannelListener = new ClosedChannelListener();
+
+ class ClosedChannelListener implements Closeable, FutureListener<Void> {
+
+ private final AtomicBoolean stop = new AtomicBoolean(false);
+
+ @Override
+ public void operationComplete(final Future<Void> future) throws Exception {
+ if (stop.get()) {
+ return;
+ }
+
+ // Start reconnecting crashed session after negotiation was successful
+ if (!negotiationFinished.get()) {
+ return;
+ }
+
+ connect();
+ }
+
+ @Override
+ public void close() {
+ this.stop.set(true);
+ }
+ }
+
+ private void addChannelClosedListener(final ChannelFuture channelFuture) {
+ channelFuture.addListener(closedChannelListener);
+ }
+
@Override
public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+ closedChannelListener.close();
+
if (super.cancel(mayInterruptIfRunning)) {
this.pending.cancel(mayInterruptIfRunning);
return true;