Refactored implementation of Notification Invoker 29/2029/3
authorTony Tkacik <ttkacik@cisco.com>
Sun, 20 Oct 2013 16:57:33 +0000 (18:57 +0200)
committerGerrit Code Review <gerrit@opendaylight.org>
Sun, 20 Oct 2013 20:59:37 +0000 (20:59 +0000)
sal-binding-broker:
  Refactored implementation of listener invoker to be avoid reflection
  when invoking notification callbacks to avoid reflection.

sal-binding-it:
  Added integration test which tests basic notification scenarios
  such as publishing and receiving notifications using generated
  interfaces.

Change-Id: I7afcf25e6f29608d76143058f2eb6aec12a7ccd2
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
15 files changed:
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/RuntimeCodeGenerator.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/RuntimeCodeHelper.xtend
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/RuntimeCodeSpecification.xtend
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RpcRouterCodegenInstance.xtend
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RuntimeCodeGenerator.xtend
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/spi/NotificationInvokerFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/RuntimeCodeGeneratorTest.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/mock/BarListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/mock/BarUpdate.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/mock/CompositeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/mock/FlowDelete.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/mock/FooListener.java
opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/NoficationTest.java [new file with mode: 0644]

index c8d6bcf3b1b06d48d5b9bfddb68fb5b0ac267131..f94be9c6a62998a08430c0d79b775cb7d7b7a14f 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.sal.binding.codegen;
 
 import org.opendaylight.controller.sal.binding.spi.DelegateProxy;
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
 import org.opendaylight.controller.sal.binding.spi.RpcRouter;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.RpcService;
