- override <T extends RpcService> getRpcService(Class<T> service) {
- checkNotNull(service, "Service should not be null");
- return getManagedDirectProxy(service) as T;
- }
-
- private def <T extends RpcService> RpcRouter<T> resolveRpcRouter(Class<T> type) {
-
- val router = rpcRouters.get(type);
- if (router !== null) {
- return router as RpcRouter<T>;
- }
-
- // We created Router
- return withLock(routerGenerationLock) [ |
- val maybeRouter = rpcRouters.get(type);
- if (maybeRouter !== null) {
- return maybeRouter as RpcRouter<T>;
- }
-
- val newRouter = generator.getRouterFor(type);
- checkState(newRouter !== null);
- rpcRouters.put(type, newRouter);
- // We create / update Direct Proxy for router
- val proxy = getManagedDirectProxy(type);
- proxy.delegate = newRouter.invocationProxy
- return newRouter;
- ]
-
- }
-
- protected def <T extends RpcService> void registerPath(RoutedRpcRegistrationImpl<T> registration,
- Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
-
- val router = registration.router;
- val paths = registration.registeredPaths;
-
- val routingTable = router.getRoutingTable(context)
- checkState(routingTable != null);
-
- // Updating internal structure of registration
- routingTable.updateRoute(path, registration.instance)
-
- // Update routing table / send announce to message bus
- val success = paths.put(context, path);
- }
-
- protected def <T extends RpcService> void unregisterPath(RoutedRpcRegistrationImpl<T> registration,
- Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
-
- val router = registration.router;
- val paths = registration.registeredPaths;
-
- val routingTable = router.getRoutingTable(context)
- checkState(routingTable != null);
-
- // Updating internal structure of registration
- val target = routingTable.getRoute(path)
- checkState(target === registration.instance)
- routingTable.removeRoute(path)
- checkState(paths.remove(context, path));
- }
-
- protected def <T extends RpcService> void unregisterRoutedRpcService(RoutedRpcRegistrationImpl<T> registration) {
-
- val router = registration.router;
- val paths = registration.registeredPaths;
-
- for (ctxMap : registration.registeredPaths.entries) {
- val context = ctxMap.key
- val routingTable = router.getRoutingTable(context)
- val path = ctxMap.value
- routingTable.removeRoute(path)
- }
- }
-
- protected def <T extends RpcService> void unregisterRpcService(RpcServiceRegistrationImpl<T> registration) {
-
- val type = registration.serviceType;
-
- val proxy = managedProxies.get(type);
- if (proxy.proxy.delegate === registration.instance) {
- proxy.proxy.delegate = null;
- }
- }
-
- def createDelegate(Class<? extends RpcService> type) {
- getManagedDirectProxy(type);
- }
-
- def getRpcRouters() {
- return Collections.unmodifiableMap(rpcRouters);
- }
-
- override close() {
- dataConsumerRegistration.unregister()
- dataProviderRegistration.unregister()
- notifyConsumerRegistration.unregister()
- notifyProviderRegistration.unregister()
- }
-
-}
-
-class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
-
- @Property
- private val BindingAwareBrokerImpl broker;
-
- @Property
- private val RpcRouter<T> router;
-
- @Property
- private val Multimap<Class<? extends BaseIdentity>, InstanceIdentifier<?>> registeredPaths = HashMultimap.create();
-
- private var closed = false;
-
- new(T instance, RpcRouter<T> backingRouter, BindingAwareBrokerImpl broker) {
- super(instance)
- _router = backingRouter;
- _broker = broker;
- }
-
- override protected removeRegistration() {
- closed = true
- broker.unregisterRoutedRpcService(this)
- }
-
- override registerInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
- registerPath(context, instance);
- }
-
- override unregisterInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
- unregisterPath(context, instance);
- }
-
- override registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
- checkClosed()
- broker.registerPath(this, context, path);
- }
-
- override unregisterPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
- checkClosed()
- broker.unregisterPath(this, context, path);
- }
-
- override getServiceType() {
- return router.serviceType;
- }
-
- private def checkClosed() {
- if (closed)
- throw new IllegalStateException("Registration was closed.");
- }
-
-}
-
-class RpcServiceRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T> {
-
- private var BindingAwareBrokerImpl broker;
-
- @Property
- val Class<T> serviceType;
-
- public new(Class<T> type, T service, BindingAwareBrokerImpl broker) {
- super(service);
- this._serviceType = type;
- this.broker = broker;
- }
-
- override protected removeRegistration() {
- broker.unregisterRpcService(this);
- broker = null;