import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.BrokerService;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.osgi.framework.BundleContext;
import org.slf4j.LoggerFactory;
+import org.opendaylight.yangtools.concepts.ListenerRegistration
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
+import org.opendaylight.controller.md.sal.common.impl.ListenerRegistry
public class BrokerImpl implements Broker {
private static val log = LoggerFactory.getLogger(BrokerImpl);
private val Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections.
synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
+
+ private val rpcRegistrationListeners = new ListenerRegistry<RpcRegistrationListener>();
// RPC Context
private val Map<QName, RpcImplementation> rpcImpls = Collections.synchronizedMap(
new HashMap<QName, RpcImplementation>());
// Implementation specific
@Property
- private var ExecutorService executor;
+ private var ExecutorService executor = Executors.newFixedThreadPool(5);
@Property
private var BundleContext bundleContext;
throw new IllegalStateException("Implementation for rpc " + rpcType + " is already registered.");
}
- //TODO: Add notification for availability of Rpc Implementation
+
rpcImpls.put(rpcType, implementation);
+
+
+ for(listener : rpcRegistrationListeners.listeners) {
+ try {
+ listener.instance.onRpcImplementationAdded(rpcType);
+ } catch (Exception e){
+ log.error("Unhandled exception during invoking listener",e);
+ }
+ }
}
protected def void removeRpcImplementation(QName rpcType, RpcImplementation implToRemove) {
if(implToRemove == rpcImpls.get(rpcType)) {
rpcImpls.remove(rpcType);
}
+
+ for(listener : rpcRegistrationListeners.listeners) {
+ try {
+ listener.instance.onRpcImplementationRemoved(rpcType);
+ } catch (Exception e){
+ log.error("Unhandled exception during invoking listener",e);
+ }
+ }
}
protected def Future<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
sessions.remove(consumerContextImpl);
providerSessions.remove(consumerContextImpl);
}
+
+ protected def getSupportedRpcs() {
+ rpcImpls.keySet;
+ }
+
+ def ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
+ rpcRegistrationListeners.register(listener);
+ }
}