Merge "BUG-694: Disable DataReader registration for BA Broker"
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / connect / dom / BindingIndependentConnector.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl.connect.dom;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.lang.ref.WeakReference;
14 import java.lang.reflect.InvocationHandler;
15 import java.lang.reflect.Method;
16 import java.lang.reflect.Proxy;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.HashMap;
20 import java.util.HashSet;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.Set;
24 import java.util.WeakHashMap;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.Future;
30
31 import org.opendaylight.controller.md.sal.binding.impl.AbstractForwardedDataBroker;
32 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
33 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
34 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
35 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
36 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
37 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
38 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
39 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
40 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
41 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
42 import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener;
43 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
44 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
45 import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
46 import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
47 import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
48 import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl;
49 import org.opendaylight.controller.sal.binding.impl.MountPointManagerImpl.BindingMountPointImpl;
50 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
51 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener;
52 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener;
53 import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions;
54 import org.opendaylight.controller.sal.common.util.Rpcs;
55 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
56 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
57 import org.opendaylight.controller.sal.core.api.Provider;
58 import org.opendaylight.controller.sal.core.api.RpcImplementation;
59 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
60 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
61 import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
62 import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
63 import org.opendaylight.yangtools.concepts.ListenerRegistration;
64 import org.opendaylight.yangtools.concepts.Registration;
65 import org.opendaylight.yangtools.concepts.util.ClassLoaderUtils;
66 import org.opendaylight.yangtools.yang.binding.Augmentable;
67 import org.opendaylight.yangtools.yang.binding.Augmentation;
68 import org.opendaylight.yangtools.yang.binding.BaseIdentity;
69 import org.opendaylight.yangtools.yang.binding.BindingMapping;
70 import org.opendaylight.yangtools.yang.binding.DataContainer;
71 import org.opendaylight.yangtools.yang.binding.DataObject;
72 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
73 import org.opendaylight.yangtools.yang.binding.Notification;
74 import org.opendaylight.yangtools.yang.binding.RpcService;
75 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
76 import org.opendaylight.yangtools.yang.common.QName;
77 import org.opendaylight.yangtools.yang.common.RpcError;
78 import org.opendaylight.yangtools.yang.common.RpcResult;
79 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
80 import org.opendaylight.yangtools.yang.data.api.Node;
81 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
82 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
83 import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
84 import org.slf4j.Logger;
85 import org.slf4j.LoggerFactory;
86
87 import com.google.common.base.Function;
88 import com.google.common.base.Optional;
89 import com.google.common.collect.FluentIterable;
90 import com.google.common.collect.ImmutableList;
91 import com.google.common.collect.ImmutableSet;
92 import com.google.common.collect.ImmutableSet.Builder;
93 import com.google.common.util.concurrent.Futures;
94
95 public class BindingIndependentConnector implements //
96         RuntimeDataProvider, //
97         Provider, //
98         AutoCloseable {
99
100
101
102     private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class);
103
104
105     private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
106             .builder().toInstance();
107
108     private final static Method EQUALS_METHOD;
109
110     private BindingIndependentMappingService mappingService;
111
112     private org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService;
113
114     private DataProviderService baDataService;
115
116     private final ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
117     private final ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
118
119     private final BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
120     private final DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
121
122     private Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> baCommitHandlerRegistration;
123
124     private Registration<DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode>> biCommitHandlerRegistration;
125
126     private RpcProvisionRegistry biRpcRegistry;
127     private RpcProviderRegistry baRpcRegistry;
128
129     private ListenerRegistration<DomToBindingRpcForwardingManager> domToBindingRpcManager;
130     // private ListenerRegistration<BindingToDomRpcForwardingManager>
131     // bindingToDomRpcManager;
132
133     private final Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
134
135         @Override
136         public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(final InstanceIdentifier<?> input) {
137             return mappingService.toDataDom(input);
138         }
139
140     };
141
142     private boolean rpcForwarding = false;
143
144     private boolean dataForwarding = false;
145
146     private boolean notificationForwarding = false;
147
148     private RpcProviderRegistryImpl baRpcRegistryImpl;
149
150     private NotificationProviderService baNotifyService;
151
152     private NotificationPublishService domNotificationService;
153
154     static {
155         try {
156             EQUALS_METHOD = Object.class.getMethod("equals", Object.class);
157         } catch (Exception e) {
158             throw new RuntimeException(e);
159         }
160     }
161
162     @Override
163     public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
164         try {
165             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
166             CompositeNode result = biDataService.readOperationalData(biPath);
167             return potentialAugmentationRead(path, biPath, result);
168         } catch (DeserializationException e) {
169             throw new IllegalStateException(e);
170         }
171     }
172
173     private DataObject potentialAugmentationRead(InstanceIdentifier<? extends DataObject> path,
174             final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath, final CompositeNode result)
175             throws DeserializationException {
176         Class<? extends DataObject> targetType = path.getTargetType();
177         if (Augmentation.class.isAssignableFrom(targetType)) {
178             path = mappingService.fromDataDom(biPath);
179             Class<? extends Augmentation<?>> augmentType = (Class<? extends Augmentation<?>>) targetType;
180             DataObject parentTo = mappingService.dataObjectFromDataDom(path, result);
181             if (parentTo instanceof Augmentable<?>) {
182                 return (DataObject) ((Augmentable) parentTo).getAugmentation(augmentType);
183             }
184         }
185         return mappingService.dataObjectFromDataDom(path, result);
186     }
187
188     @Override
189     public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
190         try {
191             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
192             CompositeNode result = biDataService.readConfigurationData(biPath);
193             return potentialAugmentationRead(path, biPath, result);
194         } catch (DeserializationException e) {
195             throw new IllegalStateException(e);
196         }
197     }
198
199     private DataModificationTransaction createBindingToDomTransaction(
200             final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> source) {
201         DataModificationTransaction target = biDataService.beginTransaction();
202         LOG.debug("Created DOM Transaction {} for {},", target.getIdentifier(),source.getIdentifier());
203         for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedConfigurationData()) {
204             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
205             target.removeConfigurationData(biEntry);
206             LOG.debug("Delete of Binding Configuration Data {} is translated to {}",entry,biEntry);
207         }
208         for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedOperationalData()) {
209             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
210             target.removeOperationalData(biEntry);
211             LOG.debug("Delete of Binding Operational Data {} is translated to {}",entry,biEntry);
212         }
213         for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedConfigurationData()
214                 .entrySet()) {
215             Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
216                     .toDataDom(entry);
217             target.putConfigurationData(biEntry.getKey(), biEntry.getValue());
218             LOG.debug("Update of Binding Configuration Data {} is translated to {}",entry,biEntry);
219         }
220         for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedOperationalData()
221                 .entrySet()) {
222             Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
223                     .toDataDom(entry);
224             target.putOperationalData(biEntry.getKey(), biEntry.getValue());
225             LOG.debug("Update of Binding Operational Data {} is translated to {}",entry,biEntry);
226         }
227
228         return target;
229     }
230
231     private org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction createDomToBindingTransaction(
232             final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> source) {
233         org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction target = baDataService
234                 .beginTransaction();
235         for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedConfigurationData()) {
236             try {
237
238                 InstanceIdentifier<?> baEntry = mappingService.fromDataDom(entry);
239                 target.removeConfigurationData(baEntry);
240             } catch (DeserializationException e) {
241                 LOG.error("Ommiting from BA transaction: {}.", entry, e);
242             }
243         }
244         for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedOperationalData()) {
245             try {
246
247                 InstanceIdentifier<?> baEntry = mappingService.fromDataDom(entry);
248                 target.removeOperationalData(baEntry);
249             } catch (DeserializationException e) {
250                 LOG.error("Ommiting from BA transaction: {}.", entry, e);
251             }
252         }
253         for (Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> entry : source
254                 .getUpdatedConfigurationData().entrySet()) {
255             try {
256                 InstanceIdentifier<?> baKey = mappingService.fromDataDom(entry.getKey());
257                 DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue());
258                 target.putConfigurationData(baKey, baData);
259             } catch (DeserializationException e) {
260                 LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e);
261             }
262         }
263         for (Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> entry : source
264                 .getUpdatedOperationalData().entrySet()) {
265             try {
266
267                 InstanceIdentifier<?> baKey = mappingService.fromDataDom(entry.getKey());
268                 DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue());
269                 target.putOperationalData(baKey, baData);
270             } catch (DeserializationException e) {
271                 LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e);
272             }
273         }
274         return target;
275     }
276
277     public org.opendaylight.controller.sal.core.api.data.DataProviderService getBiDataService() {
278         return biDataService;
279     }
280
281     protected void setDomDataService(final org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService) {
282         this.biDataService = biDataService;
283     }
284
285     public DataProviderService getBaDataService() {
286         return baDataService;
287     }
288
289     protected void setBindingDataService(final DataProviderService baDataService) {
290         this.baDataService = baDataService;
291     }
292
293     public RpcProviderRegistry getRpcRegistry() {
294         return baRpcRegistry;
295     }
296
297     protected void setBindingRpcRegistry(final RpcProviderRegistry rpcRegistry) {
298         this.baRpcRegistry = rpcRegistry;
299     }
300
301     public void startDataForwarding() {
302         if(baDataService instanceof AbstractForwardedDataBroker) {
303             dataForwarding = true;
304             return;
305         }
306
307         final DataProviderService baData;
308         if (baDataService instanceof BindingMountPointImpl) {
309             baData = ((BindingMountPointImpl)baDataService).getDataBrokerImpl();
310             LOG.debug("Extracted BA Data provider {} from mount point {}", baData, baDataService);
311         } else {
312             baData = baDataService;
313         }
314
315         if (baData instanceof DataBrokerImpl) {
316             checkState(!dataForwarding, "Connector is already forwarding data.");
317             ((DataBrokerImpl) baData).setDataReadDelegate(this);
318             ((DataBrokerImpl) baData).setRootCommitHandler(bindingToDomCommitHandler);
319             biCommitHandlerRegistration = biDataService.registerCommitHandler(ROOT_BI, domToBindingCommitHandler);
320             baDataService.registerCommitHandlerListener(domToBindingCommitHandler);
321         }
322
323         dataForwarding = true;
324     }
325
326     public void startRpcForwarding() {
327         if (baRpcRegistry != null && biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher<?, ?>) {
328             checkState(!rpcForwarding, "Connector is already forwarding RPCs");
329             domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(new DomToBindingRpcForwardingManager());
330             if (baRpcRegistry instanceof RpcProviderRegistryImpl) {
331                 baRpcRegistryImpl = (RpcProviderRegistryImpl) baRpcRegistry;
332                 baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance());
333                 baRpcRegistryImpl.registerGlobalRpcRegistrationListener(domToBindingRpcManager.getInstance());
334             }
335             rpcForwarding = true;
336         }
337     }
338
339     public void startNotificationForwarding() {
340         checkState(!notificationForwarding, "Connector is already forwarding notifications.");
341         if (baNotifyService != null && domNotificationService != null) {
342             baNotifyService.registerInterestListener(new DomToBindingNotificationForwarder());
343
344             notificationForwarding = true;
345         }
346     }
347
348     protected void setMappingService(final BindingIndependentMappingService mappingService) {
349         this.mappingService = mappingService;
350     }
351
352     @Override
353     public Collection<ProviderFunctionality> getProviderFunctionality() {
354         return Collections.emptyList();
355     }
356
357     @Override
358     public void onSessionInitiated(final ProviderSession session) {
359         setDomDataService(session.getService(org.opendaylight.controller.sal.core.api.data.DataProviderService.class));
360         setDomRpcRegistry(session.getService(RpcProvisionRegistry.class));
361
362     }
363
364     public <T extends RpcService> void onRpcRouterCreated(final Class<T> serviceType, final RpcRouter<T> router) {
365
366     }
367
368     public void setDomRpcRegistry(final RpcProvisionRegistry registry) {
369         biRpcRegistry = registry;
370     }
371
372     @Override
373     public void close() throws Exception {
374         if (baCommitHandlerRegistration != null) {
375             baCommitHandlerRegistration.close();
376         }
377         if (biCommitHandlerRegistration != null) {
378             biCommitHandlerRegistration.close();
379         }
380
381     }
382
383     private class DomToBindingTransaction implements
384             DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
385
386         private final org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing;
387         private final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> modification;
388
389         public DomToBindingTransaction(
390                 final org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing,
391                 final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> modification) {
392             super();
393             this.backing = backing;
394             this.modification = modification;
395             bindingOpenedTransactions.put(backing.getIdentifier(), this);
396         }
397
398         @Override
399         public DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> getModification() {
400             return modification;
401         }
402
403         @Override
404         public RpcResult<Void> rollback() throws IllegalStateException {
405             // backing.cancel();
406             return Rpcs.<Void> getRpcResult(true, null, Collections.<RpcError> emptySet());
407         }
408
409         @Override
410         public RpcResult<Void> finish() throws IllegalStateException {
411             Future<RpcResult<TransactionStatus>> result = backing.commit();
412             try {
413                 RpcResult<TransactionStatus> baResult = result.get();
414                 return Rpcs.<Void> getRpcResult(baResult.isSuccessful(), null, baResult.getErrors());
415             } catch (InterruptedException e) {
416                 throw new IllegalStateException("", e);
417             } catch (ExecutionException e) {
418                 throw new IllegalStateException("", e);
419             }
420         }
421     }
422
423     private class BindingToDomTransaction implements
424             DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> {
425
426         private final DataModificationTransaction backing;
427         private final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
428
429         public BindingToDomTransaction(final DataModificationTransaction backing,
430                 final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification) {
431             this.backing = backing;
432             this.modification = modification;
433             domOpenedTransactions.put(backing.getIdentifier(), this);
434         }
435
436         @Override
437         public DataModification<InstanceIdentifier<? extends DataObject>, DataObject> getModification() {
438             return modification;
439         }
440
441         @Override
442         public RpcResult<Void> finish() throws IllegalStateException {
443             Future<RpcResult<TransactionStatus>> result = backing.commit();
444             try {
445                 RpcResult<TransactionStatus> biResult = result.get();
446                 return Rpcs.<Void> getRpcResult(biResult.isSuccessful(), null, biResult.getErrors());
447             } catch (InterruptedException e) {
448                 throw new IllegalStateException("", e);
449             } catch (ExecutionException e) {
450                 throw new IllegalStateException("", e);
451             } finally {
452                 domOpenedTransactions.remove(backing.getIdentifier());
453             }
454         }
455
456         @Override
457         public RpcResult<Void> rollback() throws IllegalStateException {
458             domOpenedTransactions.remove(backing.getIdentifier());
459             return Rpcs.<Void> getRpcResult(true, null, Collections.<RpcError> emptySet());
460         }
461     }
462
463     private class BindingToDomCommitHandler implements
464             DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> {
465
466         @Override
467         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> requestCommit(
468                 final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> bindingTransaction) {
469
470             /**
471              * Transaction was created as DOM transaction, in that case we do
472              * not need to forward it back.
473              */
474             if (bindingOpenedTransactions.containsKey(bindingTransaction.getIdentifier())) {
475
476                 return CommitHandlerTransactions.allwaysSuccessfulTransaction(bindingTransaction);
477             }
478             DataModificationTransaction domTransaction = createBindingToDomTransaction(bindingTransaction);
479             BindingToDomTransaction wrapped = new BindingToDomTransaction(domTransaction, bindingTransaction);
480             LOG.trace("Forwarding Binding Transaction: {} as DOM Transaction: {} .", bindingTransaction.getIdentifier(),
481                     domTransaction.getIdentifier());
482             return wrapped;
483         }
484     }
485
486     private class DomToBindingCommitHandler implements //
487             RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>, //
488             DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
489
490         @Override
491         public void onRegister(final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
492
493             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier domPath = mappingService.toDataDom(registration
494                     .getPath());
495
496         }
497
498         @Override
499         public void onUnregister(final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
500             // NOOP for now
501             // FIXME: do registration based on only active commit handlers.
502         }
503
504         @Override
505         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> requestCommit(
506                 final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> domTransaction) {
507             Object identifier = domTransaction.getIdentifier();
508
509             /**
510              * We checks if the transcation was originated in this mapper. If it
511              * was originated in this mapper we are returing allways success
512              * commit hanlder to prevent creating loop in two-phase commit and
513              * duplicating data.
514              */
515             if (domOpenedTransactions.containsKey(identifier)) {
516                 return CommitHandlerTransactions.allwaysSuccessfulTransaction(domTransaction);
517             }
518
519             org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction baTransaction = createDomToBindingTransaction(domTransaction);
520             DomToBindingTransaction forwardedTransaction = new DomToBindingTransaction(baTransaction, domTransaction);
521             LOG.trace("Forwarding DOM Transaction: {} as Binding Transaction: {}.", domTransaction.getIdentifier(),
522                     baTransaction.getIdentifier());
523             return forwardedTransaction;
524         }
525     }
526
527     /**
528      * Manager responsible for instantiating forwarders responsible for
529      * forwarding of RPC invocations from DOM Broker to Binding Aware Broker
530      *
531      */
532     private class DomToBindingRpcForwardingManager implements
533             RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>,
534             RouterInstantiationListener,
535             GlobalRpcRegistrationListener {
536
537         private final Map<Class<? extends RpcService>, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>();
538         private RpcProviderRegistryImpl registryImpl;
539
540         public RpcProviderRegistryImpl getRegistryImpl() {
541             return registryImpl;
542         }
543
544         public void setRegistryImpl(final RpcProviderRegistryImpl registryImpl) {
545             this.registryImpl = registryImpl;
546         }
547
548         @Override
549         public void onGlobalRpcRegistered(final Class<? extends RpcService> cls) {
550             getRpcForwarder(cls, null);
551         }
552
553         @Override
554         public void onGlobalRpcUnregistered(final Class<? extends RpcService> cls) {
555             // NOOP
556         }
557
558         @Override
559         public void onRpcRouterCreated(final RpcRouter<?> router) {
560             Class<? extends BaseIdentity> ctx = router.getContexts().iterator().next();
561             getRpcForwarder(router.getServiceType(), ctx);
562         }
563
564         @Override
565         public void onRouteChange(final RouteChange<RpcContextIdentifier, InstanceIdentifier<?>> change) {
566             for (Entry<RpcContextIdentifier, Set<InstanceIdentifier<?>>> entry : change.getAnnouncements().entrySet()) {
567                 bindingRoutesAdded(entry);
568             }
569         }
570
571         private void bindingRoutesAdded(final Entry<RpcContextIdentifier, Set<InstanceIdentifier<?>>> entry) {
572             Class<? extends BaseIdentity> context = entry.getKey().getRoutingContext();
573             Class<? extends RpcService> service = entry.getKey().getRpcService();
574             if (context != null) {
575                 getRpcForwarder(service, context).registerPaths(context, service, entry.getValue());
576             }
577         }
578
579         private DomToBindingRpcForwarder getRpcForwarder(final Class<? extends RpcService> service,
580                 final Class<? extends BaseIdentity> context) {
581             DomToBindingRpcForwarder potential = forwarders.get(service);
582             if (potential != null) {
583                 return potential;
584             }
585             if (context == null) {
586                 potential = new DomToBindingRpcForwarder(service);
587             } else {
588                 potential = new DomToBindingRpcForwarder(service, context);
589             }
590
591             forwarders.put(service, potential);
592             return potential;
593         }
594
595     }
596
597     private class DomToBindingRpcForwarder implements RpcImplementation, InvocationHandler {
598
599         private final Set<QName> supportedRpcs;
600         private final WeakReference<Class<? extends RpcService>> rpcServiceType;
601         private final Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
602         private final Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
603         private final WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
604
605         public DomToBindingRpcForwarder(final Class<? extends RpcService> service) {
606             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
607             this.supportedRpcs = mappingService.getRpcQNamesFor(service);
608             try {
609                 for (QName rpc : supportedRpcs) {
610                     RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
611                     strategiesByMethod.put(strategy.targetMethod, strategy);
612                     strategiesByQName.put(rpc, strategy);
613                     biRpcRegistry.addRpcImplementation(rpc, this);
614                 }
615
616             } catch (Exception e) {
617                 LOG.error("Could not forward Rpcs of type {}", service.getName(),e);
618             }
619             registrations = ImmutableSet.of();
620         }
621
622         /**
623          * Constructor for Routed RPC Forwareder.
624          *
625          * @param service
626          * @param context
627          */
628         public DomToBindingRpcForwarder(final Class<? extends RpcService> service, final Class<? extends BaseIdentity> context) {
629             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
630             this.supportedRpcs = mappingService.getRpcQNamesFor(service);
631             Builder<RoutedRpcRegistration> registrationsBuilder = ImmutableSet
632                     .<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> builder();
633             try {
634                 for (QName rpc : supportedRpcs) {
635                     RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
636                     strategiesByMethod.put(strategy.targetMethod, strategy);
637                     strategiesByQName.put(rpc, strategy);
638                     registrationsBuilder.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this));
639                 }
640                 createDefaultDomForwarder();
641             } catch (Exception e) {
642                 LOG.error("Could not forward Rpcs of type {}", service.getName(), e);
643             }
644             registrations = registrationsBuilder.build();
645         }
646
647         public void registerPaths(final Class<? extends BaseIdentity> context, final Class<? extends RpcService> service,
648                 final Set<InstanceIdentifier<?>> set) {
649             QName ctx = BindingReflections.findQName(context);
650             for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
651                     toDOMInstanceIdentifier)) {
652                 for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) {
653                     reg.registerPath(ctx, path);
654                 }
655             }
656         }
657
658
659         @Override
660         public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
661             if (EQUALS_METHOD.equals(method)) {
662                 return false;
663             }
664             RpcInvocationStrategy strategy = strategiesByMethod.get(method);
665             checkState(strategy != null);
666             checkArgument(args.length <= 2);
667             if (args.length == 1) {
668                 checkArgument(args[0] instanceof DataObject);
669                 return strategy.forwardToDomBroker((DataObject) args[0]);
670             }
671             return strategy.forwardToDomBroker(null);
672         }
673
674         public void removePaths(final Class<? extends BaseIdentity> context, final Class<? extends RpcService> service,
675                 final Set<InstanceIdentifier<?>> set) {
676             QName ctx = BindingReflections.findQName(context);
677             for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
678                     toDOMInstanceIdentifier)) {
679                 for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) {
680                     reg.unregisterPath(ctx, path);
681                 }
682             }
683         }
684
685         @Override
686         public Set<QName> getSupportedRpcs() {
687             return supportedRpcs;
688         }
689
690         @SuppressWarnings({ "unchecked", "rawtypes" })
691         public void createDefaultDomForwarder() {
692             if (baRpcRegistryImpl != null) {
693                 Class<?> cls = rpcServiceType.get();
694                 ClassLoader clsLoader = cls.getClassLoader();
695                 RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class<?>[] { cls }, this);
696
697                 RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get());
698                 rpcRouter.registerDefaultService(proxy);
699             }
700         }
701
702         @Override
703         public RpcResult<CompositeNode> invokeRpc(final QName rpc, final CompositeNode domInput) {
704             checkArgument(rpc != null);
705             checkArgument(domInput != null);
706
707             Class<? extends RpcService> rpcType = rpcServiceType.get();
708             checkState(rpcType != null);
709             RpcService rpcService = baRpcRegistry.getRpcService(rpcType);
710             checkState(rpcService != null);
711             CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
712             try {
713                 return resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput);
714             } catch (Exception e) {
715                 throw new IllegalStateException(e);
716             }
717         }
718
719         private RpcInvocationStrategy resolveInvocationStrategy(final QName rpc) {
720             return strategiesByQName.get(rpc);
721         }
722
723         private RpcInvocationStrategy createInvocationStrategy(final QName rpc,
724                 final Class<? extends RpcService> rpcType) throws Exception {
725             return ClassLoaderUtils.withClassLoader(rpcType.getClassLoader(), new Callable<RpcInvocationStrategy>() {
726                 @Override
727                 public RpcInvocationStrategy call() throws Exception {
728                     String methodName = BindingMapping.getMethodName(rpc);
729                     Method targetMethod = null;
730                     for (Method possibleMethod : rpcType.getMethods()) {
731                         if (possibleMethod.getName().equals(methodName)
732                                 && BindingReflections.isRpcMethod(possibleMethod)) {
733                             targetMethod = possibleMethod;
734                             break;
735                         }
736                     }
737                     checkState(targetMethod != null, "Rpc method not found");
738                     Optional<Class<?>> outputClass = BindingReflections.resolveRpcOutputClass(targetMethod);
739                     Optional<Class<? extends DataContainer>> inputClass = BindingReflections
740                             .resolveRpcInputClass(targetMethod);
741
742                     RpcInvocationStrategy strategy = null;
743                     if (outputClass.isPresent()) {
744                         if (inputClass.isPresent()) {
745                             strategy = new DefaultInvocationStrategy(rpc, targetMethod, outputClass.get(), inputClass
746                                     .get());
747                         } else {
748                             strategy = new NoInputNoOutputInvocationStrategy(rpc, targetMethod);
749                         }
750                     } else if(inputClass.isPresent()){
751                         strategy = new NoOutputInvocationStrategy(rpc,targetMethod, inputClass.get());
752                     } else {
753                         strategy = new NoInputNoOutputInvocationStrategy(rpc,targetMethod);
754                     }
755                     return strategy;
756                 }
757
758             });
759         }
760     }
761
762     private abstract class RpcInvocationStrategy {
763
764         protected final Method targetMethod;
765         protected final QName rpc;
766
767         public RpcInvocationStrategy(final QName rpc, final Method targetMethod) {
768             this.targetMethod = targetMethod;
769             this.rpc = rpc;
770         }
771
772         public abstract Future<RpcResult<?>> forwardToDomBroker(DataObject input);
773
774         public abstract RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput)
775                 throws Exception;
776
777         public RpcResult<CompositeNode> invokeOn(final RpcService rpcService, final CompositeNode domInput) throws Exception {
778             return uncheckedInvoke(rpcService, domInput);
779         }
780     }
781
782     private class DefaultInvocationStrategy extends RpcInvocationStrategy {
783
784         @SuppressWarnings("rawtypes")
785         private final WeakReference<Class> inputClass;
786
787         @SuppressWarnings("rawtypes")
788         private final WeakReference<Class> outputClass;
789
790         @SuppressWarnings({ "rawtypes", "unchecked" })
791         public DefaultInvocationStrategy(final QName rpc, final Method targetMethod, final Class<?> outputClass,
792                 final Class<? extends DataContainer> inputClass) {
793             super(rpc, targetMethod);
794             this.outputClass = new WeakReference(outputClass);
795             this.inputClass = new WeakReference(inputClass);
796         }
797
798         @SuppressWarnings("unchecked")
799         @Override
800         public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) throws Exception {
801             DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
802             Future<RpcResult<?>> futureResult = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
803             if (futureResult == null) {
804                 return Rpcs.getRpcResult(false);
805             }
806             RpcResult<?> bindingResult = futureResult.get();
807             final Object resultObj = bindingResult.getResult();
808             if (resultObj instanceof DataObject) {
809                 final CompositeNode output = mappingService.toDataDom((DataObject)resultObj);
810                 return Rpcs.getRpcResult(true, output, Collections.<RpcError>emptySet());
811             }
812             return Rpcs.getRpcResult(true);
813         }
814
815         @Override
816         public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
817             if(biRpcRegistry != null) {
818                 CompositeNode xml = mappingService.toDataDom(input);
819                 CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
820                 RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
821                 Object baResultValue = null;
822                 if (result.getResult() != null) {
823                     baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult());
824                 }
825                 RpcResult<?> baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors());
826                 return Futures.<RpcResult<?>> immediateFuture(baResult);
827             }
828             return Futures.<RpcResult<?>> immediateFuture(Rpcs.getRpcResult(false));
829         }
830
831     }
832
833     private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
834
835         public NoInputNoOutputInvocationStrategy(final QName rpc, final Method targetMethod) {
836             super(rpc, targetMethod);
837         }
838
839         @Override
840         public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) throws Exception {
841             @SuppressWarnings("unchecked")
842             Future<RpcResult<Void>> result = (Future<RpcResult<Void>>) targetMethod.invoke(rpcService);
843             RpcResult<Void> bindingResult = result.get();
844             return Rpcs.getRpcResult(bindingResult.isSuccessful(), bindingResult.getErrors());
845         }
846
847         @Override
848         public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
849             return Futures.immediateFuture(null);
850         }
851     }
852
853     private class NoOutputInvocationStrategy extends RpcInvocationStrategy {
854
855
856         @SuppressWarnings("rawtypes")
857         private final WeakReference<Class> inputClass;
858
859         @SuppressWarnings({ "rawtypes", "unchecked" })
860         public NoOutputInvocationStrategy(final QName rpc, final Method targetMethod,
861                 final Class<? extends DataContainer> inputClass) {
862             super(rpc,targetMethod);
863             this.inputClass = new WeakReference(inputClass);
864         }
865
866
867         @Override
868         public RpcResult<CompositeNode> uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) throws Exception {
869             DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
870             Future<RpcResult<?>> result = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
871             if (result == null) {
872                 return Rpcs.getRpcResult(false);
873             }
874             RpcResult<?> bindingResult = result.get();
875             return Rpcs.getRpcResult(true);
876         }
877
878         @Override
879         public Future<RpcResult<?>> forwardToDomBroker(final DataObject input) {
880             if(biRpcRegistry != null) {
881                 CompositeNode xml = mappingService.toDataDom(input);
882                 CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
883                 RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
884                 Object baResultValue = null;
885                 RpcResult<?> baResult = Rpcs.<Void>getRpcResult(result.isSuccessful(), null, result.getErrors());
886                 return Futures.<RpcResult<?>>immediateFuture(baResult);
887             }
888             return Futures.<RpcResult<?>>immediateFuture(Rpcs.getRpcResult(false));
889         }
890
891     }
892
893     public boolean isRpcForwarding() {
894         return rpcForwarding;
895     }
896
897     public boolean isDataForwarding() {
898         return dataForwarding;
899     }
900
901     public boolean isNotificationForwarding() {
902         return notificationForwarding;
903     }
904
905     public BindingIndependentMappingService getMappingService() {
906         return mappingService;
907     }
908
909     public void setBindingNotificationService(final NotificationProviderService baService) {
910         this.baNotifyService = baService;
911
912     }
913
914     public void setDomNotificationService(final NotificationPublishService domService) {
915         this.domNotificationService = domService;
916     }
917
918     private class DomToBindingNotificationForwarder implements NotificationInterestListener, NotificationListener {
919
920         private final ConcurrentMap<QName, WeakReference<Class<? extends Notification>>> notifications = new ConcurrentHashMap<>();
921         private final Set<QName> supportedNotifications = new HashSet<>();
922
923         @Override
924         public Set<QName> getSupportedNotifications() {
925             return Collections.unmodifiableSet(supportedNotifications);
926         }
927
928         @Override
929         public void onNotification(final CompositeNode notification) {
930             QName qname = notification.getNodeType();
931             WeakReference<Class<? extends Notification>> potential = notifications.get(qname);
932             if (potential != null) {
933                 Class<? extends Notification> potentialClass = potential.get();
934                 if (potentialClass != null) {
935                     final DataContainer baNotification = mappingService.dataObjectFromDataDom(potentialClass,
936                             notification);
937
938                     if (baNotification instanceof Notification) {
939                         baNotifyService.publish((Notification) baNotification);
940                     }
941                 }
942             }
943         }
944
945         @Override
946         public void onNotificationSubscribtion(final Class<? extends Notification> notificationType) {
947             QName qname = BindingReflections.findQName(notificationType);
948             if (qname != null) {
949                 WeakReference<Class<? extends Notification>> already = notifications.putIfAbsent(qname,
950                         new WeakReference<Class<? extends Notification>>(notificationType));
951                 if (already == null) {
952                     domNotificationService.addNotificationListener(qname, this);
953                     supportedNotifications.add(qname);
954                 }
955             }
956         }
957     }
958 }