415637269b0277954f59210893b3ba7665b366c2
[mdsal.git] / replicate / mdsal-replicate-netty / src / main / java / org / opendaylight / mdsal / replicate / netty / SourceSingletonService.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.replicate.netty;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import io.netty.channel.Channel;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelInitializer;
19 import io.netty.channel.ChannelOption;
20 import io.netty.channel.socket.SocketChannel;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashSet;
24 import java.util.List;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
27 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
28 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
29 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * Cluster Singleton Service handler for delta stream source. Responsible for starting/stopping the delta stream source
35  * for a particular port.
36  */
37 final class SourceSingletonService extends ChannelInitializer<SocketChannel> implements ClusterSingletonService {
38     private static final Logger LOG = LoggerFactory.getLogger(SourceSingletonService.class);
39     private static final ServiceGroupIdentifier SGID =
40             ServiceGroupIdentifier.create(SourceSingletonService.class.getName());
41
42     private final BootstrapSupport bootstrapSupport;
43     private final DOMDataTreeChangeService dtcs;
44     private final int listenPort;
45
46     @GuardedBy("this")
47     private final Collection<SocketChannel> children = new HashSet<>();
48     @GuardedBy("this")
49     private Channel serverChannel;
50
51     SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs,
52             final int listenPort) {
53         this.bootstrapSupport = requireNonNull(bootstrapSupport);
54         this.dtcs = requireNonNull(dtcs);
55         this.listenPort = listenPort;
56     }
57
58     @Override
59     public ServiceGroupIdentifier getIdentifier() {
60         return SGID;
61     }
62
63     @Override
64     public synchronized void instantiateServiceInstance() {
65         final ChannelFuture future = bootstrapSupport.newServerBootstrap()
66                 .option(ChannelOption.SO_BACKLOG, 3)
67                 .childOption(ChannelOption.SO_KEEPALIVE, true)
68                 .childHandler(this)
69                 .bind(listenPort);
70
71         try {
72             future.sync();
73         } catch (InterruptedException e) {
74             throw new IllegalStateException("Failed to bind port " + listenPort, e);
75         }
76
77         serverChannel = future.channel();
78         LOG.info("Replication source started on port {}", listenPort);
79     }
80
81     @Override
82     public synchronized ListenableFuture<?> closeServiceInstance() {
83         LOG.info("Replication source on port {} shutting down", listenPort);
84
85         final List<ListenableFuture<Void>> futures = new ArrayList<>();
86
87         // Close server channel
88         futures.add(closeChannel(serverChannel));
89         serverChannel = null;
90
91         // Close all child channels
92         for (SocketChannel channel : children) {
93             futures.add(closeChannel(channel));
94         }
95         children.clear();
96
97         final ListenableFuture<?> ret = Futures.nonCancellationPropagating(Futures.successfulAsList(futures));
98         ret.addListener(() -> {
99             LOG.info("Replication source on port {} shut down", listenPort);
100         }, MoreExecutors.directExecutor());
101         return ret;
102     }
103
104     @Override
105     public synchronized void initChannel(final SocketChannel ch) {
106         if (serverChannel == null) {
107             LOG.debug("Channel {} established while shutting down, closing it", ch);
108             ch.close();
109             return;
110         }
111
112         ch.pipeline()
113             .addLast("frameDecoder", new MessageFrameDecoder())
114             .addLast("requestHandler", new SourceRequestHandler(dtcs))
115             .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current()))
116             .addLast("frameEncoder", MessageFrameEncoder.instance());
117         children.add(ch);
118
119         LOG.debug("Channel {} established", ch);
120     }
121
122     private static ListenableFuture<Void> closeChannel(final Channel ch) {
123         final SettableFuture<Void> ret = SettableFuture.create();
124         ch.closeFuture().addListener(chf -> {
125             final Throwable cause = chf.cause();
126             if (cause != null) {
127                 ret.setException(cause);
128             } else {
129                 ret.set(null);
130             }
131         });
132
133         ch.close();
134         return ret;
135     }
136 }