@@ -55,28 +56,30 @@ public interface RuntimeCodeGenerator {
      * 
      * <p>
      * Returned instance:
-     * <ul><li>Implements:
-     *     <ul><li>{@link DelegateProxy}
-     *         <li>{@link RpcRouter}
-     *     </ul>
-     *     <li>
-     *     routes all invocations of methods, which are defined in RpcService
-     *     subtype based on method arguments and routing information defined in the
-     *     RpcRoutingTables for this instance
-     *     {@link RpcRouter#getRoutingTable(Class)}.
-     *     <ul>
-     *     <li>
-     *     Implementation uses
-     *     {@link RpcRouter#getService(Class, InstanceIdentifier)} method to
-     *     retrieve particular instance to which call will be routed.
-     *    <li>
-     *    Instance of {@link InstanceIdentifier} is determined by first argument of
-     *    method and is retrieved via method which is annotated with
-     *    {@link RoutingContext}. Class representing Routing Context Identifier is
-     *    retrieved by {@link RoutingContext}.
-     *    <li>If first argument is not defined / {@link RoutingContext} annotation
-     *    is not present on any field invocation will be delegated to default
-     *    service {@link RpcRouter#getDefaultService()}.
+     * <ul>
+     * <li>Implements:
+     * <ul>
+     * <li>{@link DelegateProxy}
+     * <li>{@link RpcRouter}
+     * </ul>
+     * <li>
+     * routes all invocations of methods, which are defined in RpcService
+     * subtype based on method arguments and routing information defined in the
+     * RpcRoutingTables for this instance
+     * {@link RpcRouter#getRoutingTable(Class)}.
+     * <ul>
+     * <li>
+     * Implementation uses
+     * {@link RpcRouter#getService(Class, InstanceIdentifier)} method to
+     * retrieve particular instance to which call will be routed.
+     * <li>
+     * Instance of {@link InstanceIdentifier} is determined by first argument of
+     * method and is retrieved via method which is annotated with
+     * {@link RoutingContext}. Class representing Routing Context Identifier is
+     * retrieved by {@link RoutingContext}.
+     * <li>If first argument is not defined / {@link RoutingContext} annotation
+     * is not present on any field invocation will be delegated to default
+     * service {@link RpcRouter#getDefaultService()}.
      * </ul>
      * 
      * @param serviceType
@@ -85,4 +88,6 @@ public interface RuntimeCodeGenerator {
      *         also {@link RpcRouter}<T> and {@link DelegateProxy}
      */
     <T extends RpcService> RpcRouter<T> getRouterFor(Class<T> serviceType) throws IllegalArgumentException;
+
+    NotificationInvokerFactory getInvokerFactory();
 }
index 2e73977804c79f649d502535cc9decae1b30fbd3..fbd87d17beede977e4efa2b987f077352522481c 100644 (file)
@@ -44,6 +44,23 @@ class RuntimeCodeHelper {
         } else
             throw new IllegalArgumentException("delegate class is not assignable to proxy");
     }
+    
+        /**
+     * Helper method to set delegate to ManagedDirectedProxy with use of reflection.
+     * 
+     * Note: This method uses reflection, but setting delegate field should not occur too much
+     * to introduce any significant performance hits.
+     * 
+     */
+    public static def void setDelegate(Object proxy, Object delegate) {
+        val field = proxy.class.getField(DELEGATE_FIELD)
+        if (field == null) throw new UnsupportedOperationException("Unable to set delegate to proxy");
+        if (field.type.isAssignableFrom(delegate.class)) {
+            field.set(proxy, delegate)
+        } else
+            throw new IllegalArgumentException("delegate class is not assignable to proxy");
+    }
+    
 
     public static def Map<InstanceIdentifier<?>, ? extends RpcService> getRoutingTable(RpcService target,
         Class<? extends BaseIdentity> tableClass) {
index c6e76c2907730df0a26d369089fa96f345b7cb31..c5648adaa53af7cb6b25e4584eb49b5861aab126 100644 (file)
@@ -17,11 +17,11 @@ import org.opendaylight.yangtools.yang.binding.NotificationListener
  */
 class RuntimeCodeSpecification {
 
-    public static val PACKAGE_PREFIX = "_gen.";
+    //public static val PACKAGE_PREFIX = "_gen.";
 
     public static val DIRECT_PROXY_SUFFIX = "DirectProxy";
     public static val ROUTER_SUFFIX = "Router";
-    public static val INVOKER_SUFFIX = "Invoker";
+    public static val INVOKER_SUFFIX = "ListenerInvoker";
 
     public static val DELEGATE_FIELD = "_delegate"
     public static val ROUTING_TABLE_FIELD_PREFIX = "_routes_"
@@ -52,7 +52,7 @@ class RuntimeCodeSpecification {
      * 
      */
     public static def getGeneratedName(Class<?> cls, String suffix) {
-        '''«PACKAGE_PREFIX»«cls.package.name».«cls.simpleName»$«suffix»'''.toString()
+        '''«cls.name»$$Broker$«suffix»'''.toString()
     }
 
     /**
index b255504a00e46e57eda13ae18d7aa403b2f5da47..f63f2a313ef58375e95d165f4b5823b504cda3d4 100644 (file)
@@ -12,41 +12,39 @@ import org.opendaylight.yangtools.yang.binding.DataObject
 import static org.opendaylight.controller.sal.binding.codegen.impl.XtendHelper.*
 
 class RpcRouterCodegenInstance<T extends RpcService> implements RpcRouter<T> {
-    
+
     @Property
     val T invocationProxy
-    
+
     @Property
     val Class<T> rpcServiceType
-    
+
     @Property
     val Set<Class<? extends BaseIdentity>> contexts
-    
-    val routingTables = new HashMap<Class<? extends BaseIdentity>,RpcRoutingTableImpl<? extends BaseIdentity,?>>;
-    
-    
-    
+
+    val routingTables = new HashMap<Class<? extends BaseIdentity>, RpcRoutingTableImpl<? extends BaseIdentity, ?>>;
+
     @Property
     var T defaultService
-    
-    new(Class<T> type,T routerImpl,Set<Class<? extends BaseIdentity>> contexts) {
+
+    new(Class<T> type, T routerImpl, Set<Class<? extends BaseIdentity>> contexts) {
         _rpcServiceType = type
         _invocationProxy = routerImpl
         _contexts = contexts
-        
-        for(ctx : contexts) {
+
+        for (ctx : contexts) {
             val table = XtendHelper.createRoutingTable(ctx)
-            invocationProxy.setRoutingTable(ctx,table.routes);
-            routingTables.put(ctx,table);
+            invocationProxy.setRoutingTable(ctx, table.routes);
+            routingTables.put(ctx, table);
         }
     }
-    
+
     override <C extends BaseIdentity> getRoutingTable(Class<C> table) {
         routingTables.get(table) as RpcRoutingTable<C,T>
     }
-    
+
     override getService(Class<? extends BaseIdentity> context, InstanceIdentifier<?> path) {
         val table = getRoutingTable(context);
         return table.getRoute(path);
     }
-}
\ No newline at end of file
+}
index 90be6f3476841bf2f20e53f9cc66438148722d60..0aee95ea4175a72c9ebbd4b14827b827547ae335 100644 (file)
@@ -35,13 +35,25 @@ import static extension org.opendaylight.controller.sal.binding.codegen.RuntimeC
 import java.util.HashSet
 import java.io.ObjectOutputStream.PutField
 import static org.opendaylight.controller.sal.binding.impl.osgi.ClassLoaderUtils.*
+import javax.xml.ws.spi.Invoker
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker
+import java.util.Set
+import java.util.Collections
+import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper
+import java.util.WeakHashMap
+import javassist.ClassClassPath
 
-class RuntimeCodeGenerator implements org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator {
+class RuntimeCodeGenerator implements org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator, NotificationInvokerFactory {
 
+    val CtClass BROKER_NOTIFICATION_LISTENER;
     val ClassPool classPool;
+    val Map<Class<? extends NotificationListener>, RuntimeGeneratedInvokerPrototype> invokerClasses;
 
     public new(ClassPool pool) {
         classPool = pool;
+        invokerClasses = new WeakHashMap();
+        BROKER_NOTIFICATION_LISTENER = org.opendaylight.controller.sal.binding.api.NotificationListener.asCtClass;
     }
 
     override <T extends RpcService> getDirectProxyFor(Class<T> iface) {
@@ -80,7 +92,8 @@ class RuntimeCodeGenerator implements org.opendaylight.controller.sal.binding.co
                         val routingPair = routingContextInput;
                         val bodyTmp = '''
                         {
-                            final «InstanceIdentifier.name» identifier = $1.«routingPair.getter.name»()«IF routingPair.encapsulated».getValue()«ENDIF»;
+                            final «InstanceIdentifier.name» identifier = $1.«routingPair.getter.name»()«IF routingPair.
+                            encapsulated».getValue()«ENDIF»;
                             «supertype.name» instance = («supertype.name») «routingPair.context.routingTableField».get(identifier);
                             if(instance == null) {
                                instance = «DELEGATE_FIELD»;
@@ -101,24 +114,31 @@ class RuntimeCodeGenerator implements org.opendaylight.controller.sal.binding.co
         return new RpcRouterCodegenInstance(iface, instance, contexts);
     }
 
-    def Class<?> generateListenerInvoker(Class<? extends NotificationListener> iface) {
-        val targetCls = createClass(iface.invokerName) [
+    protected def generateListenerInvoker(Class<? extends NotificationListener> iface) {
+        val callbacks = iface.methods.filter[notificationCallback]
+
+        val supportedNotification = callbacks.map[parameterTypes.get(0) as Class<? extends Notification>].toSet;
+
+        val targetCls = createClass(iface.invokerName,BROKER_NOTIFICATION_LISTENER ) [
             field(DELEGATE_FIELD, iface)
-            it.method(Void, "invoke", Notification) [
-                val callbacks = iface.methods.filter[notificationCallback]
+            implementMethodsFrom(BROKER_NOTIFICATION_LISTENER) [
                 body = '''
                     {
                         «FOR callback : callbacks SEPARATOR " else "»
-                            if($1 instanceof «val cls = callback.parameterTypes.get(0).name») {
+                        «val cls = callback.parameterTypes.get(0).name»
+                            if($1 instanceof «cls») {
                                 «DELEGATE_FIELD».«callback.name»((«cls») $1);
-                                return;
+                                return null;
                             }
                         «ENDFOR»
+                        return null;
                     }
                 '''
             ]
         ]
-        return targetCls.toClass(iface.classLoader);
+        val finalClass = targetCls.toClass(iface.classLoader,iface.protectionDomain)
+        return new RuntimeGeneratedInvokerPrototype(supportedNotification,
+            finalClass as Class<? extends org.opendaylight.controller.sal.binding.api.NotificationListener>);
     }
 
     def void method(CtClass it, Class<?> returnType, String name, Class<?> parameter, MethodGenerator function1) {
@@ -138,8 +158,8 @@ class RuntimeCodeGenerator implements org.opendaylight.controller.sal.binding.co
                 for (annotation : method.availableAnnotations) {
                     if (annotation instanceof RoutingContext) {
                         val encapsulated = !method.returnType.equals(InstanceIdentifier.asCtClass);
-                        
-                        return new RoutingPair((annotation as RoutingContext).value, method,encapsulated);
+
+                        return new RoutingPair((annotation as RoutingContext).value, method, encapsulated);
                     }
                 }
             }
@@ -195,7 +215,73 @@ class RuntimeCodeGenerator implements org.opendaylight.controller.sal.binding.co
             return pool.get(cls.name)
         } catch (NotFoundException e) {
             pool.appendClassPath(new LoaderClassPath(cls.classLoader));
-            return pool.get(cls.name)
+            try {
+                return pool.get(cls.name)
+
+            } catch (NotFoundException ef) {
+                pool.appendClassPath(new ClassClassPath(cls));
+                return pool.get(cls.name)
+            }
+        }
+    }
+
+    override getInvokerFactory() {
+        return this;
+    }
+
+    override invokerFor(NotificationListener instance) {
+        val cls = instance.class
+        val prototype = resolveInvokerClass(cls);
+        
+        return new RuntimeGeneratedInvoker(instance,prototype)
+    }
+
+    def resolveInvokerClass(Class<? extends NotificationListener> class1) {
+        val invoker = invokerClasses.get(class1);
+        if (invoker !== null) {
+            return invoker;
         }
+        val newInvoker = generateListenerInvoker(class1);
+        invokerClasses.put(class1, newInvoker);
+        return newInvoker
     }
 }
+
+@Data
+class RuntimeGeneratedInvoker implements NotificationInvoker {
+    
+    @Property
+    val NotificationListener delegate;
+
+    
+    @Property
+    var org.opendaylight.controller.sal.binding.api.NotificationListener invocationProxy;
+
+    @Property
+    var RuntimeGeneratedInvokerPrototype prototype;
+
+    new(NotificationListener delegate,RuntimeGeneratedInvokerPrototype prototype) {
+        _delegate = delegate;
+        _prototype = prototype;
+        _invocationProxy = prototype.protoClass.newInstance;
+        RuntimeCodeHelper.setDelegate(_invocationProxy, delegate);
+    }
+
+    override getSupportedNotifications() {
+        prototype.supportedNotifications;
+    }
+
+    override close() {
+        
+    }
+}
+
+@Data
+class RuntimeGeneratedInvokerPrototype {
+
+    @Property
+    val Set<Class<? extends Notification>> supportedNotifications;
+
+    @Property
+    val Class<? extends org.opendaylight.controller.sal.binding.api.NotificationListener> protoClass;
+}
index 88e3e62f392163e192fab5e21faf74f611a0480e..740ae887b0588bcc7648aaba49bb7f63eef5b53a 100644 (file)
@@ -41,6 +41,7 @@ import org.opendaylight.yangtools.yang.binding.BaseIdentity
 import com.google.common.collect.Multimap
 import com.google.common.collect.HashMultimap
 import static org.opendaylight.controller.sal.binding.impl.osgi.ClassLoaderUtils.*
+import java.util.concurrent.Executors
 
 class BindingAwareBrokerImpl implements BindingAwareBroker {
     private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl)
@@ -72,8 +73,10 @@ class BindingAwareBrokerImpl implements BindingAwareBroker {
     def start() {
         initGenerator();
 
+        val executor = Executors.newCachedThreadPool;
         // Initialization of notificationBroker
-        notifyBroker = new NotificationBrokerImpl(null);
+        notifyBroker = new NotificationBrokerImpl(executor);
+        notifyBroker.invokerFactory = generator.invokerFactory;
         dataBroker = new DataBrokerImpl();
         val brokerProperties = newProperties();
         notifyBrokerRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker,
index 377d12ecac10340a2e5f64dc4773ae3ed0205e97..6e493057b240e2f7bcce4c2ebad267afbf9fb4c6 100644 (file)
@@ -15,22 +15,40 @@ import com.google.common.collect.HashMultimap
 import java.util.concurrent.ExecutorService
 import java.util.Collection
 import org.opendaylight.yangtools.concepts.Registration
+import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory
+import org.opendaylight.yangtools.concepts.ListenerRegistration
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
+import java.util.Collections
+import org.slf4j.LoggerFactory
+import java.util.concurrent.Callable
 
 class NotificationBrokerImpl implements NotificationProviderService {
 
     val Multimap<Class<? extends Notification>, NotificationListener<?>> listeners;
-    val ExecutorService executor;
+
+    @Property
+    var ExecutorService executor;
+
+    @Property
+    var RuntimeCodeGenerator generator;
+
+    @Property
+    var NotificationInvokerFactory invokerFactory;
 
     new(ExecutorService executor) {
         listeners = HashMultimap.create()
         this.executor = executor;
     }
 
+    @Deprecated
     override <T extends Notification> addNotificationListener(Class<T> notificationType,
         NotificationListener<T> listener) {
         listeners.put(notificationType, listener)
     }
 
+    @Deprecated
     override <T extends Notification> removeNotificationListener(Class<T> notificationType,
         NotificationListener<T> listener) {
         listeners.remove(notificationType, listener)
@@ -45,68 +63,125 @@ class NotificationBrokerImpl implements NotificationProviderService {
     }
 
     @SuppressWarnings("unchecked")
-    def notifyAll(Collection<NotificationListener<?>> listeners, Notification notification) {
+    private def notifyAll(Collection<NotificationListener<?>> listeners, Notification notification) {
         listeners.forEach[(it as NotificationListener).onNotification(notification)]
     }
 
+    @Deprecated
     override addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
-        throw new UnsupportedOperationException("TODO: auto-generated method stub")
+        throw new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead.");
 
     }
 
+    @Deprecated
     override removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
-        throw new UnsupportedOperationException("TODO: auto-generated method stub")
+        throw new UnsupportedOperationException(
+            "Deprecated method. Use RegisterNotificationListener returned value to close registration.")
     }
 
+    @Deprecated
     override notify(Notification notification, ExecutorService service) {
-        publish(notification)
+        publish(notification, service)
     }
 
     override publish(Notification notification) {
-        notification.notificationTypes.forEach [
-            listeners.get(it as Class<? extends Notification>)?.notifyAll(notification)
-        ]
+        publish(notification, executor)
     }
 
     override publish(Notification notification, ExecutorService service) {
-        publish(notification)
+        val allTypes = notification.notificationTypes
+
+        var Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();
+        for (type : allTypes) {
+            listenerToNotify = listenerToNotify + listeners.get(type as Class<? extends Notification>)
+        }
+        val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet;
+        executor.invokeAll(tasks);
     }
 
     override <T extends Notification> registerNotificationListener(Class<T> notificationType,
         NotificationListener<T> listener) {
-        val reg = new GenericNotificationRegistration<T>(notificationType,listener,this);
-        listeners.put(notificationType,listener);
+        val reg = new GenericNotificationRegistration<T>(notificationType, listener, this);
+        listeners.put(notificationType, listener);
         return reg;
     }
 
     override registerNotificationListener(
         org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
-            
+        val invoker = invokerFactory.invokerFor(listener);
+        for (notifyType : invoker.supportedNotifications) {
+            listeners.put(notifyType, invoker.invocationProxy)
+        }
+        val registration = new GeneratedListenerRegistration(listener, invoker,this);
+        return registration as Registration<org.opendaylight.yangtools.yang.binding.NotificationListener>;
     }
-    
-    
+
     protected def unregisterListener(GenericNotificationRegistration<?> reg) {
-        listeners.remove(reg.type,reg.instance);
+        listeners.remove(reg.type, reg.instance);
+    }
+
+    protected def unregisterListener(GeneratedListenerRegistration reg) {
+        for (notifyType : reg.invoker.supportedNotifications) {
+            listeners.remove(notifyType, reg.invoker.invocationProxy)
+        }
     }
 }
-class GenericNotificationRegistration<T extends Notification> implements Registration<NotificationListener<T>> {
-    
-    @Property
-    var NotificationListener<T> instance;
-    
+
+class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {
+
     @Property
     val Class<T> type;
-    
-    
-    val NotificationBrokerImpl notificationBroker;
-    
-    public new(Class<T> type, NotificationListener<T> instance,NotificationBrokerImpl broker) {
-        _instance = instance;
+
+    var NotificationBrokerImpl notificationBroker;
+
+    public new(Class<T> type, NotificationListener<T> instance, NotificationBrokerImpl broker) {
+        super(instance);
         _type = type;
         notificationBroker = broker;
     }
+
+    override protected removeRegistration() {
+        notificationBroker.unregisterListener(this);
+        notificationBroker = null;
+    }
+}
+
+class GeneratedListenerRegistration extends AbstractObjectRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> implements ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> {
+
+    @Property
+    val NotificationInvoker invoker;
     
-    override close() {
+    var NotificationBrokerImpl notificationBroker;
+    
+
+    new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) {
+        super(instance);
+        _invoker = invoker;
+        notificationBroker = broker;
+    }
+
+    override protected removeRegistration() {
         notificationBroker.unregisterListener(this);
+        notificationBroker = null;
+        invoker.close();
+    }
+}
+
+@Data
+class NotifyTask implements Callable<Object> {
+
+    private static val log = LoggerFactory.getLogger(NotifyTask);
+
+    val NotificationListener listener;
+    val Notification notification;
+
+    override call() {
+        try {
+            listener.onNotification(notification);
+        } catch (Exception e) {
+            log.error("Unhandled exception {} thrown by listener: {} Notification: {}", e, listener, notification);
+        }
+        return null;
     }
+
 }
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/spi/NotificationInvokerFactory.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/spi/NotificationInvokerFactory.java
new file mode 100644 (file)
index 0000000..a68d1e1
--- /dev/null
@@ -0,0 +1,23 @@
+package org.opendaylight.controller.sal.binding.spi;
+
+import java.util.Set;
+
+import org.opendaylight.controller.sal.binding.api.NotificationListener;
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+public interface NotificationInvokerFactory {
+
+    NotificationInvoker invokerFor(org.opendaylight.yangtools.yang.binding.NotificationListener instance);
+
+    public interface NotificationInvoker {
+
+        Set<Class<? extends Notification>> getSupportedNotifications();
+
+        NotificationListener<Notification> getInvocationProxy();
+
+        public abstract void close();
+
+        org.opendaylight.yangtools.yang.binding.NotificationListener getDelegate();
+
+    }
+}
index 95ac22c6b37c31dbd7bbb65fafd35f5e78e3b5d6..65bc58314f66c6c77b1fdaea902a8c6543ec0588 100644 (file)
@@ -1,8 +1,11 @@
 package org.opendaylight.controller.sal.binding.test;
+
 import static org.junit.Assert.*;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import javassist.ClassPool;
@@ -12,15 +15,25 @@ import org.junit.Test;
 
 import static org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper.*;
 
+import org.opendaylight.controller.sal.binding.api.NotificationListener;
 import org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator;
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
 import org.opendaylight.controller.sal.binding.spi.RpcRouter;
 import org.opendaylight.controller.sal.binding.spi.RpcRoutingTable;
+import org.opendaylight.controller.sal.binding.test.mock.BarListener;
+import org.opendaylight.controller.sal.binding.test.mock.BarUpdate;
+import org.opendaylight.controller.sal.binding.test.mock.CompositeListener;
+import org.opendaylight.controller.sal.binding.test.mock.FlowDelete;
+import org.opendaylight.controller.sal.binding.test.mock.FooListener;
 import org.opendaylight.controller.sal.binding.test.mock.FooService;
+import org.opendaylight.controller.sal.binding.test.mock.FooUpdate;
 import org.opendaylight.controller.sal.binding.test.mock.ReferencableObject;
 import org.opendaylight.controller.sal.binding.test.mock.ReferencableObjectKey;
 import org.opendaylight.controller.sal.binding.test.mock.SimpleInput;
 import org.opendaylight.yangtools.yang.binding.Augmentation;
 import org.opendaylight.yangtools.yang.binding.BaseIdentity;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.IdentifiableItem;
@@ -28,17 +41,17 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
 
 import static org.mockito.Mockito.*;
 
-
 public class RuntimeCodeGeneratorTest {
 
     private RuntimeCodeGenerator codeGenerator;
+    private NotificationInvokerFactory invokerFactory;
 
-    
     @Before
     public void initialize() {
         this.codeGenerator = new RuntimeCodeGenerator(ClassPool.getDefault());
+        this.invokerFactory = codeGenerator.getInvokerFactory();
     }
-    
+
     @Test
     public void testGenerateDirectProxy() {
         FooService product = codeGenerator.getDirectProxyFor(FooService.class);
@@ -50,60 +63,96 @@ public class RuntimeCodeGeneratorTest {
         RpcRouter<FooService> product = codeGenerator.getRouterFor(FooService.class);
         assertNotNull(product);
         assertNotNull(product.getInvocationProxy());
-        
-        assertEquals("2 fields should be generated.",2,product.getInvocationProxy().getClass().getFields().length);
-        
+
+        assertEquals("2 fields should be generated.", 2, product.getInvocationProxy().getClass().getFields().length);
+
         verifyRouting(product);
     }
 
-    private void verifyRouting(RpcRouter<FooService> product) {
-        assertNotNull("Routing table should be initialized",product.getRoutingTable(BaseIdentity.class));
+    @Test
+    public void testInvoker() throws Exception {
+
+        FooListenerImpl fooListener = new FooListenerImpl();
+
+        NotificationInvoker invokerFoo = invokerFactory.invokerFor(fooListener);
+
         
+        assertSame(fooListener,invokerFoo.getDelegate());
+        assertNotNull(invokerFoo.getSupportedNotifications());
+        assertEquals(1, invokerFoo.getSupportedNotifications().size());
+        assertNotNull(invokerFoo.getInvocationProxy());
+
+        FooUpdateImpl fooOne = new FooUpdateImpl();
+        invokerFoo.getInvocationProxy().onNotification(fooOne);
+
+        assertEquals(1, fooListener.receivedFoos.size());
+        assertSame(fooOne, fooListener.receivedFoos.get(0));
+
+        CompositeListenerImpl composite = new CompositeListenerImpl();
+
+        NotificationInvoker invokerComposite = invokerFactory.invokerFor(composite);
+
+        assertNotNull(invokerComposite.getSupportedNotifications());
+        assertEquals(3, invokerComposite.getSupportedNotifications().size());
+        assertNotNull(invokerComposite.getInvocationProxy());
+
+        invokerComposite.getInvocationProxy().onNotification(fooOne);
+
+        assertEquals(1, composite.receivedFoos.size());
+        assertSame(fooOne, composite.receivedFoos.get(0));
+
+        assertEquals(0, composite.receivedBars.size());
+
+        BarUpdateImpl barOne = new BarUpdateImpl();
+
+        invokerComposite.getInvocationProxy().onNotification(barOne);
+
+        assertEquals(1, composite.receivedFoos.size());
+        assertEquals(1, composite.receivedBars.size());
+        assertSame(barOne, composite.receivedBars.get(0));
+
+    }
+
+    private void verifyRouting(RpcRouter<FooService> product) {
+        assertNotNull("Routing table should be initialized", product.getRoutingTable(BaseIdentity.class));
+
         RpcRoutingTable<BaseIdentity, FooService> routingTable = product.getRoutingTable(BaseIdentity.class);
-        
+
         int servicesCount = 2;
         int instancesPerService = 3;
-        
-        InstanceIdentifier<?>[][] identifiers = identifiers(servicesCount,instancesPerService);
-        FooService service[] = new FooService[] {
-                mock(FooService.class, "Instance 0"),
-                mock(FooService.class,"Instance 1")
-        };
-        
-        for(int i = 0;i<service.length;i++) {
+
+        InstanceIdentifier<?>[][] identifiers = identifiers(servicesCount, instancesPerService);
+        FooService service[] = new FooService[] { mock(FooService.class, "Instance 0"),
+                mock(FooService.class, "Instance 1") };
+
+        for (int i = 0; i < service.length; i++) {
             for (InstanceIdentifier<?> instance : identifiers[i]) {
                 routingTable.updateRoute(instance, service[i]);
             }
         }
-        
-        assertEquals("All instances should be registered.", servicesCount*instancesPerService, routingTable.getRoutes().size());
-        
-        SimpleInput[] instance_0_input = new SimpleInputImpl[] {
-            new SimpleInputImpl(identifiers[0][0]),
-            new SimpleInputImpl(identifiers[0][1]),
-            new SimpleInputImpl(identifiers[0][2])
-        };
-        
-        SimpleInput[] instance_1_input = new SimpleInputImpl[] {
-                new SimpleInputImpl(identifiers[1][0]),
-                new SimpleInputImpl(identifiers[1][1]),
-                new SimpleInputImpl(identifiers[1][2])
-        };
-        
+
+        assertEquals("All instances should be registered.", servicesCount * instancesPerService, routingTable
+                .getRoutes().size());
+
+        SimpleInput[] instance_0_input = new SimpleInputImpl[] { new SimpleInputImpl(identifiers[0][0]),
+                new SimpleInputImpl(identifiers[0][1]), new SimpleInputImpl(identifiers[0][2]) };
+
+        SimpleInput[] instance_1_input = new SimpleInputImpl[] { new SimpleInputImpl(identifiers[1][0]),
+                new SimpleInputImpl(identifiers[1][1]), new SimpleInputImpl(identifiers[1][2]) };
+
         // We test sending mock messages
-        
+
         product.getInvocationProxy().simple(instance_0_input[0]);
         verify(service[0]).simple(instance_0_input[0]);
-        
+
         product.getInvocationProxy().simple(instance_0_input[1]);
         product.getInvocationProxy().simple(instance_0_input[2]);
-        
+
         verify(service[0]).simple(instance_0_input[1]);
         verify(service[0]).simple(instance_0_input[2]);
-        
-        
+
         product.getInvocationProxy().simple(instance_1_input[0]);
-        
+
         // We should have call to instance 1
         verify(service[1]).simple(instance_1_input[0]);
     }
@@ -111,23 +160,25 @@ public class RuntimeCodeGeneratorTest {
     private InstanceIdentifier<?>[][] identifiers(int serviceSize, int instancesPerService) {
         InstanceIdentifier<?>[][] ret = new InstanceIdentifier[serviceSize][];
         int service = 0;
-        for (int i = 0;i<serviceSize;i++) {
-            
+        for (int i = 0; i < serviceSize; i++) {
+
             InstanceIdentifier<?>[] instanceIdentifiers = new InstanceIdentifier[instancesPerService];
             ret[i] = instanceIdentifiers;
-            for(int id = 0;id<instancesPerService;id++) {
-                instanceIdentifiers[id] = referencableIdentifier(service*instancesPerService+id);
+            for (int id = 0; id < instancesPerService; id++) {
+                instanceIdentifiers[id] = referencableIdentifier(service * instancesPerService + id);
             }
             service++;
         }
-        
+
         return ret;
     }
 
     private InstanceIdentifier<?> referencableIdentifier(int i) {
         ReferencableObjectKey key = new ReferencableObjectKey(i);
-        IdentifiableItem<ReferencableObject,ReferencableObjectKey> pathArg = new IdentifiableItem<>(ReferencableObject.class,key);
-        return new InstanceIdentifier<ReferencableObject>(Arrays.<PathArgument>asList(pathArg), ReferencableObject.class);
+        IdentifiableItem<ReferencableObject, ReferencableObjectKey> pathArg = new IdentifiableItem<>(
+                ReferencableObject.class, key);
+        return new InstanceIdentifier<ReferencableObject>(Arrays.<PathArgument> asList(pathArg),
+                ReferencableObject.class);
     }
 
     private static class SimpleInputImpl implements SimpleInput {
@@ -152,4 +203,51 @@ public class RuntimeCodeGeneratorTest {
             return SimpleInput.class;
         }
     }
+
+    private static class FooUpdateImpl implements FooUpdate {
+        @Override
+        public Class<? extends DataContainer> getImplementedInterface() {
+            return FooUpdate.class;
+        }
+    }
+
+    private static class BarUpdateImpl implements BarUpdate {
+        @Override
+        public Class<? extends DataContainer> getImplementedInterface() {
+            return BarUpdate.class;
+        }
+
+        @Override
+        public InstanceIdentifier<?> getInheritedIdentifier() {
+            return null;
+        }
+    }
+
+    private static class FooListenerImpl implements FooListener {
+
+        List<FooUpdate> receivedFoos = new ArrayList<>();
+
+        @Override
+        public void onFooUpdate(FooUpdate notification) {
+            receivedFoos.add(notification);
+        }
+
+    }
+
+    private static class CompositeListenerImpl extends FooListenerImpl implements BarListener {
+
+        List<BarUpdate> receivedBars = new ArrayList<>();
+        List<FlowDelete> receivedDeletes = new ArrayList<>();
+
+        @Override
+        public void onBarUpdate(BarUpdate notification) {
+            receivedBars.add(notification);
+        }
+
+        @Override
+        public void onFlowDelete(FlowDelete notification) {
+            receivedDeletes.add(notification);
+        }
+
+    }
 }
diff --git a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/mock/BarListener.java b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/mock/BarListener.java
new file mode 100644 (file)
index 0000000..bc251b1
--- /dev/null
@@ -0,0 +1,11 @@
+package org.opendaylight.controller.sal.binding.test.mock;
+
+import org.opendaylight.yangtools.yang.binding.NotificationListener;
+
+public interface BarListener extends NotificationListener {
+
+    void onBarUpdate(BarUpdate notification);
+    
+    void onFlowDelete(FlowDelete notification);
+
+}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/mock/CompositeListener.java b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/mock/CompositeListener.java
new file mode 100644 (file)
index 0000000..a1c2655
--- /dev/null
@@ -0,0 +1,5 @@
+package org.opendaylight.controller.sal.binding.test.mock;
+
+public interface CompositeListener extends FooListener,BarListener {
+
+}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/mock/FlowDelete.java b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/mock/FlowDelete.java
new file mode 100644 (file)
index 0000000..3d170ba
--- /dev/null
@@ -0,0 +1,7 @@
+package org.opendaylight.controller.sal.binding.test.mock;
+
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+public interface FlowDelete extends Notification{
+
+}
index 3629689cdf33cc83af1deccb36e15341b3c5cc4a..4835537c14404c2fea1c84a85bc18b0013bf6a9a 100644 (file)
@@ -5,6 +5,5 @@ import org.opendaylight.yangtools.yang.binding.NotificationListener;
 public interface FooListener extends NotificationListener {
 
     void onFooUpdate(FooUpdate notification);
-    void onBarUpdate(BarUpdate notification);
     
 }
diff --git a/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/NoficationTest.java b/opendaylight/md-sal/sal-binding-it/src/test/java/org/opendaylight/controller/test/sal/binding/it/NoficationTest.java
new file mode 100644 (file)
index 0000000..037055c
--- /dev/null
@@ -0,0 +1,176 @@
+package org.opendaylight.controller.test.sal.binding.it;
+
+import static org.junit.Assert.*;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ConsumerContext;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
+import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer;
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.binding.api.NotificationService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.binding.NotificationListener;
+
+public class NoficationTest extends AbstractTest {
+
+    private FlowListener listener1 = new FlowListener();
+    private FlowListener listener2 = new FlowListener();
+
+    private Registration<NotificationListener> listener1Reg;
+    private Registration<NotificationListener> listener2Reg;
+
+    private NotificationProviderService notifyService;
+
+    @Before
+    public void setUp() throws Exception {
+    }
+
+    @Test
+    public void notificationTest() throws Exception {
+        /**
+         * 
+         * We register Provider 1 which retrieves Notification Service from MD-SAL
+         * 
+         */
+        AbstractTestProvider provider = new AbstractTestProvider() {
+            @Override
+            public void onSessionInitiated(ProviderContext session) {
+                notifyService = session.getSALService(NotificationProviderService.class);
+            }
+        };
+        broker.registerProvider(provider, getBundleContext());
+
+        /**
+         * 
+         * We register Consumer 1 which retrieves Notification Service from MD-SAL
+         * and registers SalFlowListener as notification listener
+         * 
+         */
+        BindingAwareConsumer consumer1 = new BindingAwareConsumer() {
+            @Override
+            public void onSessionInitialized(ConsumerContext session) {
+                NotificationService notificationService = session.getSALService(NotificationService.class);
+                assertNotNull(notificationService);
+                listener1Reg = notificationService.registerNotificationListener(listener1);
+            }
+        };
+
+        broker.registerConsumer(consumer1, getBundleContext());
+
+        assertNotNull(listener1Reg);
+
+        /**
+         * We wait 100ms for to make sure broker threads delivered notifications
+         */
+        notifyService.publish(flowAdded(0));
+        Thread.sleep(100);
+        
+        /** 
+         * We verify one notification was delivered
+         * 
+         */
+        assertEquals(1, listener1.addedFlows.size());
+        assertEquals(0, listener1.addedFlows.get(0).getCookie().intValue());
+
+        
+        /**
+         * We also register second consumerm and it's SalFlowListener
+         */
+        BindingAwareConsumer consumer2 = new BindingAwareConsumer() {
+            @Override
+            public void onSessionInitialized(ConsumerContext session) {
+                listener2Reg = session.getSALService(NotificationProviderService.class).registerNotificationListener(
+                        listener2);
+            }
+        };
+
+        broker.registerConsumer(consumer2, getBundleContext());
+
+        /**
+         * We publish 3 notifications
+         */
+        notifyService.publish(flowAdded(5));
+        notifyService.publish(flowAdded(10));
+        notifyService.publish(flowAdded(2));
+
+        /**
+         * We wait 100ms for to make sure broker threads delivered notifications
+         */
+        Thread.sleep(100);
+        
+        /** 
+         * We verify 3 notification was delivered to both listeners
+         * (first one received 4 total, second 3 in total).
+         * 
+         */
+
+        assertEquals(4, listener1.addedFlows.size());
+        assertEquals(3, listener2.addedFlows.size());
+
+        /**
+         * We close / unregister second listener
+         * 
+         */
+        listener2Reg.close();
+  
+        /**
+         * 
+         * We punblish 5th notification
+         */
+        notifyService.publish(flowAdded(10));
+        
+        /**
+         * We wait 100ms for to make sure broker threads delivered notifications
+         */
+        Thread.sleep(100);
+        
+        /**
+         * We verify that first consumer received 5 notifications in total,
+         * second consumer only three. Last notification was never received,
+         * because it already unregistered listener.
+         * 
+         */
+        assertEquals(5, listener1.addedFlows.size());
+        assertEquals(3, listener2.addedFlows.size());
+
+    }
+
+    public static FlowAdded flowAdded(int i) {
+        FlowAddedBuilder ret = new FlowAddedBuilder();
+        ret.setCookie(BigInteger.valueOf(i));
+        return ret.build();
+    }
+
+    private static class FlowListener implements SalFlowListener {
+
+        List<FlowAdded> addedFlows = new ArrayList<>();
+        List<FlowRemoved> removedFlows = new ArrayList<>();
+        List<FlowUpdated> updatedFlows = new ArrayList<>();
+
+        @Override
+        public void onFlowAdded(FlowAdded notification) {
+            addedFlows.add(notification);
+        }
+
+        @Override
+        public void onFlowRemoved(FlowRemoved notification) {
+            removedFlows.add(notification);
+        };
+
+        @Override
+        public void onFlowUpdated(FlowUpdated notification) {
+            updatedFlows.add(notification);
+        }
+
+    }
+}