Added listener for rpc registrations.
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / BrokerImpl.xtend
index 6b57cdc4db64c72990359c8a9dc9191f967debdd..83dda5902dee4e6fd819debabcaecd12ca0413e8 100644 (file)
@@ -14,6 +14,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.BrokerService;
@@ -26,6 +27,9 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.osgi.framework.BundleContext;
 import org.slf4j.LoggerFactory;
+import org.opendaylight.yangtools.concepts.ListenerRegistration
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
+import org.opendaylight.controller.md.sal.common.impl.ListenerRegistry
 
 public class BrokerImpl implements Broker {
     private static val log = LoggerFactory.getLogger(BrokerImpl);
@@ -38,13 +42,15 @@ public class BrokerImpl implements Broker {
     private val Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections.
         synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
 
+
+    private val rpcRegistrationListeners = new ListenerRegistry<RpcRegistrationListener>();
     // RPC Context
     private val Map<QName, RpcImplementation> rpcImpls = Collections.synchronizedMap(
         new HashMap<QName, RpcImplementation>());
 
     // Implementation specific
     @Property
-    private var ExecutorService executor;
+    private var ExecutorService executor = Executors.newFixedThreadPool(5);
     @Property
     private var BundleContext bundleContext;
 
@@ -95,14 +101,31 @@ public class BrokerImpl implements Broker {
             throw new IllegalStateException("Implementation for rpc " + rpcType + " is already registered.");
         }
 
-        //TODO: Add notification for availability of Rpc Implementation
+        
         rpcImpls.put(rpcType, implementation);
+
+        
+        for(listener : rpcRegistrationListeners.listeners)  {
+            try {
+                listener.instance.onRpcImplementationAdded(rpcType);
+            } catch (Exception e){
+                log.error("Unhandled exception during invoking listener",e);
+            }
+        }
     }
 
     protected def void removeRpcImplementation(QName rpcType, RpcImplementation implToRemove) {
         if(implToRemove == rpcImpls.get(rpcType)) {
             rpcImpls.remove(rpcType);
         }
+        
+        for(listener : rpcRegistrationListeners.listeners)  {
+            try {
+                listener.instance.onRpcImplementationRemoved(rpcType);
+            } catch (Exception e){
+                log.error("Unhandled exception during invoking listener",e);
+            }
+        }
     }
 
     protected def Future<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
@@ -148,4 +171,12 @@ public class BrokerImpl implements Broker {
         sessions.remove(consumerContextImpl);
         providerSessions.remove(consumerContextImpl);
     }
+    
+    protected def getSupportedRpcs() {
+        rpcImpls.keySet;
+    }
+    
+    def ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
+        rpcRegistrationListeners.register(listener);
+    }
 }