5bbebe6f1b00544021bb64d5cdf0ea4bf77c82a5
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / connect / dom / BindingIndependentConnector.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl.connect.dom;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.lang.ref.WeakReference;
14 import java.lang.reflect.InvocationHandler;
15 import java.lang.reflect.Method;
16 import java.lang.reflect.Proxy;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.HashMap;
20 import java.util.HashSet;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.Set;
24 import java.util.WeakHashMap;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.Future;
30
31 import org.opendaylight.controller.md.sal.binding.impl.AbstractForwardedDataBroker;
32 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
33 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
34 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
35 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
36 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
37 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
38 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
39 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
40 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
41 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
42 import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener;
43 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
44 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
45 import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
46 import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
47 import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
48 import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl;
49 import org.opendaylight.controller.sal.binding.impl.MountPointManagerImpl.BindingMountPointImpl;
50 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
51 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener;
52 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener;
53 import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions;
54 import org.opendaylight.controller.sal.common.util.Rpcs;
55 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
56 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
57 import org.opendaylight.controller.sal.core.api.Provider;
58 import org.opendaylight.controller.sal.core.api.RpcImplementation;
59 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
60 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
61 import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
62 import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
63 import org.opendaylight.yangtools.concepts.ListenerRegistration;
64 import org.opendaylight.yangtools.concepts.Registration;
65 import org.opendaylight.yangtools.yang.binding.Augmentable;
66 import org.opendaylight.yangtools.yang.binding.Augmentation;
67 import org.opendaylight.yangtools.yang.binding.BaseIdentity;
68 import org.opendaylight.yangtools.yang.binding.BindingMapping;
69 import org.opendaylight.yangtools.yang.binding.DataContainer;
70 import org.opendaylight.yangtools.yang.binding.DataObject;
71 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
72 import org.opendaylight.yangtools.yang.binding.Notification;
73 import org.opendaylight.yangtools.yang.binding.RpcService;
74 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
75 import org.opendaylight.yangtools.yang.binding.util.ClassLoaderUtils;
76 import org.opendaylight.yangtools.yang.common.QName;
77 import org.opendaylight.yangtools.yang.common.RpcError;
78 import org.opendaylight.yangtools.yang.common.RpcResult;
79 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
80 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
81 import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
82 import org.slf4j.Logger;
83 import org.slf4j.LoggerFactory;
84
85 import com.google.common.base.Function;
86 import com.google.common.base.Optional;
87 import com.google.common.collect.FluentIterable;
88 import com.google.common.collect.ImmutableSet;
89 import com.google.common.collect.ImmutableSet.Builder;
90 import com.google.common.util.concurrent.Futures;
91 import com.google.common.util.concurrent.ListenableFuture;
92
93 public class BindingIndependentConnector implements //
94         RuntimeDataProvider, //
95         Provider, //
96         AutoCloseable {
97
98     private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class);
99
100     private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
101             .builder().toInstance();
102
103     private final static Method EQUALS_METHOD;
104
105     private BindingIndependentMappingService mappingService;
106
107     private org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService;
108
109     private DataProviderService baDataService;
110
111     private final ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
112     private final ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
113
114     private final BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
115     private final DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
116
117     private Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> baCommitHandlerRegistration;
118
119     private Registration<DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode>> biCommitHandlerRegistration;
120
121     private RpcProvisionRegistry biRpcRegistry;
122     private RpcProviderRegistry baRpcRegistry;
123
124     private ListenerRegistration<DomToBindingRpcForwardingManager> domToBindingRpcManager;
125     // private ListenerRegistration<BindingToDomRpcForwardingManager>
126     // bindingToDomRpcManager;
127
128     private final Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
129
130         @Override
131         public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(final InstanceIdentifier<?> input) {
132             return mappingService.toDataDom(input);
133         }
134
135     };
136
137     private boolean rpcForwarding = false;
138
139     private boolean dataForwarding = false;
140
141     private boolean notificationForwarding = false;
142
143     private RpcProviderRegistryImpl baRpcRegistryImpl;
144
145     private NotificationProviderService baNotifyService;
146
147     private NotificationPublishService domNotificationService;
148
149     static {
150         try {
151             EQUALS_METHOD = Object.class.getMethod("equals", Object.class);
152         } catch (Exception e) {
153             throw new RuntimeException(e);
154         }
155     }
156
157     @Override
158     public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
159         try {
160             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
161             CompositeNode result = biDataService.readOperationalData(biPath);
162             return potentialAugmentationRead(path, biPath, result);
163         } catch (DeserializationException e) {
164             throw new IllegalStateException(e);
165         }
166     }
167
168     private DataObject potentialAugmentationRead(InstanceIdentifier<? extends DataObject> path,
169             final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath, final CompositeNode result)
170             throws DeserializationException {
171         Class<? extends DataObject> targetType = path.getTargetType();
172         if (Augmentation.class.isAssignableFrom(targetType)) {
173             path = mappingService.fromDataDom(biPath);
174             Class<? extends Augmentation<?>> augmentType = (Class<? extends Augmentation<?>>) targetType;
175             DataObject parentTo = mappingService.dataObjectFromDataDom(path, result);
176             if (parentTo instanceof Augmentable<?>) {
177                 return (DataObject) ((Augmentable) parentTo).getAugmentation(augmentType);
178             }
179         }
180         return mappingService.dataObjectFromDataDom(path, result);
181     }
182
183     @Override
184     public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
185         try {
186             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
187             CompositeNode result = biDataService.readConfigurationData(biPath);
188             return potentialAugmentationRead(path, biPath, result);
189         } catch (DeserializationException e) {
190             throw new IllegalStateException(e);
191         }
192     }
193
194     private DataModificationTransaction createBindingToDomTransaction(
195             final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> source) {
196         DataModificationTransaction target = biDataService.beginTransaction();
197         LOG.debug("Created DOM Transaction {} for {},", target.getIdentifier(), source.getIdentifier());
198         for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedConfigurationData()) {
199             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
200             target.removeConfigurationData(biEntry);
201             LOG.debug("Delete of Binding Configuration Data {} is translated to {}", entry, biEntry);
202         }
203         for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedOperationalData()) {
204             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
205             target.removeOperationalData(biEntry);
206             LOG.debug("Delete of Binding Operational Data {} is translated to {}", entry, biEntry);
207         }
208         for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedConfigurationData()
209                 .entrySet()) {
210             Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
211                     .toDataDom(entry);
212             target.putConfigurationData(biEntry.getKey(), biEntry.getValue());
213             LOG.debug("Update of Binding Configuration Data {} is translated to {}", entry, biEntry);
214         }
215         for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedOperationalData()
216                 .entrySet()) {
217             Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
218                     .toDataDom(entry);
219             target.putOperationalData(biEntry.getKey(), biEntry.getValue());
220             LOG.debug("Update of Binding Operational Data {} is translated to {}", entry, biEntry);
221         }
222
223         return target;
224     }
225
226     private org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction createDomToBindingTransaction(
227             final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> source) {
228         org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction target = baDataService
229                 .beginTransaction();
230         for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedConfigurationData()) {
231             try {
232
233                 InstanceIdentifier<?> baEntry = mappingService.fromDataDom(entry);
234                 target.removeConfigurationData(baEntry);
235             } catch (DeserializationException e) {
236                 LOG.error("Ommiting from BA transaction: {}.", entry, e);
237             }
238         }
239         for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedOperationalData()) {
240             try {
241
242                 InstanceIdentifier<?> baEntry = mappingService.fromDataDom(entry);
243                 target.removeOperationalData(baEntry);
244             } catch (DeserializationException e) {
245                 LOG.error("Ommiting from BA transaction: {}.", entry, e);
246             }
247         }
248         for (Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> entry : source
249                 .getUpdatedConfigurationData().entrySet()) {
250             try {
251                 InstanceIdentifier<?> baKey = mappingService.fromDataDom(entry.getKey());
252                 DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue());
253                 target.putConfigurationData(baKey, baData);
254             } catch (DeserializationException e) {
255                 LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e);
256             }
257         }
258         for (Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> entry : source
259                 .getUpdatedOperationalData().entrySet()) {
260             try {
261
262                 InstanceIdentifier<?> baKey = mappingService.fromDataDom(entry.getKey());
263                 DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue());
264                 target.putOperationalData(baKey, baData);
265             } catch (DeserializationException e) {
266                 LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e);
267             }
268         }
269         return target;
270     }
271
272     public org.opendaylight.controller.sal.core.api.data.DataProviderService getBiDataService() {
273         return biDataService;
274     }
275
276     protected void setDomDataService(
277             final org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService) {
278         this.biDataService = biDataService;
279     }
280
281     public DataProviderService getBaDataService() {
282         return baDataService;
283     }
284
285     protected void setBindingDataService(final DataProviderService baDataService) {
286         this.baDataService = baDataService;
287     }
288
289     public RpcProviderRegistry getRpcRegistry() {
290         return baRpcRegistry;
291     }
292
293     protected void setBindingRpcRegistry(final RpcProviderRegistry rpcRegistry) {
294         this.baRpcRegistry = rpcRegistry;
295     }
296
297     public void startDataForwarding() {
298         if (baDataService instanceof AbstractForwardedDataBroker) {
299             dataForwarding = true;
300             return;
301         }
302
303         final DataProviderService baData;
304         if (baDataService instanceof BindingMountPointImpl) {
305             baData = ((BindingMountPointImpl) baDataService).getDataBrokerImpl();
306             LOG.debug("Extracted BA Data provider {} from mount point {}", baData, baDataService);
307         } else {
308             baData = baDataService;
309         }
310
311         if (baData instanceof DataBrokerImpl) {
312             checkState(!dataForwarding, "Connector is already forwarding data.");
313             ((DataBrokerImpl) baData).setDataReadDelegate(this);
314             ((DataBrokerImpl) baData).setRootCommitHandler(bindingToDomCommitHandler);
315             biCommitHandlerRegistration = biDataService.registerCommitHandler(ROOT_BI, domToBindingCommitHandler);
316             baDataService.registerCommitHandlerListener(domToBindingCommitHandler);
317         }
318
319         dataForwarding = true;
320     }
321
322     public void startRpcForwarding() {
323         if (biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher<?, ?>) {
324             checkState(!rpcForwarding, "Connector is already forwarding RPCs");
325             domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(new DomToBindingRpcForwardingManager());
326             if (baRpcRegistry instanceof RpcProviderRegistryImpl) {
327                 baRpcRegistryImpl = (RpcProviderRegistryImpl) baRpcRegistry;
328                 baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance());
329                 baRpcRegistryImpl.registerGlobalRpcRegistrationListener(domToBindingRpcManager.getInstance());
330             }
331             rpcForwarding = true;
332         }
333     }
334
335     public void startNotificationForwarding() {
336         checkState(!notificationForwarding, "Connector is already forwarding notifications.");
337         if (baNotifyService != null && domNotificationService != null) {
338             baNotifyService.registerInterestListener(new DomToBindingNotificationForwarder());
339
340             notificationForwarding = true;
341         }
342     }
343
344     protected void setMappingService(final BindingIndependentMappingService mappingService) {
345         this.mappingService = mappingService;
346     }
347
348     @Override
349     public Collection<ProviderFunctionality> getProviderFunctionality() {
350         return Collections.emptyList();
351     }
352
353     @Override
354     public void onSessionInitiated(final ProviderSession session) {
355         setDomDataService(session.getService(org.opendaylight.controller.sal.core.api.data.DataProviderService.class));
356         setDomRpcRegistry(session.getService(RpcProvisionRegistry.class));
357
358     }
359
360     public <T extends RpcService> void onRpcRouterCreated(final Class<T> serviceType, final RpcRouter<T> router) {
361
362     }
363
364     public void setDomRpcRegistry(final RpcProvisionRegistry registry) {
365         biRpcRegistry = registry;
366     }
367
368     @Override
369     public void close() throws Exception {
370         if (baCommitHandlerRegistration != null) {
371             baCommitHandlerRegistration.close();
372         }
373         if (biCommitHandlerRegistration != null) {
374             biCommitHandlerRegistration.close();
375         }
376
377     }
378
379     private class DomToBindingTransaction implements
380             DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
381
382         private final org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing;
383         private final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> modification;
384
385         public DomToBindingTransaction(
386                 final org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing,
387                 final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> modification) {
388             super();
389             this.backing = backing;
390             this.modification = modification;
391             bindingOpenedTransactions.put(backing.getIdentifier(), this);
392         }
393
394         @Override
395         public DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> getModification() {
396             return modification;
397         }
398
399         @Override
400         public RpcResult<Void> rollback() throws IllegalStateException {
401             // backing.cancel();
402             return Rpcs.<Void> getRpcResult(true, null, Collections.<RpcError> emptySet());
403         }
404
405         @Override
406         public RpcResult<Void> finish() throws IllegalStateException {
407             Future<RpcResult<TransactionStatus>> result = backing.commit();
408             try {
409                 RpcResult<TransactionStatus> baResult = result.get();
410                 return Rpcs.<Void> getRpcResult(baResult.isSuccessful(), null, baResult.getErrors());
411             } catch (InterruptedException e) {
412                 throw new IllegalStateException("", e);
413             } catch (ExecutionException e) {
414                 throw new IllegalStateException("", e);
415             }
416         }
417     }
418
419     private class BindingToDomTransaction implements
420             DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> {
421
422         private final DataModificationTransaction backing;
423         private final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
424
425         public BindingToDomTransaction(final DataModificationTransaction backing,
426                 final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification) {
427             this.backing = backing;
428             this.modification = modification;
429             domOpenedTransactions.put(backing.getIdentifier(), this);
430         }
431
432         @Override
433         public DataModification<InstanceIdentifier<? extends DataObject>, DataObject> getModification() {
434             return modification;
435         }
436
437         @Override
438         public RpcResult<Void> finish() throws IllegalStateException {
439             Future<RpcResult<TransactionStatus>> result = backing.commit();
440             try {
441                 RpcResult<TransactionStatus> biResult = result.get();
442                 return Rpcs.<Void> getRpcResult(biResult.isSuccessful(), null, biResult.getErrors());
443             } catch (InterruptedException e) {
444                 throw new IllegalStateException("", e);
445             } catch (ExecutionException e) {
446                 throw new IllegalStateException("", e);
447             } finally {
448                 domOpenedTransactions.remove(backing.getIdentifier());
449             }
450         }
451
452         @Override
453         public RpcResult<Void> rollback() throws IllegalStateException {
454             domOpenedTransactions.remove(backing.getIdentifier());
455             return Rpcs.<Void> getRpcResult(true, null, Collections.<RpcError> emptySet());
456         }
457     }
458
459     private class BindingToDomCommitHandler implements
460             DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> {
461
462         @Override
463         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> requestCommit(
464                 final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> bindingTransaction) {
465
466             /**
467              * Transaction was created as DOM transaction, in that case we do
468              * not need to forward it back.
469              */
470             if (bindingOpenedTransactions.containsKey(bindingTransaction.getIdentifier())) {
471
472                 return CommitHandlerTransactions.allwaysSuccessfulTransaction(bindingTransaction);
473             }
474             DataModificationTransaction domTransaction = createBindingToDomTransaction(bindingTransaction);
475             BindingToDomTransaction wrapped = new BindingToDomTransaction(domTransaction, bindingTransaction);
476             LOG.trace("Forwarding Binding Transaction: {} as DOM Transaction: {} .",
477                     bindingTransaction.getIdentifier(), domTransaction.getIdentifier());
478             return wrapped;
479         }
480     }
481
482     private class DomToBindingCommitHandler implements //
483             RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>, //
484             DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
485
486         @Override
487         public void onRegister(
488                 final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
489
490             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier domPath = mappingService.toDataDom(registration
491                     .getPath());
492
493         }
494
495         @Override
496         public void onUnregister(
497                 final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
498             // NOOP for now
499             // FIXME: do registration based on only active commit handlers.
500         }
501
502         @Override
503         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> requestCommit(
504                 final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> domTransaction) {
505             Object identifier = domTransaction.getIdentifier();
506
507             /**
508              * We checks if the transcation was originated in this mapper. If it
509              * was originated in this mapper we are returing allways success
510              * commit hanlder to prevent creating loop in two-phase commit and
511              * duplicating data.
512              */
513             if (domOpenedTransactions.containsKey(identifier)) {
514                 return CommitHandlerTransactions.allwaysSuccessfulTransaction(domTransaction);
515             }
516
517             org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction baTransaction = createDomToBindingTransaction(domTransaction);
518             DomToBindingTransaction forwardedTransaction = new DomToBindingTransaction(baTransaction, domTransaction);
519             LOG.trace("Forwarding DOM Transaction: {} as Binding Transaction: {}.", domTransaction.getIdentifier(),
520                     baTransaction.getIdentifier());
521             return forwardedTransaction;
522         }
523     }
524
525     /**
526      * Manager responsible for instantiating forwarders responsible for
527      * forwarding of RPC invocations from DOM Broker to Binding Aware Broker
528      *
529      */
530     private class DomToBindingRpcForwardingManager implements
531             RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>, RouterInstantiationListener,
532             GlobalRpcRegistrationListener {
533
534         private final Map<Class<? extends RpcService>, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>();
535         private RpcProviderRegistryImpl registryImpl;
536
537         public RpcProviderRegistryImpl getRegistryImpl() {
538             return registryImpl;
539         }
540
541         public void setRegistryImpl(final RpcProviderRegistryImpl registryImpl) {
542             this.registryImpl = registryImpl;
543         }
544
545         @Override
546         public void onGlobalRpcRegistered(final Class<? extends RpcService> cls) {
547             getRpcForwarder(cls, null);
548         }
549
550         @Override
551         public void onGlobalRpcUnregistered(final Class<? extends RpcService> cls) {
552             // NOOP
553         }
554
555         @Override
556         public void onRpcRouterCreated(final RpcRouter<?> router) {
557             Class<? extends BaseIdentity> ctx = router.getContexts().iterator().next();
558             getRpcForwarder(router.getServiceType(), ctx);
559         }
560
561         @Override
562         public void onRouteChange(final RouteChange<RpcContextIdentifier, InstanceIdentifier<?>> change) {
563             for (Entry<RpcContextIdentifier, Set<InstanceIdentifier<?>>> entry : change.getAnnouncements().entrySet()) {
564                 bindingRoutesAdded(entry);
565             }
566         }
567
568         private void bindingRoutesAdded(final Entry<RpcContextIdentifier, Set<InstanceIdentifier<?>>> entry) {
569             Class<? extends BaseIdentity> context = entry.getKey().getRoutingContext();
570             Class<? extends RpcService> service = entry.getKey().getRpcService();
571             if (context != null) {
572                 getRpcForwarder(service, context).registerPaths(context, service, entry.getValue());
573             }
574         }
575
576         private DomToBindingRpcForwarder getRpcForwarder(final Class<? extends RpcService> service,
577                 final Class<? extends BaseIdentity> context) {
578             DomToBindingRpcForwarder potential = forwarders.get(service);
579             if (potential != null) {
580                 return potential;
581             }
582             if (context == null) {
583                 potential = new DomToBindingRpcForwarder(service);
584             } else {
585                 potential = new DomToBindingRpcForwarder(service, context);
586             }
587
588             forwarders.put(service, potential);
589             return potential;
590         }
591
592     }
593
594     private class DomToBindingRpcForwarder implements RpcImplementation, InvocationHandler {
595
596         private final Set<QName> supportedRpcs;
597         private final WeakReference<Class<? extends RpcService>> rpcServiceType;
598         private final Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
599         private final Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
600         private final WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
601
602         public DomToBindingRpcForwarder(final Class<? extends RpcService> service) {
603             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
604             this.supportedRpcs = mappingService.getRpcQNamesFor(service);
605             try {
606                 for (QName rpc : supportedRpcs) {
607                     RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
608                     strategiesByMethod.put(strategy.targetMethod, strategy);
609                     strategiesByQName.put(rpc, strategy);
610                     biRpcRegistry.addRpcImplementation(rpc, this);
611                 }
612
613             } catch (Exception e) {
614                 LOG.error("Could not forward Rpcs of type {}", service.getName(), e);
615             }
616             registrations = ImmutableSet.of();
617         }
618
619         /**
620          * Constructor for Routed RPC Forwareder.
621          *
622          * @param service
623          * @param context
624          */
625         public DomToBindingRpcForwarder(final Class<? extends RpcService> service,
626                 final Class<? extends BaseIdentity> context) {
627             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
628             this.supportedRpcs = mappingService.getRpcQNamesFor(service);
629             Builder<RoutedRpcRegistration> registrationsBuilder = ImmutableSet
630                     .<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> builder();
631             try {
632                 for (QName rpc : supportedRpcs) {
633                     RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service);
634                     strategiesByMethod.put(strategy.targetMethod, strategy);
635                     strategiesByQName.put(rpc, strategy);
636                     registrationsBuilder.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this));
637                 }
638                 createDefaultDomForwarder();
639             } catch (Exception e) {
640                 LOG.error("Could not forward Rpcs of type {}", service.getName(), e);
641             }
642             registrations = registrationsBuilder.build();
643         }
644
645         public void registerPaths(final Class<? extends BaseIdentity> context,
646                 final Class<? extends RpcService> service, final Set<InstanceIdentifier<?>> set) {
647             QName ctx = BindingReflections.findQName(context);
648             for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
649                     toDOMInstanceIdentifier)) {
650                 for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) {
651                     reg.registerPath(ctx, path);
652                 }
653             }
654         }
655
656         @Override
657         public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
658             if (EQUALS_METHOD.equals(method)) {
659                 return false;
660             }
661             RpcInvocationStrategy strategy = strategiesByMethod.get(method);
662             checkState(strategy != null);
663             checkArgument(args.length <= 2);
664             if (args.length == 1) {
665                 checkArgument(args[0] instanceof DataObject);
666                 return strategy.forwardToDomBroker((DataObject) args[0]);
667             }
668             return strategy.forwardToDomBroker(null);
669         }
670
671         public void removePaths(final Class<? extends BaseIdentity> context, final Class<? extends RpcService> service,
672                 final Set<InstanceIdentifier<?>> set) {
673             QName ctx = BindingReflections.findQName(context);
674             for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
675                     toDOMInstanceIdentifier)) {
676                 for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) {
677                     reg.unregisterPath(ctx, path);
678                 }
679             }
680         }
681
682         @Override
683         public Set<QName> getSupportedRpcs() {
684             return supportedRpcs;
685         }
686
687         @SuppressWarnings({ "unchecked", "rawtypes" })
688         public void createDefaultDomForwarder() {
689             if (baRpcRegistryImpl != null) {
690                 Class<?> cls = rpcServiceType.get();
691                 ClassLoader clsLoader = cls.getClassLoader();
692                 RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class<?>[] { cls }, this);
693
694                 RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get());
695                 rpcRouter.registerDefaultService(proxy);
696             }
697         }
698
699         @Override
700         public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final CompositeNode domInput) {
701             checkArgument(rpc != null);
702             checkArgument(domInput != null);
703
704             Class<? extends RpcService> rpcType = rpcServiceType.get();
705             checkState(rpcType != null);
706             RpcService rpcService = baRpcRegistry.getRpcService(rpcType);
707             checkState(rpcService != null);
708             CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
709
710             try {
711                 return Futures.immediateFuture(resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput));
712             } catch (Exception e) {
713                 return Futures.immediateFailedFuture(e);
714             }
715         }
716
717         private RpcInvocationStrategy resolveInvocationStrategy(final QName rpc) {
718             return strategiesByQName.get(rpc);
719         }
720
721         private RpcInvocationStrategy createInvocationStrategy(final QName rpc,
722                 final Class<? extends RpcService> rpcType) throws Exception {
723             return ClassLoaderUtils.withClassLoader(rpcType.getClassLoader(), new Callable<RpcInvocationStrategy>() {
724                 @Override
725                 public RpcInvocationStrategy call() throws Exception {
726                     String methodName = BindingMapping.getMethodName(rpc);
727                     Method targetMethod = null;
728                     for (Method possibleMethod : rpcType.getMethods()) {
729                         if (possibleMethod.getName().equals(methodName)
730                                 && BindingReflections.isRpcMethod(possibleMethod)) {
731                             targetMethod = possibleMethod;
732                             break;
733                         }
734                     }
735                     checkState(targetMethod != null, "Rpc method not found");
736                     return  new RpcInvocationStrategy(rpc,targetMethod, mappingService, biRpcRegistry);
737                 }
738
739             });
740         }
741     }
742
743     public boolean isRpcForwarding() {
744         return rpcForwarding;
745     }
746
747     public boolean isDataForwarding() {
748         return dataForwarding;
749     }
750
751     public boolean isNotificationForwarding() {
752         return notificationForwarding;
753     }
754
755     public BindingIndependentMappingService getMappingService() {
756         return mappingService;
757     }
758
759     public void setBindingNotificationService(final NotificationProviderService baService) {
760         this.baNotifyService = baService;
761
762     }
763
764     public void setDomNotificationService(final NotificationPublishService domService) {
765         this.domNotificationService = domService;
766     }
767
768     private class DomToBindingNotificationForwarder implements NotificationInterestListener, NotificationListener {
769
770         private final ConcurrentMap<QName, WeakReference<Class<? extends Notification>>> notifications = new ConcurrentHashMap<>();
771         private final Set<QName> supportedNotifications = new HashSet<>();
772
773         @Override
774         public Set<QName> getSupportedNotifications() {
775             return Collections.unmodifiableSet(supportedNotifications);
776         }
777
778         @Override
779         public void onNotification(final CompositeNode notification) {
780             QName qname = notification.getNodeType();
781             WeakReference<Class<? extends Notification>> potential = notifications.get(qname);
782             if (potential != null) {
783                 Class<? extends Notification> potentialClass = potential.get();
784                 if (potentialClass != null) {
785                     final DataContainer baNotification = mappingService.dataObjectFromDataDom(potentialClass,
786                             notification);
787
788                     if (baNotification instanceof Notification) {
789                         baNotifyService.publish((Notification) baNotification);
790                     }
791                 }
792             }
793         }
794
795         @Override
796         public void onNotificationSubscribtion(final Class<? extends Notification> notificationType) {
797             QName qname = BindingReflections.findQName(notificationType);
798             if (qname != null) {
799                 WeakReference<Class<? extends Notification>> already = notifications.putIfAbsent(qname,
800                         new WeakReference<Class<? extends Notification>>(notificationType));
801                 if (already == null) {
802                     domNotificationService.addNotificationListener(qname, this);
803                     supportedNotifications.add(qname);
804                 }
805             }
806         }
807     }
808 }