Netty Replicator - improve the reconnection and keepalive mechanisms
[mdsal.git] / replicate / mdsal-replicate-netty / src / main / java / org / opendaylight / mdsal / replicate / netty / SourceSingletonService.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.replicate.netty;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import io.netty.channel.Channel;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelInitializer;
19 import io.netty.channel.ChannelOption;
20 import io.netty.channel.socket.SocketChannel;
21 import io.netty.handler.timeout.IdleStateHandler;
22 import java.time.Duration;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.concurrent.TimeUnit;
28 import org.checkerframework.checker.lock.qual.GuardedBy;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
30 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
31 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
32 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Cluster Singleton Service handler for delta stream source. Responsible for starting/stopping the delta stream source
38  * for a particular port.
39  */
40 final class SourceSingletonService extends ChannelInitializer<SocketChannel> implements ClusterSingletonService {
41     private static final Logger LOG = LoggerFactory.getLogger(SourceSingletonService.class);
42     private static final ServiceGroupIdentifier SGID =
43             ServiceGroupIdentifier.create(SourceSingletonService.class.getName());
44
45     private final BootstrapSupport bootstrapSupport;
46     private final DOMDataTreeChangeService dtcs;
47     private final int listenPort;
48
49     @GuardedBy("this")
50     private final Collection<SocketChannel> children = new HashSet<>();
51     private final Duration keepaliveInterval;
52     private final int maxMissedKeepalives;
53     @GuardedBy("this")
54     private Channel serverChannel;
55
56     SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs,
57             final int listenPort, final Duration keepaliveInterval, final int maxMissedKeepalives) {
58         this.bootstrapSupport = requireNonNull(bootstrapSupport);
59         this.dtcs = requireNonNull(dtcs);
60         this.listenPort = listenPort;
61         this.keepaliveInterval = requireNonNull(keepaliveInterval);
62         this.maxMissedKeepalives = maxMissedKeepalives;
63         LOG.info("Replication source on port {} waiting for cluster-wide mastership", listenPort);
64     }
65
66     @Override
67     public ServiceGroupIdentifier getIdentifier() {
68         return SGID;
69     }
70
71     @Override
72     public synchronized void instantiateServiceInstance() {
73         final ChannelFuture future = bootstrapSupport.newServerBootstrap()
74                 .option(ChannelOption.SO_BACKLOG, 3)
75                 .childOption(ChannelOption.SO_KEEPALIVE, true)
76                 .childHandler(this)
77                 .bind(listenPort);
78
79         try {
80             future.sync();
81         } catch (InterruptedException e) {
82             throw new IllegalStateException("Failed to bind port " + listenPort, e);
83         }
84
85         serverChannel = future.channel();
86         LOG.info("Replication source started on port {}", listenPort);
87     }
88
89     @Override
90     public synchronized ListenableFuture<?> closeServiceInstance() {
91         LOG.info("Replication source on port {} shutting down", listenPort);
92
93         final List<ListenableFuture<Void>> futures = new ArrayList<>();
94
95         // Close server channel
96         futures.add(closeChannel(serverChannel));
97         serverChannel = null;
98
99         // Close all child channels
100         for (SocketChannel channel : children) {
101             futures.add(closeChannel(channel));
102         }
103         children.clear();
104
105         final ListenableFuture<?> ret = Futures.nonCancellationPropagating(Futures.successfulAsList(futures));
106         ret.addListener(() -> {
107             LOG.info("Replication source on port {} shut down", listenPort);
108         }, MoreExecutors.directExecutor());
109         return ret;
110     }
111
112     @Override
113     public synchronized void initChannel(final SocketChannel ch) {
114         if (serverChannel == null) {
115             LOG.debug("Channel {} established while shutting down, closing it", ch);
116             ch.close();
117             return;
118         }
119
120         ch.pipeline()
121             .addLast("frameDecoder", new MessageFrameDecoder())
122             .addLast("idleStateHandler", new IdleStateHandler(0, keepaliveInterval.toNanos(), 0, TimeUnit.NANOSECONDS))
123             .addLast("keepaliveHandler", new SourceKeepaliveHandler(maxMissedKeepalives))
124             .addLast("requestHandler", new SourceRequestHandler(dtcs))
125             // Output, in reverse order
126             .addLast("frameEncoder", MessageFrameEncoder.INSTANCE)
127             .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current()));
128         children.add(ch);
129
130         LOG.info("Channel {} established", ch);
131     }
132
133     private static ListenableFuture<Void> closeChannel(final Channel ch) {
134         final SettableFuture<Void> ret = SettableFuture.create();
135         ch.closeFuture().addListener(chf -> {
136             final Throwable cause = chf.cause();
137             if (cause != null) {
138                 ret.setException(cause);
139             } else {
140                 ret.set(null);
141             }
142         });
143
144         ch.close();
145         return ret;
146     }
147 }