Merge "Resolve Bug:707 - ConfigPusher should wait for netconf-impl to register JMX...
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / RpcProviderRegistryImpl.java
index bc862886d7abc548efc680d8f029c89a6d98f8d6..542dfa7e7bca61a0fc4ff33e31239c3c3872e73b 100644 (file)
@@ -1,34 +1,41 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.sal.binding.impl;
 
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.WeakHashMap;
-
-import javax.swing.tree.ExpandVetoException;
-
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
+import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
 import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator;
 import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper;
 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
-import org.opendaylight.controller.sal.binding.spi.RpcContextIdentifier;
-import org.opendaylight.controller.sal.binding.spi.RpcRouter;
 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
-import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.binding.BaseIdentity;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.RpcService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static com.google.common.base.Preconditions.*;
+import java.util.EventListener;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.WeakHashMap;
+
+import static com.google.common.base.Preconditions.checkState;
 
 public class RpcProviderRegistryImpl implements //
         RpcProviderRegistry, //
@@ -36,10 +43,27 @@ public class RpcProviderRegistryImpl implements //
 
     private RuntimeCodeGenerator rpcFactory = SingletonHolder.RPC_GENERATOR_IMPL;
 
+    // publicProxies is a cache of proxy objects where each value in the map corresponds to a specific RpcService
     private final Map<Class<? extends RpcService>, RpcService> publicProxies = new WeakHashMap<>();
     private final Map<Class<? extends RpcService>, RpcRouter<?>> rpcRouters = new WeakHashMap<>();
     private final ListenerRegistry<RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> routeChangeListeners = ListenerRegistry
             .create();
+    private final ListenerRegistry<RouterInstantiationListener> routerInstantiationListener = ListenerRegistry.create();
+
+    private final static Logger LOG = LoggerFactory.getLogger(RpcProviderRegistryImpl.class);
+
+    private final String name;
+
+    private final ListenerRegistry<GlobalRpcRegistrationListener> globalRpcListeners = ListenerRegistry.create();
+
+    public String getName() {
+        return name;
+    }
+
+    public RpcProviderRegistryImpl(String name) {
+        super();
+        this.name = name;
+    }
 
     @Override
     public final <T extends RpcService> RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> type,
