2 * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.transport.api;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.MoreObjects.ToStringHelper;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import com.google.common.util.concurrent.SettableFuture;
18 import io.netty.util.concurrent.Future;
19 import java.util.ArrayList;
20 import java.util.HashSet;
22 import org.checkerframework.checker.lock.qual.GuardedBy;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.opendaylight.yangtools.yang.common.Empty;
27 * Convenience base class for {@link TransportStack} implementations. It mediates atomic idempotence of
28 * {@link #shutdown()}.
30 * @param <C> associated {@link TransportChannel} type
32 public abstract class AbstractTransportStack<C extends TransportChannel> implements TransportStack {
33 private final @NonNull TransportChannelListener listener;
36 * Polymorphic state. It can be in one of four states:
38 * <li>{@code null} when there is no attached transport channel,</li>
39 * <li>a {@link TransportChannel} instance when there is exactly one attached transport channel,</li>
40 * <li>a {@link Set} holding multiple attached transport channels,</li>
41 * <li>a {@link ListenableFuture} completing when shutdown is complete
47 protected AbstractTransportStack(final TransportChannelListener listener) {
48 this.listener = requireNonNull(listener);
52 @SuppressWarnings("unchecked")
53 public final ListenableFuture<Empty> shutdown() {
54 final SettableFuture<Empty> future;
55 final Set<TransportChannel> channels;
59 if (local instanceof ListenableFuture) {
60 // Already shutting down, no-op
61 return (ListenableFuture<Empty>) local;
66 } else if (local instanceof Set) {
67 channels = (Set<TransportChannel>) local;
68 } else if (local instanceof TransportChannel tc) {
69 channels = Set.of(tc);
71 throw new IllegalStateException("Unexpected state " + local);
73 state = future = SettableFuture.create();
76 final var futures = new ArrayList<ListenableFuture<?>>(channels.size() + 1);
77 futures.add(startShutdown());
78 for (var channel : channels) {
79 futures.add(toListenableFuture(channel.channel().close()));
81 Futures.whenAllComplete(futures).run(() -> future.set(Empty.value()), MoreExecutors.directExecutor());
85 protected abstract @NonNull ListenableFuture<Empty> startShutdown();
87 protected final void addTransportChannel(final @NonNull C channel) {
88 // Careful stepping here so we do not invoke callbacks while holding a lock...
89 final var ch = channel.channel();
91 // The channel is tracked in state, make sure we remove it when it goes away. Invoke the user listener only
92 // after that, so our listener fires first (and the user does not have a chance to close() before that).
93 ch.closeFuture().addListener(ignored -> remove(channel));
94 listener.onTransportChannelEstablished(channel);
96 // We are already shutting down, just close the channel
101 protected final void notifyTransportChannelFailed(final @NonNull Throwable cause) {
102 listener.onTransportChannelFailed(cause);
106 public final String toString() {
107 return addToStringAttributes(MoreObjects.toStringHelper(this).omitNullValues()).toString();
110 protected synchronized ToStringHelper addToStringAttributes(final ToStringHelper helper) {
111 return helper.add("listener", listener).add("state", state);
114 protected static final @NonNull ListenableFuture<Empty> toListenableFuture(final Future<?> nettyFuture) {
115 final var ret = SettableFuture.<Empty>create();
116 nettyFuture.addListener(future -> {
117 final var cause = future.cause();
119 ret.setException(cause);
121 ret.set(Empty.value());
127 @SuppressWarnings("unchecked")
128 private synchronized boolean add(final @NonNull TransportChannel channel) {
129 final var local = state;
130 if (local instanceof ListenableFuture) {
131 // Already shutting down
135 // First session, simple
137 } else if (local instanceof Set) {
138 ((Set<TransportChannel>) local).add(channel);
139 } else if (local instanceof TransportChannel tc) {
140 final var set = new HashSet<TransportChannel>(4);
145 throw new IllegalStateException("Unhandled state " + local);
150 private synchronized void remove(final @NonNull TransportChannel channel) {
151 final var local = state;
152 if (local == null || local instanceof ListenableFuture) {
153 // No recorded channel or we are already shutting down
157 if (local.equals(channel)) {
158 // Single channel, go back to null
160 } else if (local instanceof Set<?> set) {
161 // Multiple channels, let's just remove it. Note this does not collapse the set if there's just one
162 // remaining session -- the chances are we will go back to more than one soon
165 throw new IllegalStateException("Unhandled state " + local);