2 * Copyright (c) 2022 Opendaylight, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.server.mdsal.streams.devnotif;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.collect.ImmutableSet;
13 import java.util.concurrent.atomic.AtomicReference;
14 import org.eclipse.jdt.annotation.NonNull;
15 import org.opendaylight.mdsal.dom.api.DOMMountPointListener;
16 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
17 import org.opendaylight.mdsal.dom.api.DOMNotification;
18 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
19 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
20 import org.opendaylight.restconf.server.mdsal.streams.notif.AbstractNotificationSource;
21 import org.opendaylight.restconf.server.spi.RestconfStream;
22 import org.opendaylight.restconf.server.spi.RestconfStream.Sink;
23 import org.opendaylight.yangtools.concepts.Registration;
24 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
25 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
26 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
31 * A {@link RestconfStream} reporting YANG notifications coming from a mounted device.
33 final class DeviceNotificationSource extends AbstractNotificationSource implements DOMMountPointListener {
34 private static final Logger LOG = LoggerFactory.getLogger(DeviceNotificationSource.class);
36 private final AtomicReference<Runnable> onRemoved = new AtomicReference<>();
37 private final DOMMountPointService mountPointService;
38 private final YangInstanceIdentifier devicePath;
40 DeviceNotificationSource(final DOMMountPointService mountPointService, final YangInstanceIdentifier devicePath) {
41 this.mountPointService = requireNonNull(mountPointService);
42 this.devicePath = requireNonNull(devicePath);
46 public void onMountPointCreated(final YangInstanceIdentifier path) {
51 public void onMountPointRemoved(final YangInstanceIdentifier path) {
52 if (devicePath.equals(path)) {
53 // The mount point went away, run cleanup
59 protected Registration start(final Sink<DOMNotification> sink) {
60 final var optMount = mountPointService.getMountPoint(devicePath);
61 if (optMount.isEmpty()) {
62 LOG.info("Mount point {} not present, terminating", devicePath);
63 return endOfStream(sink);
66 final var mount = optMount.orElseThrow();
67 final var optSchema = mount.getService(DOMSchemaService.class);
68 if (optSchema.isEmpty()) {
69 LOG.info("Mount point {} does not have a DOMSchemaService, terminating", devicePath);
70 return endOfStream(sink);
73 final var optNotification = mount.getService(DOMNotificationService.class);
74 if (optNotification.isEmpty()) {
75 LOG.info("Mount point {} does not have a DOMNotificationService, terminating", devicePath);
76 return endOfStream(sink);
79 // Find all notifications
80 final var modelContext = optSchema.orElseThrow().getGlobalContext();
81 final var paths = modelContext.getModuleStatements().values().stream()
82 .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
83 .map(notification -> Absolute.of(notification.argument()))
84 .collect(ImmutableSet.toImmutableSet());
85 if (paths.isEmpty()) {
86 LOG.info("Mount point {} does not advertize any YANG notifications, terminating", devicePath);
87 return endOfStream(sink);
90 final var notifReg = optNotification.orElseThrow()
91 .registerNotificationListener(new Listener(sink, () -> modelContext), paths);
93 // Notifications are running now.
94 // If we get removed we need to close those. But since we are running lockless and we need to set up
95 // the listener, which will own its cleanup.
96 final Runnable closeNotif = () -> {
100 onRemoved.set(closeNotif);
102 // onMountPointRemoved() may be invoked asynchronously before this method returns.
103 // Therefore we perform a CAS replacement routine of the close routine:
104 // - if it succeeds onRemoved's Runnable covers all required cleanup
105 // - if it does not, it means state has already been cleaned up by onMountPointRemoved()
106 final var mountReg = mountPointService.registerProvisionListener(this);
107 final Runnable closeMount = () -> {
112 if (onRemoved.compareAndSet(closeNotif, closeMount)) {
113 // All set, cleanup() will handle the rest
114 return this::cleanup;
117 // Already removed, bail out, but do not signal endOfStream()
124 private static @NonNull Registration endOfStream(final Sink<DOMNotification> sink) {
125 // Something went wrong: signal end of stream and return a no-op registration
132 private void cleanup() {
133 final var runnable = onRemoved.getAndSet(null);
134 if (runnable != null) {