import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
-import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.util.ListenerRegistry;
import org.opendaylight.yangtools.util.concurrent.FluentFutures;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final ScheduledThreadPoolExecutor observer;
private final ExecutorService executor;
- private volatile Multimap<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> listeners =
+ private volatile Multimap<Absolute, AbstractListenerRegistration<? extends DOMNotificationListener>> listeners =
ImmutableMultimap.of();
@VisibleForTesting
@Override
public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
- final T listener, final Collection<SchemaPath> types) {
- final ListenerRegistration<T> reg = new AbstractListenerRegistration<T>(listener) {
+ final T listener, final Collection<Absolute> types) {
+ final AbstractListenerRegistration<T> reg = new AbstractListenerRegistration<>(listener) {
@Override
protected void removeRegistration() {
synchronized (DOMNotificationRouter.this) {
};
if (!types.isEmpty()) {
- final Builder<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> b =
+ final Builder<Absolute, AbstractListenerRegistration<? extends DOMNotificationListener>> b =
ImmutableMultimap.builder();
b.putAll(listeners);
- for (final SchemaPath t : types) {
+ for (final Absolute t : types) {
b.put(t, reg);
}
return reg;
}
- @Override
- public <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
- final T listener, final SchemaPath... types) {
- return registerNotificationListener(listener, Arrays.asList(types));
- }
-
/**
* Swaps registered listeners and triggers notification update.
*
* @param newListeners is used to notify listenerTypes changed
*/
private void replaceListeners(
- final Multimap<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> newListeners) {
+ final Multimap<Absolute, AbstractListenerRegistration<? extends DOMNotificationListener>> newListeners) {
listeners = newListeners;
notifyListenerTypesChanged(newListeners.keySet());
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private void notifyListenerTypesChanged(final Set<SchemaPath> typesAfter) {
+ private void notifyListenerTypesChanged(final Set<Absolute> typesAfter) {
final List<? extends DOMNotificationSubscriptionListener> listenersAfter =
- subscriptionListeners.getRegistrations().stream().map(ListenerRegistration::getInstance)
- .collect(ImmutableList.toImmutableList());
+ subscriptionListeners.streamListeners().collect(ImmutableList.toImmutableList());
executor.execute(() -> {
for (final DOMNotificationSubscriptionListener subListener : listenersAfter) {
try {
@Override
public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
final L listener) {
- final Set<SchemaPath> initialTypes = listeners.keySet();
+ final Set<Absolute> initialTypes = listeners.keySet();
executor.execute(() -> listener.onSubscriptionChanged(initialTypes));
return subscriptionListeners.register(listener);
}
private ListenableFuture<Void> publish(final long seq, final DOMNotification notification,
- final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+ final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
final DOMNotificationRouterEvent event = disruptor.get(seq);
final ListenableFuture<Void> future = event.initialize(notification, subscribers);
disruptor.getRingBuffer().publish(seq);
@Override
public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
throws InterruptedException {
- final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers =
+ final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
listeners.get(notification.getType());
if (subscribers.isEmpty()) {
return NO_LISTENERS;
@SuppressWarnings("checkstyle:IllegalCatch")
@VisibleForTesting
ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
- final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+ final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
final long seq;
try {
seq = disruptor.getRingBuffer().tryNext();
@Override
public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
- final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers =
+ final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
listeners.get(notification.getType());
if (subscribers.isEmpty()) {
return NO_LISTENERS;
@Override
public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
final TimeUnit unit) throws InterruptedException {
- final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers =
+ final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
listeners.get(notification.getType());
if (subscribers.isEmpty()) {
return NO_LISTENERS;
}
@VisibleForTesting
- Multimap<SchemaPath, ?> listeners() {
+ Multimap<Absolute, ?> listeners() {
return listeners;
}