*/
package org.opendaylight.controller.md.sal.binding.compat;
+import com.google.common.base.Preconditions;
import org.opendaylight.controller.sal.binding.api.NotificationListener;
import org.opendaylight.yangtools.yang.binding.Notification;
-import com.google.common.base.Preconditions;
-
/**
* An aggregated listener registration. This is a result of registering an invoker which can handle multiple
* interfaces at the same time. In order to support correct delivery, we need to maintain per-type registrations
*/
package org.opendaylight.controller.md.sal.binding.compat;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
-
import javax.annotation.concurrent.GuardedBy;
-
import org.opendaylight.controller.sal.binding.api.NotificationListener;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
-import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.util.ListenerRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
+@Deprecated
public class HydrogenNotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(HydrogenNotificationBrokerImpl.class);
@Override
public void publish(final Notification notification, final ExecutorService service) {
- for (NotificationListenerRegistration<?> r : listeners.get().listenersFor(notification)) {
+ for (final NotificationListenerRegistration<?> r : listeners.get().listenersFor(notification)) {
service.submit(new NotifyTask(r, notification));
}
}
synchronized (this) {
final Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> newListeners =
mutableListeners();
- for (NotificationListenerRegistration<?> reg : registrations) {
+ for (final NotificationListenerRegistration<?> reg : registrations) {
newListeners.put(reg.getType(), reg);
}
}
// Notifications are dispatched out of lock...
- for (NotificationListenerRegistration<?> reg : registrations) {
+ for (final NotificationListenerRegistration<?> reg : registrations) {
announceNotificationSubscription(reg.getType());
}
}
final Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> newListeners =
mutableListeners();
- for (NotificationListenerRegistration<?> reg : registrations) {
+ for (final NotificationListenerRegistration<?> reg : registrations) {
newListeners.remove(reg.getType(), reg);
}
for (final ListenerRegistration<NotificationInterestListener> listener : interestListeners) {
try {
listener.getInstance().onNotificationSubscribtion(notification);
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.warn("Listener {} reported unexpected error on notification {}",
listener.getInstance(), notification, e);
}
@Override
public ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
- final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
+ final NotificationInvoker invoker = NotificationInvoker.invokerFor(listener);
final Set<Class<? extends Notification>> types = invoker.getSupportedNotifications();
final NotificationListenerRegistration<?>[] regs = new NotificationListenerRegistration<?>[types.size()];
// Populate the registrations...
int i = 0;
- for (Class<? extends Notification> type : types) {
- regs[i] = new AggregatedNotificationListenerRegistration<Notification, Object>(type, invoker.getInvocationProxy(), regs) {
+ for (final Class<? extends Notification> type : types) {
+ regs[i] = new AggregatedNotificationListenerRegistration<Notification, Object>(type, invoker, regs) {
@Override
protected void removeRegistration() {
// Nothing to do, will be cleaned up by parent (below)
@Override
protected void removeRegistration() {
removeRegistrations(regs);
- for (ListenerRegistration<?> reg : regs) {
+ for (final ListenerRegistration<?> reg : regs) {
reg.close();
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.compat;
+
+import com.google.common.collect.ImmutableMap;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.yangtools.yang.binding.Notification;
+import org.opendaylight.yangtools.yang.binding.NotificationListener;
+import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
+import org.opendaylight.yangtools.yang.binding.util.NotificationListenerInvoker;
+import org.opendaylight.yangtools.yang.common.QName;
+
+final class NotificationInvoker implements org.opendaylight.controller.sal.binding.api.NotificationListener<Notification> {
+
+ private final NotificationListener delegate;
+ private final Map<Class<? extends Notification>,InvokerContext> invokers;
+
+
+ private NotificationInvoker(final NotificationListener listener) {
+ delegate = listener;
+ final Map<Class<? extends Notification>, InvokerContext> builder = new HashMap<>();
+ for(final Class<?> iface : listener.getClass().getInterfaces()) {
+ if(NotificationListener.class.isAssignableFrom(iface) && BindingReflections.isBindingClass(iface)) {
+ @SuppressWarnings("unchecked")
+ final Class<? extends NotificationListener> listenerType = (Class<? extends NotificationListener>) iface;
+ final NotificationListenerInvoker invoker = NotificationListenerInvoker.from(listenerType);
+ for(final Class<? extends Notification> type : getNotificationTypes(listenerType)) {
+ builder.put(type, new InvokerContext(BindingReflections.findQName(type) , invoker));
+ }
+ }
+ }
+ invokers = ImmutableMap.copyOf(builder);
+ }
+
+ public static NotificationInvoker invokerFor(final NotificationListener listener) {
+ return new NotificationInvoker(listener);
+ }
+
+ public Set<Class<? extends Notification>> getSupportedNotifications() {
+ return invokers.keySet();
+ }
+
+ @Override
+ public void onNotification(final Notification notification) {
+ getContext(notification.getImplementedInterface()).invoke(notification);
+ };
+
+ private InvokerContext getContext(final Class<?> type) {
+ return invokers.get(type);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Set<Class<? extends Notification>> getNotificationTypes(final Class<? extends org.opendaylight.yangtools.yang.binding.NotificationListener> type) {
+ // TODO: Investigate possibility and performance impact if we cache this or expose
+ // it from NotificationListenerInvoker
+ final Set<Class<? extends Notification>> ret = new HashSet<>();
+ for(final Method method : type.getMethods()) {
+ if(BindingReflections.isNotificationCallback(method)) {
+ final Class<? extends Notification> notification = (Class<? extends Notification>) method.getParameterTypes()[0];
+ ret.add(notification);
+ }
+ }
+ return ret;
+ }
+
+ private class InvokerContext {
+
+ private final QName name;
+ private final NotificationListenerInvoker invoker;
+
+ private InvokerContext(final QName name, final NotificationListenerInvoker invoker) {
+ this.name = name;
+ this.invoker = invoker;
+ }
+
+ public void invoke(final Notification notification) {
+ invoker.invokeNotification(delegate, name, notification);
+ }
+
+ }
+
+}
return invokers.keySet();
}
- private static Map<SchemaPath, NotificationListenerInvoker> createInvokerMapFor(final Class<? extends NotificationListener> implClz) {
+ public static Map<SchemaPath, NotificationListenerInvoker> createInvokerMapFor(final Class<? extends NotificationListener> implClz) {
final Map<SchemaPath, NotificationListenerInvoker> builder = new HashMap<>();
for(final Class<?> iface : implClz.getInterfaces()) {
if(NotificationListener.class.isAssignableFrom(iface) && BindingReflections.isBindingClass(iface)) {
import java.util.concurrent.TimeUnit;
import javassist.ClassPool;
import org.apache.commons.lang3.StringUtils;
-import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator;
-import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public static final JavassistUtils JAVASSIST = JavassistUtils.forClassPool(CLASS_POOL);
public static final org.opendaylight.controller.sal.binding.codegen.impl.DefaultRuntimeCodeGenerator RPC_GENERATOR_IMPL = new org.opendaylight.controller.sal.binding.codegen.impl.DefaultRuntimeCodeGenerator(
CLASS_POOL);
- public static final RuntimeCodeGenerator RPC_GENERATOR = RPC_GENERATOR_IMPL;
- public static final NotificationInvokerFactory INVOKER_FACTORY = RPC_GENERATOR_IMPL.getInvokerFactory();
public static final int CORE_NOTIFICATION_THREADS = 4;
public static final int MAX_NOTIFICATION_THREADS = 32;
if (NOTIFICATION_EXECUTOR == null) {
int queueSize = MAX_NOTIFICATION_QUEUE_SIZE;
- String queueValue = System.getProperty(NOTIFICATION_QUEUE_SIZE_PROPERTY);
+ final String queueValue = System.getProperty(NOTIFICATION_QUEUE_SIZE_PROPERTY);
if (StringUtils.isNotBlank(queueValue)) {
try {
queueSize = Integer.parseInt(queueValue);
logger.trace("Queue size was set to {}", queueSize);
- } catch (NumberFormatException e) {
+ } catch (final NumberFormatException e) {
logger.warn("Cannot parse {} as set by {}, using default {}", queueValue,
NOTIFICATION_QUEUE_SIZE_PROPERTY, queueSize);
}
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
throw new RejectedExecutionException("Interrupted while waiting on the queue", e);
}
}
@Deprecated
public static synchronized ListeningExecutorService getDefaultCommitExecutor() {
if (COMMIT_EXECUTOR == null) {
- ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("md-sal-binding-commit-%d").build();
+ final ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("md-sal-binding-commit-%d").build();
/*
* FIXME: this used to be newCacheThreadPool(), but MD-SAL does not have transaction
* ordering guarantees, which means that using a concurrent threadpool results
* in inconsistent data being present. Once proper primitives are introduced,
* concurrency can be reintroduced.
*/
- ExecutorService executor = Executors.newSingleThreadExecutor(factory);
+ final ExecutorService executor = Executors.newSingleThreadExecutor(factory);
COMMIT_EXECUTOR = MoreExecutors.listeningDecorator(executor);
}
public static ExecutorService getDefaultChangeEventExecutor() {
if (CHANGE_EVENT_EXECUTOR == null) {
- ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("md-sal-binding-change-%d").build();
+ final ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("md-sal-binding-change-%d").build();
/*
* FIXME: this used to be newCacheThreadPool(), but MD-SAL does not have transaction
* ordering guarantees, which means that using a concurrent threadpool results
* in inconsistent data being present. Once proper primitives are introduced,
* concurrency can be reintroduced.
*/
- ExecutorService executor = Executors.newSingleThreadExecutor(factory);
+ final ExecutorService executor = Executors.newSingleThreadExecutor(factory);
CHANGE_EVENT_EXECUTOR = MoreExecutors.listeningDecorator(executor);
}