2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.nettyutil;
10 import static java.util.Objects.requireNonNull;
12 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
13 import io.netty.bootstrap.Bootstrap;
14 import io.netty.channel.ChannelHandlerContext;
15 import io.netty.channel.ChannelInboundHandlerAdapter;
16 import io.netty.util.concurrent.DefaultPromise;
17 import io.netty.util.concurrent.EventExecutor;
18 import io.netty.util.concurrent.Future;
19 import io.netty.util.concurrent.Promise;
20 import java.net.InetSocketAddress;
21 import org.checkerframework.checker.lock.qual.GuardedBy;
22 import org.checkerframework.checker.lock.qual.Holding;
23 import org.opendaylight.netconf.api.NetconfSession;
24 import org.opendaylight.netconf.api.NetconfSessionListener;
25 import org.opendaylight.netconf.nettyutil.AbstractNetconfDispatcher.PipelineInitializer;
26 import org.opendaylight.yangtools.yang.common.Empty;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
31 final class ReconnectPromise<S extends NetconfSession, L extends NetconfSessionListener<? super S>>
32 extends DefaultPromise<Empty> implements ReconnectFuture {
33 private static final Logger LOG = LoggerFactory.getLogger(ReconnectPromise.class);
35 private final AbstractNetconfDispatcher<S, L> dispatcher;
36 private final InetSocketAddress address;
37 private final ReconnectStrategyFactory strategyFactory;
38 private final Bootstrap bootstrap;
39 private final PipelineInitializer<S> initializer;
40 private final Promise<Empty> firstSessionFuture;
43 private Future<?> pending;
45 ReconnectPromise(final EventExecutor executor, final AbstractNetconfDispatcher<S, L> dispatcher,
46 final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
47 final Bootstrap bootstrap, final PipelineInitializer<S> initializer) {
49 this.firstSessionFuture = new DefaultPromise<>(executor);
50 this.bootstrap = requireNonNull(bootstrap);
51 this.initializer = requireNonNull(initializer);
52 this.dispatcher = requireNonNull(dispatcher);
53 this.address = requireNonNull(address);
54 this.strategyFactory = requireNonNull(connectStrategyFactory);
58 public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
59 if (super.cancel(mayInterruptIfRunning)) {
60 firstSessionFuture.cancel(mayInterruptIfRunning);
61 pending.cancel(mayInterruptIfRunning);
68 public Future<?> firstSessionFuture() {
69 return firstSessionFuture;
72 synchronized void connect() {
77 private void lockedConnect() {
78 final ReconnectStrategy cs = strategyFactory.createReconnectStrategy();
80 // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support
82 pending = dispatcher.createClient(address, cs, bootstrap, (channel, promise) -> {
83 initializer.initializeChannel(channel, promise);
84 // add closed channel handler
85 // This handler has to be added as last channel handler and the channel inactive event has to be caught by
87 // Handlers in front of it can react to channelInactive event, but have to forward the event or the
88 // reconnect will not work
89 // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource
90 // cleanup) before a new connection is started
91 channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
93 public void channelInactive(final ChannelHandlerContext ctx) {
99 if (!firstSessionFuture.isDone()) {
100 pending.addListener(future -> {
101 if (!future.isSuccess() && !firstSessionFuture.isDone()) {
102 firstSessionFuture.setFailure(future.cause());
108 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
109 justification = "https://github.com/spotbugs/spotbugs/issues/811")
110 private void onChannelInactive() {
111 // This is the ultimate channel inactive handler, not forwarding
116 synchronized (this) {
117 final Future<?> attempt = pending;
118 if (!attempt.isDone() || !attempt.isSuccess()) {
119 // Connection refused, negotiation failed, or similar
120 LOG.debug("Connection to {} was dropped during negotiation, reattempting", address);
123 LOG.debug("Reconnecting after connection to {} was dropped", address);