Merge "BUG 2509 : Removing all journal entries from a Followers in-memory journal...
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / connect / dom / DomToBindingRpcForwarder.java
1 package org.opendaylight.controller.sal.binding.impl.connect.dom;
2
3 import static com.google.common.base.Preconditions.checkArgument;
4 import static com.google.common.base.Preconditions.checkState;
5 import com.google.common.base.Function;
6 import com.google.common.collect.Collections2;
7 import com.google.common.collect.ImmutableSet;
8 import com.google.common.util.concurrent.Futures;
9 import com.google.common.util.concurrent.ListenableFuture;
10 import java.lang.ref.WeakReference;
11 import java.lang.reflect.InvocationHandler;
12 import java.lang.reflect.Method;
13 import java.lang.reflect.Proxy;
14 import java.util.HashMap;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.WeakHashMap;
18 import java.util.concurrent.Callable;
19 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
20 import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
21 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
22 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
23 import org.opendaylight.controller.sal.core.api.RpcImplementation;
24 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
25 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
26 import org.opendaylight.yangtools.concepts.ObjectRegistration;
27 import org.opendaylight.yangtools.util.ClassLoaderUtils;
28 import org.opendaylight.yangtools.yang.binding.BaseIdentity;
29 import org.opendaylight.yangtools.yang.binding.BindingMapping;
30 import org.opendaylight.yangtools.yang.binding.DataObject;
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
32 import org.opendaylight.yangtools.yang.binding.RpcService;
33 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
34 import org.opendaylight.yangtools.yang.common.QName;
35 import org.opendaylight.yangtools.yang.common.RpcResult;
36 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 class DomToBindingRpcForwarder implements RpcImplementation, InvocationHandler {
43
44     private final Logger LOG = LoggerFactory.getLogger(DomToBindingRpcForwarder.class);
45
46     private final Set<QName> supportedRpcs;
47     private final WeakReference<Class<? extends RpcService>> rpcServiceType;
48     private Set<RoutedRpcRegistration> registrations;
49     private final Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
50     private final WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
51     private final RpcService proxy;
52     private ObjectRegistration<?> forwarderRegistration;
53     private boolean registrationInProgress = false;
54
55     private final RpcProvisionRegistry biRpcRegistry;
56     private final RpcProviderRegistry baRpcRegistry;
57     private final RpcProviderRegistryImpl baRpcRegistryImpl;
58
59     private final Function<InstanceIdentifier<?>, YangInstanceIdentifier> toDOMInstanceIdentifier;
60
61     private final static Method EQUALS_METHOD;
62
63     static {
64         try {
65             EQUALS_METHOD = Object.class.getMethod("equals", Object.class);
66         } catch (NoSuchMethodException | SecurityException e) {
67             throw new ExceptionInInitializerError(e);
68         }
69     }
70
71     public DomToBindingRpcForwarder(final Class<? extends RpcService> service, final BindingIndependentMappingService mappingService,
72         final RpcProvisionRegistry biRpcRegistry, final RpcProviderRegistry baRpcRegistry, final RpcProviderRegistryImpl registryImpl) {
73         this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
74         this.supportedRpcs = mappingService.getRpcQNamesFor(service);
75
76         this.toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, YangInstanceIdentifier>() {
77             @Override
78             public YangInstanceIdentifier apply(final InstanceIdentifier<?> input) {
79                 return mappingService.toDataDom(input);
80             }
81         };
82
83         this.biRpcRegistry = biRpcRegistry;
84         this.baRpcRegistry = baRpcRegistry;
85         this.baRpcRegistryImpl = registryImpl;
86
87         Class<?> cls = rpcServiceType.get();
88         ClassLoader clsLoader = cls.getClassLoader();
89         proxy =(RpcService) Proxy.newProxyInstance(clsLoader, new Class<?>[] { cls }, this);
90         createStrategies(mappingService);
91     }
92
93     /**
94      * Constructor for Routed RPC Forwarder.
95      *
96      * @param service
97      * @param context
98      * @param registryImpl
99      */
100     public DomToBindingRpcForwarder(final Class<? extends RpcService> service,
101         final Class<? extends BaseIdentity> context, final BindingIndependentMappingService mappingService,
102         final RpcProvisionRegistry biRpcRegistry, final RpcProviderRegistry baRpcRegistry, final RpcProviderRegistryImpl registryImpl) {
103         this(service, mappingService, biRpcRegistry, baRpcRegistry,registryImpl);
104
105         final ImmutableSet.Builder<RoutedRpcRegistration> registrationsBuilder = ImmutableSet.builder();
106         try {
107             for (QName rpc : supportedRpcs) {
108                 registrationsBuilder.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this));
109             }
110             createDefaultDomForwarder();
111         } catch (Exception e) {
112             LOG.error("Could not forward Rpcs of type {}", service.getName(), e);
113         }
114         registrations = registrationsBuilder.build();
115     }
116
117
118
119     private void createStrategies(final BindingIndependentMappingService mappingService) {
120         try {
121             for (QName rpc : supportedRpcs) {
122                 RpcInvocationStrategy strategy = createInvocationStrategy(rpc, rpcServiceType.get(), mappingService);
123                 strategiesByMethod.put(strategy.targetMethod, strategy);
124                 strategiesByQName.put(rpc, strategy);
125             }
126         } catch (Exception e) {
127             LOG.error("Could not forward Rpcs of type {}", rpcServiceType.get(), e);
128         }
129
130     }
131
132     /**
133      * Registers RPC Forwarder to DOM Broker,
134      * this means Binding Aware Broker has implementation of RPC
135      * which is registered to it.
136      *
137      * If RPC Forwarder was previously registered to DOM Broker
138      * or to Bidning Broker this method is noop to prevent
139      * creating forwarding loop.
140      *
141      */
142     public void registerToDOMBroker() {
143         if(!registrationInProgress && forwarderRegistration == null) {
144             registrationInProgress = true;
145             CompositeObjectRegistration.CompositeObjectRegistrationBuilder<DomToBindingRpcForwarder> builder = CompositeObjectRegistration.builderFor(this);
146             try {
147                 for (QName rpc : supportedRpcs) {
148                     builder.add(biRpcRegistry.addRpcImplementation(rpc, this));
149                 }
150             } catch (Exception e) {
151                 LOG.error("Could not forward Rpcs of type {}", rpcServiceType.get(), e);
152             }
153             this.forwarderRegistration = builder.toInstance();
154             registrationInProgress = false;
155         }
156     }
157
158
159     public void registerPaths(final Class<? extends BaseIdentity> context,
160         final Class<? extends RpcService> service, final Set<InstanceIdentifier<?>> set) {
161         QName ctx = BindingReflections.findQName(context);
162         for (YangInstanceIdentifier path : Collections2.transform(set, toDOMInstanceIdentifier)) {
163             for (RoutedRpcRegistration reg : registrations) {
164                 reg.registerPath(ctx, path);
165             }
166         }
167     }
168
169     @Override
170     public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
171         if (EQUALS_METHOD.equals(method)) {
172             return false;
173         }
174         RpcInvocationStrategy strategy = strategiesByMethod.get(method);
175         checkState(strategy != null);
176         checkArgument(args.length <= 2);
177         if (args.length == 1) {
178             checkArgument(args[0] instanceof DataObject);
179             return strategy.forwardToDomBroker((DataObject) args[0]);
180         }
181         return strategy.forwardToDomBroker(null);
182     }
183
184     public void removePaths(final Class<? extends BaseIdentity> context, final Class<? extends RpcService> service,
185         final Set<InstanceIdentifier<?>> set) {
186         QName ctx = BindingReflections.findQName(context);
187         for (YangInstanceIdentifier path : Collections2.transform(set, toDOMInstanceIdentifier)) {
188             for (RoutedRpcRegistration reg : registrations) {
189                 reg.unregisterPath(ctx, path);
190             }
191         }
192     }
193
194     @Override
195     public Set<QName> getSupportedRpcs() {
196         return supportedRpcs;
197     }
198
199     @SuppressWarnings({ "unchecked", "rawtypes" })
200     public void createDefaultDomForwarder() {
201         if (baRpcRegistryImpl != null) {
202             Class<?> cls = rpcServiceType.get();
203             ClassLoader clsLoader = cls.getClassLoader();
204             RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class<?>[] { cls }, this);
205
206             RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get());
207             rpcRouter.registerDefaultService(proxy);
208         }
209     }
210
211     @Override
212     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final CompositeNode domInput) {
213         checkArgument(rpc != null);
214         checkArgument(domInput != null);
215
216         Class<? extends RpcService> rpcType = rpcServiceType.get();
217         checkState(rpcType != null);
218         RpcService rpcService = baRpcRegistry.getRpcService(rpcType);
219         checkState(rpcService != null);
220         CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
221
222         try {
223             return Futures.immediateFuture(resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput));
224         } catch (Exception e) {
225             return Futures.immediateFailedFuture(e);
226         }
227     }
228
229     private RpcInvocationStrategy resolveInvocationStrategy(final QName rpc) {
230         return strategiesByQName.get(rpc);
231     }
232
233     private RpcInvocationStrategy createInvocationStrategy(final QName rpc,
234         final Class<? extends RpcService> rpcType, final BindingIndependentMappingService mappingService) throws Exception {
235         return ClassLoaderUtils.withClassLoader(rpcType.getClassLoader(), new Callable<RpcInvocationStrategy>() {
236             @Override
237             public RpcInvocationStrategy call() throws Exception {
238                 String methodName = BindingMapping.getMethodName(rpc);
239                 Method targetMethod = null;
240                 for (Method possibleMethod : rpcType.getMethods()) {
241                     if (possibleMethod.getName().equals(methodName)
242                         && BindingReflections.isRpcMethod(possibleMethod)) {
243                         targetMethod = possibleMethod;
244                         break;
245                     }
246                 }
247                 checkState(targetMethod != null, "Rpc method not found");
248                 return new RpcInvocationStrategy(rpc, targetMethod, mappingService, biRpcRegistry);
249             }
250
251         });
252     }
253
254     /**
255      * Registers RPC Forwarder to Binding Broker,
256      * this means DOM Broekr has implementation of RPC
257      * which is registered to it.
258      *
259      * If RPC Forwarder was previously registered to DOM Broker
260      * or to Bidning Broker this method is noop to prevent
261      * creating forwarding loop.
262      *
263      */
264     public void registerToBindingBroker() {
265         if(!registrationInProgress && forwarderRegistration == null) {
266             try {
267                 registrationInProgress = true;
268                 this.forwarderRegistration = baRpcRegistry.addRpcImplementation((Class)rpcServiceType.get(), proxy);
269             } catch (Exception e) {
270                 LOG.error("Unable to forward RPCs for {}",rpcServiceType.get(),e);
271             } finally {
272                 registrationInProgress = false;
273             }
274         }
275     }
276 }