import org.opendaylight.controller.config.api.DependencyResolver;
import org.opendaylight.controller.config.api.ModuleIdentifier;
-import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
import org.opendaylight.controller.md.sal.binding.impl.BindingDOMNotificationPublishServiceAdapter;
+import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
import org.opendaylight.controller.sal.core.api.Broker;
public class BindingNotificationPublishAdapterModule extends AbstractBindingNotificationPublishAdapterModule {
- public BindingNotificationPublishAdapterModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver) {
+ public BindingNotificationPublishAdapterModule(final ModuleIdentifier identifier, final DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
- public BindingNotificationPublishAdapterModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver, BindingNotificationPublishAdapterModule oldModule, java.lang.AutoCloseable oldInstance) {
+ public BindingNotificationPublishAdapterModule(final ModuleIdentifier identifier, final DependencyResolver dependencyResolver, final BindingNotificationPublishAdapterModule oldModule, final java.lang.AutoCloseable oldInstance) {
super(identifier, dependencyResolver, oldModule, oldInstance);
}
final BindingToNormalizedNodeCodec codec = getBindingMappingServiceDependency();
final Broker.ProviderSession session = getDomAsyncBrokerDependency().registerProvider(new DummyDOMProvider());
final DOMNotificationPublishService publishService = session.getService(DOMNotificationPublishService.class);
- return new BindingDOMNotificationPublishServiceAdapter(codec.getCodecRegistry(), publishService);
+ return new BindingDOMNotificationPublishServiceAdapter(codec, publishService);
}
}
*/
package org.opendaylight.controller.config.yang.md.sal.binding.impl;
-import org.opendaylight.controller.md.sal.binding.compat.HydrogenNotificationBrokerImpl;
-
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.md.sal.binding.api.NotificationService;
import org.opendaylight.controller.md.sal.binding.compat.HeliumNotificationProviderServiceAdapter;
+import org.opendaylight.controller.md.sal.binding.compat.HeliumNotificationProviderServiceWithInterestListeners;
+import org.opendaylight.controller.md.sal.binding.compat.HydrogenNotificationBrokerImpl;
+import org.opendaylight.controller.md.sal.binding.impl.BindingDOMNotificationPublishServiceAdapter;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.md.sal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
/**
final NotificationService notificationService = getNotificationAdapterDependency();
if(notificationPublishService != null & notificationService != null) {
- return new HeliumNotificationProviderServiceAdapter(notificationPublishService, notificationService);
+ return createHeliumAdapter(notificationPublishService,notificationService);
}
/*
* we will have adapter implementation which will honor Helium
* threading model for notifications.
*/
-
return new HydrogenNotificationBrokerImpl(SingletonHolder.getDefaultNotificationExecutor());
}
+
+ private static AutoCloseable createHeliumAdapter(final NotificationPublishService publishService,
+ final NotificationService listenService) {
+ if(publishService instanceof BindingDOMNotificationPublishServiceAdapter) {
+ final BindingDOMNotificationPublishServiceAdapter casted = (BindingDOMNotificationPublishServiceAdapter) publishService;
+ final DOMNotificationPublishService domService = casted.getDomPublishService();
+ if(domService instanceof DOMNotificationSubscriptionListenerRegistry) {
+ final DOMNotificationSubscriptionListenerRegistry subsRegistry = (DOMNotificationSubscriptionListenerRegistry) domService;
+ return new HeliumNotificationProviderServiceWithInterestListeners(publishService, listenService, casted.getCodecRegistry(), subsRegistry);
+ }
+ }
+ return new HeliumNotificationProviderServiceAdapter(publishService, listenService);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.compat;
+
+import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.controller.md.sal.binding.api.NotificationService;
+import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
+import org.opendaylight.controller.md.sal.dom.spi.DOMNotificationSubscriptionListener;
+import org.opendaylight.controller.md.sal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.ListenerRegistry;
+import org.opendaylight.yangtools.yang.binding.Notification;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HeliumNotificationProviderServiceWithInterestListeners extends HeliumNotificationProviderServiceAdapter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HeliumNotificationProviderServiceWithInterestListeners.class);
+
+ private final ListenerRegistry<NotificationInterestListener> interestListeners = ListenerRegistry.create();
+ private final ListenerRegistration<Listener> domListener;
+ private final BindingToNormalizedNodeCodec codec;
+
+ public HeliumNotificationProviderServiceWithInterestListeners(
+ final NotificationPublishService publishService, final NotificationService listenService, final BindingToNormalizedNodeCodec codec, final DOMNotificationSubscriptionListenerRegistry registry) {
+ super(publishService, listenService);
+ this.codec = codec;
+ this.domListener = registry.registerSubscriptionListener(new Listener());
+ }
+
+ @Override
+ public ListenerRegistration<NotificationInterestListener> registerInterestListener(
+ final NotificationInterestListener listener) {
+ notifyListener(listener, translate(domListener.getInstance().getAllObserved()));
+ return interestListeners.register(listener);
+ }
+
+ private Set<Class<? extends Notification>> translate(final Set<SchemaPath> added) {
+ return codec.getNotificationClasses(added);
+ }
+
+ private void notifyAllListeners(final Set<SchemaPath> added) {
+ final Iterator<ListenerRegistration<NotificationInterestListener>> listeners = interestListeners.iterator();
+ if(listeners.hasNext()) {
+ final Set<Class<? extends Notification>> baEvent = translate(added);
+ while(listeners.hasNext()) {
+ final NotificationInterestListener listenerRef = listeners.next().getInstance();
+ try {
+ notifyListener(listenerRef,baEvent);
+ } catch (final Exception e) {
+ LOG.warn("Unhandled exception during invoking listener {}",e, listenerRef);
+ }
+ }
+ }
+ }
+
+ private void notifyListener(final NotificationInterestListener listener, final Set<Class<? extends Notification>> baEvent) {
+ for(final Class<? extends Notification> event: baEvent) {
+ listener.onNotificationSubscribtion(event);
+ }
+ }
+
+ private final class Listener implements DOMNotificationSubscriptionListener {
+
+ private volatile Set<SchemaPath> allObserved = Collections.emptySet();
+
+ @Override
+ public void onSubscriptionChanged(final Set<SchemaPath> currentTypes) {
+ final Set<SchemaPath> added = Sets.difference(currentTypes, allObserved).immutableCopy();
+ notifyAllListeners(added);
+ allObserved = Sets.union(allObserved, added).immutableCopy();
+ }
+
+ Set<SchemaPath> getAllObserved() {
+ return allObserved;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ domListener.close();
+ }
+}
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
import org.opendaylight.controller.md.sal.dom.api.DOMService;
-import org.opendaylight.yangtools.binding.data.codec.api.BindingNormalizedNodeSerializer;
import org.opendaylight.yangtools.yang.binding.Notification;
public class BindingDOMNotificationPublishServiceAdapter implements NotificationPublishService, AutoCloseable {
};
- private final BindingNormalizedNodeSerializer codecRegistry;
+ private final BindingToNormalizedNodeCodec codecRegistry;
private final DOMNotificationPublishService domPublishService;
- public BindingDOMNotificationPublishServiceAdapter(final BindingNormalizedNodeSerializer codecRegistry, final DOMNotificationPublishService domPublishService) {
- this.codecRegistry = codecRegistry;
+ public BindingDOMNotificationPublishServiceAdapter(final BindingToNormalizedNodeCodec codec, final DOMNotificationPublishService domPublishService) {
+ this.codecRegistry = codec;
this.domPublishService = domPublishService;
}
+ public BindingToNormalizedNodeCodec getCodecRegistry() {
+ return codecRegistry;
+ }
+
+ public DOMNotificationPublishService getDomPublishService() {
+ return domPublishService;
+ }
+
@Override
public void putNotification(final Notification notification) throws InterruptedException {
domPublishService.putNotification(toDomNotification(notification));
@Override
protected NotificationPublishService createInstance(final BindingToNormalizedNodeCodec codec,
final ClassToInstanceMap<DOMService> delegates) {
- final BindingNormalizedNodeSerializer codecReg = codec.getCodecRegistry();
final DOMNotificationPublishService domPublish = delegates.getInstance(DOMNotificationPublishService.class);
- return new BindingDOMNotificationPublishServiceAdapter(codecReg, domPublish);
+ return new BindingDOMNotificationPublishServiceAdapter(codec, domPublish);
}
}
import com.google.common.collect.ImmutableBiMap;
import java.lang.reflect.Method;
import java.util.AbstractMap.SimpleEntry;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationOperation;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
return new SimpleEntry<InstanceIdentifier<?>, BindingCodecTreeNode<?>>(bindingPath, codecContext);
}
+ public Set<Class<? extends Notification>> getNotificationClasses(final Set<SchemaPath> interested) {
+ final Set<Class<? extends Notification>> result = new HashSet<>();
+ final Set<NotificationDefinition> knownNotifications = runtimeContext.getSchemaContext().getNotifications();
+ for (final NotificationDefinition notification : knownNotifications) {
+ if (interested.contains(notification.getPath())) {
+ try {
+ result.add((Class<? extends Notification>) runtimeContext.getClassForSchema(notification));
+ } catch (final IllegalStateException e) {
+ // Ignore
+ }
+ }
+ }
+ return result;
+ }
+
}
}
public NotificationPublishService createNotificationPublishService() {
- return new BindingDOMNotificationPublishServiceAdapter(bindingToNormalized.getCodecRegistry(), domNotificationRouter);
+ return new BindingDOMNotificationPublishServiceAdapter(bindingToNormalized, domNotificationRouter);
}
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableMultimap.Builder;
import com.google.common.collect.Multimap;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.spi.DOMNotificationSubscriptionListener;
+import org.opendaylight.controller.md.sal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.ListenerRegistry;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
* are realized using the Disruptor's native operations. The bounded-blocking {@link #offerNotification(DOMNotification, long, TimeUnit)}
* is realized by arming a background wakeup interrupt.
*/
-public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService, DOMNotificationService {
+public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
+ DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
private static final ListenableFuture<Void> NO_LISTENERS = Futures.immediateFuture(null);
private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock(1L, 30L, TimeUnit.MILLISECONDS);
private static final EventHandler<DOMNotificationRouterEvent> DISPATCH_NOTIFICATIONS = new EventHandler<DOMNotificationRouterEvent>() {
private final Disruptor<DOMNotificationRouterEvent> disruptor;
private final ExecutorService executor;
private volatile Multimap<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> listeners = ImmutableMultimap.of();
+ private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners = ListenerRegistry.create();
private DOMNotificationRouter(final ExecutorService executor, final Disruptor<DOMNotificationRouterEvent> disruptor) {
this.executor = Preconditions.checkNotNull(executor);
final ListenerRegistration<T> me = this;
synchronized (DOMNotificationRouter.this) {
- listeners = ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, new Predicate<ListenerRegistration<? extends DOMNotificationListener>>() {
+ replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, new Predicate<ListenerRegistration<? extends DOMNotificationListener>>() {
@Override
public boolean apply(final ListenerRegistration<? extends DOMNotificationListener> input) {
return input != me;
}
- }));
+ })));
}
}
};
final Builder<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> b = ImmutableMultimap.builder();
b.putAll(listeners);
- for (SchemaPath t : types) {
+ for (final SchemaPath t : types) {
b.put(t, reg);
}
- listeners = b.build();
+ replaceListeners(b.build());
}
return reg;
return registerNotificationListener(listener, Arrays.asList(types));
}
+ /**
+ * Swaps registered listeners and triggers notification update
+ *
+ * @param newListeners
+ */
+ private void replaceListeners(
+ final Multimap<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> newListeners) {
+ listeners = newListeners;
+ notifyListenerTypesChanged(newListeners.keySet());
+ }
+
+ private void notifyListenerTypesChanged(final Set<SchemaPath> typesAfter) {
+ final List<ListenerRegistration<DOMNotificationSubscriptionListener>> listenersAfter =ImmutableList.copyOf(subscriptionListeners.getListeners());
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ for (final ListenerRegistration<DOMNotificationSubscriptionListener> subListener : listenersAfter) {
+ try {
+ subListener.getInstance().onSubscriptionChanged(typesAfter);
+ } catch (final Exception e) {
+ LOG.warn("Uncaught exception during invoking listener {}", subListener.getInstance(), e);
+ }
+ }
+ }
+ });
+ }
+
+ @Override
+ public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
+ final L listener) {
+ final Set<SchemaPath> initialTypes = listeners.keySet();
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ listener.onSubscriptionChanged(initialTypes);
+ }
+ });
+ return subscriptionListeners.registerWithType(listener);
+ }
+
private ListenableFuture<Void> publish(final long seq, final DOMNotification notification, final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
final DOMNotificationRouterEvent event = disruptor.get(seq);
final ListenableFuture<Void> future = event.initialize(notification, subscribers);
final long seq;
try {
seq = disruptor.getRingBuffer().tryNext();
- } catch (InsufficientCapacityException e) {
+ } catch (final InsufficientCapacityException e) {
return DOMNotificationPublishService.REJECTED;
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.spi;
+
+import com.google.common.annotations.Beta;
+import java.util.EventListener;
+import java.util.Set;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * Listener which is notified when subscriptions changes and
+ * provides set of notification types for which currently
+ * subscriptions are in place.
+ *
+ */
+@Beta
+public interface DOMNotificationSubscriptionListener extends EventListener {
+
+ /**
+ * Invoked when notification subscription changed
+ *
+ * @param currentTypes Set of notification types
+ * for which listeners are registered.
+ */
+ void onSubscriptionChanged(Set<SchemaPath> currentTypes);
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.spi;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * Registry of {@link DOMNotificationSubscriptionListener}
+ * which listens for changes in notification types.
+ *
+ */
+@Beta
+public interface DOMNotificationSubscriptionListenerRegistry {
+
+ <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(L listener);
+
+}