2d2b35ecfb851f5c08fb06511e9292063459fc52
[mdsal.git] / replicate / mdsal-replicate-netty / src / main / java / org / opendaylight / mdsal / replicate / netty / SourceSingletonService.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.replicate.netty;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import io.netty.channel.Channel;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelInitializer;
19 import io.netty.channel.ChannelOption;
20 import io.netty.channel.socket.SocketChannel;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashSet;
24 import java.util.List;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
27 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
28 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
29 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * Cluster Singleton Service handler for delta stream source. Responsible for starting/stopping the delta stream source
35  * for a particular port.
36  */
37 final class SourceSingletonService extends ChannelInitializer<SocketChannel> implements ClusterSingletonService {
38     private static final Logger LOG = LoggerFactory.getLogger(SourceSingletonService.class);
39     private static final ServiceGroupIdentifier SGID =
40             ServiceGroupIdentifier.create(SourceSingletonService.class.getName());
41
42     private final BootstrapSupport bootstrapSupport;
43     private final DOMDataTreeChangeService dtcs;
44     private final int listenPort;
45
46     @GuardedBy("this")
47     private final Collection<SocketChannel> children = new HashSet<>();
48     @GuardedBy("this")
49     private Channel serverChannel;
50
51     SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs,
52             final int listenPort) {
53         this.bootstrapSupport = requireNonNull(bootstrapSupport);
54         this.dtcs = requireNonNull(dtcs);
55         this.listenPort = listenPort;
56         LOG.info("Replication source on port {} waiting for cluster-wide mastership", listenPort);
57     }
58
59     @Override
60     public ServiceGroupIdentifier getIdentifier() {
61         return SGID;
62     }
63
64     @Override
65     public synchronized void instantiateServiceInstance() {
66         final ChannelFuture future = bootstrapSupport.newServerBootstrap()
67                 .option(ChannelOption.SO_BACKLOG, 3)
68                 .childOption(ChannelOption.SO_KEEPALIVE, true)
69                 .childHandler(this)
70                 .bind(listenPort);
71
72         try {
73             future.sync();
74         } catch (InterruptedException e) {
75             throw new IllegalStateException("Failed to bind port " + listenPort, e);
76         }
77
78         serverChannel = future.channel();
79         LOG.info("Replication source started on port {}", listenPort);
80     }
81
82     @Override
83     public synchronized ListenableFuture<?> closeServiceInstance() {
84         LOG.info("Replication source on port {} shutting down", listenPort);
85
86         final List<ListenableFuture<Void>> futures = new ArrayList<>();
87
88         // Close server channel
89         futures.add(closeChannel(serverChannel));
90         serverChannel = null;
91
92         // Close all child channels
93         for (SocketChannel channel : children) {
94             futures.add(closeChannel(channel));
95         }
96         children.clear();
97
98         final ListenableFuture<?> ret = Futures.nonCancellationPropagating(Futures.successfulAsList(futures));
99         ret.addListener(() -> {
100             LOG.info("Replication source on port {} shut down", listenPort);
101         }, MoreExecutors.directExecutor());
102         return ret;
103     }
104
105     @Override
106     public synchronized void initChannel(final SocketChannel ch) {
107         if (serverChannel == null) {
108             LOG.debug("Channel {} established while shutting down, closing it", ch);
109             ch.close();
110             return;
111         }
112
113         ch.pipeline()
114             .addLast("frameDecoder", new MessageFrameDecoder())
115             .addLast("requestHandler", new SourceRequestHandler(dtcs))
116             // Output, in reverse order
117             .addLast("frameEncoder", MessageFrameEncoder.INSTANCE)
118             .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current()));
119         children.add(ch);
120
121         LOG.info("Channel {} established", ch);
122     }
123
124     private static ListenableFuture<Void> closeChannel(final Channel ch) {
125         final SettableFuture<Void> ret = SettableFuture.create();
126         ch.closeFuture().addListener(chf -> {
127             final Throwable cause = chf.cause();
128             if (cause != null) {
129                 ret.setException(cause);
130             } else {
131                 ret.set(null);
132             }
133         });
134
135         ch.close();
136         return ret;
137     }
138 }