1e9d1efdd556b2e21b164b2ac556e7b3f1d20aea
[netconf.git] / netconf / netconf-client / src / main / java / org / opendaylight / netconf / client / TcpClientChannelInitializer.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.client;
9
10 import io.netty.channel.Channel;
11 import io.netty.channel.ChannelHandlerContext;
12 import io.netty.channel.ChannelOutboundHandlerAdapter;
13 import io.netty.channel.ChannelPromise;
14 import io.netty.util.concurrent.Future;
15 import io.netty.util.concurrent.FutureListener;
16 import io.netty.util.concurrent.Promise;
17 import java.net.SocketAddress;
18
19 final class TcpClientChannelInitializer extends AbstractClientChannelInitializer {
20     TcpClientChannelInitializer(final NetconfClientSessionNegotiatorFactory negotiatorFactory,
21             final NetconfClientSessionListener sessionListener) {
22         super(negotiatorFactory, sessionListener);
23     }
24
25     @Override
26     public void initialize(final Channel ch, final Promise<NetconfClientSession> promise) {
27         final Future<NetconfClientSession> negotiationFuture = promise;
28
29         //We have to add this channel outbound handler to channel pipeline, in order
30         //to get notifications from netconf negotiatior. Set connection promise to
31         //success only after successful negotiation.
32         ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
33             ChannelPromise connectPromise;
34             FutureListener<NetconfClientSession> negotiationFutureListener;
35
36             @Override
37             public void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress,
38                                 final SocketAddress localAddress,
39                                 final ChannelPromise channelPromise) {
40                 connectPromise = channelPromise;
41                 ChannelPromise tcpConnectFuture = ch.newPromise();
42
43                 negotiationFutureListener = future -> {
44                     if (future.isSuccess()) {
45                         channelPromise.setSuccess();
46                     }
47                 };
48
49                 tcpConnectFuture.addListener(future -> {
50                     if (future.isSuccess()) {
51                         //complete connection promise with netconf negotiation future
52                         negotiationFuture.addListener(negotiationFutureListener);
53                     } else {
54                         channelPromise.setFailure(future.cause());
55                     }
56                 });
57                 ctx.connect(remoteAddress, localAddress, tcpConnectFuture);
58             }
59
60             @Override
61             public void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
62                 if (connectPromise == null) {
63                     return;
64                 }
65
66                 // If we have already succeeded and the session was dropped after, we need to fire inactive to notify
67                 // reconnect logic
68                 if (connectPromise.isSuccess()) {
69                     ctx.fireChannelInactive();
70                 }
71
72                 //If connection promise is not already set, it means negotiation failed
73                 //we must set connection promise to failure
74                 if (!connectPromise.isDone()) {
75                     connectPromise.setFailure(new IllegalStateException("Negotiation failed"));
76                 }
77
78                 //Remove listener from negotiation future, we don't want notifications
79                 //from negotiation anymore
80                 negotiationFuture.removeListener(negotiationFutureListener);
81
82                 super.disconnect(ctx, promise);
83                 promise.setSuccess();
84             }
85         });
86
87         super.initialize(ch, promise);
88     }
89 }