2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.replicate.netty;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import io.netty.channel.Channel;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelInitializer;
19 import io.netty.channel.ChannelOption;
20 import io.netty.channel.socket.SocketChannel;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashSet;
24 import java.util.List;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
27 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
28 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
29 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * Cluster Singleton Service handler for delta stream source. Responsible for starting/stopping the delta stream source
35 * for a particular port.
37 final class SourceSingletonService extends ChannelInitializer<SocketChannel> implements ClusterSingletonService {
38 private static final Logger LOG = LoggerFactory.getLogger(SourceSingletonService.class);
39 private static final ServiceGroupIdentifier SGID =
40 ServiceGroupIdentifier.create(SourceSingletonService.class.getName());
42 private final BootstrapSupport bootstrapSupport;
43 private final DOMDataTreeChangeService dtcs;
44 private final int listenPort;
47 private final Collection<SocketChannel> children = new HashSet<>();
49 private Channel serverChannel;
51 SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs,
52 final int listenPort) {
53 this.bootstrapSupport = requireNonNull(bootstrapSupport);
54 this.dtcs = requireNonNull(dtcs);
55 this.listenPort = listenPort;
59 public ServiceGroupIdentifier getIdentifier() {
64 public synchronized void instantiateServiceInstance() {
65 final ChannelFuture future = bootstrapSupport.newServerBootstrap()
66 .option(ChannelOption.SO_BACKLOG, 3)
67 .childOption(ChannelOption.SO_KEEPALIVE, true)
73 } catch (InterruptedException e) {
74 throw new IllegalStateException("Failed to bind port " + listenPort, e);
77 serverChannel = future.channel();
78 LOG.info("Replication source started on port {}", listenPort);
82 public synchronized ListenableFuture<?> closeServiceInstance() {
83 LOG.info("Replication source on port {} shutting down", listenPort);
85 final List<ListenableFuture<Void>> futures = new ArrayList<>();
87 // Close server channel
88 futures.add(closeChannel(serverChannel));
91 // Close all child channels
92 for (SocketChannel channel : children) {
93 futures.add(closeChannel(channel));
97 final ListenableFuture<?> ret = Futures.nonCancellationPropagating(Futures.successfulAsList(futures));
98 ret.addListener(() -> {
99 LOG.info("Replication source on port {} shut down", listenPort);
100 }, MoreExecutors.directExecutor());
105 public synchronized void initChannel(final SocketChannel ch) {
106 if (serverChannel == null) {
107 LOG.debug("Channel {} established while shutting down, closing it", ch);
113 .addLast("frameDecoder", new MessageFrameDecoder())
114 .addLast("requestHandler", new SourceRequestHandler(dtcs))
115 .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current()))
116 .addLast("frameEncoder", MessageFrameEncoder.instance());
119 LOG.debug("Channel {} established", ch);
122 private static ListenableFuture<Void> closeChannel(final Channel ch) {
123 final SettableFuture<Void> ret = SettableFuture.create();
124 ch.closeFuture().addListener(chf -> {
125 final Throwable cause = chf.cause();
127 ret.setException(cause);