Introduce restconf.server.{api,spi,mdsal}
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / server / mdsal / streams / devnotif / DeviceNotificationSource.java
1 /*
2  * Copyright (c) 2022 Opendaylight, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.server.mdsal.streams.devnotif;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.collect.ImmutableSet;
13 import java.util.concurrent.atomic.AtomicReference;
14 import org.eclipse.jdt.annotation.NonNull;
15 import org.opendaylight.mdsal.dom.api.DOMMountPointListener;
16 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
17 import org.opendaylight.mdsal.dom.api.DOMNotification;
18 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
19 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
20 import org.opendaylight.restconf.server.mdsal.streams.notif.AbstractNotificationSource;
21 import org.opendaylight.restconf.server.spi.RestconfStream;
22 import org.opendaylight.restconf.server.spi.RestconfStream.Sink;
23 import org.opendaylight.yangtools.concepts.Registration;
24 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
25 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
26 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * A {@link RestconfStream} reporting YANG notifications coming from a mounted device.
32  */
33 final class DeviceNotificationSource extends AbstractNotificationSource implements DOMMountPointListener {
34     private static final Logger LOG = LoggerFactory.getLogger(DeviceNotificationSource.class);
35
36     private final AtomicReference<Runnable> onRemoved = new AtomicReference<>();
37     private final DOMMountPointService mountPointService;
38     private final YangInstanceIdentifier devicePath;
39
40     DeviceNotificationSource(final DOMMountPointService mountPointService, final YangInstanceIdentifier devicePath) {
41         this.mountPointService = requireNonNull(mountPointService);
42         this.devicePath = requireNonNull(devicePath);
43     }
44
45     @Override
46     public void onMountPointCreated(final YangInstanceIdentifier path) {
47         // No-op
48     }
49
50     @Override
51     public void onMountPointRemoved(final YangInstanceIdentifier path) {
52         if (devicePath.equals(path)) {
53             // The mount point went away, run cleanup
54             cleanup();
55         }
56     }
57
58     @Override
59     protected Registration start(final Sink<DOMNotification> sink) {
60         final var optMount = mountPointService.getMountPoint(devicePath);
61         if (optMount.isEmpty()) {
62             LOG.info("Mount point {} not present, terminating", devicePath);
63             return endOfStream(sink);
64         }
65
66         final var mount = optMount.orElseThrow();
67         final var optSchema = mount.getService(DOMSchemaService.class);
68         if (optSchema.isEmpty()) {
69             LOG.info("Mount point {} does not have a DOMSchemaService, terminating", devicePath);
70             return endOfStream(sink);
71         }
72
73         final var optNotification = mount.getService(DOMNotificationService.class);
74         if (optNotification.isEmpty()) {
75             LOG.info("Mount point {} does not have a DOMNotificationService, terminating", devicePath);
76             return endOfStream(sink);
77         }
78
79         // Find all notifications
80         final var modelContext = optSchema.orElseThrow().getGlobalContext();
81         final var paths = modelContext.getModuleStatements().values().stream()
82             .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
83             .map(notification -> Absolute.of(notification.argument()))
84             .collect(ImmutableSet.toImmutableSet());
85         if (paths.isEmpty()) {
86             LOG.info("Mount point {} does not advertize any YANG notifications, terminating", devicePath);
87             return endOfStream(sink);
88         }
89
90         final var notifReg = optNotification.orElseThrow()
91             .registerNotificationListener(new Listener(sink, () -> modelContext), paths);
92
93         // Notifications are running now.
94         // If we get removed we need to close those. But since we are running lockless and we need to set up
95         // the listener, which will own its cleanup.
96         final Runnable closeNotif = () -> {
97             notifReg.close();
98             sink.endOfStream();
99         };
100         onRemoved.set(closeNotif);
101
102         // onMountPointRemoved() may be invoked asynchronously before this method returns.
103         // Therefore we perform a CAS replacement routine of the close routine:
104         // - if it succeeds onRemoved's Runnable covers all required cleanup
105         // - if it does not, it means state has already been cleaned up by onMountPointRemoved()
106         final var mountReg = mountPointService.registerProvisionListener(this);
107         final Runnable closeMount = () -> {
108             notifReg.close();
109             sink.endOfStream();
110             mountReg.close();
111         };
112         if (onRemoved.compareAndSet(closeNotif, closeMount)) {
113             // All set, cleanup() will handle the rest
114             return this::cleanup;
115         }
116
117         // Already removed, bail out, but do not signal endOfStream()
118         mountReg.close();
119         return () -> {
120             // No-op
121         };
122     }
123
124     private static @NonNull Registration endOfStream(final Sink<DOMNotification> sink) {
125         // Something went wrong: signal end of stream and return a no-op registration
126         sink.endOfStream();
127         return () -> {
128             // No-op
129         };
130     }
131
132     private void cleanup() {
133         final var runnable = onRemoved.getAndSet(null);
134         if (runnable != null) {
135             runnable.run();
136         }
137     }
138 }