2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.replicate.netty;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import io.netty.channel.Channel;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelInitializer;
19 import io.netty.channel.ChannelOption;
20 import io.netty.channel.socket.SocketChannel;
21 import io.netty.handler.timeout.IdleStateHandler;
22 import java.time.Duration;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.concurrent.TimeUnit;
28 import org.checkerframework.checker.lock.qual.GuardedBy;
29 import org.opendaylight.mdsal.dom.api.DOMDataBroker.DataTreeChangeExtension;
30 import org.opendaylight.mdsal.singleton.api.ClusterSingletonService;
31 import org.opendaylight.mdsal.singleton.api.ServiceGroupIdentifier;
32 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * Cluster Singleton Service handler for delta stream source. Responsible for starting/stopping the delta stream source
38 * for a particular port.
40 final class SourceSingletonService extends ChannelInitializer<SocketChannel> implements ClusterSingletonService {
41 private static final Logger LOG = LoggerFactory.getLogger(SourceSingletonService.class);
42 private static final ServiceGroupIdentifier SGID =
43 new ServiceGroupIdentifier(SourceSingletonService.class.getName());
45 private final BootstrapSupport bootstrapSupport;
46 private final DataTreeChangeExtension dtcs;
47 private final int listenPort;
50 private final Collection<SocketChannel> children = new HashSet<>();
51 private final Duration keepaliveInterval;
52 private final int maxMissedKeepalives;
54 private Channel serverChannel;
56 SourceSingletonService(final BootstrapSupport bootstrapSupport, final DataTreeChangeExtension dtcs,
57 final int listenPort, final Duration keepaliveInterval, final int maxMissedKeepalives) {
58 this.bootstrapSupport = requireNonNull(bootstrapSupport);
59 this.dtcs = requireNonNull(dtcs);
60 this.listenPort = listenPort;
61 this.keepaliveInterval = requireNonNull(keepaliveInterval);
62 this.maxMissedKeepalives = maxMissedKeepalives;
63 LOG.info("Replication source on port {} waiting for cluster-wide mastership", listenPort);
67 public ServiceGroupIdentifier getIdentifier() {
72 public synchronized void instantiateServiceInstance() {
73 final ChannelFuture future = bootstrapSupport.newServerBootstrap()
74 .option(ChannelOption.SO_BACKLOG, 3)
75 .childOption(ChannelOption.SO_KEEPALIVE, true)
81 } catch (InterruptedException e) {
82 throw new IllegalStateException("Failed to bind port " + listenPort, e);
85 serverChannel = future.channel();
86 LOG.info("Replication source started on port {}", listenPort);
90 public synchronized ListenableFuture<?> closeServiceInstance() {
91 LOG.info("Replication source on port {} shutting down", listenPort);
93 final List<ListenableFuture<Void>> futures = new ArrayList<>();
95 // Close server channel
96 futures.add(closeChannel(serverChannel));
99 // Close all child channels
100 for (SocketChannel channel : children) {
101 futures.add(closeChannel(channel));
105 final ListenableFuture<?> ret = Futures.nonCancellationPropagating(Futures.successfulAsList(futures));
106 ret.addListener(() -> {
107 LOG.info("Replication source on port {} shut down", listenPort);
108 }, MoreExecutors.directExecutor());
113 public synchronized void initChannel(final SocketChannel ch) {
114 if (serverChannel == null) {
115 LOG.debug("Channel {} established while shutting down, closing it", ch);
121 .addLast("frameDecoder", new MessageFrameDecoder())
122 .addLast("idleStateHandler", new IdleStateHandler(0, keepaliveInterval.toNanos(), 0, TimeUnit.NANOSECONDS))
123 .addLast("keepaliveHandler", new SourceKeepaliveHandler(maxMissedKeepalives))
124 .addLast("requestHandler", new SourceRequestHandler(dtcs))
125 // Output, in reverse order
126 .addLast("frameEncoder", MessageFrameEncoder.INSTANCE)
127 .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current()));
130 LOG.info("Channel {} established", ch);
133 private static ListenableFuture<Void> closeChannel(final Channel ch) {
134 final SettableFuture<Void> ret = SettableFuture.create();
135 ch.closeFuture().addListener(chf -> {
136 final Throwable cause = chf.cause();
138 ret.setException(cause);