Expose streams with all supported encodings
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / server / spi / Subscribers.java
1 /*
2  * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.server.spi;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.VerifyException;
13 import com.google.common.collect.ArrayListMultimap;
14 import com.google.common.collect.ImmutableListMultimap;
15 import com.google.common.collect.ListMultimap;
16 import java.time.Instant;
17 import org.eclipse.jdt.annotation.NonNull;
18 import org.eclipse.jdt.annotation.Nullable;
19 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 /**
24  * A set of subscribers attached to an {@link AbstractStream}. This is an immutable structure, which can be updated
25  * through a copy-on-writer process driven by {@link #add(Subscriber)} and {@link #remove(Subscriber)}.
26  *
27  * @param <T> event type
28  */
29 abstract sealed class Subscribers<T> {
30     static final class Empty<T> extends Subscribers<T> {
31         private static final @NonNull Empty<?> INSTANCE = new Empty<>();
32
33         private Empty() {
34             // Hidden on purpose
35         }
36
37         @Override
38         Subscribers<T> add(final Subscriber<T> toAdd) {
39             return new Single<>(toAdd);
40         }
41
42         @Override
43         Subscribers<T> remove(final Subscriber<?> toRemove) {
44             return this;
45         }
46
47         @Override
48         void endOfStream() {
49             // No-op
50         }
51
52         @Override
53         void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
54             // No-op
55         }
56     }
57
58     private static final class Single<T> extends Subscribers<T> {
59         private final Subscriber<T> subscriber;
60
61         Single(final Subscriber<T> subscriber) {
62             this.subscriber = requireNonNull(subscriber);
63         }
64
65         @Override
66         Subscribers<T> add(final Subscriber<T> toAdd) {
67             return new Multiple<>(ImmutableListMultimap.of(
68                 subscriber.formatter(), subscriber,
69                 toAdd.formatter(), toAdd));
70         }
71
72         @Override
73         Subscribers<T> remove(final Subscriber<?> toRemove) {
74             return toRemove.equals(subscriber) ? null : this;
75         }
76
77         @Override
78         void endOfStream() {
79             subscriber.sender().endOfStream();
80         }
81
82         @Override
83         void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
84             final var formatted = format(subscriber.formatter(), modelContext, input, now);
85             if (formatted != null) {
86                 subscriber.sender().sendDataMessage(formatted);
87             }
88         }
89     }
90
91     private static final class Multiple<T> extends Subscribers<T> {
92         private final ImmutableListMultimap<EventFormatter<T>, Subscriber<T>> subscribers;
93
94         Multiple(final ListMultimap<EventFormatter<T>, Subscriber<T>> subscribers) {
95             this.subscribers = ImmutableListMultimap.copyOf(subscribers);
96         }
97
98         @Override
99         Subscribers<T> add(final Subscriber<T> toAdd) {
100             final var newSubscribers = ArrayListMultimap.create(subscribers);
101             newSubscribers.put(toAdd.formatter(), toAdd);
102             return new Multiple<>(newSubscribers);
103         }
104
105         @Override
106         Subscribers<T> remove(final Subscriber<?> toRemove) {
107             final var newSubscribers = ArrayListMultimap.create(subscribers);
108             return newSubscribers.remove(toRemove.formatter(), toRemove) ? switch (newSubscribers.size()) {
109                 case 0 -> throw new VerifyException("Unexpected empty subscribers");
110                 case 1 -> new Single<>(newSubscribers.values().iterator().next());
111                 default -> new Multiple<>(newSubscribers);
112             } : this;
113         }
114
115         @Override
116         void endOfStream() {
117             subscribers.forEach((formatter, subscriber) -> subscriber.sender().endOfStream());
118         }
119
120         @Override
121         void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
122             for (var entry : subscribers.asMap().entrySet()) {
123                 final var formatted = format(entry.getKey(), modelContext, input, now);
124                 if (formatted != null) {
125                     for (var subscriber : entry.getValue()) {
126                         subscriber.sender().sendDataMessage(formatted);
127                     }
128                 }
129             }
130         }
131     }
132
133     private static final Logger LOG = LoggerFactory.getLogger(Subscribers.class);
134
135     private Subscribers() {
136         // Hidden on purpose
137     }
138
139     /**
140      * Return an An empty set of subscribers. This is the initial state of {@link RestconfStream}, waiting for the first
141      * subscriber to appear.
142      *
143      * @param <T> event type
144      * @return An empty {@link Subscribers} file
145      */
146     @SuppressWarnings("unchecked")
147     static <T> org.opendaylight.restconf.server.spi.Subscribers<T> empty() {
148         return (Subscribers<T>) Empty.INSTANCE;
149     }
150
151     /**
152      * Add a new subscriber to this file.
153      *
154      * @param toAdd subscriber to add
155      * @return A new {@link Subscribers} file
156      * @throws NullPointerException if {@code toAdd} is {@code null}
157      */
158     abstract @NonNull Subscribers<T> add(Subscriber<T> toAdd);
159
160     /**
161      * Remove a subscriber to this file.
162      *
163      * @param toRemove subscriber to add
164      * @return A new {@link Subscribers} file, or {@code null} if this file was not empty and it became empty
165      * @throws NullPointerException if {@code toRemove} is {@code null}
166      */
167     abstract @Nullable Subscribers<T> remove(Subscriber<?> toRemove);
168
169     /**
170      * Signal end-of-stream to all subscribers.
171      */
172     abstract void endOfStream();
173
174     /**
175      * Publish an event to all {@link Subscriber}s in this file.
176      *
177      * @param modelContext An {@link EffectiveModelContext} used to format the input
178      * @param input Input data
179      * @param now Current time
180      * @throws NullPointerException if any argument is {@code null}
181      */
182     abstract void publish(EffectiveModelContext modelContext, T input, Instant now);
183
184     @SuppressWarnings("checkstyle:illegalCatch")
185     private static <T> @Nullable String format(final EventFormatter<T> formatter,
186             final EffectiveModelContext modelContext, final T input, final Instant now) {
187         try {
188             return formatter.eventData(modelContext, input, now);
189         } catch (Exception e) {
190             LOG.warn("Failed to format", e);
191             return null;
192         }
193     }
194 }