Merge "On openflow plugin restart, NPE in tx poller"
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / RpcProviderRegistryImpl.java
1 package org.opendaylight.controller.sal.binding.impl;
2
3 import java.util.Map;
4 import java.util.Map.Entry;
5 import java.util.HashMap;
6 import java.util.Set;
7 import java.util.WeakHashMap;
8
9 import javax.swing.tree.ExpandVetoException;
10
11 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
12 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
13 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
14 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
15 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
16 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
17 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
18 import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator;
19 import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper;
20 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
21 import org.opendaylight.controller.sal.binding.spi.RpcContextIdentifier;
22 import org.opendaylight.controller.sal.binding.spi.RpcRouter;
23 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
24 import org.opendaylight.yangtools.concepts.Identifiable;
25 import org.opendaylight.yangtools.concepts.ListenerRegistration;
26 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
27 import org.opendaylight.yangtools.yang.binding.BaseIdentity;
28 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
29 import org.opendaylight.yangtools.yang.binding.RpcService;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import static com.google.common.base.Preconditions.*;
34
35 public class RpcProviderRegistryImpl implements //
36         RpcProviderRegistry, //
37         RouteChangePublisher<RpcContextIdentifier, InstanceIdentifier<?>> {
38
39     private RuntimeCodeGenerator rpcFactory = SingletonHolder.RPC_GENERATOR_IMPL;
40
41     private final Map<Class<? extends RpcService>, RpcService> publicProxies = new WeakHashMap<>();
42     private final Map<Class<? extends RpcService>, RpcRouter<?>> rpcRouters = new WeakHashMap<>();
43     private final ListenerRegistry<RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> routeChangeListeners = ListenerRegistry
44             .create();
45
46     private final static Logger LOG = LoggerFactory.getLogger(RpcProviderRegistryImpl.class);
47     
48     private final String name;
49
50     public String getName() {
51         return name;
52     }
53
54     public RpcProviderRegistryImpl(String name) {
55         super();
56         this.name = name;
57     }
58
59     @Override
60     public final <T extends RpcService> RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> type,
61             T implementation) throws IllegalStateException {
62         return getRpcRouter(type).addRoutedRpcImplementation(implementation);
63     }
64
65     @Override
66     public final <T extends RpcService> RpcRegistration<T> addRpcImplementation(Class<T> type, T implementation)
67             throws IllegalStateException {
68         @SuppressWarnings("unchecked")
69         RpcRouter<T> potentialRouter = (RpcRouter<T>) rpcRouters.get(type);
70         if (potentialRouter != null) {
71             checkState(potentialRouter.getDefaultService() == null,
72                     "Default service for routed RPC already registered.");
73             return potentialRouter.registerDefaultService(implementation);
74         }
75         T publicProxy = getRpcService(type);
76         RpcService currentDelegate = RuntimeCodeHelper.getDelegate(publicProxy);
77         checkState(currentDelegate == null, "Rpc service is already registered");
78         LOG.debug("Registering {} as global implementation of {} in {}",implementation,type.getSimpleName(),this);
79         RuntimeCodeHelper.setDelegate(publicProxy, implementation);
80         return new RpcProxyRegistration<T>(type, implementation, this);
81     }
82
83     @SuppressWarnings("unchecked")
84     @Override
85     public final <T extends RpcService> T getRpcService(Class<T> type) {
86
87         @SuppressWarnings("unchecked")
88         T potentialProxy = (T) publicProxies.get(type);
89         if (potentialProxy != null) {
90             return potentialProxy;
91         }
92         synchronized(this) {
93             /**
94              * Potential proxy could be instantiated by other thread while we were
95              * waiting for the lock.
96              */
97             
98             potentialProxy = (T) publicProxies.get(type);
99             if (potentialProxy != null) {
100                 return (T) potentialProxy;
101             }
102             T proxy = rpcFactory.getDirectProxyFor(type);
103             LOG.debug("Created {} as public proxy for {} in {}",proxy,type.getSimpleName(),this);
104             publicProxies.put(type, proxy);
105             return proxy;
106         }
107     }
108
109     private <T extends RpcService> RpcRouter<T> getRpcRouter(Class<T> type) {
110         RpcRouter<?> potentialRouter = rpcRouters.get(type);
111         if (potentialRouter != null) {
112             return (RpcRouter<T>) potentialRouter;
113         }
114         synchronized(this) {
115             /**
116              * Potential Router could be instantiated by other thread while we were
117              * waiting for the lock.
118              */
119             potentialRouter = rpcRouters.get(type); 
120             if (potentialRouter != null) {
121                 return (RpcRouter<T>) potentialRouter;
122             }
123             RpcRouter<T> router = rpcFactory.getRouterFor(type,name);
124             router.registerRouteChangeListener(new RouteChangeForwarder(type));
125             LOG.debug("Registering router {} as global implementation of {} in {}",router,type.getSimpleName(),this);
126             RuntimeCodeHelper.setDelegate(getRpcService(type), router.getInvocationProxy());
127             rpcRouters.put(type, router);
128             return router;
129         }
130     }
131
132     @Override
133     public <L extends RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> ListenerRegistration<L> registerRouteChangeListener(
134             L listener) {
135         return (ListenerRegistration<L>) routeChangeListeners.register(listener);
136     }
137
138     public RuntimeCodeGenerator getRpcFactory() {
139         return rpcFactory;
140     }
141
142     public void setRpcFactory(RuntimeCodeGenerator rpcFactory) {
143         this.rpcFactory = rpcFactory;
144     }
145
146     private class RouteChangeForwarder<T extends RpcService> implements
147             RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
148
149         private final Class<T> type;
150
151         public RouteChangeForwarder(Class<T> type) {
152             this.type = type;
153         }
154
155         @Override
156         public void onRouteChange(RouteChange<Class<? extends BaseIdentity>, InstanceIdentifier<?>> change) {
157             Map<RpcContextIdentifier, Set<InstanceIdentifier<?>>> announcements = new HashMap<>();
158             for (Entry<Class<? extends BaseIdentity>, Set<InstanceIdentifier<?>>> entry : change.getAnnouncements()
159                     .entrySet()) {
160                 RpcContextIdentifier key = RpcContextIdentifier.contextFor(type, entry.getKey());
161                 announcements.put(key, entry.getValue());
162             }
163             Map<RpcContextIdentifier, Set<InstanceIdentifier<?>>> removals = new HashMap<>();
164             for (Entry<Class<? extends BaseIdentity>, Set<InstanceIdentifier<?>>> entry : change.getRemovals()
165                     .entrySet()) {
166                 RpcContextIdentifier key = RpcContextIdentifier.contextFor(type, entry.getKey());
167                 removals.put(key, entry.getValue());
168             }
169             RouteChange<RpcContextIdentifier, InstanceIdentifier<?>> toPublish = RoutingUtils
170                     .<RpcContextIdentifier, InstanceIdentifier<?>> change(announcements, removals);
171             for (ListenerRegistration<RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> listener : routeChangeListeners) {
172                 try {
173                     listener.getInstance().onRouteChange(toPublish);
174                 } catch (Exception e) {
175                     e.printStackTrace();
176                 }
177             }
178         }
179     }
180
181     public static class RpcProxyRegistration<T extends RpcService> extends AbstractObjectRegistration<T> implements
182             RpcRegistration<T> {
183
184         private final Class<T> serviceType;
185         private RpcProviderRegistryImpl registry;
186
187         public RpcProxyRegistration(Class<T> type, T service, RpcProviderRegistryImpl registry) {
188             super(service);
189             serviceType = type;
190         }
191
192         @Override
193         public Class<T> getServiceType() {
194             return serviceType;
195         }
196
197         @Override
198         protected void removeRegistration() {
199             if (registry != null) {
200                 T publicProxy = registry.getRpcService(serviceType);
201                 RpcService currentDelegate = RuntimeCodeHelper.getDelegate(publicProxy);
202                 if (currentDelegate == getInstance()) {
203                     RuntimeCodeHelper.setDelegate(publicProxy, null);
204                 }
205                 registry = null;
206             }
207         }
208     }
209 }