2 * Copyright (c) 2022 Opendaylight, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.collect.ImmutableSet;
13 import java.util.concurrent.atomic.AtomicReference;
14 import org.eclipse.jdt.annotation.NonNull;
15 import org.opendaylight.mdsal.dom.api.DOMMountPointListener;
16 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
17 import org.opendaylight.mdsal.dom.api.DOMNotification;
18 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
19 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
20 import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Sink;
21 import org.opendaylight.yangtools.concepts.Registration;
22 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
23 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
24 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
29 * A {@link RestconfStream} reporting YANG notifications coming from a mounted device.
31 public final class DeviceNotificationSource extends AbstractNotificationSource implements DOMMountPointListener {
32 private static final Logger LOG = LoggerFactory.getLogger(DeviceNotificationSource.class);
34 private final AtomicReference<Runnable> onRemoved = new AtomicReference<>();
35 private final DOMMountPointService mountPointService;
36 private final YangInstanceIdentifier devicePath;
38 DeviceNotificationSource(final DOMMountPointService mountPointService, final YangInstanceIdentifier devicePath) {
39 this.mountPointService = requireNonNull(mountPointService);
40 this.devicePath = requireNonNull(devicePath);
44 public void onMountPointCreated(final YangInstanceIdentifier path) {
49 public void onMountPointRemoved(final YangInstanceIdentifier path) {
50 if (devicePath.equals(path)) {
51 // The mount point went away, run cleanup
57 protected Registration start(final Sink<DOMNotification> sink) {
58 final var optMount = mountPointService.getMountPoint(devicePath);
59 if (optMount.isEmpty()) {
60 LOG.info("Mount point {} not present, terminating", devicePath);
61 return endOfStream(sink);
64 final var mount = optMount.orElseThrow();
65 final var optSchema = mount.getService(DOMSchemaService.class);
66 if (optSchema.isEmpty()) {
67 LOG.info("Mount point {} does not have a DOMSchemaService, terminating", devicePath);
68 return endOfStream(sink);
71 final var optNotification = mount.getService(DOMNotificationService.class);
72 if (optNotification.isEmpty()) {
73 LOG.info("Mount point {} does not have a DOMNotificationService, terminating", devicePath);
74 return endOfStream(sink);
77 // Find all notifications
78 final var modelContext = optSchema.orElseThrow().getGlobalContext();
79 final var paths = modelContext.getModuleStatements().values().stream()
80 .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
81 .map(notification -> Absolute.of(notification.argument()))
82 .collect(ImmutableSet.toImmutableSet());
83 if (paths.isEmpty()) {
84 LOG.info("Mount point {} does not advertize any YANG notifications, terminating", devicePath);
85 return endOfStream(sink);
88 final var notifReg = optNotification.orElseThrow().registerNotificationListener(
89 new Listener(sink, () -> modelContext), paths);
91 // Notifications are running now.
92 // If we get removed we need to close those. But since we are running lockless and we need to set up
93 // the listener, which will own its cleanup.
94 final Runnable closeNotif = () -> {
98 onRemoved.set(closeNotif);
100 // onMountPointRemoved() may be invoked asynchronously before this method returns.
101 // Therefore we perform a CAS replacement routine of the close routine:
102 // - if it succeeds onRemoved's Runnable covers all required cleanup
103 // - if it does not, it means state has already been cleaned up by onMountPointRemoved()
104 final var mountReg = mountPointService.registerProvisionListener(this);
105 final Runnable closeMount = () -> {
110 if (onRemoved.compareAndSet(closeNotif, closeMount)) {
111 // All set, cleanup() will handle the rest
112 return this::cleanup;
115 // Already removed, bail out, but do not signal endOfStream()
122 private static @NonNull Registration endOfStream(final Sink<DOMNotification> sink) {
123 // Something went wrong: signal end of stream and return a no-op registration
130 private void cleanup() {
131 final var runnable = onRemoved.getAndSet(null);
132 if (runnable != null) {