+ return new RpcServiceRegistrationImpl<T>(type, service, this);
+ }
+
+ override <T extends RpcService> RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> type, T service) {
+ checkNotNull(type, "Service type should not be null")
+ checkNotNull(service, "Service type should not be null")
+
+ val router = resolveRpcRouter(type);
+ checkState(router !== null)
+ return new RoutedRpcRegistrationImpl<T>(service, router, this)
+ }
+
+ override <T extends RpcService> getRpcService(Class<T> service) {
+ checkNotNull(service, "Service should not be null");
+ return getManagedDirectProxy(service) as T;
+ }
+
+ private def <T extends RpcService> RpcRouter<T> resolveRpcRouter(Class<T> type) {
+
+ val router = rpcRouters.get(type);
+ if (router !== null) {
+ return router as RpcRouter<T>;
+ }
+
+ // We created Router
+ return withLock(routerGenerationLock) [ |
+ val maybeRouter = rpcRouters.get(type);
+ if (maybeRouter !== null) {
+ return maybeRouter as RpcRouter<T>;
+ }
+
+ val newRouter = generator.getRouterFor(type);
+ checkState(newRouter !== null);
+ rpcRouters.put(type, newRouter);
+ // We create / update Direct Proxy for router
+ val proxy = getManagedDirectProxy(type);
+ proxy.delegate = newRouter.invocationProxy
+ return newRouter;
+ ]
+
+ }
+
+ protected def <T extends RpcService> void registerPath(RoutedRpcRegistrationImpl<T> registration,
+ Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
+
+ val router = registration.router;
+ val paths = registration.registeredPaths;
+
+ val routingTable = router.getRoutingTable(context)
+ checkState(routingTable != null);
+
+ // Updating internal structure of registration
+ routingTable.updateRoute(path, registration.instance)
+
+ // Update routing table / send announce to message bus
+ val success = paths.put(context, path);
+ }
+
+ protected def <T extends RpcService> void unregisterPath(RoutedRpcRegistrationImpl<T> registration,
+ Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
+
+ val router = registration.router;
+ val paths = registration.registeredPaths;
+
+ val routingTable = router.getRoutingTable(context)
+ checkState(routingTable != null);
+
+ // Updating internal structure of registration
+ val target = routingTable.getRoute(path)
+ checkState(target === registration.instance)
+ routingTable.removeRoute(path)
+ checkState(paths.remove(context, path));
+ }
+
+ protected def <T extends RpcService> void unregisterRoutedRpcService(RoutedRpcRegistrationImpl<T> registration) {
+
+ val router = registration.router;
+ val paths = registration.registeredPaths;
+
+ for (ctxMap : registration.registeredPaths.entries) {
+ val context = ctxMap.key
+ val routingTable = router.getRoutingTable(context)
+ val path = ctxMap.value
+ routingTable.removeRoute(path)
+ }
+ }
+
+ protected def <T extends RpcService> void unregisterRpcService(RpcServiceRegistrationImpl<T> registration) {
+
+ val type = registration.serviceType;
+
+ val proxy = managedProxies.get(type);
+ if (proxy.proxy.delegate === registration.instance) {
+ proxy.proxy.delegate = null;
+ }
+ }
+
+ def createDelegate(Class<? extends RpcService> type) {
+ getManagedDirectProxy(type);
+ }
+
+ def getRpcRouters() {
+ return Collections.unmodifiableMap(rpcRouters);
+ }
+
+ override close() {
+ dataConsumerRegistration.unregister()
+ dataProviderRegistration.unregister()
+ notifyConsumerRegistration.unregister()
+ notifyProviderRegistration.unregister()
+ }
+
+}
+
+class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
+
+ @Property
+ private val BindingAwareBrokerImpl broker;
+
+ @Property
+ private val RpcRouter<T> router;
+
+ @Property
+ private val Multimap<Class<? extends BaseIdentity>, InstanceIdentifier<?>> registeredPaths = HashMultimap.create();
+
+ private var closed = false;
+
+ new(T instance, RpcRouter<T> backingRouter, BindingAwareBrokerImpl broker) {
+ super(instance)
+ _router = backingRouter;
+ _broker = broker;
+ }
+
+ override protected removeRegistration() {
+ closed = true
+ broker.unregisterRoutedRpcService(this)
+ }
+
+ override registerInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
+ registerPath(context, instance);
+ }
+
+ override unregisterInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
+ unregisterPath(context, instance);
+ }
+
+ override registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
+ checkClosed()
+ broker.registerPath(this, context, path);