2 * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.base.VerifyException;
13 import com.google.common.collect.ArrayListMultimap;
14 import com.google.common.collect.ImmutableListMultimap;
15 import com.google.common.collect.ListMultimap;
16 import java.time.Instant;
17 import org.eclipse.jdt.annotation.NonNull;
18 import org.eclipse.jdt.annotation.Nullable;
19 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
24 * A set of subscribers attached to an {@link AbstractStream}. This is an immutable structure, which can be updated
25 * through a copy-on-writer process driven by {@link #add(Subscriber)} and {@link #remove(Subscriber)}.
27 * @param <T> event type
29 abstract sealed class Subscribers<T> {
30 private static final class Empty<T> extends Subscribers<T> {
31 static final @NonNull Empty<?> INSTANCE = new Empty<>();
34 Subscribers<T> add(final Subscriber<T> toAdd) {
35 return new Single<>(toAdd);
39 Subscribers<T> remove(final Subscriber<?> toRemove) {
49 void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
54 private static final class Single<T> extends Subscribers<T> {
55 private final Subscriber<T> subscriber;
57 Single(final Subscriber<T> subscriber) {
58 this.subscriber = requireNonNull(subscriber);
62 Subscribers<T> add(final Subscriber<T> toAdd) {
63 return new Multiple<>(ImmutableListMultimap.of(
64 subscriber.formatter(), subscriber,
65 toAdd.formatter(), toAdd));
69 Subscribers<T> remove(final Subscriber<?> toRemove) {
70 return toRemove.equals(subscriber) ? null : this;
75 subscriber.handler().endOfStream();
79 void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
80 final var formatted = format(subscriber.formatter(), modelContext, input, now);
81 if (formatted != null) {
82 subscriber.handler().sendDataMessage(formatted);
87 private static final class Multiple<T> extends Subscribers<T> {
88 private final ImmutableListMultimap<EventFormatter<T>, Subscriber<T>> subscribers;
90 Multiple(final ListMultimap<EventFormatter<T>, Subscriber<T>> subscribers) {
91 this.subscribers = ImmutableListMultimap.copyOf(subscribers);
95 Subscribers<T> add(final Subscriber<T> toAdd) {
96 final var newSubscribers = ArrayListMultimap.create(subscribers);
97 newSubscribers.put(toAdd.formatter(), toAdd);
98 return new Multiple<>(newSubscribers);
102 Subscribers<T> remove(final Subscriber<?> toRemove) {
103 final var newSubscribers = ArrayListMultimap.create(subscribers);
104 return newSubscribers.remove(toRemove.formatter(), toRemove) ? switch (newSubscribers.size()) {
105 case 0 -> throw new VerifyException("Unexpected empty subscribers");
106 case 1 -> new Single<>(newSubscribers.values().iterator().next());
107 default -> new Multiple<>(newSubscribers);
113 subscribers.forEach((formatter, subscriber) -> subscriber.handler().endOfStream());
117 void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
118 for (var entry : subscribers.asMap().entrySet()) {
119 final var formatted = format(entry.getKey(), modelContext, input, now);
120 if (formatted != null) {
121 for (var subscriber : entry.getValue()) {
122 subscriber.handler().sendDataMessage(formatted);
129 private static final Logger LOG = LoggerFactory.getLogger(Subscribers.class);
131 private Subscribers() {
136 * Return an An empty set of subscribers. This is the initial state of {@link RestconfStream}, waiting for the first
137 * subscriber to appear.
139 * @param <T> event type
140 * @return An empty {@link Subscribers} file
142 @SuppressWarnings("unchecked")
143 static <T> @NonNull Subscribers<T> empty() {
144 return (Subscribers<T>) Empty.INSTANCE;
148 * Add a new subscriber to this file.
150 * @param toAdd subscriber to add
151 * @return A new {@link Subscribers} file
152 * @throws NullPointerException if {@code toAdd} is {@code null}
154 abstract @NonNull Subscribers<T> add(Subscriber<T> toAdd);
157 * Remove a subscriber to this file.
159 * @param toRemove subscriber to add
160 * @return A new {@link Subscribers} file, or {@code null} if this file was not empty and it became empty
161 * @throws NullPointerException if {@code toRemove} is {@code null}
163 abstract @Nullable Subscribers<T> remove(Subscriber<?> toRemove);
166 * Signal end-of-stream to all subscribers.
168 abstract void endOfStream();
171 * Publish an event to all {@link Subscriber}s in this file.
173 * @param modelContext An {@link EffectiveModelContext} used to format the input
174 * @param input Input data
175 * @param now Current time
176 * @throws NullPointerException if any argument is {@code null}
178 abstract void publish(EffectiveModelContext modelContext, T input, Instant now);
180 @SuppressWarnings("checkstyle:illegalCatch")
181 private static <T> @Nullable String format(final EventFormatter<T> formatter,
182 final EffectiveModelContext modelContext, final T input, final Instant now) {
184 return formatter.eventData(modelContext, input, now);
185 } catch (Exception e) {
186 LOG.warn("Failed to format", e);