Bug 1339: Added doublecheck for RPC registration
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / connect / dom / BindingIndependentConnector.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl.connect.dom;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.lang.ref.WeakReference;
14 import java.lang.reflect.InvocationHandler;
15 import java.lang.reflect.Method;
16 import java.lang.reflect.Proxy;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.HashMap;
20 import java.util.HashSet;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.Set;
24 import java.util.WeakHashMap;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.Future;
30
31 import org.opendaylight.controller.md.sal.binding.impl.AbstractForwardedDataBroker;
32 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
33 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
34 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
35 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
36 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
37 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
38 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
39 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
40 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
41 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
42 import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener;
43 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
44 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
45 import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
46 import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
47 import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
48 import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl;
49 import org.opendaylight.controller.sal.binding.impl.MountPointManagerImpl.BindingMountPointImpl;
50 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
51 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener;
52 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener;
53 import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions;
54 import org.opendaylight.controller.sal.common.util.Rpcs;
55 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
56 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
57 import org.opendaylight.controller.sal.core.api.Provider;
58 import org.opendaylight.controller.sal.core.api.RpcImplementation;
59 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
60 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
61 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
62 import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
63 import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
64 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
65 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration.CompositeObjectRegistrationBuilder;
66 import org.opendaylight.yangtools.concepts.ListenerRegistration;
67 import org.opendaylight.yangtools.concepts.ObjectRegistration;
68 import org.opendaylight.yangtools.concepts.Registration;
69 import org.opendaylight.yangtools.yang.binding.Augmentable;
70 import org.opendaylight.yangtools.yang.binding.Augmentation;
71 import org.opendaylight.yangtools.yang.binding.BaseIdentity;
72 import org.opendaylight.yangtools.yang.binding.BindingMapping;
73 import org.opendaylight.yangtools.yang.binding.DataContainer;
74 import org.opendaylight.yangtools.yang.binding.DataObject;
75 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
76 import org.opendaylight.yangtools.yang.binding.Notification;
77 import org.opendaylight.yangtools.yang.binding.RpcService;
78 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
79 import org.opendaylight.yangtools.yang.binding.util.ClassLoaderUtils;
80 import org.opendaylight.yangtools.yang.common.QName;
81 import org.opendaylight.yangtools.yang.common.RpcError;
82 import org.opendaylight.yangtools.yang.common.RpcResult;
83 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
84 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
85 import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
86 import org.slf4j.Logger;
87 import org.slf4j.LoggerFactory;
88
89 import com.google.common.base.Function;
90 import com.google.common.base.Optional;
91 import com.google.common.collect.FluentIterable;
92 import com.google.common.collect.ImmutableSet;
93 import com.google.common.collect.ImmutableSet.Builder;
94 import com.google.common.util.concurrent.Futures;
95 import com.google.common.util.concurrent.ListenableFuture;
96
97 public class BindingIndependentConnector implements //
98         RuntimeDataProvider, //
99         Provider, //
100         AutoCloseable {
101
102     private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class);
103
104     private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
105             .builder().toInstance();
106
107     private final static Method EQUALS_METHOD;
108
109     private BindingIndependentMappingService mappingService;
110
111     private org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService;
112
113     private DataProviderService baDataService;
114
115     private final ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
116     private final ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
117
118     private final BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
119     private final DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
120
121     private Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> baCommitHandlerRegistration;
122
123     private Registration<DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode>> biCommitHandlerRegistration;
124
125     private RpcProvisionRegistry biRpcRegistry;
126     private RpcProviderRegistry baRpcRegistry;
127
128     private ListenerRegistration<DomToBindingRpcForwardingManager> domToBindingRpcManager;
129     // private ListenerRegistration<BindingToDomRpcForwardingManager>
130     // bindingToDomRpcManager;
131
132     private final Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
133
134         @Override
135         public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(final InstanceIdentifier<?> input) {
136             return mappingService.toDataDom(input);
137         }
138
139     };
140
141     private boolean rpcForwarding = false;
142
143     private boolean dataForwarding = false;
144
145     private boolean notificationForwarding = false;
146
147     private RpcProviderRegistryImpl baRpcRegistryImpl;
148
149     private NotificationProviderService baNotifyService;
150
151     private NotificationPublishService domNotificationService;
152
153     static {
154         try {
155             EQUALS_METHOD = Object.class.getMethod("equals", Object.class);
156         } catch (Exception e) {
157             throw new RuntimeException(e);
158         }
159     }
160
161     @Override
162     public DataObject readOperationalData(final InstanceIdentifier<? extends DataObject> path) {
163         try {
164             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
165             CompositeNode result = biDataService.readOperationalData(biPath);
166             return potentialAugmentationRead(path, biPath, result);
167         } catch (DeserializationException e) {
168             throw new IllegalStateException(e);
169         }
170     }
171
172     private DataObject potentialAugmentationRead(InstanceIdentifier<? extends DataObject> path,
173             final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath, final CompositeNode result)
174             throws DeserializationException {
175         Class<? extends DataObject> targetType = path.getTargetType();
176         if (Augmentation.class.isAssignableFrom(targetType)) {
177             path = mappingService.fromDataDom(biPath);
178             Class<? extends Augmentation<?>> augmentType = (Class<? extends Augmentation<?>>) targetType;
179             DataObject parentTo = mappingService.dataObjectFromDataDom(path, result);
180             if (parentTo instanceof Augmentable<?>) {
181                 return (DataObject) ((Augmentable) parentTo).getAugmentation(augmentType);
182             }
183         }
184         return mappingService.dataObjectFromDataDom(path, result);
185     }
186
187     @Override
188     public DataObject readConfigurationData(final InstanceIdentifier<? extends DataObject> path) {
189         try {
190             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
191             CompositeNode result = biDataService.readConfigurationData(biPath);
192             return potentialAugmentationRead(path, biPath, result);
193         } catch (DeserializationException e) {
194             throw new IllegalStateException(e);
195         }
196     }
197
198     private DataModificationTransaction createBindingToDomTransaction(
199             final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> source) {
200         DataModificationTransaction target = biDataService.beginTransaction();
201         LOG.debug("Created DOM Transaction {} for {},", target.getIdentifier(), source.getIdentifier());
202         for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedConfigurationData()) {
203             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
204             target.removeConfigurationData(biEntry);
205             LOG.debug("Delete of Binding Configuration Data {} is translated to {}", entry, biEntry);
206         }
207         for (InstanceIdentifier<? extends DataObject> entry : source.getRemovedOperationalData()) {
208             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biEntry = mappingService.toDataDom(entry);
209             target.removeOperationalData(biEntry);
210             LOG.debug("Delete of Binding Operational Data {} is translated to {}", entry, biEntry);
211         }
212         for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedConfigurationData()
213                 .entrySet()) {
214             Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
215                     .toDataDom(entry);
216             target.putConfigurationData(biEntry.getKey(), biEntry.getValue());
217             LOG.debug("Update of Binding Configuration Data {} is translated to {}", entry, biEntry);
218         }
219         for (Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : source.getUpdatedOperationalData()
220                 .entrySet()) {
221             Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> biEntry = mappingService
222                     .toDataDom(entry);
223             target.putOperationalData(biEntry.getKey(), biEntry.getValue());
224             LOG.debug("Update of Binding Operational Data {} is translated to {}", entry, biEntry);
225         }
226
227         return target;
228     }
229
230     private org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction createDomToBindingTransaction(
231             final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> source) {
232         org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction target = baDataService
233                 .beginTransaction();
234         for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedConfigurationData()) {
235             try {
236
237                 InstanceIdentifier<?> baEntry = mappingService.fromDataDom(entry);
238                 target.removeConfigurationData(baEntry);
239             } catch (DeserializationException e) {
240                 LOG.error("Ommiting from BA transaction: {}.", entry, e);
241             }
242         }
243         for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry : source.getRemovedOperationalData()) {
244             try {
245
246                 InstanceIdentifier<?> baEntry = mappingService.fromDataDom(entry);
247                 target.removeOperationalData(baEntry);
248             } catch (DeserializationException e) {
249                 LOG.error("Ommiting from BA transaction: {}.", entry, e);
250             }
251         }
252         for (Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> entry : source
253                 .getUpdatedConfigurationData().entrySet()) {
254             try {
255                 InstanceIdentifier<?> baKey = mappingService.fromDataDom(entry.getKey());
256                 DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue());
257                 target.putConfigurationData(baKey, baData);
258             } catch (DeserializationException e) {
259                 LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e);
260             }
261         }
262         for (Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> entry : source
263                 .getUpdatedOperationalData().entrySet()) {
264             try {
265
266                 InstanceIdentifier<?> baKey = mappingService.fromDataDom(entry.getKey());
267                 DataObject baData = mappingService.dataObjectFromDataDom(baKey, entry.getValue());
268                 target.putOperationalData(baKey, baData);
269             } catch (DeserializationException e) {
270                 LOG.error("Ommiting from BA transaction: {}.", entry.getKey(), e);
271             }
272         }
273         return target;
274     }
275
276     public org.opendaylight.controller.sal.core.api.data.DataProviderService getBiDataService() {
277         return biDataService;
278     }
279
280     protected void setDomDataService(
281             final org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService) {
282         this.biDataService = biDataService;
283     }
284
285     public DataProviderService getBaDataService() {
286         return baDataService;
287     }
288
289     protected void setBindingDataService(final DataProviderService baDataService) {
290         this.baDataService = baDataService;
291     }
292
293     public RpcProviderRegistry getRpcRegistry() {
294         return baRpcRegistry;
295     }
296
297     protected void setBindingRpcRegistry(final RpcProviderRegistry rpcRegistry) {
298         this.baRpcRegistry = rpcRegistry;
299     }
300
301     public void startDataForwarding() {
302         if (baDataService instanceof AbstractForwardedDataBroker) {
303             dataForwarding = true;
304             return;
305         }
306
307         final DataProviderService baData;
308         if (baDataService instanceof BindingMountPointImpl) {
309             baData = ((BindingMountPointImpl) baDataService).getDataBrokerImpl();
310             LOG.debug("Extracted BA Data provider {} from mount point {}", baData, baDataService);
311         } else {
312             baData = baDataService;
313         }
314
315         if (baData instanceof DataBrokerImpl) {
316             checkState(!dataForwarding, "Connector is already forwarding data.");
317             ((DataBrokerImpl) baData).setDataReadDelegate(this);
318             ((DataBrokerImpl) baData).setRootCommitHandler(bindingToDomCommitHandler);
319             biCommitHandlerRegistration = biDataService.registerCommitHandler(ROOT_BI, domToBindingCommitHandler);
320             baDataService.registerCommitHandlerListener(domToBindingCommitHandler);
321         }
322
323         dataForwarding = true;
324     }
325
326     public void startRpcForwarding() {
327         if (biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher<?, ?>) {
328             checkState(!rpcForwarding, "Connector is already forwarding RPCs");
329             final DomToBindingRpcForwardingManager biFwdManager = new DomToBindingRpcForwardingManager();
330
331             domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(biFwdManager);
332             biRpcRegistry.addRpcRegistrationListener(biFwdManager);
333             if (baRpcRegistry instanceof RpcProviderRegistryImpl) {
334                 baRpcRegistryImpl = (RpcProviderRegistryImpl) baRpcRegistry;
335                 baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance());
336                 baRpcRegistryImpl.registerGlobalRpcRegistrationListener(domToBindingRpcManager.getInstance());
337             }
338             rpcForwarding = true;
339         }
340     }
341
342     public void startNotificationForwarding() {
343         checkState(!notificationForwarding, "Connector is already forwarding notifications.");
344         if (baNotifyService != null && domNotificationService != null) {
345             baNotifyService.registerInterestListener(new DomToBindingNotificationForwarder());
346
347             notificationForwarding = true;
348         }
349     }
350
351     protected void setMappingService(final BindingIndependentMappingService mappingService) {
352         this.mappingService = mappingService;
353     }
354
355     @Override
356     public Collection<ProviderFunctionality> getProviderFunctionality() {
357         return Collections.emptyList();
358     }
359
360     @Override
361     public void onSessionInitiated(final ProviderSession session) {
362         setDomDataService(session.getService(org.opendaylight.controller.sal.core.api.data.DataProviderService.class));
363         setDomRpcRegistry(session.getService(RpcProvisionRegistry.class));
364
365     }
366
367     public <T extends RpcService> void onRpcRouterCreated(final Class<T> serviceType, final RpcRouter<T> router) {
368
369     }
370
371     public void setDomRpcRegistry(final RpcProvisionRegistry registry) {
372         biRpcRegistry = registry;
373     }
374
375     @Override
376     public void close() throws Exception {
377         if (baCommitHandlerRegistration != null) {
378             baCommitHandlerRegistration.close();
379         }
380         if (biCommitHandlerRegistration != null) {
381             biCommitHandlerRegistration.close();
382         }
383
384     }
385
386     private class DomToBindingTransaction implements
387             DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
388
389         private final org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing;
390         private final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> modification;
391
392         public DomToBindingTransaction(
393                 final org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction backing,
394                 final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> modification) {
395             super();
396             this.backing = backing;
397             this.modification = modification;
398             bindingOpenedTransactions.put(backing.getIdentifier(), this);
399         }
400
401         @Override
402         public DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> getModification() {
403             return modification;
404         }
405
406         @Override
407         public RpcResult<Void> rollback() throws IllegalStateException {
408             // backing.cancel();
409             return Rpcs.<Void> getRpcResult(true, null, Collections.<RpcError> emptySet());
410         }
411
412         @Override
413         public RpcResult<Void> finish() throws IllegalStateException {
414             Future<RpcResult<TransactionStatus>> result = backing.commit();
415             try {
416                 RpcResult<TransactionStatus> baResult = result.get();
417                 return Rpcs.<Void> getRpcResult(baResult.isSuccessful(), null, baResult.getErrors());
418             } catch (InterruptedException e) {
419                 throw new IllegalStateException("", e);
420             } catch (ExecutionException e) {
421                 throw new IllegalStateException("", e);
422             }
423         }
424     }
425
426     private class BindingToDomTransaction implements
427             DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> {
428
429         private final DataModificationTransaction backing;
430         private final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
431
432         public BindingToDomTransaction(final DataModificationTransaction backing,
433                 final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification) {
434             this.backing = backing;
435             this.modification = modification;
436             domOpenedTransactions.put(backing.getIdentifier(), this);
437         }
438
439         @Override
440         public DataModification<InstanceIdentifier<? extends DataObject>, DataObject> getModification() {
441             return modification;
442         }
443
444         @Override
445         public RpcResult<Void> finish() throws IllegalStateException {
446             Future<RpcResult<TransactionStatus>> result = backing.commit();
447             try {
448                 RpcResult<TransactionStatus> biResult = result.get();
449                 return Rpcs.<Void> getRpcResult(biResult.isSuccessful(), null, biResult.getErrors());
450             } catch (InterruptedException e) {
451                 throw new IllegalStateException("", e);
452             } catch (ExecutionException e) {
453                 throw new IllegalStateException("", e);
454             } finally {
455                 domOpenedTransactions.remove(backing.getIdentifier());
456             }
457         }
458
459         @Override
460         public RpcResult<Void> rollback() throws IllegalStateException {
461             domOpenedTransactions.remove(backing.getIdentifier());
462             return Rpcs.<Void> getRpcResult(true, null, Collections.<RpcError> emptySet());
463         }
464     }
465
466     private class BindingToDomCommitHandler implements
467             DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> {
468
469         @Override
470         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> requestCommit(
471                 final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> bindingTransaction) {
472
473             /**
474              * Transaction was created as DOM transaction, in that case we do
475              * not need to forward it back.
476              */
477             if (bindingOpenedTransactions.containsKey(bindingTransaction.getIdentifier())) {
478
479                 return CommitHandlerTransactions.allwaysSuccessfulTransaction(bindingTransaction);
480             }
481             DataModificationTransaction domTransaction = createBindingToDomTransaction(bindingTransaction);
482             BindingToDomTransaction wrapped = new BindingToDomTransaction(domTransaction, bindingTransaction);
483             LOG.trace("Forwarding Binding Transaction: {} as DOM Transaction: {} .",
484                     bindingTransaction.getIdentifier(), domTransaction.getIdentifier());
485             return wrapped;
486         }
487     }
488
489     private class DomToBindingCommitHandler implements //
490             RegistrationListener<DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject>>, //
491             DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> {
492
493         @Override
494         public void onRegister(
495                 final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
496
497             mappingService.toDataDom(registration
498                     .getPath());
499
500         }
501
502         @Override
503         public void onUnregister(
504                 final DataCommitHandlerRegistration<InstanceIdentifier<? extends DataObject>, DataObject> registration) {
505             // NOOP for now
506             // FIXME: do registration based on only active commit handlers.
507         }
508
509         @Override
510         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> requestCommit(
511                 final DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> domTransaction) {
512             Object identifier = domTransaction.getIdentifier();
513
514             /**
515              * We checks if the transcation was originated in this mapper. If it
516              * was originated in this mapper we are returing allways success
517              * commit hanlder to prevent creating loop in two-phase commit and
518              * duplicating data.
519              */
520             if (domOpenedTransactions.containsKey(identifier)) {
521                 return CommitHandlerTransactions.allwaysSuccessfulTransaction(domTransaction);
522             }
523
524             org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction baTransaction = createDomToBindingTransaction(domTransaction);
525             DomToBindingTransaction forwardedTransaction = new DomToBindingTransaction(baTransaction, domTransaction);
526             LOG.trace("Forwarding DOM Transaction: {} as Binding Transaction: {}.", domTransaction.getIdentifier(),
527                     baTransaction.getIdentifier());
528             return forwardedTransaction;
529         }
530     }
531
532     /**
533      * Manager responsible for instantiating forwarders responsible for
534      * forwarding of RPC invocations from DOM Broker to Binding Aware Broker
535      *
536      */
537     private class DomToBindingRpcForwardingManager implements
538             RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>, RouterInstantiationListener,
539             GlobalRpcRegistrationListener, RpcRegistrationListener {
540
541         private final Map<Class<? extends RpcService>, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>();
542         private RpcProviderRegistryImpl registryImpl;
543
544         public RpcProviderRegistryImpl getRegistryImpl() {
545             return registryImpl;
546         }
547
548         public void setRegistryImpl(final RpcProviderRegistryImpl registryImpl) {
549             this.registryImpl = registryImpl;
550         }
551
552         @Override
553         public void onGlobalRpcRegistered(final Class<? extends RpcService> cls) {
554             getRpcForwarder(cls, null).registerToDOMBroker();
555         }
556
557         @Override
558         public void onGlobalRpcUnregistered(final Class<? extends RpcService> cls) {
559             // NOOP
560         }
561
562         @Override
563         public void onRpcRouterCreated(final RpcRouter<?> router) {
564             Class<? extends BaseIdentity> ctx = router.getContexts().iterator().next();
565             getRpcForwarder(router.getServiceType(), ctx);
566         }
567
568         @Override
569         public void onRouteChange(final RouteChange<RpcContextIdentifier, InstanceIdentifier<?>> change) {
570             for (Entry<RpcContextIdentifier, Set<InstanceIdentifier<?>>> entry : change.getAnnouncements().entrySet()) {
571                 bindingRoutesAdded(entry);
572             }
573         }
574
575         private void bindingRoutesAdded(final Entry<RpcContextIdentifier, Set<InstanceIdentifier<?>>> entry) {
576             Class<? extends BaseIdentity> context = entry.getKey().getRoutingContext();
577             Class<? extends RpcService> service = entry.getKey().getRpcService();
578             if (context != null) {
579                 getRpcForwarder(service, context).registerPaths(context, service, entry.getValue());
580             }
581         }
582
583         private DomToBindingRpcForwarder getRpcForwarder(final Class<? extends RpcService> service,
584                 final Class<? extends BaseIdentity> context) {
585             DomToBindingRpcForwarder potential = forwarders.get(service);
586             if (potential != null) {
587                 return potential;
588             }
589             if (context == null) {
590                 potential = new DomToBindingRpcForwarder(service);
591             } else {
592                 potential = new DomToBindingRpcForwarder(service, context);
593             }
594
595             forwarders.put(service, potential);
596             return potential;
597         }
598
599         @Override
600         public void onRpcImplementationAdded(final QName name) {
601
602             final Optional<Class<? extends RpcService>> rpcInterface = mappingService.getRpcServiceClassFor(
603                     name.getNamespace().toString(), name.getFormattedRevision());
604             if (rpcInterface.isPresent()) {
605                 getRpcForwarder(rpcInterface.get(), null).registerToBindingBroker();
606             }
607         }
608
609         @Override
610         public void onRpcImplementationRemoved(final QName name) {
611
612         }
613     }
614
615     private class DomToBindingRpcForwarder implements RpcImplementation, InvocationHandler {
616
617         private final Set<QName> supportedRpcs;
618         private final WeakReference<Class<? extends RpcService>> rpcServiceType;
619         private Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
620         private final Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
621         private final WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
622         private final RpcService proxy;
623         private ObjectRegistration<?> forwarderRegistration;
624         private boolean registrationInProgress = false;
625
626         public DomToBindingRpcForwarder(final Class<? extends RpcService> service) {
627             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
628             this.supportedRpcs = mappingService.getRpcQNamesFor(service);
629
630             Class<?> cls = rpcServiceType.get();
631             ClassLoader clsLoader = cls.getClassLoader();
632             proxy =(RpcService) Proxy.newProxyInstance(clsLoader, new Class<?>[] { cls }, this);
633             createStrategies();
634         }
635
636         /**
637          * Constructor for Routed RPC Forwareder.
638          *
639          * @param service
640          * @param context
641          */
642         public DomToBindingRpcForwarder(final Class<? extends RpcService> service,
643                                         final Class<? extends BaseIdentity> context) {
644             this(service);
645             Builder<RoutedRpcRegistration> registrationsBuilder = ImmutableSet
646                     .<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> builder();
647             try {
648                 for (QName rpc : supportedRpcs) {
649                     registrationsBuilder.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this));
650                 }
651                 createDefaultDomForwarder();
652             } catch (Exception e) {
653                 LOG.error("Could not forward Rpcs of type {}", service.getName(), e);
654             }
655             registrations = registrationsBuilder.build();
656         }
657
658
659
660         private void createStrategies() {
661             try {
662                 for (QName rpc : supportedRpcs) {
663                     RpcInvocationStrategy strategy = createInvocationStrategy(rpc, rpcServiceType.get());
664                     strategiesByMethod.put(strategy.targetMethod, strategy);
665                     strategiesByQName.put(rpc, strategy);
666                 }
667             } catch (Exception e) {
668                 LOG.error("Could not forward Rpcs of type {}", rpcServiceType.get(), e);
669             }
670
671         }
672
673         /**
674          * Registers RPC Forwarder to DOM Broker,
675          * this means Binding Aware Broker has implementation of RPC
676          * which is registered to it.
677          *
678          * If RPC Forwarder was previously registered to DOM Broker
679          * or to Bidning Broker this method is noop to prevent
680          * creating forwarding loop.
681          *
682          */
683         public void registerToDOMBroker() {
684             if(!registrationInProgress && forwarderRegistration == null) {
685                 registrationInProgress = true;
686                 CompositeObjectRegistrationBuilder<DomToBindingRpcForwarder> builder = CompositeObjectRegistration.builderFor(this);
687                 try {
688                     for (QName rpc : supportedRpcs) {
689                         builder.add(biRpcRegistry.addRpcImplementation(rpc, this));
690                     }
691                 } catch (Exception e) {
692                     LOG.error("Could not forward Rpcs of type {}", rpcServiceType.get(), e);
693                 }
694                 this.forwarderRegistration = builder.toInstance();
695                 registrationInProgress = false;
696             }
697         }
698
699
700         public void registerPaths(final Class<? extends BaseIdentity> context,
701                 final Class<? extends RpcService> service, final Set<InstanceIdentifier<?>> set) {
702             QName ctx = BindingReflections.findQName(context);
703             for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
704                     toDOMInstanceIdentifier)) {
705                 for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) {
706                     reg.registerPath(ctx, path);
707                 }
708             }
709         }
710
711         @Override
712         public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
713             if (EQUALS_METHOD.equals(method)) {
714                 return false;
715             }
716             RpcInvocationStrategy strategy = strategiesByMethod.get(method);
717             checkState(strategy != null);
718             checkArgument(args.length <= 2);
719             if (args.length == 1) {
720                 checkArgument(args[0] instanceof DataObject);
721                 return strategy.forwardToDomBroker((DataObject) args[0]);
722             }
723             return strategy.forwardToDomBroker(null);
724         }
725
726         public void removePaths(final Class<? extends BaseIdentity> context, final Class<? extends RpcService> service,
727                 final Set<InstanceIdentifier<?>> set) {
728             QName ctx = BindingReflections.findQName(context);
729             for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
730                     toDOMInstanceIdentifier)) {
731                 for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) {
732                     reg.unregisterPath(ctx, path);
733                 }
734             }
735         }
736
737         @Override
738         public Set<QName> getSupportedRpcs() {
739             return supportedRpcs;
740         }
741
742         @SuppressWarnings({ "unchecked", "rawtypes" })
743         public void createDefaultDomForwarder() {
744             if (baRpcRegistryImpl != null) {
745                 Class<?> cls = rpcServiceType.get();
746                 ClassLoader clsLoader = cls.getClassLoader();
747                 RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class<?>[] { cls }, this);
748
749                 RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get());
750                 rpcRouter.registerDefaultService(proxy);
751             }
752         }
753
754         @Override
755         public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final CompositeNode domInput) {
756             checkArgument(rpc != null);
757             checkArgument(domInput != null);
758
759             Class<? extends RpcService> rpcType = rpcServiceType.get();
760             checkState(rpcType != null);
761             RpcService rpcService = baRpcRegistry.getRpcService(rpcType);
762             checkState(rpcService != null);
763             CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
764
765             try {
766                 return Futures.immediateFuture(resolveInvocationStrategy(rpc).invokeOn(rpcService, domUnwrappedInput));
767             } catch (Exception e) {
768                 return Futures.immediateFailedFuture(e);
769             }
770         }
771
772         private RpcInvocationStrategy resolveInvocationStrategy(final QName rpc) {
773             return strategiesByQName.get(rpc);
774         }
775
776         private RpcInvocationStrategy createInvocationStrategy(final QName rpc,
777                 final Class<? extends RpcService> rpcType) throws Exception {
778             return ClassLoaderUtils.withClassLoader(rpcType.getClassLoader(), new Callable<RpcInvocationStrategy>() {
779                 @Override
780                 public RpcInvocationStrategy call() throws Exception {
781                     String methodName = BindingMapping.getMethodName(rpc);
782                     Method targetMethod = null;
783                     for (Method possibleMethod : rpcType.getMethods()) {
784                         if (possibleMethod.getName().equals(methodName)
785                                 && BindingReflections.isRpcMethod(possibleMethod)) {
786                             targetMethod = possibleMethod;
787                             break;
788                         }
789                     }
790                     checkState(targetMethod != null, "Rpc method not found");
791                     return  new RpcInvocationStrategy(rpc,targetMethod, mappingService, biRpcRegistry);
792                 }
793
794             });
795         }
796
797         /**
798          * Registers RPC Forwarder to Binding Broker,
799          * this means DOM Broekr has implementation of RPC
800          * which is registered to it.
801          *
802          * If RPC Forwarder was previously registered to DOM Broker
803          * or to Bidning Broker this method is noop to prevent
804          * creating forwarding loop.
805          *
806          */
807         public void registerToBindingBroker() {
808             if(!registrationInProgress && forwarderRegistration == null) {
809                try {
810                    registrationInProgress = true;
811                    this.forwarderRegistration = baRpcRegistry.addRpcImplementation((Class)rpcServiceType.get(), proxy);
812                } catch (Exception e) {
813                    LOG.error("Unable to forward RPCs for {}",rpcServiceType.get(),e);
814                } finally {
815                    registrationInProgress = false;
816                }
817             }
818         }
819     }
820
821     public boolean isRpcForwarding() {
822         return rpcForwarding;
823     }
824
825     public boolean isDataForwarding() {
826         return dataForwarding;
827     }
828
829     public boolean isNotificationForwarding() {
830         return notificationForwarding;
831     }
832
833     public BindingIndependentMappingService getMappingService() {
834         return mappingService;
835     }
836
837     public void setBindingNotificationService(final NotificationProviderService baService) {
838         this.baNotifyService = baService;
839
840     }
841
842     public void setDomNotificationService(final NotificationPublishService domService) {
843         this.domNotificationService = domService;
844     }
845
846     private class DomToBindingNotificationForwarder implements NotificationInterestListener, NotificationListener {
847
848         private final ConcurrentMap<QName, WeakReference<Class<? extends Notification>>> notifications = new ConcurrentHashMap<>();
849         private final Set<QName> supportedNotifications = new HashSet<>();
850
851         @Override
852         public Set<QName> getSupportedNotifications() {
853             return Collections.unmodifiableSet(supportedNotifications);
854         }
855
856         @Override
857         public void onNotification(final CompositeNode notification) {
858             QName qname = notification.getNodeType();
859             WeakReference<Class<? extends Notification>> potential = notifications.get(qname);
860             if (potential != null) {
861                 Class<? extends Notification> potentialClass = potential.get();
862                 if (potentialClass != null) {
863                     final DataContainer baNotification = mappingService.dataObjectFromDataDom(potentialClass,
864                             notification);
865
866                     if (baNotification instanceof Notification) {
867                         baNotifyService.publish((Notification) baNotification);
868                     }
869                 }
870             }
871         }
872
873         @Override
874         public void onNotificationSubscribtion(final Class<? extends Notification> notificationType) {
875             QName qname = BindingReflections.findQName(notificationType);
876             if (qname != null) {
877                 WeakReference<Class<? extends Notification>> already = notifications.putIfAbsent(qname,
878                         new WeakReference<Class<? extends Notification>>(notificationType));
879                 if (already == null) {
880                     domNotificationService.addNotificationListener(qname, this);
881                     supportedNotifications.add(qname);
882                 }
883             }
884         }
885     }
886 }