@@ -50,6 +74,7 @@ public class RpcProviderRegistryImpl implements //
     @Override
     public final <T extends RpcService> RpcRegistration<T> addRpcImplementation(Class<T> type, T implementation)
             throws IllegalStateException {
+        @SuppressWarnings("unchecked")
         RpcRouter<T> potentialRouter = (RpcRouter<T>) rpcRouters.get(type);
         if (potentialRouter != null) {
             checkState(potentialRouter.getDefaultService() == null,
@@ -59,34 +84,99 @@ public class RpcProviderRegistryImpl implements //
         T publicProxy = getRpcService(type);
         RpcService currentDelegate = RuntimeCodeHelper.getDelegate(publicProxy);
         checkState(currentDelegate == null, "Rpc service is already registered");
+        LOG.debug("Registering {} as global implementation of {} in {}", implementation, type.getSimpleName(), this);
         RuntimeCodeHelper.setDelegate(publicProxy, implementation);
+        notifyGlobalRpcAdded(type);
         return new RpcProxyRegistration<T>(type, implementation, this);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public final <T extends RpcService> T getRpcService(Class<T> type) {
 
-        RpcService potentialProxy = publicProxies.get(type);
+        T potentialProxy = (T) publicProxies.get(type);
         if (potentialProxy != null) {
-            return (T) potentialProxy;
+            return potentialProxy;
+        }
+        synchronized (this) {
+            /**
+             * Potential proxy could be instantiated by other thread while we
+             * were waiting for the lock.
+             */
+
+            potentialProxy = (T) publicProxies.get(type);
+            if (potentialProxy != null) {
+                return potentialProxy;
+            }
+            T proxy = rpcFactory.getDirectProxyFor(type);
+            LOG.debug("Created {} as public proxy for {} in {}", proxy, type.getSimpleName(), this);
+            publicProxies.put(type, proxy);
+            return proxy;
         }
-        T proxy = rpcFactory.getDirectProxyFor(type);
-        publicProxies.put(type, proxy);
-        return proxy;
     }
 
-    private <T extends RpcService> RpcRouter<T> getRpcRouter(Class<T> type) {
+    @SuppressWarnings("unchecked")
+    public <T extends RpcService> RpcRouter<T> getRpcRouter(Class<T> type) {
         RpcRouter<?> potentialRouter = rpcRouters.get(type);
         if (potentialRouter != null) {
             return (RpcRouter<T>) potentialRouter;
         }
-        RpcRouter<T> router = rpcFactory.getRouterFor(type);
-        router.registerRouteChangeListener(new RouteChangeForwarder(type));
-        RuntimeCodeHelper.setDelegate(getRpcService(type), router.getInvocationProxy());
-        rpcRouters.put(type, router);
-        return router;
+        synchronized (this) {
+            /**
+             * Potential Router could be instantiated by other thread while we
+             * were waiting for the lock.
+             */
+            potentialRouter = rpcRouters.get(type);
+            if (potentialRouter != null) {
+                return (RpcRouter<T>) potentialRouter;
+            }
+            RpcRouter<T> router = rpcFactory.getRouterFor(type, name);
+            router.registerRouteChangeListener(new RouteChangeForwarder(type));
+            LOG.debug("Registering router {} as global implementation of {} in {}", router, type.getSimpleName(), this);
+            RuntimeCodeHelper.setDelegate(getRpcService(type), router.getInvocationProxy());
+            rpcRouters.put(type, router);
+            notifyListenersRoutedCreated(router);
+            return router;
+        }
+    }
+
+    private void notifyGlobalRpcAdded(Class<? extends RpcService> type) {
+        for(ListenerRegistration<GlobalRpcRegistrationListener> listener : globalRpcListeners) {
+            try {
+                listener.getInstance().onGlobalRpcRegistered(type);
+            } catch (Exception e) {
+                LOG.error("Unhandled exception during invoking listener {}", e);
+            }
+        }
+
+    }
+
+    private void notifyListenersRoutedCreated(RpcRouter<?> router) {
+
+        for (ListenerRegistration<RouterInstantiationListener> listener : routerInstantiationListener) {
+            try {
+                listener.getInstance().onRpcRouterCreated(router);
+            } catch (Exception e) {
+                LOG.error("Unhandled exception during invoking listener {}", e);
+            }
+        }
+
     }
 
+    public ListenerRegistration<RouterInstantiationListener> registerRouterInstantiationListener(
+            RouterInstantiationListener listener) {
+        ListenerRegistration<RouterInstantiationListener> reg = routerInstantiationListener.register(listener);
+        try {
+            for (RpcRouter<?> router : rpcRouters.values()) {
+                listener.onRpcRouterCreated(router);
+            }
+        } catch (Exception e) {
+            LOG.error("Unhandled exception during invoking listener {}", e);
+        }
+        return reg;
+    }
+
+    @Override
     public <L extends RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> ListenerRegistration<L> registerRouteChangeListener(
             L listener) {
         return (ListenerRegistration<L>) routeChangeListeners.register(listener);
@@ -100,6 +190,20 @@ public class RpcProviderRegistryImpl implements //
         this.rpcFactory = rpcFactory;
     }
 
+    public interface RouterInstantiationListener extends EventListener {
+        void onRpcRouterCreated(RpcRouter<?> router);
+    }
+
+    public ListenerRegistration<GlobalRpcRegistrationListener> registerGlobalRpcRegistrationListener(GlobalRpcRegistrationListener listener) {
+        return globalRpcListeners.register(listener);
+    }
+
+    public interface GlobalRpcRegistrationListener extends EventListener {
+        void onGlobalRpcRegistered(Class<? extends RpcService> cls);
+        void onGlobalRpcUnregistered(Class<? extends RpcService> cls);
+
+    }
+
     private class RouteChangeForwarder<T extends RpcService> implements
             RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
 
@@ -143,7 +247,8 @@ public class RpcProviderRegistryImpl implements //
 
         public RpcProxyRegistration(Class<T> type, T service, RpcProviderRegistryImpl registry) {
             super(service);
-            serviceType = type;
+            this.serviceType = type;
+            this.registry =  registry;
         }
 
         @Override