Fix RPC forwarding related bugs in Binding Independent Connector
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / connect / dom / BindingIndependentConnector.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl.connect.dom;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.lang.ref.WeakReference;
14 import java.lang.reflect.InvocationHandler;
15 import java.lang.reflect.Method;
16 import java.lang.reflect.Proxy;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.Map.Entry;
22 import java.util.Set;
23 import java.util.WeakHashMap;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Future;
29
30 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
31 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
32 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
33 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
35 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
36 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
37 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
38 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
39 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
40 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
41 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
42 import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
43 import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
44 import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
45 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
46 import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
47 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
48 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener;
49 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener;
50 import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions;
51 import org.opendaylight.controller.sal.common.util.Rpcs;
52 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
53 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
54 import org.opendaylight.controller.sal.core.api.Provider;
55 import org.opendaylight.controller.sal.core.api.RpcImplementation;
56 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
57 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
58 import org.opendaylight.yangtools.concepts.ListenerRegistration;
59 import org.opendaylight.yangtools.concepts.Registration;
60 import org.opendaylight.yangtools.concepts.util.ClassLoaderUtils;
61 import org.opendaylight.yangtools.yang.binding.Augmentable;
62 import org.opendaylight.yangtools.yang.binding.Augmentation;
63 import org.opendaylight.yangtools.yang.binding.BaseIdentity;
64 import org.opendaylight.yangtools.yang.binding.BindingMapping;
65 import org.opendaylight.yangtools.yang.binding.DataContainer;
66 import org.opendaylight.yangtools.yang.binding.DataObject;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.opendaylight.yangtools.yang.binding.RpcService;
69 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
70 import org.opendaylight.yangtools.yang.common.QName;
71 import org.opendaylight.yangtools.yang.common.RpcError;
72 import org.opendaylight.yangtools.yang.common.RpcResult;
73 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
74 import org.opendaylight.yangtools.yang.data.api.Node;
75 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78
79 import com.google.common.base.Function;
80 import com.google.common.base.Optional;
81 import com.google.common.collect.FluentIterable;
82 import com.google.common.collect.ImmutableList;
83 import com.google.common.collect.ImmutableSet;
84 import com.google.common.collect.ImmutableSet.Builder;
85 import com.google.common.util.concurrent.Futures;
86
87 public class BindingIndependentConnector implements //
88         RuntimeDataProvider, //
89         Provider, //
90         AutoCloseable {
91
92
93
94     private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class);
95
96     @SuppressWarnings( "deprecation")
97     private static final InstanceIdentifier<? extends DataObject> ROOT = InstanceIdentifier.builder().toInstance();
98
99     private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
100             .builder().toInstance();
101
102     private final static Method EQUALS_METHOD;
103
104
105     private BindingIndependentMappingService mappingService;
106
107     private org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService;
108
109     private DataProviderService baDataService;
110
111     private ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
112     private ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
113
114     private BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
115     private DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
116
117     private Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> baCommitHandlerRegistration;
118
119     private Registration<DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode>> biCommitHandlerRegistration;
120
121     private RpcProvisionRegistry biRpcRegistry;
122     private RpcProviderRegistry baRpcRegistry;
123
124     private ListenerRegistration<DomToBindingRpcForwardingManager> domToBindingRpcManager;
125     // private ListenerRegistration<BindingToDomRpcForwardingManager>
126     // bindingToDomRpcManager;
127
128     private Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
129
130         @Override
131         public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(InstanceIdentifier<?> input) {
132             return mappingService.toDataDom(input);
133         }
134
135     };
136
137     private Registration<DataReader<InstanceIdentifier<? extends DataObject>, DataObject>> baDataReaderRegistration;
138
139     private boolean rpcForwarding = false;
140
141     private boolean dataForwarding = false;
142
143     private boolean notificationForwarding = false;
144
145     private RpcProviderRegistryImpl baRpcRegistryImpl;
146
147     private org.opendaylight.controller.sal.dom.broker.spi.RpcRouter biRouter;
148
149
150     static {
151         try {
152         EQUALS_METHOD = Object.class.getMethod("equals", Object.class);
153         } catch (Exception e) {
154             throw new RuntimeException(e);
155         }
156     }
157
158     @Override
159     public DataObject readOperationalData(InstanceIdentifier<? extends DataObject> path) {
160         try {
161             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
162             CompositeNode result = biDataService.readOperationalData(biPath);
163             return potentialAugmentationRead(path, biPath, result);
164         } catch (DeserializationException e) {
165             throw new IllegalStateException(e);
166         }
167     }
168
169     private DataObject potentialAugmentationRead(InstanceIdentifier<? extends DataObject> path,
170             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath, CompositeNode result)
171             throws DeserializationException {
172         Class<? extends DataObject> targetType = path.getTargetType();
173         if (Augmentation.class.isAssignableFrom(targetType)) {
174             path = mappingService.fromDataDom(biPath);
175             Class<? extends Augmentation<?>> augmentType = (Class<? extends Augmentation<?>>) targetType;
176             DataObject parentTo = mappingService.dataObjectFromDataDom(path, result);
177             if (parentTo instanceof Augmentable<?>) {
178                 return (DataObject) ((Augmentable) parentTo).getAugmentation(augmentType);
179             }
180         }
181         return mappingService.dataObjectFromDataDom(path, result);
182     }
183
184     @Override
185     public DataObject readConfigurationData(InstanceIdentifier<? extends DataObject> path) {
186         try {
187             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
188             CompositeNode result = biDataService.readConfigurationData(biPath);
189             return potentialAugmentationRead(path, biPath, result);
190         } catch (DeserializationException e) {
191             throw new IllegalStateException(e);
192         }
193     }
194
195     private DataModificationTransaction createBindingToDomTransaction(
196             DataModification<InstanceIdentifier<? extends DataObject>, DataObject> source) {
197         DataModificationTransaction target = biDataService.beginTransaction();
198         LOG.debug("Created DOM Transaction {} for {},", target.getIdentifier(),source.getIdentifier());
199         for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedConfigurationData()
200                 .entrySet()) {
201             Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
202                     .toDataDom(entry);
203             target.putConfigurationData(biEntry.getKey(), biEntry.getValue());
204             LOG.debug("Update of Binding Configuration Data {} is translated to {}",entry,biEntry);
205         }
206         for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedOperationalData()
207                 .entrySet()) {
208             Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
209                     .toDataDom(entry);
210             target.putOperationalData(biEntry.getKey(), biEntry.getValue());
211             LOG.debug("Update of Binding Operational Data {} is translated to {}",entry,biEntry);
212         }
213         for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedConfigurationData()) {
214             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
215             target.removeConfigurationData(biEntry);
216             LOG.debug("Delete of Binding Configuration Data {} is translated to {}",entry,biEntry);
217         }
218         for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedOperationalData()) {
219             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
220             target.removeOperationalData(biEntry);
221             LOG.debug("Delete of Binding Operational Data {} is translated to {}",entry,biEntry);
222         }
223         return target;
224     }
225
226     private org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction createDomToBindingTransaction(
227             DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> source) {
228         org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction target = baDataService
229                 .beginTransaction();
230         for (Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> entry : source
231                 .getUpdatedConfigurationData().entrySet()) {
232             try {
233                 InstanceIdentifier<?> baKey = mappingService.fromDataDom(entry.getKey());
234                 DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue());
235                 target.putConfigurationData(baKey, baData);
236             } catch (DeserializationException e) {
237                 LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e);
238             }
239         }
240         for (Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> entry : source
241                 .getUpdatedOperationalData().entrySet()) {
242             try {
243
244                 InstanceIdentifier<?> baKey = mappingService.fromDataDom(entry.getKey());
245                 DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue());
246                 target.putOperationalData(baKey, baData);
247             } catch (DeserializationException e) {
248                 LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e);
249             }
250         }
251         for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedConfigurationData()) {
252             try {
253
254                 InstanceIdentifier<?> baEntry = mappingService.fromDataDom(entry);
255                 target.removeConfigurationData(baEntry);
256             } catch (DeserializationException e) {
257                 LOG.error("Ommiting from BA transaction: {}.", entry, e);
258             }
259         }
260         for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedOperationalData()) {
261             try {
262
263                 InstanceIdentifier<?> baEntry = mappingService.fromDataDom(entry);
264                 target.removeOperationalData(baEntry);
265             } catch (DeserializationException e) {
266                 LOG.error("Ommiting from BA transaction: {}.", entry, e);
267             }
268         }
269         return target;
270     }
271
272     public org.opendaylight.controller.sal.core.api.data.DataProviderService getBiDataService() {
273         return biDataService;
274     }
275
276     protected void setDomDataService(org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService) {
277         this.biDataService = biDataService;
278     }
279
280     public DataProviderService getBaDataService() {
281         return baDataService;
282     }
283
284     protected void setBindingDataService(DataProviderService baDataService) {
285         this.baDataService = baDataService;
286     }
287
288     public RpcProviderRegistry getRpcRegistry() {
289         return baRpcRegistry;
290     }
291
292     protected void setBindingRpcRegistry(RpcProviderRegistry rpcRegistry) {
293         this.baRpcRegistry = rpcRegistry;
294     }
295
296     public void startDataForwarding() {
297         checkState(!dataForwarding, "Connector is already forwarding data.");
298         baDataReaderRegistration = baDataService.registerDataReader(ROOT, this);
299         baCommitHandlerRegistration = baDataService.registerCommitHandler(ROOT, bindingToDomCommitHandler);
300         biCommitHandlerRegistration = biDataService.registerCommitHandler(ROOT_BI, domToBindingCommitHandler);
301         baDataService.registerCommitHandlerListener(domToBindingCommitHandler);
302         dataForwarding = true;
303     }
304
305     public void startRpcForwarding() {
306         if (baRpcRegistry != null && biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher<?, ?>) {
307             checkState(!rpcForwarding, "Connector is already forwarding RPCs");
308             domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(new DomToBindingRpcForwardingManager());
309             if (baRpcRegistry instanceof RpcProviderRegistryImpl) {
310                 baRpcRegistryImpl = (RpcProviderRegistryImpl) baRpcRegistry;
311                 baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance());
312                 baRpcRegistryImpl.registerGlobalRpcRegistrationListener(domToBindingRpcManager.getInstance());
313             }
314             if(biRpcRegistry instanceof org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) {
315                 biRouter = (org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) biRpcRegistry;
316             }
317             rpcForwarding = true;
318         }
319     }
320
321     public void startNotificationForwarding() {
322         checkState(!notificationForwarding, "Connector is already forwarding notifications.");
323         notificationForwarding = true;
324     }
325
326     protected void setMappingService(BindingIndependentMappingService mappingService) {
327         this.mappingService = mappingService;
328     }
329
330     @Override
331     public Collection<ProviderFunctionality> getProviderFunctionality() {
332         return Collections.emptyList();
333     }
334
335     @Override
336     public void onSessionInitiated(ProviderSession session) {
337         setDomDataService(session.getService(org.opendaylight.controller.sal.core.api.data.DataProviderService.class));
338         setDomRpcRegistry(session.getService(RpcProvisionRegistry.class));
339
340     }
341
342     public <T extends RpcService> void onRpcRouterCreated(Class<T> serviceType, RpcRouter<T> router) {
343
344     }
345
346     public void setDomRpcRegistry(RpcProvisionRegistry registry) {
347         biRpcRegistry = registry;
348     }
349
350     @Override
351     public void close() throws Exception {
352         if (baCommitHandlerRegistration != null) {
353             baCommitHandlerRegistration.close();
354         }
355         if (biCommitHandlerRegistration != null) {
356             biCommitHandlerRegistration.close();
357         }
358
359     }
360
361     private class DomToBindingTransaction implements
362             DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
363
364         private final org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing;
365         private final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> modification;
366
367         public DomToBindingTransaction(
368                 org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing,
369                 DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> modification) {
370             super();
371             this.backing = backing;
372             this.modification = modification;
373             bindingOpenedTransactions.put(backing.getIdentifier(), this);
374         }
375
376         @Override
377         public DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> getModification() {
378             return modification;
379         }
380
381         @Override
382         public RpcResult<Void> rollback() throws IllegalStateException {
383             // backing.cancel();
384             return Rpcs.<Void> getRpcResult(true, null, Collections.<RpcError> emptySet());
385         }
386
387         @Override
388         public RpcResult<Void> finish() throws IllegalStateException {
389             Future<RpcResult<TransactionStatus>> result = backing.commit();
390             try {
391                 RpcResult<TransactionStatus> baResult = result.get();
392                 return Rpcs.<Void> getRpcResult(baResult.isSuccessful(), null, baResult.getErrors());
393             } catch (InterruptedException e) {
394                 throw new IllegalStateException("", e);
395             } catch (ExecutionException e) {
396                 throw new IllegalStateException("", e);
397             }
398         }
399     }
400
401     private class BindingToDomTransaction implements
402             DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> {
403
404         private DataModificationTransaction backing;
405         private DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
406
407         public BindingToDomTransaction(DataModificationTransaction backing,
408                 DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification) {
409             this.backing = backing;
410             this.modification = modification;
411             domOpenedTransactions.put(backing.getIdentifier(), this);
412         }
413
414         @Override
415         public DataModification<InstanceIdentifier<? extends DataObject>, DataObject> getModification() {
416             return modification;
417         }
418
419         @Override
420         public RpcResult<Void> finish() throws IllegalStateException {
421             Future<RpcResult<TransactionStatus>> result = backing.commit();
422             try {
423                 RpcResult<TransactionStatus> biResult = result.get();
424                 return Rpcs.<Void> getRpcResult(biResult.isSuccessful(), null, biResult.getErrors());
425             } catch (InterruptedException e) {
426                 throw new IllegalStateException("", e);
427             } catch (ExecutionException e) {
428                 throw new IllegalStateException("", e);
429             } finally {
430                 domOpenedTransactions.remove(backing.getIdentifier());
431             }
432         }
433
434         @Override
435         public RpcResult<Void> rollback() throws IllegalStateException {
436             domOpenedTransactions.remove(backing.getIdentifier());
437             return Rpcs.<Void> getRpcResult(true, null, Collections.<RpcError> emptySet());
438         }
439     }
440
441     private class BindingToDomCommitHandler implements
442             DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> {
443
444         @Override
445         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> requestCommit(
446                 DataModification<InstanceIdentifier<? extends DataObject>, DataObject> bindingTransaction) {
447
448             /**
449              * Transaction was created as DOM transaction, in that case we do
450              * not need to forward it back.
451              */
452             if (bindingOpenedTransactions.containsKey(bindingTransaction.getIdentifier())) {
453
454                 return CommitHandlerTransactions.allwaysSuccessfulTransaction(bindingTransaction);
455             }
456             DataModificationTransaction domTransaction = createBindingToDomTransaction(bindingTransaction);
457             BindingToDomTransaction wrapped = new BindingToDomTransaction(domTransaction, bindingTransaction);
458             LOG.trace("Forwarding Binding Transaction: {} as DOM Transaction: {} .", bindingTransaction.getIdentifier(),
459                     domTransaction.getIdentifier());
460             return wrapped;
461         }
462     }
463
464     private class DomToBindingCommitHandler implements //
465             RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>, //
466             DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
467
468         @Override
469         public void onRegister(DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
470
471             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier domPath = mappingService.toDataDom(registration
472                     .getPath());
473
474         }
475
476         @Override
477         public void onUnregister(DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
478             // NOOP for now
479             // FIXME: do registration based on only active commit handlers.
480         }
481
482         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> requestCommit(
483                 DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> domTransaction) {
484             Object identifier = domTransaction.getIdentifier();
485
486             /**
487              * We checks if the transcation was originated in this mapper. If it
488              * was originated in this mapper we are returing allways success
489              * commit hanlder to prevent creating loop in two-phase commit and
490              * duplicating data.
491              */
492             if (domOpenedTransactions.containsKey(identifier)) {
493                 return CommitHandlerTransactions.allwaysSuccessfulTransaction(domTransaction);
494             }
495
496             org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction baTransaction = createDomToBindingTransaction(domTransaction);
497             DomToBindingTransaction forwardedTransaction = new DomToBindingTransaction(baTransaction, domTransaction);
498             LOG.trace("Forwarding DOM Transaction: {} as Binding Transaction: {}.", domTransaction.getIdentifier(),
499                     baTransaction.getIdentifier());
500             return forwardedTransaction;
501         }
502     }
503
504     /**
505      * Manager responsible for instantiating forwarders responsible for
506      * forwarding of RPC invocations from DOM Broker to Binding Aware Broker
507      *
508      */
509     private class DomToBindingRpcForwardingManager implements
510             RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>,
511             RouterInstantiationListener,
512             GlobalRpcRegistrationListener {
513
514         private final Map<Class<? extends RpcService>, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>();
515         private RpcProviderRegistryImpl registryImpl;
516
517         public RpcProviderRegistryImpl getRegistryImpl() {
518             return registryImpl;
519         }
520
521         public void setRegistryImpl(RpcProviderRegistryImpl registryImpl) {
522             this.registryImpl = registryImpl;
523         }
524
525         @Override
526         public void onGlobalRpcRegistered(Class<? extends RpcService> cls) {
527             getRpcForwarder(cls, null);
528         }
529
530         @Override
531         public void onGlobalRpcUnregistered(Class<? extends RpcService> cls) {
532             // NOOP
533         }
534
535         @Override
536         public void onRpcRouterCreated(RpcRouter<?> router) {
537             Class<? extends BaseIdentity> ctx = router.getContexts().iterator().next();
538             getRpcForwarder(router.getServiceType(), ctx);
539         }
540
541         @Override
542         public void onRouteChange(RouteChange<RpcContextIdentifier, InstanceIdentifier<?>> change) {
543             for (Entry<RpcContextIdentifier, Set<InstanceIdentifier<?>>> entry : change.getAnnouncements().entrySet()) {
544                 bindingRoutesAdded(entry);
545             }
546         }
547
548         private void bindingRoutesAdded(Entry<RpcContextIdentifier, Set<InstanceIdentifier<?>>> entry) {
549             Class<? extends BaseIdentity> context = entry.getKey().getRoutingContext();
550             Class<? extends RpcService> service = entry.getKey().getRpcService();
551             if (context != null) {
552                 getRpcForwarder(service, context).registerPaths(context, service, entry.getValue());
553             }
554         }
555
556         private DomToBindingRpcForwarder getRpcForwarder(Class<? extends RpcService> service,
557                 Class<? extends BaseIdentity> context) {
558             DomToBindingRpcForwarder potential = forwarders.get(service);
559             if (potential != null) {
560                 return potential;
561             }
562             if (context == null) {
563                 potential = new DomToBindingRpcForwarder(service);
564             } else {
565                 potential = new DomToBindingRpcForwarder(service, context);
566             }
567
568             forwarders.put(service, potential);
569             return potential;
570         }
571
572     }
573
574     private class DomToBindingRpcForwarder implements RpcImplementation, InvocationHandler {
575
576         private final Set<QName> supportedRpcs;
577         private final WeakReference<Class<? extends RpcService>> rpcServiceType;
578         private Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
579         private Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
580         private WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
581
582         public DomToBindingRpcForwarder(Class<? extends RpcService> service) {
583             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
584             this.supportedRpcs = mappingService.getRpcQNamesFor(service);
585             try {
586                 for (QName rpc : supportedRpcs) {
587                     RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
588                     strategiesByMethod.put(strategy.targetMethod, strategy);
589                     strategiesByQName.put(rpc, strategy);
590                     biRpcRegistry.addRpcImplementation(rpc, this);
591                 }
592
593             } catch (Exception e) {
594                 LOG.error("Could not forward Rpcs of type {}", service.getName(),e);
595             }
596             registrations = ImmutableSet.of();
597         }
598
599         /**
600          * Constructor for Routed RPC Forwareder.
601          *
602          * @param service
603          * @param context
604          */
605         public DomToBindingRpcForwarder(Class<? extends RpcService> service, Class<? extends BaseIdentity> context) {
606             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
607             this.supportedRpcs = mappingService.getRpcQNamesFor(service);
608             Builder<RoutedRpcRegistration> registrationsBuilder = ImmutableSet
609                     .<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> builder();
610             try {
611                 for (QName rpc : supportedRpcs) {
612                     RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
613                     strategiesByMethod.put(strategy.targetMethod, strategy);
614                     strategiesByQName.put(rpc, strategy);
615                     registrationsBuilder.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this));
616                 }
617                 createDefaultDomForwarder();
618             } catch (Exception e) {
619                 LOG.error("Could not forward Rpcs of type {}", service.getName(),e);
620             }
621             registrations = registrationsBuilder.build();
622         }
623
624         public void registerPaths(Class<? extends BaseIdentity> context, Class<? extends RpcService> service,
625                 Set<InstanceIdentifier<?>> set) {
626             QName ctx = BindingReflections.findQName(context);
627             for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
628                     toDOMInstanceIdentifier)) {
629                 for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) {
630                     reg.registerPath(ctx, path);
631                 }
632             }
633         }
634
635
636         @Override
637         public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
638             if(EQUALS_METHOD.equals(method)) {
639                 return false;
640             }
641             RpcInvocationStrategy strategy = strategiesByMethod.get(method);
642             checkState(strategy != null);
643             checkArgument(args.length <= 2);
644             if(args.length == 1) {
645                 checkArgument(args[0] instanceof DataObject);
646                 return strategy.forwardToDomBroker((DataObject) args[0]);
647             }
648             return strategy.forwardToDomBroker(null);
649         }
650
651         public void removePaths(Class<? extends BaseIdentity> context, Class<? extends RpcService> service,
652                 Set<InstanceIdentifier<?>> set) {
653             QName ctx = BindingReflections.findQName(context);
654             for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
655                     toDOMInstanceIdentifier)) {
656                 for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) {
657                     reg.unregisterPath(ctx, path);
658                 }
659             }
660         }
661
662         @Override
663         public Set<QName> getSupportedRpcs() {
664             return supportedRpcs;
665         }
666
667         @SuppressWarnings({ "unchecked", "rawtypes" })
668         public void createDefaultDomForwarder() {
669             if (baRpcRegistryImpl != null) {
670                 Class<?> cls = rpcServiceType.get();
671                 ClassLoader clsLoader = cls.getClassLoader();
672                 RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class<?>[] { cls }, this);
673
674                 RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get());
675                 rpcRouter.registerDefaultService(proxy);
676             }
677         }
678
679         @Override
680         public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode domInput) {
681             checkArgument(rpc != null);
682             checkArgument(domInput != null);
683
684             Class<? extends RpcService> rpcType = rpcServiceType.get();
685             checkState(rpcType != null);
686             RpcService rpcService = baRpcRegistry.getRpcService(rpcType);
687             checkState(rpcService != null);
688             CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
689             try {
690                 return resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput);
691             } catch (Exception e) {
692                 throw new IllegalStateException(e);
693             }
694         }
695
696         private RpcInvocationStrategy resolveInvocationStrategy(QName rpc) {
697             return strategiesByQName.get(rpc);
698         }
699
700         private RpcInvocationStrategy createInvocationStrategy(final QName rpc,
701                 final Class<? extends RpcService> rpcType) throws Exception {
702             return ClassLoaderUtils.withClassLoader(rpcType.getClassLoader(), new Callable<RpcInvocationStrategy>() {
703                 @Override
704                 public RpcInvocationStrategy call() throws Exception {
705                     String methodName = BindingMapping.getMethodName(rpc);
706                     Method targetMethod = null;
707                     for (Method possibleMethod : rpcType.getMethods()) {
708                         if (possibleMethod.getName().equals(methodName)
709                                 && BindingReflections.isRpcMethod(possibleMethod)) {
710                             targetMethod = possibleMethod;
711                             break;
712                         }
713                     }
714                     checkState(targetMethod != null, "Rpc method not found");
715                     Optional<Class<?>> outputClass = BindingReflections.resolveRpcOutputClass(targetMethod);
716                     Optional<Class<? extends DataContainer>> inputClass = BindingReflections
717                             .resolveRpcInputClass(targetMethod);
718
719                     RpcInvocationStrategy strategy = null;
720                     if (outputClass.isPresent()) {
721                         if (inputClass.isPresent()) {
722                             strategy = new DefaultInvocationStrategy(rpc,targetMethod, outputClass.get(), inputClass.get());
723                         } else {
724                             strategy = new NoInputNoOutputInvocationStrategy(rpc,targetMethod);
725                         }
726                     } else if(inputClass.isPresent()){
727                         strategy = new NoOutputInvocationStrategy(rpc,targetMethod, inputClass.get());
728                     } else {
729                         strategy = new NoInputNoOutputInvocationStrategy(rpc,targetMethod);
730                     }
731                     return strategy;
732                 }
733
734             });
735         }
736     }
737
738     private abstract class RpcInvocationStrategy {
739
740         protected final Method targetMethod;
741         protected final QName rpc;
742
743         public RpcInvocationStrategy(QName rpc,Method targetMethod) {
744             this.targetMethod = targetMethod;
745             this.rpc = rpc;
746         }
747
748         public abstract Future<RpcResult<?>> forwardToDomBroker(DataObject input);
749
750         public abstract RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput)
751                 throws Exception;
752
753         public RpcResult<CompositeNode> invokeOn(RpcService rpcService, CompositeNode domInput) throws Exception {
754             return uncheckedInvoke(rpcService, domInput);
755         }
756     }
757
758     private class DefaultInvocationStrategy extends RpcInvocationStrategy {
759
760         @SuppressWarnings("rawtypes")
761         private WeakReference<Class> inputClass;
762
763         @SuppressWarnings("rawtypes")
764         private WeakReference<Class> outputClass;
765
766         @SuppressWarnings({ "rawtypes", "unchecked" })
767         public DefaultInvocationStrategy(QName rpc, Method targetMethod, Class<?> outputClass,
768                 Class<? extends DataContainer> inputClass) {
769             super(rpc,targetMethod);
770             this.outputClass = new WeakReference(outputClass);
771             this.inputClass = new WeakReference(inputClass);
772         }
773
774         @Override
775         public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
776             DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
777             Future<RpcResult<?>> result = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
778             if (result == null) {
779                 return Rpcs.getRpcResult(false);
780             }
781             RpcResult<?> bindingResult = result.get();
782             return Rpcs.getRpcResult(true);
783         }
784
785         @Override
786         public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
787             if(biRouter != null) {
788                 CompositeNode xml = mappingService.toDataDom(input);
789                 CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
790                 RpcResult<CompositeNode> result = biRouter.invokeRpc(rpc, wrappedXml);
791                 Object baResultValue = null;
792                 if(result.getResult() != null) {
793                     baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult());
794                 }
795                 RpcResult<?> baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors());
796                 return Futures.<RpcResult<?>>immediateFuture(baResult);
797             }
798             return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
799         }
800
801     }
802
803     private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
804
805         public NoInputNoOutputInvocationStrategy(QName rpc, Method targetMethod) {
806             super(rpc,targetMethod);
807         }
808
809         public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
810             @SuppressWarnings("unchecked")
811             Future<RpcResult<Void>> result = (Future<RpcResult<Void>>) targetMethod.invoke(rpcService);
812             RpcResult<Void> bindingResult = result.get();
813             return Rpcs.getRpcResult(bindingResult.isSuccessful(), bindingResult.getErrors());
814         }
815
816         @Override
817         public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
818             return Futures.immediateFuture(null);
819         }
820     }
821     
822     private class NoOutputInvocationStrategy extends RpcInvocationStrategy {
823
824         
825         @SuppressWarnings("rawtypes")
826         private WeakReference<Class> inputClass;
827
828         @SuppressWarnings({ "rawtypes", "unchecked" })
829         public NoOutputInvocationStrategy(QName rpc, Method targetMethod, 
830                 Class<? extends DataContainer> inputClass) {
831             super(rpc,targetMethod);
832             this.inputClass = new WeakReference(inputClass);
833         }
834         
835         
836         @Override
837         public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
838             DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
839             Future<RpcResult<?>> result = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
840             if (result == null) {
841                 return Rpcs.getRpcResult(false);
842             }
843             RpcResult<?> bindingResult = result.get();
844             return Rpcs.getRpcResult(true);
845         }
846
847         @Override
848         public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
849             if(biRouter != null) {
850                 CompositeNode xml = mappingService.toDataDom(input);
851                 CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
852                 RpcResult<CompositeNode> result = biRouter.invokeRpc(rpc, wrappedXml);
853                 Object baResultValue = null;
854                 RpcResult<?> baResult = Rpcs.<Void>getRpcResult(result.isSuccessful(), null, result.getErrors());
855                 return Futures.<RpcResult<?>>immediateFuture(baResult);
856             }
857             return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
858         }
859
860     }
861
862     public boolean isRpcForwarding() {
863         return rpcForwarding;
864     }
865
866     public boolean isDataForwarding() {
867         return dataForwarding;
868     }
869
870     public boolean isNotificationForwarding() {
871         return notificationForwarding;
872     }
873
874     public BindingIndependentMappingService getMappingService() {
875         return mappingService;
876     }
877 }