import org.opendaylight.yangtools.yang.binding.BaseIdentity
import com.google.common.collect.Multimap
import com.google.common.collect.HashMultimap
-import static org.opendaylight.controller.sal.binding.impl.osgi.ClassLoaderUtils.*
+import static org.opendaylight.controller.sal.binding.impl.util.ClassLoaderUtils.*
+import java.util.concurrent.Executors
+import java.util.Collections
+import org.opendaylight.yangtools.yang.binding.DataObject
+import org.opendaylight.controller.sal.binding.impl.connect.dom.ConnectorActivator
-class BindingAwareBrokerImpl implements BindingAwareBroker {
+class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable {
private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl)
+
+ private InstanceIdentifier<? extends DataObject> root = InstanceIdentifier.builder().toInstance();
+
private val clsPool = ClassPool.getDefault()
private var RuntimeCodeGenerator generator;
+
/**
* Map of all Managed Direct Proxies
*/
private val Map<Class<? extends RpcService>, RpcRouter<? extends RpcService>> rpcRouters = new ConcurrentHashMap();
+ @Property
private var NotificationBrokerImpl notifyBroker
+
+ @Property
private var DataBrokerImpl dataBroker
- private var ServiceRegistration<NotificationProviderService> notifyBrokerRegistration
-
+
@Property
var BundleContext brokerBundleContext
+
+ ServiceRegistration<NotificationProviderService> notifyProviderRegistration
+
+ ServiceRegistration<NotificationService> notifyConsumerRegistration
+
+ ServiceRegistration<DataProviderService> dataProviderRegistration
+
+ ServiceRegistration<DataBrokerService> dataConsumerRegistration
+
+ ConnectorActivator connectorActivator
+
+
+ public new(BundleContext bundleContext) {
+ _brokerBundleContext = bundleContext;
+ }
def start() {
+ log.info("Starting MD-SAL: Binding Aware Broker");
initGenerator();
+ val executor = Executors.newCachedThreadPool;
// Initialization of notificationBroker
- notifyBroker = new NotificationBrokerImpl(null);
+ log.info("Starting MD-SAL: Binding Aware Notification Broker");
+ notifyBroker = new NotificationBrokerImpl(executor);
+ notifyBroker.invokerFactory = generator.invokerFactory;
+
+ log.info("Starting MD-SAL: Binding Aware Data Broker");
dataBroker = new DataBrokerImpl();
+ dataBroker.executor = executor;
+
val brokerProperties = newProperties();
- notifyBrokerRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker,
+
+
+ log.info("Starting MD-SAL: Binding Aware Data Broker");
+ notifyProviderRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker,
brokerProperties)
- brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties)
- brokerBundleContext.registerService(DataProviderService, dataBroker, brokerProperties)
- brokerBundleContext.registerService(DataBrokerService, dataBroker, brokerProperties)
+ notifyConsumerRegistration = brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties)
+ dataProviderRegistration = brokerBundleContext.registerService(DataProviderService, dataBroker, brokerProperties)
+ dataConsumerRegistration = brokerBundleContext.registerService(DataBrokerService, dataBroker, brokerProperties)
+ connectorActivator = new ConnectorActivator(dataBroker,brokerBundleContext);
+ connectorActivator.start();
+ log.info("MD-SAL: Binding Aware Broker Started");
}
def initGenerator() {
def <T extends RpcService> registerRpcImplementation(Class<T> type, T service, OsgiProviderContext context,
Hashtable<String, String> properties) {
val proxy = getManagedDirectProxy(type)
- checkState(proxy.delegate === null, "The Service for type {} is already registered", type)
+ checkState(proxy.delegate === null, "The Service for type %s is already registered", type)
val osgiReg = context.bundleContext.registerService(type, service, properties);
proxy.delegate = service;
- return new RpcServiceRegistrationImpl<T>(type, service, osgiReg);
- }
-
- def <T extends RpcService> RpcRegistration<T> registerMountedRpcImplementation(Class<T> type, T service,
- InstanceIdentifier<?> identifier, OsgiProviderContext context) {
- throw new UnsupportedOperationException("TODO: auto-generated method stub")
+ return new RpcServiceRegistrationImpl<T>(type, service, osgiReg,this);
}
def <T extends RpcService> RoutedRpcRegistration<T> registerRoutedRpcImplementation(Class<T> type, T service,
// Updating internal structure of registration
routingTable.updateRoute(path, registration.instance)
+ // Update routing table / send announce to message bus
+
val success = paths.put(context, path);
}
val routingTable = router.getRoutingTable(context)
val path = ctxMap.value
routingTable.removeRoute(path)
-
}
}
+
+ protected def <T extends RpcService> void unregisterRpcService(RpcServiceRegistrationImpl<T> registration) {
+
+ val type = registration.serviceType;
+
+ val proxy = managedProxies.get(type);
+ if(proxy.proxy.delegate === registration.instance) {
+ proxy.proxy.delegate = null;
+ }
+ }
+
+ def createDelegate(Class<? extends RpcService> type) {
+ getManagedDirectProxy(type);
+ }
+
+ def getRpcRouters() {
+ return Collections.unmodifiableMap(rpcRouters);
+ }
+
+ override close() {
+ dataConsumerRegistration.unregister()
+ dataProviderRegistration.unregister()
+ notifyConsumerRegistration.unregister()
+ notifyProviderRegistration.unregister()
+ }
+
}
class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
unregisterPath(context, instance);
}
- override getService() {
- return instance;
- }
-
override registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
checkClosed()
broker.registerPath(this, context, path);
checkClosed()
broker.unregisterPath(this, context, path);
}
+
+ override getServiceType() {
+ return router.serviceType;
+ }
private def checkClosed() {
if (closed)
}
}
+class RpcServiceRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T> {
+
+ val ServiceRegistration<T> osgiRegistration;
+ private var BindingAwareBrokerImpl broker;
+
+ @Property
+ val Class<T> serviceType;
+
+ public new(Class<T> type, T service, ServiceRegistration<T> osgiReg,BindingAwareBrokerImpl broker) {
+ super(service);
+ this._serviceType = type;
+ this.osgiRegistration = osgiReg;
+ this.broker= broker;
+ }
+
+ override protected removeRegistration() {
+ broker.unregisterRpcService(this);
+ broker = null;
+ }
+
+}