--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.binding.impl;
+
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.NotificationListener;
+
+import com.google.common.base.Preconditions;
+
+class GeneratedListenerRegistration extends AbstractObjectRegistration<NotificationListener> implements ListenerRegistration<NotificationListener> {
+ private NotificationBrokerImpl notificationBroker;
+ private final NotificationInvoker invoker;
+
+ public GeneratedListenerRegistration(final NotificationListener instance, final NotificationInvoker invoker, final NotificationBrokerImpl broker) {
+ super(instance);
+ this.invoker = Preconditions.checkNotNull(invoker);
+ this.notificationBroker = Preconditions.checkNotNull(broker);
+ }
+
+ public NotificationInvoker getInvoker() {
+ // There is a race with NotificationBrokerImpl:
+ // the invoker can be closed here
+ return invoker;
+ }
+
+ @Override
+ protected void removeRegistration() {
+ notificationBroker.unregisterListener(this);
+ notificationBroker = null;
+ invoker.close();
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.binding.impl;
+
+import org.opendaylight.controller.sal.binding.api.NotificationListener;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+import com.google.common.base.Preconditions;
+
+class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {
+ private final Class<T> type;
+ private NotificationBrokerImpl notificationBroker;
+
+ public GenericNotificationRegistration(final Class<T> type, final NotificationListener<T> instance, final NotificationBrokerImpl broker) {
+ super(instance);
+ this.type = Preconditions.checkNotNull(type);
+ this.notificationBroker = Preconditions.checkNotNull(broker);
+ }
+
+ public Class<T> getType() {
+ return type;
+ }
+
+ @Override
+ protected void removeRegistration() {
+ notificationBroker.unregisterListener(this);
+ notificationBroker = null;
+ }
+}
import java.util.concurrent.ConcurrentMap;
import org.opendaylight.controller.md.sal.binding.util.AbstractBindingSalProviderInstance;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
import org.opendaylight.controller.sal.binding.api.mount.MountProviderInstance;
import org.opendaylight.controller.sal.binding.api.mount.MountProviderService;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final ConcurrentMap<InstanceIdentifier<?>, BindingMountPointImpl> mountPoints;
private final ListenerRegistry<MountProvisionListener> listeners = ListenerRegistry.create();
-
+
private ListeningExecutorService notificationExecutor;
private ListeningExecutorService dataCommitExecutor;
return notificationExecutor;
}
- public void setNotificationExecutor(ListeningExecutorService notificationExecutor) {
+ public void setNotificationExecutor(final ListeningExecutorService notificationExecutor) {
this.notificationExecutor = notificationExecutor;
}
return dataCommitExecutor;
}
- public void setDataCommitExecutor(ListeningExecutorService dataCommitExecutor) {
+ public void setDataCommitExecutor(final ListeningExecutorService dataCommitExecutor) {
this.dataCommitExecutor = dataCommitExecutor;
}
@Override
- public synchronized BindingMountPointImpl createMountPoint(InstanceIdentifier<?> path) {
+ public synchronized BindingMountPointImpl createMountPoint(final InstanceIdentifier<?> path) {
BindingMountPointImpl potential = mountPoints.get(path);
if (potential != null) {
throw new IllegalStateException("Mount point already exists.");
}
@Override
- public BindingMountPointImpl createOrGetMountPoint(InstanceIdentifier<?> path) {
+ public BindingMountPointImpl createOrGetMountPoint(final InstanceIdentifier<?> path) {
BindingMountPointImpl potential = getMountPoint(path);
if (potential != null) {
return potential;
}
@Override
- public BindingMountPointImpl getMountPoint(InstanceIdentifier<?> path) {
+ public BindingMountPointImpl getMountPoint(final InstanceIdentifier<?> path) {
return mountPoints.get(path);
}
- private synchronized BindingMountPointImpl createOrGetMountPointImpl(InstanceIdentifier<?> path) {
+ private synchronized BindingMountPointImpl createOrGetMountPointImpl(final InstanceIdentifier<?> path) {
BindingMountPointImpl potential = getMountPoint(path);
if (potential != null) {
return potential;
}
RpcProviderRegistryImpl rpcRegistry = new RpcProviderRegistryImpl("mount");
- NotificationBrokerImpl notificationBroker = new NotificationBrokerImpl();
- notificationBroker.setExecutor(getNotificationExecutor());
+ NotificationBrokerImpl notificationBroker = new NotificationBrokerImpl(getNotificationExecutor());
DataBrokerImpl dataBroker = new DataBrokerImpl();
dataBroker.setExecutor(getDataCommitExecutor());
BindingMountPointImpl mountInstance = new BindingMountPointImpl(path, rpcRegistry, notificationBroker,
return mountInstance;
}
- private void notifyMountPointCreated(InstanceIdentifier<?> path) {
+ private void notifyMountPointCreated(final InstanceIdentifier<?> path) {
for (ListenerRegistration<MountProvisionListener> listener : listeners) {
try {
listener.getInstance().onMountPointCreated(path);
}
@Override
- public ListenerRegistration<MountProvisionListener> registerProvisionListener(MountProvisionListener listener) {
+ public ListenerRegistration<MountProvisionListener> registerProvisionListener(final MountProvisionListener listener) {
return listeners.register(listener);
}
public class BindingMountPointImpl extends
- AbstractBindingSalProviderInstance<DataBrokerImpl, NotificationBrokerImpl, RpcProviderRegistryImpl>
+ AbstractBindingSalProviderInstance<DataBrokerImpl, NotificationBrokerImpl, RpcProviderRegistryImpl>
implements MountProviderInstance {
- private InstanceIdentifier<?> identifier;
+ private final InstanceIdentifier<?> identifier;
- public BindingMountPointImpl(org.opendaylight.yangtools.yang.binding.InstanceIdentifier<?> identifier,
- RpcProviderRegistryImpl rpcRegistry, NotificationBrokerImpl notificationBroker,
- DataBrokerImpl dataBroker) {
+ public BindingMountPointImpl(final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<?> identifier,
+ final RpcProviderRegistryImpl rpcRegistry, final NotificationBrokerImpl notificationBroker,
+ final DataBrokerImpl dataBroker) {
super(rpcRegistry, notificationBroker, dataBroker);
this.identifier = identifier;
}
// Needed only for BI Connector
public DataBrokerImpl getDataBrokerImpl() {
- return (DataBrokerImpl) getDataBroker();
+ return getDataBroker();
}
-
+
@Override
public InstanceIdentifier<?> getIdentifier() {
return this.identifier;
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.binding.impl;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import org.opendaylight.controller.sal.binding.api.NotificationListener;
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.yang.binding.Notification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+
+public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
+
+ private final ListenerRegistry<NotificationInterestListener> interestListeners =
+ ListenerRegistry.create();
+
+ private final Multimap<Class<? extends Notification>, NotificationListener<?>> listeners =
+ Multimaps.synchronizedSetMultimap(HashMultimap.<Class<? extends Notification>, NotificationListener<?>>create());
+ private ExecutorService executor;
+
+ @Deprecated
+ public NotificationBrokerImpl(final ExecutorService executor) {
+ this.setExecutor(executor);
+ }
+
+ public void setExecutor(final ExecutorService executor) {
+ this.executor = Preconditions.checkNotNull(executor);
+ }
+
+ public Iterable<Class<?>> getNotificationTypes(final Notification notification) {
+ final Class<?>[] ifaces = notification.getClass().getInterfaces();
+ return Iterables.filter(Arrays.asList(ifaces), new Predicate<Class<?>>() {
+ @Override
+ public boolean apply(final Class<?> input) {
+ if (Notification.class.equals(input)) {
+ return false;
+ }
+ return Notification.class.isAssignableFrom(input);
+ }
+ });
+ }
+
+ @Override
+ public void publish(final Notification notification) {
+ this.publish(notification, executor);
+ }
+
+ @Override
+ public void publish(final Notification notification, final ExecutorService service) {
+ Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();
+ for (final Class<?> type : getNotificationTypes(notification)) {
+ listenerToNotify = Iterables.concat(listenerToNotify, listeners.get(((Class<? extends Notification>) type)));
+ }
+
+ final Set<NotifyTask> tasks = new HashSet<>();
+ for (NotificationListener<?> l : listenerToNotify) {
+ tasks.add(new NotifyTask(l, notification));
+ }
+
+ for (final NotifyTask task : tasks) {
+ service.submit(task);
+ }
+ }
+
+ @Override
+ public <T extends Notification> Registration<NotificationListener<T>> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
+ final GenericNotificationRegistration<T> reg = new GenericNotificationRegistration<T>(notificationType, listener, this);
+ this.listeners.put(notificationType, listener);
+ this.announceNotificationSubscription(notificationType);
+ return reg;
+ }
+
+ private void announceNotificationSubscription(final Class<? extends Notification> notification) {
+ for (final ListenerRegistration<NotificationInterestListener> listener : interestListeners) {
+ try {
+ listener.getInstance().onNotificationSubscribtion(notification);
+ } catch (Exception e) {
+ LOG.warn("Listener {} reported unexpected error on notification {}",
+ listener.getInstance(), notification, e);
+ }
+ }
+ }
+
+ @Override
+ public Registration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
+ final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
+ for (final Class<? extends Notification> notifyType : invoker.getSupportedNotifications()) {
+ listeners.put(notifyType, invoker.getInvocationProxy());
+ announceNotificationSubscription(notifyType);
+ }
+
+ return new GeneratedListenerRegistration(listener, invoker, this);
+ }
+
+ protected boolean unregisterListener(final GenericNotificationRegistration<?> reg) {
+ return listeners.remove(reg.getType(), reg.getInstance());
+ }
+
+ protected void unregisterListener(final GeneratedListenerRegistration reg) {
+ final NotificationInvoker invoker = reg.getInvoker();
+ for (final Class<? extends Notification> notifyType : invoker.getSupportedNotifications()) {
+ this.listeners.remove(notifyType, invoker.getInvocationProxy());
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
+ final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
+ for (final Class<? extends Notification> notification : listeners.keySet()) {
+ interestListener.onNotificationSubscribtion(notification);
+ }
+ return registration;
+ }
+}
+++ /dev/null
-/*\r
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-package org.opendaylight.controller.sal.binding.impl\r
-\r
-import com.google.common.collect.HashMultimap\r
-import com.google.common.collect.ImmutableSet\r
-import com.google.common.collect.Multimap\r
-import com.google.common.collect.Multimaps\r
-import java.util.Collections\r
-import java.util.concurrent.Callable\r
-import java.util.concurrent.ExecutorService\r
-import java.util.concurrent.Future\r
-import java.util.Set\r
-import org.opendaylight.controller.sal.binding.api.NotificationListener\r
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService\r
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener\r
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder\r
-import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker\r
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
-import org.opendaylight.yangtools.concepts.ListenerRegistration\r
-import org.opendaylight.yangtools.concepts.Registration\r
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
-import org.opendaylight.yangtools.yang.binding.Notification\r
-import org.slf4j.LoggerFactory\r
-\r
-class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {\r
- \r
- val ListenerRegistry<NotificationInterestListener> interestListeners = ListenerRegistry.create;\r
- \r
- val Multimap<Class<? extends Notification>, NotificationListener<?>> listeners;\r
-\r
- @Property\r
- var ExecutorService executor;\r
- \r
- val logger = LoggerFactory.getLogger(NotificationBrokerImpl)\r
-\r
- new() {\r
- listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create())\r
- }\r
-\r
- @Deprecated\r
- new(ExecutorService executor) {\r
- listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create())\r
- this.executor = executor;\r
- }\r
-\r
- def getNotificationTypes(Notification notification) {\r
- notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)]\r
- }\r
-\r
- override publish(Notification notification) {\r
- publish(notification, executor)\r
- }\r
-\r
- override publish(Notification notification, ExecutorService service) {\r
- val allTypes = notification.notificationTypes\r
-\r
- var Iterable<NotificationListener<? extends Object>> listenerToNotify = Collections.emptySet();\r
- for (type : allTypes) {\r
- listenerToNotify = listenerToNotify + listeners.get(type as Class<? extends Notification>)\r
- }\r
- val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet;\r
- submitAll(executor,tasks);\r
- }\r
- \r
- def submitAll(ExecutorService service, Set<NotifyTask> tasks) {\r
- val ret = ImmutableSet.<Future<Object>>builder();\r
- for(task : tasks) {\r
- ret.add(service.submit(task));\r
- }\r
- return ret.build();\r
- }\r
- \r
- override <T extends Notification> registerNotificationListener(Class<T> notificationType,\r
- NotificationListener<T> listener) {\r
- val reg = new GenericNotificationRegistration<T>(notificationType, listener, this);\r
- listeners.put(notificationType, listener);\r
- announceNotificationSubscription(notificationType);\r
- return reg;\r
- }\r
- \r
- def announceNotificationSubscription(Class<? extends Notification> notification) {\r
- for (listener : interestListeners) {\r
- try {\r
- listener.instance.onNotificationSubscribtion(notification);\r
- } catch (Exception e) {\r
- logger.error("", e.message)\r
- }\r
- }\r
- }\r
-\r
- override registerNotificationListener(\r
- org.opendaylight.yangtools.yang.binding.NotificationListener listener) {\r
- val invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);\r
- for (notifyType : invoker.supportedNotifications) {\r
- listeners.put(notifyType, invoker.invocationProxy)\r
- announceNotificationSubscription(notifyType)\r
- }\r
- val registration = new GeneratedListenerRegistration(listener, invoker,this);\r
- return registration as Registration<org.opendaylight.yangtools.yang.binding.NotificationListener>;\r
- }\r
-\r
- protected def unregisterListener(GenericNotificationRegistration<?> reg) {\r
- listeners.remove(reg.type, reg.instance);\r
- }\r
-\r
- protected def unregisterListener(GeneratedListenerRegistration reg) {\r
- for (notifyType : reg.invoker.supportedNotifications) {\r
- listeners.remove(notifyType, reg.invoker.invocationProxy)\r
- }\r
- }\r
- \r
- override close() {\r
- //FIXME: implement properly.\r
- }\r
- \r
- override registerInterestListener(NotificationInterestListener interestListener) {\r
- val registration = interestListeners.register(interestListener);\r
- \r
- for(notification : listeners.keySet) {\r
- interestListener.onNotificationSubscribtion(notification);\r
- }\r
- return registration\r
- }\r
-}\r
-\r
-class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {\r
-\r
- @Property\r
- val Class<T> type;\r
-\r
- var NotificationBrokerImpl notificationBroker;\r
-\r
- public new(Class<T> type, NotificationListener<T> instance, NotificationBrokerImpl broker) {\r
- super(instance);\r
- _type = type;\r
- notificationBroker = broker;\r
- }\r
-\r
- override protected removeRegistration() {\r
- notificationBroker.unregisterListener(this);\r
- notificationBroker = null;\r
- }\r
-}\r
-\r
-class GeneratedListenerRegistration extends AbstractObjectRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> implements ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> {\r
-\r
- @Property\r
- val NotificationInvoker invoker;\r
- \r
- var NotificationBrokerImpl notificationBroker;\r
- \r
-\r
- new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) {\r
- super(instance);\r
- _invoker = invoker;\r
- notificationBroker = broker;\r
- }\r
-\r
- override protected removeRegistration() {\r
- notificationBroker.unregisterListener(this);\r
- notificationBroker = null;\r
- invoker.close();\r
- }\r
-}\r
-\r
-@Data\r
-class NotifyTask implements Callable<Object> {\r
-\r
- private static val log = LoggerFactory.getLogger(NotifyTask);\r
-\r
- @SuppressWarnings("rawtypes")\r
- val NotificationListener listener;\r
- val Notification notification;\r
-\r
- override call() {\r
- //Only logging the complete notification in debug mode\r
- try {\r
- if(log.isDebugEnabled){\r
- log.debug("Delivering notification {} to {}",notification,listener);\r
- } else {\r
- log.trace("Delivering notification {} to {}",notification.class.name,listener);\r
- }\r
- listener.onNotification(notification);\r
- if(log.isDebugEnabled){\r
- log.debug("Notification delivered {} to {}",notification,listener);\r
- } else {\r
- log.trace("Notification delivered {} to {}",notification.class.name,listener);\r
- }\r
- } catch (Exception e) {\r
- log.error("Unhandled exception thrown by listener: {}", listener, e);\r
- }\r
- return null;\r
- }\r
-\r
-}\r
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.binding.impl;
+
+import org.opendaylight.controller.sal.binding.api.NotificationListener;
+import org.opendaylight.yangtools.yang.binding.Notification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+class NotifyTask implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(NotifyTask.class);
+
+ private final NotificationListener<?> listener;
+ private final Notification notification;
+
+ public NotifyTask(final NotificationListener<?> listener, final Notification notification) {
+ this.listener = Preconditions.checkNotNull(listener);
+ this.notification = Preconditions.checkNotNull(notification);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T extends Notification> NotificationListener<T> getListener() {
+ return (NotificationListener<T>)listener;
+ }
+
+ @Override
+ public void run() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Delivering notification {} to {}", notification, listener);
+ } else {
+ LOG.trace("Delivering notification {} to {}", notification.getClass().getName(), listener);
+ }
+
+ try {
+ getListener().onNotification(notification);
+ } catch (final Exception e) {
+ LOG.error("Unhandled exception thrown by listener: {}", listener, e);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Notification delivered {} to {}", notification, listener);
+ } else {
+ LOG.trace("Notification delivered {} to {}", notification.getClass().getName(), listener);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((listener== null) ? 0 : listener.hashCode());
+ result = prime * result + ((notification== null) ? 0 : notification.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ NotifyTask other = (NotifyTask) obj;
+ if (listener == null) {
+ if (other.listener != null)
+ return false;
+ } else if (!listener.equals(other.listener))
+ return false;
+ if (notification == null) {
+ if (other.notification != null)
+ return false;
+ } else if (!notification.equals(other.notification))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("listener", listener)
+ .add("notification", notification.getClass())
+ .toString();
+ }
+}