2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.dom.broker.impl;
10 import com.google.common.util.concurrent.ListenableFuture;
11 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
12 import java.time.Instant;
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.Date;
16 import java.util.concurrent.TimeUnit;
17 import org.opendaylight.controller.md.sal.dom.api.DOMEvent;
18 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
19 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
20 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
21 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
22 import org.opendaylight.controller.md.sal.dom.spi.DOMNotificationSubscriptionListener;
23 import org.opendaylight.controller.md.sal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
24 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
25 import org.opendaylight.yangtools.concepts.ListenerRegistration;
26 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
27 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
30 * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
31 * routing of notifications from publishers to subscribers.
34 * Internal implementation works by allocating a two-handler Disruptor. The first handler delivers notifications
35 * to subscribed listeners and the second one notifies whoever may be listening on the returned future. Registration
36 * state tracking is performed by a simple immutable multimap -- when a registration or unregistration occurs we
37 * re-generate the entire map from scratch and set it atomically. While registrations/unregistrations synchronize
38 * on this instance, notifications do not take any locks here.
41 * The fully-blocking {@link #offerNotification(DOMNotification)}
42 * is realized using the Disruptor's native operations. The bounded-blocking
43 * {@link #offerNotification(DOMNotification, long, TimeUnit)}
44 * is realized by arming a background wakeup interrupt.
46 @SuppressFBWarnings(value = "NP_NONNULL_PARAM_VIOLATION", justification = "Void is the only allowed value")
47 public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
48 DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
50 private final org.opendaylight.mdsal.dom.api.DOMNotificationService delegateNotificationService;
51 private final org.opendaylight.mdsal.dom.api.DOMNotificationPublishService delegateNotificationPublishService;
52 private final org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistry delegateListenerRegistry;
54 private DOMNotificationRouter(
55 org.opendaylight.mdsal.dom.api.DOMNotificationService delegateNotificationService,
56 org.opendaylight.mdsal.dom.api.DOMNotificationPublishService delegateNotificationPublishService,
57 org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistry delegateListenerRegistry) {
58 this.delegateNotificationService = delegateNotificationService;
59 this.delegateNotificationPublishService = delegateNotificationPublishService;
60 this.delegateListenerRegistry = delegateListenerRegistry;
63 public static DOMNotificationRouter create(final int queueDepth) {
64 final org.opendaylight.mdsal.dom.broker.DOMNotificationRouter delegate =
65 org.opendaylight.mdsal.dom.broker.DOMNotificationRouter.create(queueDepth);
66 return create(delegate, delegate, delegate);
69 public static DOMNotificationRouter create(final int queueDepth, final long spinTime, final long parkTime,
70 final TimeUnit unit) {
71 final org.opendaylight.mdsal.dom.broker.DOMNotificationRouter delegate =
72 org.opendaylight.mdsal.dom.broker.DOMNotificationRouter.create(queueDepth, spinTime, parkTime, unit);
73 return create(delegate, delegate, delegate);
76 public static DOMNotificationRouter create(
77 final org.opendaylight.mdsal.dom.api.DOMNotificationService delegateNotificationService,
78 final org.opendaylight.mdsal.dom.api.DOMNotificationPublishService delegateNotificationPublishService,
79 final org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistry delegateListenerRegistry) {
80 return new DOMNotificationRouter(delegateNotificationService, delegateNotificationPublishService,
81 delegateListenerRegistry);
85 public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
86 final T listener, final Collection<SchemaPath> types) {
87 org.opendaylight.mdsal.dom.api.DOMNotificationListener delegateListener = notification -> {
88 if (notification instanceof DOMNotification) {
89 listener.onNotification((DOMNotification)notification);
93 if (notification instanceof org.opendaylight.mdsal.dom.api.DOMEvent) {
94 listener.onNotification(new DefaultDOMEvent(notification,
95 (org.opendaylight.mdsal.dom.api.DOMEvent)notification));
99 listener.onNotification(new DefaultDOMNotification(notification));
102 final ListenerRegistration<org.opendaylight.mdsal.dom.api.DOMNotificationListener> reg =
103 delegateNotificationService.registerNotificationListener(delegateListener, types);
105 return new AbstractListenerRegistration<T>(listener) {
107 protected void removeRegistration() {
114 public <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(final T listener,
115 final SchemaPath... types) {
116 return registerNotificationListener(listener, Arrays.asList(types));
120 public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
122 return delegateListenerRegistry.registerSubscriptionListener(listener);
126 public ListenableFuture<?> putNotification(final DOMNotification notification) throws InterruptedException {
127 return delegateNotificationPublishService.putNotification(notification);
131 public ListenableFuture<?> offerNotification(final DOMNotification notification) {
132 return delegateNotificationPublishService.offerNotification(notification);
136 public ListenableFuture<?> offerNotification(final DOMNotification notification, final long timeout,
137 final TimeUnit unit) throws InterruptedException {
138 return delegateNotificationPublishService.offerNotification(notification, timeout, unit);
142 public void close() {
145 private static class DefaultDOMNotification implements DOMNotification {
146 private final SchemaPath schemaPath;
147 private final ContainerNode body;
149 DefaultDOMNotification(org.opendaylight.mdsal.dom.api.DOMNotification from) {
150 this.schemaPath = from.getType();
151 this.body = from.getBody();
155 public SchemaPath getType() {
160 public ContainerNode getBody() {
165 private static class DefaultDOMEvent extends DefaultDOMNotification implements DOMEvent {
166 private final Date eventTime;
168 DefaultDOMEvent(org.opendaylight.mdsal.dom.api.DOMNotification fromNotification,
169 org.opendaylight.mdsal.dom.api.DOMEvent fromEvent) {
170 super(fromNotification);
171 final Instant eventInstant = fromEvent.getEventInstant();
172 this.eventTime = eventInstant != null ? Date.from(eventInstant) : null;
176 public Date getEventTime() {