2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.replicate.netty;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import io.netty.channel.Channel;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelInitializer;
19 import io.netty.channel.ChannelOption;
20 import io.netty.channel.socket.SocketChannel;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashSet;
24 import java.util.List;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
27 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
28 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
29 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * Cluster Singleton Service handler for delta stream source. Responsible for starting/stopping the delta stream source
35 * for a particular port.
37 final class SourceSingletonService extends ChannelInitializer<SocketChannel> implements ClusterSingletonService {
38 private static final Logger LOG = LoggerFactory.getLogger(SourceSingletonService.class);
39 private static final ServiceGroupIdentifier SGID =
40 ServiceGroupIdentifier.create(SourceSingletonService.class.getName());
42 private final BootstrapSupport bootstrapSupport;
43 private final DOMDataTreeChangeService dtcs;
44 private final int listenPort;
47 private final Collection<SocketChannel> children = new HashSet<>();
49 private Channel serverChannel;
51 SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs,
52 final int listenPort) {
53 this.bootstrapSupport = requireNonNull(bootstrapSupport);
54 this.dtcs = requireNonNull(dtcs);
55 this.listenPort = listenPort;
56 LOG.info("Replication source on port {} waiting for cluster-wide mastership", listenPort);
60 public ServiceGroupIdentifier getIdentifier() {
65 public synchronized void instantiateServiceInstance() {
66 final ChannelFuture future = bootstrapSupport.newServerBootstrap()
67 .option(ChannelOption.SO_BACKLOG, 3)
68 .childOption(ChannelOption.SO_KEEPALIVE, true)
74 } catch (InterruptedException e) {
75 throw new IllegalStateException("Failed to bind port " + listenPort, e);
78 serverChannel = future.channel();
79 LOG.info("Replication source started on port {}", listenPort);
83 public synchronized ListenableFuture<?> closeServiceInstance() {
84 LOG.info("Replication source on port {} shutting down", listenPort);
86 final List<ListenableFuture<Void>> futures = new ArrayList<>();
88 // Close server channel
89 futures.add(closeChannel(serverChannel));
92 // Close all child channels
93 for (SocketChannel channel : children) {
94 futures.add(closeChannel(channel));
98 final ListenableFuture<?> ret = Futures.nonCancellationPropagating(Futures.successfulAsList(futures));
99 ret.addListener(() -> {
100 LOG.info("Replication source on port {} shut down", listenPort);
101 }, MoreExecutors.directExecutor());
106 public synchronized void initChannel(final SocketChannel ch) {
107 if (serverChannel == null) {
108 LOG.debug("Channel {} established while shutting down, closing it", ch);
114 .addLast("frameDecoder", new MessageFrameDecoder())
115 .addLast("requestHandler", new SourceRequestHandler(dtcs))
116 // Output, in reverse order
117 .addLast("frameEncoder", MessageFrameEncoder.INSTANCE)
118 .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current()));
121 LOG.info("Channel {} established", ch);
124 private static ListenableFuture<Void> closeChannel(final Channel ch) {
125 final SettableFuture<Void> ret = SettableFuture.create();
126 ch.closeFuture().addListener(chf -> {
127 final Throwable cause = chf.cause();
129 ret.setException(cause);