Refactor EventFormatter interface
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / Subscribers.java
1 /*
2  * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.VerifyException;
13 import com.google.common.collect.ArrayListMultimap;
14 import com.google.common.collect.ImmutableListMultimap;
15 import com.google.common.collect.ListMultimap;
16 import java.time.Instant;
17 import org.eclipse.jdt.annotation.NonNull;
18 import org.eclipse.jdt.annotation.Nullable;
19 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 /**
24  * A set of subscribers attached to an {@link AbstractStream}. This is an immutable structure, which can be updated
25  * through a copy-on-writer process driven by {@link #add(Subscriber)} and {@link #remove(Subscriber)}.
26  *
27  * @param <T> event type
28  */
29 abstract sealed class Subscribers<T> {
30     private static final class Empty<T> extends Subscribers<T> {
31         static final @NonNull Empty<?> INSTANCE = new Empty<>();
32
33         @Override
34         Subscribers<T> add(final Subscriber<T> toAdd) {
35             return new Single<>(toAdd);
36         }
37
38         @Override
39         Subscribers<T> remove(final Subscriber<?> toRemove) {
40             return this;
41         }
42
43         @Override
44         void endOfStream() {
45             // No-op
46         }
47
48         @Override
49         void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
50             // No-op
51         }
52     }
53
54     private static final class Single<T> extends Subscribers<T> {
55         private final Subscriber<T> subscriber;
56
57         Single(final Subscriber<T> subscriber) {
58             this.subscriber = requireNonNull(subscriber);
59         }
60
61         @Override
62         Subscribers<T> add(final Subscriber<T> toAdd) {
63             return new Multiple<>(ImmutableListMultimap.of(
64                 subscriber.formatter(), subscriber,
65                 toAdd.formatter(), toAdd));
66         }
67
68         @Override
69         Subscribers<T> remove(final Subscriber<?> toRemove) {
70             return toRemove.equals(subscriber) ? null : this;
71         }
72
73         @Override
74         void endOfStream() {
75             subscriber.handler().endOfStream();
76         }
77
78         @Override
79         void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
80             final var formatted = format(subscriber.formatter(), modelContext, input, now);
81             if (formatted != null) {
82                 subscriber.handler().sendDataMessage(formatted);
83             }
84         }
85     }
86
87     private static final class Multiple<T> extends Subscribers<T> {
88         private final ImmutableListMultimap<EventFormatter<T>, Subscriber<T>> subscribers;
89
90         Multiple(final ListMultimap<EventFormatter<T>, Subscriber<T>> subscribers) {
91             this.subscribers = ImmutableListMultimap.copyOf(subscribers);
92         }
93
94         @Override
95         Subscribers<T> add(final Subscriber<T> toAdd) {
96             final var newSubscribers = ArrayListMultimap.create(subscribers);
97             newSubscribers.put(toAdd.formatter(), toAdd);
98             return new Multiple<>(newSubscribers);
99         }
100
101         @Override
102         Subscribers<T> remove(final Subscriber<?> toRemove) {
103             final var newSubscribers = ArrayListMultimap.create(subscribers);
104             return newSubscribers.remove(toRemove.formatter(), toRemove) ? switch (newSubscribers.size()) {
105                 case 0 -> throw new VerifyException("Unexpected empty subscribers");
106                 case 1 -> new Single<>(newSubscribers.values().iterator().next());
107                 default -> new Multiple<>(newSubscribers);
108             } : this;
109         }
110
111         @Override
112         void endOfStream() {
113             subscribers.forEach((formatter, subscriber) -> subscriber.handler().endOfStream());
114         }
115
116         @Override
117         void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
118             for (var entry : subscribers.asMap().entrySet()) {
119                 final var formatted = format(entry.getKey(), modelContext, input, now);
120                 if (formatted != null) {
121                     for (var subscriber : entry.getValue()) {
122                         subscriber.handler().sendDataMessage(formatted);
123                     }
124                 }
125             }
126         }
127     }
128
129     private static final Logger LOG = LoggerFactory.getLogger(Subscribers.class);
130
131     private Subscribers() {
132         // Hidden on purpose
133     }
134
135     /**
136      * Return an An empty set of subscribers. This is the initial state of {@link RestconfStream}, waiting for the first
137      * subscriber to appear.
138      *
139      * @param <T> event type
140      * @return An empty {@link Subscribers} file
141      */
142     @SuppressWarnings("unchecked")
143     static <T> @NonNull Subscribers<T> empty() {
144         return (Subscribers<T>) Empty.INSTANCE;
145     }
146
147     /**
148      * Add a new subscriber to this file.
149      *
150      * @param toAdd subscriber to add
151      * @return A new {@link Subscribers} file
152      * @throws NullPointerException if {@code toAdd} is {@code null}
153      */
154     abstract @NonNull Subscribers<T> add(Subscriber<T> toAdd);
155
156     /**
157      * Remove a subscriber to this file.
158      *
159      * @param toRemove subscriber to add
160      * @return A new {@link Subscribers} file, or {@code null} if this file was not empty and it became empty
161      * @throws NullPointerException if {@code toRemove} is {@code null}
162      */
163     abstract @Nullable Subscribers<T> remove(Subscriber<?> toRemove);
164
165     /**
166      * Signal end-of-stream to all subscribers.
167      */
168     abstract void endOfStream();
169
170     /**
171      * Publish an event to all {@link Subscriber}s in this file.
172      *
173      * @param modelContext An {@link EffectiveModelContext} used to format the input
174      * @param input Input data
175      * @param now Current time
176      * @throws NullPointerException if any argument is {@code null}
177      */
178     abstract void publish(EffectiveModelContext modelContext, T input, Instant now);
179
180     @SuppressWarnings("checkstyle:illegalCatch")
181     private static <T> @Nullable String format(final EventFormatter<T> formatter,
182             final EffectiveModelContext modelContext, final T input, final Instant now) {
183         try {
184             return formatter.eventData(modelContext, input, now);
185         } catch (Exception e) {
186             LOG.warn("Failed to format", e);
187             return null;
188         }
189     }
190 }