Expose streams with all supported encodings
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / DeviceNotificationSource.java
1 /*
2  * Copyright (c) 2022 Opendaylight, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.collect.ImmutableSet;
13 import java.util.concurrent.atomic.AtomicReference;
14 import org.eclipse.jdt.annotation.NonNull;
15 import org.opendaylight.mdsal.dom.api.DOMMountPointListener;
16 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
17 import org.opendaylight.mdsal.dom.api.DOMNotification;
18 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
19 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
20 import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Sink;
21 import org.opendaylight.yangtools.concepts.Registration;
22 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
23 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
24 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * A {@link RestconfStream} reporting YANG notifications coming from a mounted device.
30  */
31 public final class DeviceNotificationSource extends AbstractNotificationSource implements DOMMountPointListener {
32     private static final Logger LOG = LoggerFactory.getLogger(DeviceNotificationSource.class);
33
34     private final AtomicReference<Runnable> onRemoved = new AtomicReference<>();
35     private final DOMMountPointService mountPointService;
36     private final YangInstanceIdentifier devicePath;
37
38     DeviceNotificationSource(final DOMMountPointService mountPointService, final YangInstanceIdentifier devicePath) {
39         this.mountPointService = requireNonNull(mountPointService);
40         this.devicePath = requireNonNull(devicePath);
41     }
42
43     @Override
44     public void onMountPointCreated(final YangInstanceIdentifier path) {
45         // No-op
46     }
47
48     @Override
49     public void onMountPointRemoved(final YangInstanceIdentifier path) {
50         if (devicePath.equals(path)) {
51             // The mount point went away, run cleanup
52             cleanup();
53         }
54     }
55
56     @Override
57     protected Registration start(final Sink<DOMNotification> sink) {
58         final var optMount = mountPointService.getMountPoint(devicePath);
59         if (optMount.isEmpty()) {
60             LOG.info("Mount point {} not present, terminating", devicePath);
61             return endOfStream(sink);
62         }
63
64         final var mount = optMount.orElseThrow();
65         final var optSchema = mount.getService(DOMSchemaService.class);
66         if (optSchema.isEmpty()) {
67             LOG.info("Mount point {} does not have a DOMSchemaService, terminating", devicePath);
68             return endOfStream(sink);
69         }
70
71         final var optNotification = mount.getService(DOMNotificationService.class);
72         if (optNotification.isEmpty()) {
73             LOG.info("Mount point {} does not have a DOMNotificationService, terminating", devicePath);
74             return endOfStream(sink);
75         }
76
77         // Find all notifications
78         final var modelContext = optSchema.orElseThrow().getGlobalContext();
79         final var paths = modelContext.getModuleStatements().values().stream()
80             .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
81             .map(notification -> Absolute.of(notification.argument()))
82             .collect(ImmutableSet.toImmutableSet());
83         if (paths.isEmpty()) {
84             LOG.info("Mount point {} does not advertize any YANG notifications, terminating", devicePath);
85             return endOfStream(sink);
86         }
87
88         final var notifReg = optNotification.orElseThrow().registerNotificationListener(
89             new Listener(sink, () -> modelContext), paths);
90
91         // Notifications are running now.
92         // If we get removed we need to close those. But since we are running lockless and we need to set up
93         // the listener, which will own its cleanup.
94         final Runnable closeNotif = () -> {
95             notifReg.close();
96             sink.endOfStream();
97         };
98         onRemoved.set(closeNotif);
99
100         // onMountPointRemoved() may be invoked asynchronously before this method returns.
101         // Therefore we perform a CAS replacement routine of the close routine:
102         // - if it succeeds onRemoved's Runnable covers all required cleanup
103         // - if it does not, it means state has already been cleaned up by onMountPointRemoved()
104         final var mountReg = mountPointService.registerProvisionListener(this);
105         final Runnable closeMount = () -> {
106             notifReg.close();
107             sink.endOfStream();
108             mountReg.close();
109         };
110         if (onRemoved.compareAndSet(closeNotif, closeMount)) {
111             // All set, cleanup() will handle the rest
112             return this::cleanup;
113         }
114
115         // Already removed, bail out, but do not signal endOfStream()
116         mountReg.close();
117         return () -> {
118             // No-op
119         };
120     }
121
122     private static @NonNull Registration endOfStream(final Sink<DOMNotification> sink) {
123         // Something went wrong: signal end of stream and return a no-op registration
124         sink.endOfStream();
125         return () -> {
126             // No-op
127         };
128     }
129
130     private void cleanup() {
131         final var runnable = onRemoved.getAndSet(null);
132         if (runnable != null) {
133             runnable.run();
134         }
135     }
136 }