Merge "BUG 2509 : Removing all journal entries from a Followers in-memory journal...
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / RpcProviderRegistryImpl.java
index e98d5b9942c86afb9b76e006662fe02cdf6147c2..0949d3d7612dfb34a1f8890329653af7efebc35e 100644 (file)
@@ -8,14 +8,21 @@
 package org.opendaylight.controller.sal.binding.impl;
 
 import static com.google.common.base.Preconditions.checkState;
-
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.util.EventListener;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.WeakHashMap;
-
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
@@ -25,26 +32,37 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistr
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
 import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
+import org.opendaylight.controller.sal.binding.codegen.RpcIsNotRoutedException;
 import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator;
 import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper;
 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.binding.BaseIdentity;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.RpcService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RpcProviderRegistryImpl implements //
-        RpcProviderRegistry, //
-        RouteChangePublisher<RpcContextIdentifier, InstanceIdentifier<?>> {
+public class RpcProviderRegistryImpl implements RpcProviderRegistry, RouteChangePublisher<RpcContextIdentifier, InstanceIdentifier<?>> {
 
     private RuntimeCodeGenerator rpcFactory = SingletonHolder.RPC_GENERATOR_IMPL;
 
-    private final Map<Class<? extends RpcService>, RpcService> publicProxies = new WeakHashMap<>();
-    private final Map<Class<? extends RpcService>, RpcRouter<?>> rpcRouters = new WeakHashMap<>();
+    // cache of proxy objects where each value in the map corresponds to a specific RpcService
+    private final LoadingCache<Class<? extends RpcService>, RpcService> publicProxies = CacheBuilder.newBuilder().weakKeys().
+            build(new CacheLoader<Class<? extends RpcService>, RpcService>() {
+                @Override
+                public RpcService load(final Class<? extends RpcService> type) {
+                    final RpcService proxy = rpcFactory.getDirectProxyFor(type);
+                    LOG.debug("Created {} as public proxy for {} in {}", proxy, type.getSimpleName(), this);
+                    return proxy;
+                }
+            });
+
+    private final Cache<Class<? extends RpcService>, RpcRouter<?>> rpcRouters = CacheBuilder.newBuilder().weakKeys()
+            .build();
+
     private final ListenerRegistry<RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> routeChangeListeners = ListenerRegistry
             .create();
     private final ListenerRegistry<RouterInstantiationListener> routerInstantiationListener = ListenerRegistry.create();
@@ -59,26 +77,33 @@ public class RpcProviderRegistryImpl implements //
         return name;
     }
 
-    public RpcProviderRegistryImpl(String name) {
+    public RpcProviderRegistryImpl(final String name) {
         super();
         this.name = name;
     }
 
     @Override
-    public final <T extends RpcService> RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> type,
-            T implementation) throws IllegalStateException {
+    public final <T extends RpcService> RoutedRpcRegistration<T> addRoutedRpcImplementation(final Class<T> type,
+            final T implementation) throws IllegalStateException {
         return getRpcRouter(type).addRoutedRpcImplementation(implementation);
     }
 
     @Override
-    public final <T extends RpcService> RpcRegistration<T> addRpcImplementation(Class<T> type, T implementation)
-            throws IllegalStateException {
-        @SuppressWarnings("unchecked")
-        RpcRouter<T> potentialRouter = (RpcRouter<T>) rpcRouters.get(type);
-        if (potentialRouter != null) {
+    public final <T extends RpcService> RpcRegistration<T> addRpcImplementation(final Class<T> type, final T implementation) {
+
+        // FIXME: This should be well documented - addRpcImplementation for
+        // routed RPCs
+        try {
+            // Note: If RPC is really global, expected count of registrations
+            // of this method is really low.
+            RpcRouter<T> potentialRouter = getRpcRouter(type);
             checkState(potentialRouter.getDefaultService() == null,
-                    "Default service for routed RPC already registered.");
+                        "Default service for routed RPC already registered.");
             return potentialRouter.registerDefaultService(implementation);
+        } catch (RpcIsNotRoutedException e) {
+            // NOOP - we could safely continue, since RPC is not routed
+            // so we fallback to global routing.
+            LOG.debug("RPC is not routed. Using global registration.",e);
         }
         T publicProxy = getRpcService(type);
         RpcService currentDelegate = RuntimeCodeHelper.getDelegate(publicProxy);
@@ -91,55 +116,45 @@ public class RpcProviderRegistryImpl implements //
 
     @SuppressWarnings("unchecked")
     @Override
-    public final <T extends RpcService> T getRpcService(Class<T> type) {
-
-        T potentialProxy = (T) publicProxies.get(type);
-        if (potentialProxy != null) {
-            return potentialProxy;
-        }
-        synchronized (this) {
-            /**
-             * Potential proxy could be instantiated by other thread while we
-             * were waiting for the lock.
-             */
-
-            potentialProxy = (T) publicProxies.get(type);
-            if (potentialProxy != null) {
-                return potentialProxy;
-            }
-            T proxy = rpcFactory.getDirectProxyFor(type);
-            LOG.debug("Created {} as public proxy for {} in {}", proxy, type.getSimpleName(), this);
-            publicProxies.put(type, proxy);
-            return proxy;
-        }
+    public final <T extends RpcService> T getRpcService(final Class<T> type) {
+        return (T) publicProxies.getUnchecked(type);
     }
 
-    @SuppressWarnings("unchecked")
-    public <T extends RpcService> RpcRouter<T> getRpcRouter(Class<T> type) {
-        RpcRouter<?> potentialRouter = rpcRouters.get(type);
-        if (potentialRouter != null) {
-            return (RpcRouter<T>) potentialRouter;
-        }
-        synchronized (this) {
-            /**
-             * Potential Router could be instantiated by other thread while we
-             * were waiting for the lock.
-             */
-            potentialRouter = rpcRouters.get(type);
-            if (potentialRouter != null) {
-                return (RpcRouter<T>) potentialRouter;
+
+    public <T extends RpcService> RpcRouter<T> getRpcRouter(final Class<T> type) {
+        try {
+            final AtomicBoolean created = new AtomicBoolean(false);
+            @SuppressWarnings( "unchecked")
+            // LoadingCache is unsuitable for RpcRouter since we need to distinguish
+            // first creation of RPC Router, so that is why
+            // we are using normal cache with load API and shared AtomicBoolean
+            // for this call, which will be set to true if router was created.
+            RpcRouter<T> router = (RpcRouter<T>) rpcRouters.get(type,new Callable<RpcRouter<?>>() {
+
+                @Override
+                public org.opendaylight.controller.sal.binding.api.rpc.RpcRouter<?> call()  {
+                    RpcRouter<?> router = rpcFactory.getRouterFor(type, name);
+                    router.registerRouteChangeListener(new RouteChangeForwarder<T>(type));
+                    LOG.debug("Registering router {} as global implementation of {} in {}", router, type.getSimpleName(), this);
+                    RuntimeCodeHelper.setDelegate(getRpcService(type), router.getInvocationProxy());
+                    created.set(true);
+                    return router;
+                }
+            });
+            if(created.get()) {
+                notifyListenersRoutedCreated(router);
             }
-            RpcRouter<T> router = rpcFactory.getRouterFor(type, name);
-            router.registerRouteChangeListener(new RouteChangeForwarder(type));
-            LOG.debug("Registering router {} as global implementation of {} in {}", router, type.getSimpleName(), this);
-            RuntimeCodeHelper.setDelegate(getRpcService(type), router.getInvocationProxy());
-            rpcRouters.put(type, router);
-            notifyListenersRoutedCreated(router);
             return router;
+        } catch (ExecutionException | UncheckedExecutionException e) {
+            // We rethrow Runtime Exceptions which were wrapped by
+            // Execution Exceptions
+            // otherwise we throw IllegalStateException with original
+            Throwables.propagateIfPossible(e.getCause());
+            throw new IllegalStateException("Could not load RPC Router for "+type.getName(),e);
         }
     }
 
-    private void notifyGlobalRpcAdded(Class<? extends RpcService> type) {
+    private void notifyGlobalRpcAdded(final Class<? extends RpcService> type) {
         for(ListenerRegistration<GlobalRpcRegistrationListener> listener : globalRpcListeners) {
             try {
                 listener.getInstance().onGlobalRpcRegistered(type);
@@ -150,7 +165,7 @@ public class RpcProviderRegistryImpl implements //
 
     }
 
-    private void notifyListenersRoutedCreated(RpcRouter<?> router) {
+    private void notifyListenersRoutedCreated(final RpcRouter<?> router) {
 
         for (ListenerRegistration<RouterInstantiationListener> listener : routerInstantiationListener) {
             try {
@@ -163,10 +178,10 @@ public class RpcProviderRegistryImpl implements //
     }
 
     public ListenerRegistration<RouterInstantiationListener> registerRouterInstantiationListener(
-            RouterInstantiationListener listener) {
+            final RouterInstantiationListener listener) {
         ListenerRegistration<RouterInstantiationListener> reg = routerInstantiationListener.register(listener);
         try {
-            for (RpcRouter<?> router : rpcRouters.values()) {
+            for (RpcRouter<?> router : rpcRouters.asMap().values()) {
                 listener.onRpcRouterCreated(router);
             }
         } catch (Exception e) {
@@ -175,9 +190,10 @@ public class RpcProviderRegistryImpl implements //
         return reg;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public <L extends RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> ListenerRegistration<L> registerRouteChangeListener(
-            L listener) {
+            final L listener) {
         return (ListenerRegistration<L>) routeChangeListeners.register(listener);
     }
 
@@ -185,7 +201,7 @@ public class RpcProviderRegistryImpl implements //
         return rpcFactory;
     }
 
-    public void setRpcFactory(RuntimeCodeGenerator rpcFactory) {
+    public void setRpcFactory(final RuntimeCodeGenerator rpcFactory) {
         this.rpcFactory = rpcFactory;
     }
 
@@ -193,7 +209,7 @@ public class RpcProviderRegistryImpl implements //
         void onRpcRouterCreated(RpcRouter<?> router);
     }
 
-    public ListenerRegistration<GlobalRpcRegistrationListener> registerGlobalRpcRegistrationListener(GlobalRpcRegistrationListener listener) {
+    public ListenerRegistration<GlobalRpcRegistrationListener> registerGlobalRpcRegistrationListener(final GlobalRpcRegistrationListener listener) {
         return globalRpcListeners.register(listener);
     }
 
@@ -203,17 +219,15 @@ public class RpcProviderRegistryImpl implements //
 
     }
 
-    private class RouteChangeForwarder<T extends RpcService> implements
-            RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
-
+    private final class RouteChangeForwarder<T extends RpcService> implements RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
         private final Class<T> type;
 
-        public RouteChangeForwarder(Class<T> type) {
+        RouteChangeForwarder(final Class<T> type) {
             this.type = type;
         }
 
         @Override
-        public void onRouteChange(RouteChange<Class<? extends BaseIdentity>, InstanceIdentifier<?>> change) {
+        public void onRouteChange(final RouteChange<Class<? extends BaseIdentity>, InstanceIdentifier<?>> change) {
             Map<RpcContextIdentifier, Set<InstanceIdentifier<?>>> announcements = new HashMap<>();
             for (Entry<Class<? extends BaseIdentity>, Set<InstanceIdentifier<?>>> entry : change.getAnnouncements()
                     .entrySet()) {
@@ -232,21 +246,20 @@ public class RpcProviderRegistryImpl implements //
                 try {
                     listener.getInstance().onRouteChange(toPublish);
                 } catch (Exception e) {
-                    e.printStackTrace();
+                    LOG.error("Unhandled exception during invoking listener",listener.getInstance(),e);
                 }
             }
         }
     }
 
-    public static class RpcProxyRegistration<T extends RpcService> extends AbstractObjectRegistration<T> implements
-            RpcRegistration<T> {
-
+    private static final class RpcProxyRegistration<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T> {
+        private final RpcProviderRegistryImpl registry;
         private final Class<T> serviceType;
-        private RpcProviderRegistryImpl registry;
 
-        public RpcProxyRegistration(Class<T> type, T service, RpcProviderRegistryImpl registry) {
+        RpcProxyRegistration(final Class<T> type, final T service, final RpcProviderRegistryImpl registry) {
             super(service);
-            serviceType = type;
+            this.registry =  Preconditions.checkNotNull(registry);
+            this.serviceType = type;
         }
 
         @Override
@@ -256,13 +269,10 @@ public class RpcProviderRegistryImpl implements //
 
         @Override
         protected void removeRegistration() {
-            if (registry != null) {
-                T publicProxy = registry.getRpcService(serviceType);
-                RpcService currentDelegate = RuntimeCodeHelper.getDelegate(publicProxy);
-                if (currentDelegate == getInstance()) {
-                    RuntimeCodeHelper.setDelegate(publicProxy, null);
-                }
-                registry = null;
+            T publicProxy = registry.getRpcService(serviceType);
+            RpcService currentDelegate = RuntimeCodeHelper.getDelegate(publicProxy);
+            if (currentDelegate == getInstance()) {
+                RuntimeCodeHelper.setDelegate(publicProxy, null);
             }
         }
     }