Merge "BUG-832 Refactor netconf connector"
authorTony Tkacik <ttkacik@cisco.com>
Mon, 2 Jun 2014 08:50:39 +0000 (08:50 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 2 Jun 2014 08:50:39 +0000 (08:50 +0000)
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/GeneratedListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/GenericNotificationRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/MountPointManagerImpl.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend [deleted file]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotifyTask.java [new file with mode: 0644]

diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/GeneratedListenerRegistration.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/GeneratedListenerRegistration.java
new file mode 100644 (file)
index 0000000..5325ed3
--- /dev/null
@@ -0,0 +1,39 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.binding.impl;
+
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.NotificationListener;
+
+import com.google.common.base.Preconditions;
+
+class GeneratedListenerRegistration extends AbstractObjectRegistration<NotificationListener> implements ListenerRegistration<NotificationListener> {
+    private NotificationBrokerImpl notificationBroker;
+    private final NotificationInvoker invoker;
+
+    public GeneratedListenerRegistration(final NotificationListener instance, final NotificationInvoker invoker, final NotificationBrokerImpl broker) {
+        super(instance);
+        this.invoker = Preconditions.checkNotNull(invoker);
+        this.notificationBroker = Preconditions.checkNotNull(broker);
+    }
+
+    public NotificationInvoker getInvoker() {
+        // There is a race with NotificationBrokerImpl:
+        // the invoker can be closed here
+        return invoker;
+    }
+
+    @Override
+    protected void removeRegistration() {
+        notificationBroker.unregisterListener(this);
+        notificationBroker = null;
+        invoker.close();
+    }
+}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/GenericNotificationRegistration.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/GenericNotificationRegistration.java
new file mode 100644 (file)
index 0000000..448adfa
--- /dev/null
@@ -0,0 +1,36 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.binding.impl;
+
+import org.opendaylight.controller.sal.binding.api.NotificationListener;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+import com.google.common.base.Preconditions;
+
+class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {
+    private final Class<T> type;
+    private NotificationBrokerImpl notificationBroker;
+
+    public GenericNotificationRegistration(final Class<T> type, final NotificationListener<T> instance, final NotificationBrokerImpl broker) {
+        super(instance);
+        this.type = Preconditions.checkNotNull(type);
+        this.notificationBroker = Preconditions.checkNotNull(broker);
+    }
+
+    public Class<T> getType() {
+        return type;
+    }
+
+    @Override
+    protected void removeRegistration() {
+        notificationBroker.unregisterListener(this);
+        notificationBroker = null;
+    }
+}
index b999a6f01cef0e4094288108734ea96f1f1c050a..df09f78620a11bcce20f04034e6a7b7cf144780f 100644 (file)
@@ -11,10 +11,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.opendaylight.controller.md.sal.binding.util.AbstractBindingSalProviderInstance;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
 import org.opendaylight.controller.sal.binding.api.mount.MountProviderInstance;
 import org.opendaylight.controller.sal.binding.api.mount.MountProviderService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,7 +27,7 @@ public class MountPointManagerImpl implements MountProviderService {
 
     private final ConcurrentMap<InstanceIdentifier<?>, BindingMountPointImpl> mountPoints;
     private final ListenerRegistry<MountProvisionListener> listeners = ListenerRegistry.create();
-    
+
     private ListeningExecutorService notificationExecutor;
     private ListeningExecutorService dataCommitExecutor;
 
@@ -39,7 +39,7 @@ public class MountPointManagerImpl implements MountProviderService {
         return notificationExecutor;
     }
 
-    public void setNotificationExecutor(ListeningExecutorService notificationExecutor) {
+    public void setNotificationExecutor(final ListeningExecutorService notificationExecutor) {
         this.notificationExecutor = notificationExecutor;
     }
 
@@ -47,12 +47,12 @@ public class MountPointManagerImpl implements MountProviderService {
         return dataCommitExecutor;
     }
 
-    public void setDataCommitExecutor(ListeningExecutorService dataCommitExecutor) {
+    public void setDataCommitExecutor(final ListeningExecutorService dataCommitExecutor) {
         this.dataCommitExecutor = dataCommitExecutor;
     }
 
     @Override
-    public synchronized BindingMountPointImpl createMountPoint(InstanceIdentifier<?> path) {
+    public synchronized BindingMountPointImpl createMountPoint(final InstanceIdentifier<?> path) {
         BindingMountPointImpl potential = mountPoints.get(path);
         if (potential != null) {
             throw new IllegalStateException("Mount point already exists.");
@@ -61,7 +61,7 @@ public class MountPointManagerImpl implements MountProviderService {
     }
 
     @Override
-    public BindingMountPointImpl createOrGetMountPoint(InstanceIdentifier<?> path) {
+    public BindingMountPointImpl createOrGetMountPoint(final InstanceIdentifier<?> path) {
         BindingMountPointImpl potential = getMountPoint(path);
         if (potential != null) {
             return potential;
@@ -70,18 +70,17 @@ public class MountPointManagerImpl implements MountProviderService {
     }
 
     @Override
-    public BindingMountPointImpl getMountPoint(InstanceIdentifier<?> path) {
+    public BindingMountPointImpl getMountPoint(final InstanceIdentifier<?> path) {
         return mountPoints.get(path);
     }
 
-    private synchronized BindingMountPointImpl createOrGetMountPointImpl(InstanceIdentifier<?> path) {
+    private synchronized BindingMountPointImpl createOrGetMountPointImpl(final InstanceIdentifier<?> path) {
         BindingMountPointImpl potential = getMountPoint(path);
         if (potential != null) {
             return potential;
         }
         RpcProviderRegistryImpl rpcRegistry = new RpcProviderRegistryImpl("mount");
-        NotificationBrokerImpl notificationBroker = new NotificationBrokerImpl();
-        notificationBroker.setExecutor(getNotificationExecutor());
+        NotificationBrokerImpl notificationBroker = new NotificationBrokerImpl(getNotificationExecutor());
         DataBrokerImpl dataBroker = new DataBrokerImpl();
         dataBroker.setExecutor(getDataCommitExecutor());
         BindingMountPointImpl mountInstance = new BindingMountPointImpl(path, rpcRegistry, notificationBroker,
@@ -91,7 +90,7 @@ public class MountPointManagerImpl implements MountProviderService {
         return mountInstance;
     }
 
-    private void notifyMountPointCreated(InstanceIdentifier<?> path) {
+    private void notifyMountPointCreated(final InstanceIdentifier<?> path) {
         for (ListenerRegistration<MountProvisionListener> listener : listeners) {
             try {
                 listener.getInstance().onMountPointCreated(path);
@@ -102,28 +101,28 @@ public class MountPointManagerImpl implements MountProviderService {
     }
 
     @Override
-    public ListenerRegistration<MountProvisionListener> registerProvisionListener(MountProvisionListener listener) {
+    public ListenerRegistration<MountProvisionListener> registerProvisionListener(final MountProvisionListener listener) {
         return listeners.register(listener);
     }
 
     public class BindingMountPointImpl extends
-            AbstractBindingSalProviderInstance<DataBrokerImpl, NotificationBrokerImpl, RpcProviderRegistryImpl>
+    AbstractBindingSalProviderInstance<DataBrokerImpl, NotificationBrokerImpl, RpcProviderRegistryImpl>
     implements MountProviderInstance {
 
-        private InstanceIdentifier<?> identifier;
+        private final InstanceIdentifier<?> identifier;
 
-        public BindingMountPointImpl(org.opendaylight.yangtools.yang.binding.InstanceIdentifier<?> identifier,
-                RpcProviderRegistryImpl rpcRegistry, NotificationBrokerImpl notificationBroker,
-                DataBrokerImpl dataBroker) {
+        public BindingMountPointImpl(final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<?> identifier,
+                final RpcProviderRegistryImpl rpcRegistry, final NotificationBrokerImpl notificationBroker,
+                final DataBrokerImpl dataBroker) {
             super(rpcRegistry, notificationBroker, dataBroker);
             this.identifier = identifier;
         }
 
         // Needed only for BI Connector
         public DataBrokerImpl getDataBrokerImpl() {
-            return (DataBrokerImpl) getDataBroker();
+            return getDataBroker();
         }
-        
+
         @Override
         public InstanceIdentifier<?> getIdentifier() {
             return this.identifier;
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java
new file mode 100644 (file)
index 0000000..d3b6800
--- /dev/null
@@ -0,0 +1,141 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.binding.impl;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import org.opendaylight.controller.sal.binding.api.NotificationListener;
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.yang.binding.Notification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+
+public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
+
+    private final ListenerRegistry<NotificationInterestListener> interestListeners =
+            ListenerRegistry.create();
+
+    private final Multimap<Class<? extends Notification>, NotificationListener<?>> listeners =
+            Multimaps.synchronizedSetMultimap(HashMultimap.<Class<? extends Notification>, NotificationListener<?>>create());
+    private ExecutorService executor;
+
+    @Deprecated
+    public NotificationBrokerImpl(final ExecutorService executor) {
+        this.setExecutor(executor);
+    }
+
+    public void setExecutor(final ExecutorService executor) {
+        this.executor = Preconditions.checkNotNull(executor);
+    }
+
+    public Iterable<Class<?>> getNotificationTypes(final Notification notification) {
+        final Class<?>[] ifaces = notification.getClass().getInterfaces();
+        return Iterables.filter(Arrays.asList(ifaces), new Predicate<Class<?>>() {
+            @Override
+            public boolean apply(final Class<?> input) {
+                if (Notification.class.equals(input)) {
+                    return false;
+                }
+                return Notification.class.isAssignableFrom(input);
+            }
+        });
+    }
+
+    @Override
+    public void publish(final Notification notification) {
+        this.publish(notification, executor);
+    }
+
+    @Override
+    public void publish(final Notification notification, final ExecutorService service) {
+        Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();
+        for (final Class<?> type : getNotificationTypes(notification)) {
+            listenerToNotify = Iterables.concat(listenerToNotify, listeners.get(((Class<? extends Notification>) type)));
+        }
+
+        final Set<NotifyTask> tasks = new HashSet<>();
+        for (NotificationListener<?> l : listenerToNotify) {
+            tasks.add(new NotifyTask(l, notification));
+        }
+
+        for (final NotifyTask task : tasks) {
+            service.submit(task);
+        }
+    }
+
+    @Override
+    public <T extends Notification> Registration<NotificationListener<T>> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
+        final GenericNotificationRegistration<T> reg = new GenericNotificationRegistration<T>(notificationType, listener, this);
+        this.listeners.put(notificationType, listener);
+        this.announceNotificationSubscription(notificationType);
+        return reg;
+    }
+
+    private void announceNotificationSubscription(final Class<? extends Notification> notification) {
+        for (final ListenerRegistration<NotificationInterestListener> listener : interestListeners) {
+            try {
+                listener.getInstance().onNotificationSubscribtion(notification);
+            } catch (Exception e) {
+                LOG.warn("Listener {} reported unexpected error on notification {}",
+                        listener.getInstance(), notification, e);
+            }
+        }
+    }
+
+    @Override
+    public Registration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
+        final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
+        for (final Class<? extends Notification> notifyType : invoker.getSupportedNotifications()) {
+            listeners.put(notifyType, invoker.getInvocationProxy());
+            announceNotificationSubscription(notifyType);
+        }
+
+        return new GeneratedListenerRegistration(listener, invoker, this);
+    }
+
+    protected boolean unregisterListener(final GenericNotificationRegistration<?> reg) {
+        return listeners.remove(reg.getType(), reg.getInstance());
+    }
+
+    protected void unregisterListener(final GeneratedListenerRegistration reg) {
+        final NotificationInvoker invoker = reg.getInvoker();
+        for (final Class<? extends Notification> notifyType : invoker.getSupportedNotifications()) {
+            this.listeners.remove(notifyType, invoker.getInvocationProxy());
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
+        final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
+        for (final Class<? extends Notification> notification : listeners.keySet()) {
+            interestListener.onNotificationSubscribtion(notification);
+        }
+        return registration;
+    }
+}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend
deleted file mode 100644 (file)
index 6d675b4..0000000
+++ /dev/null
@@ -1,201 +0,0 @@
-/*\r
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-package org.opendaylight.controller.sal.binding.impl\r
-\r
-import com.google.common.collect.HashMultimap\r
-import com.google.common.collect.ImmutableSet\r
-import com.google.common.collect.Multimap\r
-import com.google.common.collect.Multimaps\r
-import java.util.Collections\r
-import java.util.concurrent.Callable\r
-import java.util.concurrent.ExecutorService\r
-import java.util.concurrent.Future\r
-import java.util.Set\r
-import org.opendaylight.controller.sal.binding.api.NotificationListener\r
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService\r
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener\r
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder\r
-import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker\r
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
-import org.opendaylight.yangtools.concepts.ListenerRegistration\r
-import org.opendaylight.yangtools.concepts.Registration\r
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
-import org.opendaylight.yangtools.yang.binding.Notification\r
-import org.slf4j.LoggerFactory\r
-\r
-class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {\r
-    \r
-    val ListenerRegistry<NotificationInterestListener> interestListeners = ListenerRegistry.create;\r
-    \r
-    val Multimap<Class<? extends Notification>, NotificationListener<?>> listeners;\r
-\r
-    @Property\r
-    var ExecutorService executor;\r
-    \r
-    val logger = LoggerFactory.getLogger(NotificationBrokerImpl)\r
-\r
-    new() {\r
-        listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create())\r
-    }\r
-\r
-    @Deprecated\r
-    new(ExecutorService executor) {\r
-        listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create())\r
-        this.executor = executor;\r
-    }\r
-\r
-    def getNotificationTypes(Notification notification) {\r
-        notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)]\r
-    }\r
-\r
-    override publish(Notification notification) {\r
-        publish(notification, executor)\r
-    }\r
-\r
-    override publish(Notification notification, ExecutorService service) {\r
-        val allTypes = notification.notificationTypes\r
-\r
-        var Iterable<NotificationListener<? extends Object>> listenerToNotify = Collections.emptySet();\r
-        for (type : allTypes) {\r
-            listenerToNotify = listenerToNotify + listeners.get(type as Class<? extends Notification>)\r
-        }\r
-        val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet;\r
-        submitAll(executor,tasks);\r
-    }\r
-    \r
-    def submitAll(ExecutorService service, Set<NotifyTask> tasks) {\r
-        val ret = ImmutableSet.<Future<Object>>builder();\r
-        for(task : tasks) {\r
-            ret.add(service.submit(task));\r
-        }\r
-        return ret.build();\r
-    }\r
-    \r
-    override <T extends Notification> registerNotificationListener(Class<T> notificationType,\r
-        NotificationListener<T> listener) {\r
-        val reg = new GenericNotificationRegistration<T>(notificationType, listener, this);\r
-        listeners.put(notificationType, listener);\r
-        announceNotificationSubscription(notificationType);\r
-        return reg;\r
-    }\r
-    \r
-    def announceNotificationSubscription(Class<? extends Notification> notification) {\r
-        for (listener : interestListeners) {\r
-            try {\r
-                listener.instance.onNotificationSubscribtion(notification);\r
-            } catch (Exception e) {\r
-                logger.error("", e.message)\r
-            }\r
-        }\r
-    }\r
-\r
-    override registerNotificationListener(\r
-        org.opendaylight.yangtools.yang.binding.NotificationListener listener) {\r
-        val invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);\r
-        for (notifyType : invoker.supportedNotifications) {\r
-            listeners.put(notifyType, invoker.invocationProxy)\r
-            announceNotificationSubscription(notifyType)\r
-        }\r
-        val registration = new GeneratedListenerRegistration(listener, invoker,this);\r
-        return registration as Registration<org.opendaylight.yangtools.yang.binding.NotificationListener>;\r
-    }\r
-\r
-    protected def unregisterListener(GenericNotificationRegistration<?> reg) {\r
-        listeners.remove(reg.type, reg.instance);\r
-    }\r
-\r
-    protected def unregisterListener(GeneratedListenerRegistration reg) {\r
-        for (notifyType : reg.invoker.supportedNotifications) {\r
-            listeners.remove(notifyType, reg.invoker.invocationProxy)\r
-        }\r
-    }\r
-    \r
-    override close()  {\r
-        //FIXME: implement properly.\r
-    }\r
-    \r
-    override registerInterestListener(NotificationInterestListener interestListener) {\r
-        val registration = interestListeners.register(interestListener);\r
-        \r
-        for(notification : listeners.keySet) {\r
-            interestListener.onNotificationSubscribtion(notification);\r
-        }\r
-        return registration\r
-    }\r
-}\r
-\r
-class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {\r
-\r
-    @Property\r
-    val Class<T> type;\r
-\r
-    var NotificationBrokerImpl notificationBroker;\r
-\r
-    public new(Class<T> type, NotificationListener<T> instance, NotificationBrokerImpl broker) {\r
-        super(instance);\r
-        _type = type;\r
-        notificationBroker = broker;\r
-    }\r
-\r
-    override protected removeRegistration() {\r
-        notificationBroker.unregisterListener(this);\r
-        notificationBroker = null;\r
-    }\r
-}\r
-\r
-class GeneratedListenerRegistration extends AbstractObjectRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> implements ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> {\r
-\r
-    @Property\r
-    val NotificationInvoker invoker;\r
-    \r
-    var NotificationBrokerImpl notificationBroker;\r
-    \r
-\r
-    new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) {\r
-        super(instance);\r
-        _invoker = invoker;\r
-        notificationBroker = broker;\r
-    }\r
-\r
-    override protected removeRegistration() {\r
-        notificationBroker.unregisterListener(this);\r
-        notificationBroker = null;\r
-        invoker.close();\r
-    }\r
-}\r
-\r
-@Data\r
-class NotifyTask implements Callable<Object> {\r
-\r
-    private static val log = LoggerFactory.getLogger(NotifyTask);\r
-\r
-    @SuppressWarnings("rawtypes")\r
-    val NotificationListener listener;\r
-    val Notification notification;\r
-\r
-    override call() {\r
-        //Only logging the complete notification in debug mode\r
-        try {\r
-            if(log.isDebugEnabled){\r
-                log.debug("Delivering notification {} to {}",notification,listener);\r
-            } else {\r
-                log.trace("Delivering notification {} to {}",notification.class.name,listener);\r
-            }\r
-            listener.onNotification(notification);\r
-            if(log.isDebugEnabled){\r
-                log.debug("Notification delivered {} to {}",notification,listener);\r
-            } else {\r
-                log.trace("Notification delivered {} to {}",notification.class.name,listener);\r
-            }\r
-        } catch (Exception e) {\r
-            log.error("Unhandled exception thrown by listener: {}", listener, e);\r
-        }\r
-        return null;\r
-    }\r
-\r
-}\r
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotifyTask.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotifyTask.java
new file mode 100644 (file)
index 0000000..5f0de6b
--- /dev/null
@@ -0,0 +1,93 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.binding.impl;
+
+import org.opendaylight.controller.sal.binding.api.NotificationListener;
+import org.opendaylight.yangtools.yang.binding.Notification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+class NotifyTask implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(NotifyTask.class);
+
+    private final NotificationListener<?> listener;
+    private final Notification notification;
+
+    public NotifyTask(final NotificationListener<?> listener, final Notification notification) {
+        this.listener = Preconditions.checkNotNull(listener);
+        this.notification = Preconditions.checkNotNull(notification);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T extends Notification> NotificationListener<T> getListener() {
+        return (NotificationListener<T>)listener;
+    }
+
+    @Override
+    public void run() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Delivering notification {} to {}", notification, listener);
+        } else {
+            LOG.trace("Delivering notification {} to {}", notification.getClass().getName(), listener);
+        }
+
+        try {
+            getListener().onNotification(notification);
+        } catch (final Exception e) {
+            LOG.error("Unhandled exception thrown by listener: {}", listener, e);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Notification delivered {} to {}", notification, listener);
+        } else {
+            LOG.trace("Notification delivered {} to {}", notification.getClass().getName(), listener);
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((listener== null) ? 0 : listener.hashCode());
+        result = prime * result + ((notification== null) ? 0 : notification.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        NotifyTask other = (NotifyTask) obj;
+        if (listener == null) {
+            if (other.listener != null)
+                return false;
+        } else if (!listener.equals(other.listener))
+            return false;
+        if (notification == null) {
+            if (other.notification != null)
+                return false;
+        } else if (!notification.equals(other.notification))
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this)
+                .add("listener", listener)
+                .add("notification", notification.getClass())
+                .toString();
+    }
+}