Add transport-{api,tcp}
[netconf.git] / transport / transport-api / src / main / java / org / opendaylight / netconf / transport / api / AbstractTransportStack.java
1 /*
2  * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.transport.api;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.MoreObjects.ToStringHelper;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import com.google.common.util.concurrent.SettableFuture;
18 import io.netty.util.concurrent.Future;
19 import java.util.ArrayList;
20 import java.util.HashSet;
21 import java.util.Set;
22 import org.checkerframework.checker.lock.qual.GuardedBy;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.opendaylight.yangtools.yang.common.Empty;
25
26 /**
27  * Convenience base class for {@link TransportStack} implementations. It mediates atomic idempotence of
28  * {@link #shutdown()}.
29  *
30  * @param <C> associated {@link TransportChannel} type
31  */
32 public abstract class AbstractTransportStack<C extends TransportChannel> implements TransportStack {
33     private final @NonNull TransportChannelListener listener;
34
35     /**
36      * Polymorphic state. It can be in one of four states:
37      * <ol>
38      *   <li>{@code null} when there is no attached transport channel,</li>
39      *   <li>a {@link TransportChannel} instance when there is exactly one attached transport channel,</li>
40      *   <li>a {@link Set} holding multiple attached transport channels,</li>
41      *   <li>a {@link ListenableFuture} completing when shutdown is complete
42      * </ol>
43      */
44     @GuardedBy("this")
45     private Object state;
46
47     protected AbstractTransportStack(final TransportChannelListener listener) {
48         this.listener = requireNonNull(listener);
49     }
50
51     @Override
52     @SuppressWarnings("unchecked")
53     public final ListenableFuture<Empty> shutdown() {
54         final SettableFuture<Empty> future;
55         final Set<TransportChannel> channels;
56
57         synchronized (this) {
58             var local = state;
59             if (local instanceof ListenableFuture) {
60                 // Already shutting down, no-op
61                 return (ListenableFuture<Empty>) local;
62             }
63
64             if (local == null) {
65                 channels = Set.of();
66             } else if (local instanceof Set) {
67                 channels = (Set<TransportChannel>) local;
68             } else if (local instanceof TransportChannel tc) {
69                 channels = Set.of(tc);
70             } else {
71                 throw new IllegalStateException("Unexpected state " + local);
72             }
73             state = future = SettableFuture.create();
74         }
75
76         final var futures = new ArrayList<ListenableFuture<?>>(channels.size() + 1);
77         futures.add(startShutdown());
78         for (var channel : channels) {
79             futures.add(toListenableFuture(channel.channel().close()));
80         }
81         Futures.whenAllComplete(futures).run(() -> future.set(Empty.value()), MoreExecutors.directExecutor());
82         return future;
83     }
84
85     protected abstract @NonNull ListenableFuture<Empty> startShutdown();
86
87     protected final void addTransportChannel(final @NonNull C channel) {
88         // Careful stepping here so we do not invoke callbacks while holding a lock...
89         final var ch = channel.channel();
90         if (add(channel)) {
91             // The channel is tracked in state, make sure we remove it when it goes away. Invoke the user listener only
92             // after that, so our listener fires first (and the user does not have a chance to close() before that).
93             ch.closeFuture().addListener(ignored -> remove(channel));
94             listener.onTransportChannelEstablished(channel);
95         } else {
96             // We are already shutting down, just close the channel
97             ch.close();
98         }
99     }
100
101     protected final void notifyTransportChannelFailed(final @NonNull Throwable cause) {
102         listener.onTransportChannelFailed(cause);
103     }
104
105     @Override
106     public final String toString() {
107         return addToStringAttributes(MoreObjects.toStringHelper(this).omitNullValues()).toString();
108     }
109
110     protected synchronized ToStringHelper addToStringAttributes(final ToStringHelper helper) {
111         return helper.add("listener", listener).add("state", state);
112     }
113
114     protected static final @NonNull ListenableFuture<Empty> toListenableFuture(final Future<?> nettyFuture) {
115         final var ret = SettableFuture.<Empty>create();
116         nettyFuture.addListener(future -> {
117             final var cause = future.cause();
118             if (cause != null) {
119                 ret.setException(cause);
120             } else {
121                 ret.set(Empty.value());
122             }
123         });
124         return ret;
125     }
126
127     @SuppressWarnings("unchecked")
128     private synchronized boolean add(final @NonNull TransportChannel channel) {
129         final var local = state;
130         if (local instanceof ListenableFuture) {
131             // Already shutting down
132             return false;
133         }
134         if (local == null) {
135             // First session, simple
136             state = channel;
137         } else if (local instanceof Set) {
138             ((Set<TransportChannel>) local).add(channel);
139         } else if (local instanceof TransportChannel tc) {
140             final var set = new HashSet<TransportChannel>(4);
141             set.add(tc);
142             set.add(channel);
143             state = set;
144         } else {
145             throw new IllegalStateException("Unhandled state " + local);
146         }
147         return true;
148     }
149
150     private synchronized void remove(final @NonNull TransportChannel channel) {
151         final var local = state;
152         if (local == null || local instanceof ListenableFuture) {
153             // No recorded channel or we are already shutting down
154             return;
155         }
156
157         if (local.equals(channel)) {
158             // Single channel, go back to null
159             state = null;
160         } else if (local instanceof Set<?> set) {
161             // Multiple channels, let's just remove it. Note this does not collapse the set if there's just one
162             // remaining session -- the chances are we will go back to more than one soon
163             set.remove(channel);
164         } else {
165             throw new IllegalStateException("Unhandled state " + local);
166         }
167     }
168 }