Fixed bug when Binding-Aware Data Change Listeners we're not triggered. 20/5320/3
authorTony Tkacik <ttkacik@cisco.com>
Fri, 14 Feb 2014 05:10:59 +0000 (06:10 +0100)
committerTony Tkacik <ttkacik@cisco.com>
Fri, 14 Feb 2014 08:24:31 +0000 (09:24 +0100)
  - Added additional strategy to inspect changes based on registered listeners
  - Fixed normalization of data for changes where parent node was written
      - Extracts data for listener, so listener does not need to
        extract that data manually
  - Splitted AbstractDataBroker.xtend into several classes

Change-Id: I7e5d1d759c40519d164b08678a9dc22743d329bb
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
14 files changed:
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/DataBrokerImplModule.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolder.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataBrokerImpl.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/PutAugmentationTest.java
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend [deleted file]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/DataChangeListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ImmutableDataChangeEvent.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ListenerStateCapture.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/RootedChangeSet.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/TwoPhaseCommit.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareDataStoreAdapter.java

index 286b0c378ca7183589e3001105db2be973e8222a..7357926b9e6226a612bd0fc453dc28b737cff5b8 100644 (file)
@@ -7,17 +7,17 @@
  */
 package org.opendaylight.controller.config.yang.md.sal.binding.impl;\r
 \r
-import java.util.concurrent.ExecutorService;\r
-\r
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;\r
-import org.opendaylight.controller.sal.binding.impl.RootDataBrokerImpl;\r
-import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingDomConnectorDeployer;\r
-import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentConnector;\r
-import org.opendaylight.controller.sal.binding.impl.forward.DomForwardedDataBrokerImpl;\r
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;\r
-import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;\r
-import org.osgi.framework.BundleContext;\r
-import org.osgi.framework.ServiceReference;\r
+import java.util.concurrent.ExecutorService;
+
+import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
+import org.opendaylight.controller.sal.binding.impl.RootDataBrokerImpl;
+import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingDomConnectorDeployer;
+import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentConnector;
+import org.opendaylight.controller.sal.binding.impl.forward.DomForwardedDataBrokerImpl;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
 \r
 /**\r
 *\r
@@ -57,14 +57,14 @@ public final class DataBrokerImplModule extends
             dataBindingBroker = createStandAloneBroker(listeningExecutor);\r
         }\r
         dataBindingBroker.registerRuntimeBean(getRootRuntimeBeanRegistratorWrapper());\r
-\r
+        dataBindingBroker.setNotificationExecutor(SingletonHolder.getDefaultChangeEventExecutor());\r
         return dataBindingBroker;\r
     }\r
     private BindingIndependentMappingService resolveMappingServiceDependency() {\r
         if(getMappingService() != null) {\r
             return getMappingServiceDependency();\r
         }\r
-        \r
+\r
         ServiceReference<BindingIndependentMappingService> potentialMappingService = bundleContext.getServiceReference(BindingIndependentMappingService.class);\r
         if(potentialMappingService != null) {\r
             return bundleContext.getService(potentialMappingService);\r
index 291677a79a4fba3a16d27e1929e1fd23e2e14743..a0bbb28d9e07624e23ad0afd094c347c27d2fce2 100644 (file)
@@ -29,6 +29,7 @@ public class SingletonHolder {
     public static final NotificationInvokerFactory INVOKER_FACTORY = RPC_GENERATOR_IMPL.getInvokerFactory();
     private static ListeningExecutorService NOTIFICATION_EXECUTOR = null;
     private static ListeningExecutorService COMMIT_EXECUTOR = null;
+    private static ListeningExecutorService CHANGE_EVENT_EXECUTOR = null;
 
     public static synchronized final ListeningExecutorService getDefaultNotificationExecutor() {
         if (NOTIFICATION_EXECUTOR == null) {
@@ -64,4 +65,21 @@ public class SingletonHolder {
         ExecutorService executor = Executors.newCachedThreadPool(factory);
         return MoreExecutors.listeningDecorator(executor);
     }
+
+    public static ExecutorService getDefaultChangeEventExecutor() {
+        if (CHANGE_EVENT_EXECUTOR == null) {
+            ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("md-sal-binding-change-%d").build();
+            /*
+             * FIXME: this used to be newCacheThreadPool(), but MD-SAL does not have transaction
+             *        ordering guarantees, which means that using a concurrent threadpool results
+             *        in application data being committed in random order, potentially resulting
+             *        in inconsistent data being present. Once proper primitives are introduced,
+             *        concurrency can be reintroduced.
+             */
+            ExecutorService executor = Executors.newSingleThreadExecutor(factory);
+            CHANGE_EVENT_EXECUTOR  = MoreExecutors.listeningDecorator(executor);
+        }
+
+        return CHANGE_EVENT_EXECUTOR;
+    }
 }
index ddf67719dc03f8c9311f565fbf78c98f8ba107cd..16d5a24cb5b7c80364edd8dd5dbe4f8b59e6aa97 100644 (file)
@@ -7,7 +7,8 @@
  */
 package org.opendaylight.controller.sal.binding.impl;\r
 \r
-import java.util.Set;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -19,12 +20,46 @@ import org.opendaylight.controller.sal.common.DataStoreIdentifier;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.DataRoot;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.util.DataObjectReadingUtil;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+import com.google.common.collect.Maps;
 \r
 \r
 public class DataBrokerImpl extends AbstractDataBroker<InstanceIdentifier<? extends DataObject>, DataObject, DataChangeListener> //\r
        implements DataProviderService, AutoCloseable {\r
 \r
+    private final static class ContainsWildcarded implements Predicate<InstanceIdentifier<? extends DataObject>> {
+
+        private final  InstanceIdentifier<? extends DataObject> key;
+
+        public ContainsWildcarded(InstanceIdentifier<? extends DataObject> key) {
+            this.key = key;
+        }
+
+        @Override
+        public boolean apply(InstanceIdentifier<? extends DataObject> input) {
+            return key.containsWildcarded(input);
+        }
+    }
+
+    private final static class IsContainedWildcarded implements Predicate<InstanceIdentifier<? extends DataObject>> {
+
+        private final  InstanceIdentifier<? extends DataObject> key;
+
+        public IsContainedWildcarded(InstanceIdentifier<? extends DataObject> key) {
+            this.key = key;
+        }
+
+        @Override
+        public boolean apply(InstanceIdentifier<? extends DataObject> input) {
+            return input.containsWildcarded(key);
+        }
+    }
+
     private final AtomicLong nextTransaction = new AtomicLong();\r
     private final AtomicLong createdTransactionsCount = new AtomicLong();\r
 \r
@@ -110,16 +145,33 @@ public class DataBrokerImpl extends AbstractDataBroker<InstanceIdentifier<? exte
     }
 
     @Override
-    protected boolean isAffectedBy(InstanceIdentifier<? extends DataObject> key,
-            Set<InstanceIdentifier<? extends DataObject>> paths) {
-        if (paths.contains(key)) {
-            return true;
-        }
-        for (InstanceIdentifier<?> path : paths) {
-            if (key.containsWildcarded(path)) {
-                return true;
+    protected Predicate<InstanceIdentifier<? extends DataObject>> createContainsPredicate(final
+            InstanceIdentifier<? extends DataObject> key) {
+        return new ContainsWildcarded(key);
+    }
+
+    @Override
+    protected Predicate<InstanceIdentifier<? extends DataObject>> createIsContainedPredicate(final
+            InstanceIdentifier<? extends DataObject> key) {
+        return new IsContainedWildcarded(key);
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    protected Map<InstanceIdentifier<? extends DataObject>, DataObject> deepGetBySubpath(
+            Map<InstanceIdentifier<? extends DataObject>, DataObject> dataSet,
+            InstanceIdentifier<? extends DataObject> path) {
+        Builder<InstanceIdentifier<? extends DataObject>, DataObject> builder = ImmutableMap.builder();
+        Map<InstanceIdentifier<? extends DataObject>, DataObject> potential = Maps.filterKeys(dataSet, createIsContainedPredicate(path));
+        for(Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : potential.entrySet()) {
+            try {
+                builder.putAll(DataObjectReadingUtil.readData(entry.getValue(),(InstanceIdentifier)entry.getKey(),path));
+            } catch (Exception e) {
+                // FIXME : Log exception;
             }
         }
-        return false;
-    }\r
+        return builder.build();
+
+    }
+\r
 }
index 5630664a678e8387426bb6a2b2ff88957b0654e8..5b947a5922589605abd146183802ffa07b6b84b4 100644 (file)
@@ -45,8 +45,6 @@ import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
 import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
 import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
-import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
-import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener;
 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener;
@@ -79,6 +77,8 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
+import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,11 +113,11 @@ public class BindingIndependentConnector implements //
 
     private DataProviderService baDataService;
 
-    private ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
-    private ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
 
-    private BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
-    private DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
+    private final BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
+    private final DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
 
     private Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> baCommitHandlerRegistration;
 
@@ -130,7 +130,7 @@ public class BindingIndependentConnector implements //
     // private ListenerRegistration<BindingToDomRpcForwardingManager>
     // bindingToDomRpcManager;
 
-    private Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
+    private final Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
 
         @Override
         public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(InstanceIdentifier<?> input) {
@@ -413,8 +413,8 @@ public class BindingIndependentConnector implements //
     private class BindingToDomTransaction implements
             DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> {
 
-        private DataModificationTransaction backing;
-        private DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
+        private final DataModificationTransaction backing;
+        private final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
 
         public BindingToDomTransaction(DataModificationTransaction backing,
                 DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification) {
@@ -491,6 +491,7 @@ public class BindingIndependentConnector implements //
             // FIXME: do registration based on only active commit handlers.
         }
 
+        @Override
         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> requestCommit(
                 DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> domTransaction) {
             Object identifier = domTransaction.getIdentifier();
@@ -587,9 +588,9 @@ public class BindingIndependentConnector implements //
 
         private final Set<QName> supportedRpcs;
         private final WeakReference<Class<? extends RpcService>> rpcServiceType;
-        private Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
-        private Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
-        private WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
+        private final Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
+        private final Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
+        private final WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
 
         public DomToBindingRpcForwarder(Class<? extends RpcService> service) {
             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
@@ -771,10 +772,10 @@ public class BindingIndependentConnector implements //
     private class DefaultInvocationStrategy extends RpcInvocationStrategy {
 
         @SuppressWarnings("rawtypes")
-        private WeakReference<Class> inputClass;
+        private final WeakReference<Class> inputClass;
 
         @SuppressWarnings("rawtypes")
-        private WeakReference<Class> outputClass;
+        private final WeakReference<Class> outputClass;
 
         @SuppressWarnings({ "rawtypes", "unchecked" })
         public DefaultInvocationStrategy(QName rpc, Method targetMethod, Class<?> outputClass,
@@ -825,6 +826,7 @@ public class BindingIndependentConnector implements //
             super(rpc, targetMethod);
         }
 
+        @Override
         public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
             @SuppressWarnings("unchecked")
             Future<RpcResult<Void>> result = (Future<RpcResult<Void>>) targetMethod.invoke(rpcService);
@@ -837,21 +839,21 @@ public class BindingIndependentConnector implements //
             return Futures.immediateFuture(null);
         }
     }
-    
+
     private class NoOutputInvocationStrategy extends RpcInvocationStrategy {
 
-        
+
         @SuppressWarnings("rawtypes")
-        private WeakReference<Class> inputClass;
+        private final WeakReference<Class> inputClass;
 
         @SuppressWarnings({ "rawtypes", "unchecked" })
-        public NoOutputInvocationStrategy(QName rpc, Method targetMethod, 
+        public NoOutputInvocationStrategy(QName rpc, Method targetMethod,
                 Class<? extends DataContainer> inputClass) {
             super(rpc,targetMethod);
             this.inputClass = new WeakReference(inputClass);
         }
-        
-        
+
+
         @Override
         public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
             DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
@@ -902,12 +904,12 @@ public class BindingIndependentConnector implements //
     public void setDomNotificationService(NotificationPublishService domService) {
         this.domNotificationService = domService;
     }
-    
+
     private class DomToBindingNotificationForwarder implements NotificationInterestListener, NotificationListener {
 
-        private ConcurrentMap<QName, WeakReference<Class<? extends Notification>>> notifications = new ConcurrentHashMap<>();
-        private Set<QName> supportedNotifications = new HashSet<>();
-        
+        private final ConcurrentMap<QName, WeakReference<Class<? extends Notification>>> notifications = new ConcurrentHashMap<>();
+        private final Set<QName> supportedNotifications = new HashSet<>();
+
         @Override
         public Set<QName> getSupportedNotifications() {
             return Collections.unmodifiableSet(supportedNotifications);
@@ -922,7 +924,7 @@ public class BindingIndependentConnector implements //
                 if (potentialClass != null) {
                     final DataContainer baNotification = mappingService.dataObjectFromDataDom(potentialClass,
                             notification);
-                    
+
                     if (baNotification instanceof Notification) {
                         baNotifyService.publish((Notification) baNotification);
                     }
index 598743af90337cdf2baad9194bcdd0020f6f0d04..90fa2be21103a7bf89ec8191a2a4d9fad4b301f4 100644 (file)
@@ -6,9 +6,11 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 package org.opendaylight.controller.sal.binding.test.bugfix;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.Collections;
 import java.util.Map;
@@ -52,38 +54,42 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
     private static final InstanceIdentifier<Nodes> NODES_INSTANCE_ID_BA = InstanceIdentifier.builder(Nodes.class) //
             .toInstance();
 
-
     private static final InstanceIdentifier<Node> NODE_INSTANCE_ID_BA = InstanceIdentifier//
             .builder(NODES_INSTANCE_ID_BA) //
             .child(Node.class, NODE_KEY).toInstance();
 
-
     private static final InstanceIdentifier<SupportedActions> SUPPORTED_ACTIONS_INSTANCE_ID_BA = InstanceIdentifier//
             .builder(NODES_INSTANCE_ID_BA) //
             .child(Node.class, NODE_KEY) //
             .augmentation(FlowCapableNode.class) //
-            .child(SupportedActions.class)
-            .toInstance();
+            .child(SupportedActions.class).toInstance();
 
+    private static final InstanceIdentifier<FlowCapableNode> ALL_FLOW_CAPABLE_NODES = InstanceIdentifier //
+            .builder(NODES_INSTANCE_ID_BA) //
+            .child(Node.class) //
+            .augmentation(FlowCapableNode.class) //
+            .build();
 
     private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier NODE_INSTANCE_ID_BI = //
     org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.builder() //
             .node(Nodes.QNAME) //
             .nodeWithKey(Node.QNAME, NODE_KEY_BI) //
             .toInstance();
-    private static final QName SUPPORTED_ACTIONS_QNAME = QName.create(FlowCapableNode.QNAME, SupportedActions.QNAME.getLocalName());
-
+    private static final QName SUPPORTED_ACTIONS_QNAME = QName.create(FlowCapableNode.QNAME,
+            SupportedActions.QNAME.getLocalName());
 
     private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier SUPPORTED_ACTIONS_INSTANCE_ID_BI = //
-            org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.builder() //
-                    .node(Nodes.QNAME) //
-                    .nodeWithKey(Node.QNAME, NODE_KEY_BI) //
-                    .node(SUPPORTED_ACTIONS_QNAME) //
-                    .toInstance();
-
-    private DataChangeEvent<InstanceIdentifier<?>, DataObject> receivedChangeEvent;
-
+    org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.builder() //
+            .node(Nodes.QNAME) //
+            .nodeWithKey(Node.QNAME, NODE_KEY_BI) //
+            .node(SUPPORTED_ACTIONS_QNAME) //
+            .toInstance();
+    private static final InstanceIdentifier<FlowCapableNode> FLOW_AUGMENTATION_PATH = InstanceIdentifier //
+            .builder(NODE_INSTANCE_ID_BA) //
+            .augmentation(FlowCapableNode.class) //
+            .build();
 
+    private DataChangeEvent<InstanceIdentifier<?>, DataObject> lastReceivedChangeEvent;
 
     /**
      * Test for Bug 148
@@ -93,7 +99,8 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
     @Test
     public void putNodeAndAugmentation() throws Exception {
 
-        baDataService.registerDataChangeListener(NODES_INSTANCE_ID_BA, this);
+        baDataService.registerDataChangeListener(ALL_FLOW_CAPABLE_NODES, this);
+
 
         NodeBuilder nodeBuilder = new NodeBuilder();
         nodeBuilder.setId(new NodeId(NODE_ID));
@@ -102,7 +109,7 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
         baseTransaction.putOperationalData(NODE_INSTANCE_ID_BA, nodeBuilder.build());
         RpcResult<TransactionStatus> result = baseTransaction.commit().get();
         assertEquals(TransactionStatus.COMMITED, result.getResult());
-        assertNotNull(receivedChangeEvent);
+
         Node node = (Node) baDataService.readOperationalData(NODE_INSTANCE_ID_BA);
         assertNotNull(node);
         assertEquals(NODE_KEY, node.getKey());
@@ -114,13 +121,16 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
         fnub.setDescription("Description Foo");
         fnub.setSoftware("JUnit emulated");
         FlowCapableNode fnu = fnub.build();
-        InstanceIdentifier<FlowCapableNode> augmentIdentifier = InstanceIdentifier.builder(NODE_INSTANCE_ID_BA).augmentation(FlowCapableNode.class).toInstance();
+        InstanceIdentifier<FlowCapableNode> augmentIdentifier = InstanceIdentifier.builder(NODE_INSTANCE_ID_BA)
+                .augmentation(FlowCapableNode.class).toInstance();
         DataModificationTransaction augmentedTransaction = baDataService.beginTransaction();
         augmentedTransaction.putOperationalData(augmentIdentifier, fnu);
 
         result = augmentedTransaction.commit().get();
         assertEquals(TransactionStatus.COMMITED, result.getResult());
 
+        assertNotNull(lastReceivedChangeEvent);
+        assertTrue(lastReceivedChangeEvent.getCreatedOperationalData().containsKey(FLOW_AUGMENTATION_PATH));
 
         Node augmentedNode = (Node) baDataService.readOperationalData(NODE_INSTANCE_ID_BA);
         assertNotNull(node);
@@ -131,11 +141,14 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
         assertEquals(fnu.getDescription(), readedAugmentation.getDescription());
         assertBindingIndependentVersion(NODE_INSTANCE_ID_BI);
         testNodeRemove();
+        assertTrue(lastReceivedChangeEvent.getRemovedOperationalData().contains(FLOW_AUGMENTATION_PATH));
     }
 
     @Test
     public void putNodeWithAugmentation() throws Exception {
 
+        baDataService.registerDataChangeListener(ALL_FLOW_CAPABLE_NODES, this);
+
         NodeBuilder nodeBuilder = new NodeBuilder();
         nodeBuilder.setId(new NodeId(NODE_ID));
         nodeBuilder.setKey(NODE_KEY);
@@ -151,23 +164,31 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
         DataModificationTransaction baseTransaction = baDataService.beginTransaction();
         baseTransaction.putOperationalData(NODE_INSTANCE_ID_BA, nodeBuilder.build());
         RpcResult<TransactionStatus> result = baseTransaction.commit().get();
+
+        assertNotNull(lastReceivedChangeEvent);
+        assertTrue(lastReceivedChangeEvent.getCreatedOperationalData().containsKey(FLOW_AUGMENTATION_PATH));
+        lastReceivedChangeEvent = null;
         assertEquals(TransactionStatus.COMMITED, result.getResult());
 
-        FlowCapableNode readedAugmentation = (FlowCapableNode) baDataService.readOperationalData(InstanceIdentifier.builder(NODE_INSTANCE_ID_BA).augmentation(FlowCapableNode.class).toInstance());
+        FlowCapableNode readedAugmentation = (FlowCapableNode) baDataService.readOperationalData(InstanceIdentifier
+                .builder(NODE_INSTANCE_ID_BA).augmentation(FlowCapableNode.class).toInstance());
         assertNotNull(readedAugmentation);
+
         assertEquals(fnu.getHardware(), readedAugmentation.getHardware());
 
         testPutNodeConnectorWithAugmentation();
+        lastReceivedChangeEvent = null;
         testNodeRemove();
-    }
 
+        assertTrue(lastReceivedChangeEvent.getRemovedOperationalData().contains(FLOW_AUGMENTATION_PATH));
+    }
 
     private void testPutNodeConnectorWithAugmentation() throws Exception {
         NodeConnectorKey ncKey = new NodeConnectorKey(new NodeConnectorId("test:0:0"));
         InstanceIdentifier<NodeConnector> ncPath = InstanceIdentifier.builder(NODE_INSTANCE_ID_BA)
-        .child(NodeConnector.class, ncKey).toInstance();
+                .child(NodeConnector.class, ncKey).toInstance();
         InstanceIdentifier<FlowCapableNodeConnector> ncAugmentPath = InstanceIdentifier.builder(ncPath)
-        .augmentation(FlowCapableNodeConnector.class).toInstance();
+                .augmentation(FlowCapableNodeConnector.class).toInstance();
 
         NodeConnectorBuilder nc = new NodeConnectorBuilder();
         nc.setKey(ncKey);
@@ -181,7 +202,8 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
         RpcResult<TransactionStatus> result = baseTransaction.commit().get();
         assertEquals(TransactionStatus.COMMITED, result.getResult());
 
-        FlowCapableNodeConnector readedAugmentation = (FlowCapableNodeConnector) baDataService.readOperationalData(ncAugmentPath);
+        FlowCapableNodeConnector readedAugmentation = (FlowCapableNodeConnector) baDataService
+                .readOperationalData(ncAugmentPath);
         assertNotNull(readedAugmentation);
         assertEquals(fncb.getName(), readedAugmentation.getName());
     }
@@ -196,7 +218,7 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
         assertNull(node);
     }
 
-    private void verifyNodes(Nodes nodes,Node original) {
+    private void verifyNodes(Nodes nodes, Node original) {
         assertNotNull(nodes);
         assertNotNull(nodes.getNode());
         assertEquals(1, nodes.getNode().size());
@@ -212,8 +234,7 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
 
     }
 
-    private void assertBindingIndependentVersion(
-            org.opendaylight.yangtools.yang.data.api.InstanceIdentifier nodeId) {
+    private void assertBindingIndependentVersion(org.opendaylight.yangtools.yang.data.api.InstanceIdentifier nodeId) {
         CompositeNode node = biDataService.readOperationalData(nodeId);
         assertNotNull(node);
     }
@@ -224,7 +245,7 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
 
     @Override
     public void onDataChanged(DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
-        receivedChangeEvent = change;
+        lastReceivedChangeEvent = change;
     }
 
 }
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.java
new file mode 100644 (file)
index 0000000..bfffb59
--- /dev/null
@@ -0,0 +1,441 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.eclipse.xtext.xbase.lib.Exceptions;
+import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
+import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory;
+import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService;
+import org.opendaylight.controller.md.sal.common.api.data.DataReader;
+import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public abstract class AbstractDataBroker<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>>
+        implements DataModificationTransactionFactory<P, D>, DataReader<P, D>, DataChangePublisher<P, D, DCL>,
+        DataProvisionService<P, D> {
+    private final static Logger LOG = LoggerFactory.getLogger(AbstractDataBroker.class);
+
+    private ExecutorService executor;
+
+    public ExecutorService getExecutor() {
+        return this.executor;
+    }
+
+    public void setExecutor(final ExecutorService executor) {
+        this.executor = executor;
+    }
+
+    private ExecutorService notificationExecutor = MoreExecutors.sameThreadExecutor();
+
+    public ExecutorService getNotificationExecutor() {
+        return this.notificationExecutor;
+    }
+
+    public void setNotificationExecutor(final ExecutorService notificationExecutor) {
+        this.notificationExecutor = notificationExecutor;
+    }
+
+    private AbstractDataReadRouter<P, D> dataReadRouter;
+
+    private final AtomicLong submittedTransactionsCount = new AtomicLong();
+
+    private final AtomicLong failedTransactionsCount = new AtomicLong();
+
+    private final AtomicLong finishedTransactionsCount = new AtomicLong();
+
+    public AbstractDataReadRouter<P, D> getDataReadRouter() {
+        return this.dataReadRouter;
+    }
+
+    public void setDataReadRouter(final AbstractDataReadRouter<P, D> dataReadRouter) {
+        this.dataReadRouter = dataReadRouter;
+    }
+
+    public AtomicLong getSubmittedTransactionsCount() {
+        return this.submittedTransactionsCount;
+    }
+
+    public AtomicLong getFailedTransactionsCount() {
+        return this.failedTransactionsCount;
+    }
+
+    public AtomicLong getFinishedTransactionsCount() {
+        return this.finishedTransactionsCount;
+    }
+
+    private final Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps
+            .synchronizedSetMultimap(HashMultimap.<P, DataChangeListenerRegistration<P, D, DCL>> create());
+
+    private final Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps
+            .synchronizedSetMultimap(HashMultimap.<P, DataCommitHandlerRegistrationImpl<P, D>> create());
+
+    private final Lock registrationLock = new ReentrantLock();
+
+    private final ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>> commitHandlerRegistrationListeners = new ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>>();
+
+    public AbstractDataBroker() {
+    }
+
+    protected ImmutableList<DataCommitHandler<P, D>> affectedCommitHandlers(final Set<P> paths) {
+        final Callable<ImmutableList<DataCommitHandler<P, D>>> _function = new Callable<ImmutableList<DataCommitHandler<P, D>>>() {
+            @Override
+            public ImmutableList<DataCommitHandler<P, D>> call() throws Exception {
+                Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
+                Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
+                FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
+                        .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
+                final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
+                    @Override
+                    public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
+                        P _key = it.getKey();
+                        boolean _isAffectedBy = isAffectedBy(_key, paths);
+                        return _isAffectedBy;
+                    }
+                };
+                FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
+                        .filter(_function);
+                final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
+                    @Override
+                    public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
+                            final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
+                        Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
+                        return _value;
+                    }
+                };
+                FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
+                        .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
+                final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
+                    @Override
+                    public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
+                        DataCommitHandler<P, D> _instance = it.getInstance();
+                        return _instance;
+                    }
+                };
+                FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
+                        .<DataCommitHandler<P, D>> transform(_function_2);
+                return _transform.toList();
+            }
+        };
+        return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
+    }
+
+    protected ImmutableList<DataCommitHandler<P, D>> probablyAffectedCommitHandlers(final HashSet<P> paths) {
+        final Callable<ImmutableList<DataCommitHandler<P, D>>> _function = new Callable<ImmutableList<DataCommitHandler<P, D>>>() {
+            @Override
+            public ImmutableList<DataCommitHandler<P, D>> call() throws Exception {
+                Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
+                Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
+                FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
+                        .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
+                final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
+                    @Override
+                    public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
+                        P _key = it.getKey();
+                        boolean _isProbablyAffectedBy = isProbablyAffectedBy(_key, paths);
+                        return _isProbablyAffectedBy;
+                    }
+                };
+                FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
+                        .filter(_function);
+                final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
+                    @Override
+                    public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
+                            final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
+                        Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
+                        return _value;
+                    }
+                };
+                FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
+                        .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
+                final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
+                    @Override
+                    public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
+                        DataCommitHandler<P, D> _instance = it.getInstance();
+                        return _instance;
+                    }
+                };
+                FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
+                        .<DataCommitHandler<P, D>> transform(_function_2);
+                return _transform.toList();
+            }
+        };
+        return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
+    }
+
+    protected Map<P, D> deepGetBySubpath(final Map<P, D> dataSet, final P path) {
+        return Collections.<P, D> emptyMap();
+    }
+
+    @Override
+    public final D readConfigurationData(final P path) {
+        AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
+        return _dataReadRouter.readConfigurationData(path);
+    }
+
+    @Override
+    public final D readOperationalData(final P path) {
+        AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
+        return _dataReadRouter.readOperationalData(path);
+    }
+
+    private static <T extends Object> T withLock(final Lock lock, final Callable<T> method) {
+        lock.lock();
+        try {
+            return method.call();
+        } catch (Exception e) {
+            throw Exceptions.sneakyThrow(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public final Registration<DataCommitHandler<P, D>> registerCommitHandler(final P path,
+            final DataCommitHandler<P, D> commitHandler) {
+        synchronized (commitHandler) {
+            final DataCommitHandlerRegistrationImpl<P, D> registration = new DataCommitHandlerRegistrationImpl<P, D>(
+                    path, commitHandler, this);
+            commitHandlers.put(path, registration);
+            LOG.trace("Registering Commit Handler {} for path: {}", commitHandler, path);
+            for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
+                try {
+                    listener.getInstance().onRegister(registration);
+                } catch (Exception e) {
+                    LOG.error("Unexpected exception in listener {} during invoking onRegister", listener.getInstance(),
+                            e);
+                }
+            }
+            return registration;
+        }
+    }
+
+    @Override
+    public final ListenerRegistration<DCL> registerDataChangeListener(final P path, final DCL listener) {
+        synchronized (listeners) {
+            final DataChangeListenerRegistration<P, D, DCL> reg = new DataChangeListenerRegistration<P, D, DCL>(path,
+                    listener, AbstractDataBroker.this);
+            listeners.put(path, reg);
+            final D initialConfig = getDataReadRouter().readConfigurationData(path);
+            final D initialOperational = getDataReadRouter().readOperationalData(path);
+            final DataChangeEvent<P, D> event = createInitialListenerEvent(path, initialConfig, initialOperational);
+            listener.onDataChanged(event);
+            return reg;
+        }
+    }
+
+    public final CompositeObjectRegistration<DataReader<P, D>> registerDataReader(final P path,
+            final DataReader<P, D> reader) {
+
+        final Registration<DataReader<P, D>> confReg = getDataReadRouter().registerConfigurationReader(path, reader);
+        final Registration<DataReader<P, D>> dataReg = getDataReadRouter().registerOperationalReader(path, reader);
+        return new CompositeObjectRegistration<DataReader<P, D>>(reader, Arrays.asList(confReg, dataReg));
+    }
+
+    @Override
+    public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> registerCommitHandlerListener(
+            final RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
+        final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> ret = this.commitHandlerRegistrationListeners
+                .register(commitHandlerListener);
+        return ret;
+    }
+
+    protected DataChangeEvent<P, D> createInitialListenerEvent(final P path, final D initialConfig,
+            final D initialOperational) {
+        InitialDataChangeEventImpl<P, D> _initialDataChangeEventImpl = new InitialDataChangeEventImpl<P, D>(
+                initialConfig, initialOperational);
+        return _initialDataChangeEventImpl;
+    }
+
+    protected final void removeListener(final DataChangeListenerRegistration<P, D, DCL> registration) {
+        synchronized (listeners) {
+            listeners.remove(registration.getPath(), registration);
+        }
+    }
+
+    protected final void removeCommitHandler(final DataCommitHandlerRegistrationImpl<P, D> registration) {
+        synchronized (commitHandlers) {
+
+            commitHandlers.remove(registration.getPath(), registration);
+            LOG.trace("Removing Commit Handler {} for path: {}", registration.getInstance(), registration.getPath());
+            for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
+                try {
+                    listener.getInstance().onUnregister(registration);
+                } catch (Exception e) {
+                    LOG.error("Unexpected exception in listener {} during invoking onUnregister",
+                            listener.getInstance(), e);
+                }
+            }
+        }
+
+    }
+
+    protected final Collection<Entry<P, DataCommitHandlerRegistrationImpl<P, D>>> getActiveCommitHandlers() {
+        return commitHandlers.entries();
+    }
+
+    protected ImmutableList<ListenerStateCapture<P, D, DCL>> affectedListeners(final Set<P> paths) {
+
+        synchronized (listeners) {
+            return FluentIterable //
+                    .from(listeners.asMap().entrySet()) //
+                    .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
+                        @Override
+                        public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
+                            return isAffectedBy(it.getKey(), paths);
+                        }
+                    }) //
+                    .transform(
+                            new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
+                                @Override
+                                public ListenerStateCapture<P, D, DCL> apply(
+                                        final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
+                                    return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
+                                            createContainsPredicate(it.getKey()));
+                                }
+                            }) //
+                    .toList();
+        }
+    }
+
+    protected ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners(final Set<P> paths) {
+        synchronized (listeners) {
+            return FluentIterable //
+                    .from(listeners.asMap().entrySet()) //
+                    .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
+                        @Override
+                        public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
+                            return isProbablyAffectedBy(it.getKey(), paths);
+                        }
+                    }) //
+                    .transform(
+                            new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
+                                @Override
+                                public ListenerStateCapture<P, D, DCL> apply(
+                                        final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
+                                    return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
+                                            createIsContainedPredicate(it.getKey()));
+                                }
+                            }) //
+                    .toList();
+        }
+    }
+
+    protected Predicate<P> createContainsPredicate(final P key) {
+        return new Predicate<P>() {
+            @Override
+            public boolean apply(final P other) {
+                return key.contains(other);
+            }
+        };
+    }
+
+    protected Predicate<P> createIsContainedPredicate(final P key) {
+        return new Predicate<P>() {
+            @Override
+            public boolean apply(final P other) {
+                return other.contains(key);
+            }
+        };
+    }
+
+    protected boolean isAffectedBy(final P key, final Set<P> paths) {
+        final Predicate<P> contains = this.createContainsPredicate(key);
+        if (paths.contains(key)) {
+            return true;
+        }
+        for (final P path : paths) {
+            if (contains.apply(path)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    protected boolean isProbablyAffectedBy(final P key, final Set<P> paths) {
+        final Predicate<P> isContained = this.createIsContainedPredicate(key);
+        for (final P path : paths) {
+            if (isContained.apply(path)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    final Future<RpcResult<TransactionStatus>> commit(final AbstractDataTransaction<P, D> transaction) {
+        Preconditions.checkNotNull(transaction);
+        transaction.changeStatus(TransactionStatus.SUBMITED);
+        final TwoPhaseCommit<P, D, DCL> task = new TwoPhaseCommit<P, D, DCL>(transaction, this);
+        ;
+        this.getSubmittedTransactionsCount().getAndIncrement();
+        return this.getExecutor().submit(task);
+    }
+
+    private static class DataCommitHandlerRegistrationImpl<P extends Path<P>, D extends Object> //
+            extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
+            implements DataCommitHandlerRegistration<P, D> {
+
+        private AbstractDataBroker<P, D, ? extends Object> dataBroker;
+        private final P path;
+
+        @Override
+        public P getPath() {
+            return this.path;
+        }
+
+        public DataCommitHandlerRegistrationImpl(final P path, final DataCommitHandler<P, D> instance,
+                final AbstractDataBroker<P, D, ? extends Object> broker) {
+            super(instance);
+            this.dataBroker = broker;
+            this.path = path;
+        }
+
+        @Override
+        protected void removeRegistration() {
+            this.dataBroker.removeCommitHandler(this);
+            this.dataBroker = null;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend
deleted file mode 100644 (file)
index 7c6f52f..0000000
+++ /dev/null
@@ -1,443 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.common.impl.service\r
-\r
-import com.google.common.collect.FluentIterable\r
-import com.google.common.collect.HashMultimap\r
-import com.google.common.collect.ImmutableList\r
-import com.google.common.collect.Multimap\r
-import java.util.ArrayList\r
-import java.util.Arrays\r
-import java.util.Collection\r
-import java.util.Collections\r
-import java.util.HashSet\r
-import java.util.List\r
-import java.util.Set\r
-import java.util.concurrent.Callable\r
-import java.util.concurrent.ExecutorService\r
-import java.util.concurrent.Future\r
-import java.util.concurrent.atomic.AtomicLong\r
-import org.opendaylight.controller.md.sal.common.api.RegistrationListener\r
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus\r
-import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener\r
-import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher\r
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler\r
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction\r
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration\r
-import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory\r
-import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService\r
-import org.opendaylight.controller.md.sal.common.api.data.DataReader\r
-import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification\r
-import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter\r
-import org.opendaylight.controller.sal.common.util.Rpcs\r
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
-import org.opendaylight.yangtools.concepts.CompositeObjectRegistration\r
-import org.opendaylight.yangtools.concepts.ListenerRegistration\r
-import org.opendaylight.yangtools.concepts.Path\r
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
-import org.opendaylight.yangtools.yang.common.RpcResult\r
-import org.slf4j.LoggerFactory\r
-\r
-import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
-import com.google.common.collect.Multimaps
-import java.util.concurrent.locks.Lock
-import java.util.concurrent.locks.ReentrantLock
-
-abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
-DataReader<P, D>, //\r
-DataChangePublisher<P, D, DCL>, //\r
-DataProvisionService<P, D> {\r
-\r
-    private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);\r
-\r
-    @Property\r
-    var ExecutorService executor;\r
-\r
-    @Property\r
-    var AbstractDataReadRouter<P, D> dataReadRouter;\r
-    \r
-    @Property\r
-    private val AtomicLong submittedTransactionsCount = new AtomicLong;\r
-    \r
-    @Property\r
-    private val AtomicLong failedTransactionsCount = new AtomicLong\r
-    \r
-    @Property\r
-    private val AtomicLong finishedTransactionsCount = new AtomicLong\r
-\r
-    Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
-    Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
-    
-    private val Lock registrationLock = new ReentrantLock;
-    \r
-    val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
-    public new() {\r
-    }\r
-\r
-    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
-        HashSet<P> paths) {
-        return withLock(registrationLock) [|\r
-            return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
-                .transformAndConcat[value] //\r
-                .transform[instance].toList()
-        ]\r
-    }\r
-\r
-    override final readConfigurationData(P path) {\r
-        return dataReadRouter.readConfigurationData(path);\r
-    }\r
-\r
-    override final readOperationalData(P path) {\r
-        return dataReadRouter.readOperationalData(path);\r
-    }
-    
-    private static def <T> withLock(Lock lock,Callable<T> method) {
-        lock.lock
-        try {
-            return method.call
-        } finally {
-            lock.unlock
-        }
-    } \r
-\r
-    override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
-        return withLock(registrationLock) [|\r
-            val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
-            commitHandlers.put(path, registration)\r
-            LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
-            for(listener : commitHandlerRegistrationListeners) {\r
-                try {\r
-                    listener.instance.onRegister(registration);\r
-                } catch (Exception e) {\r
-                    LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
-                }\r
-            }
-            return registration;
-        ]\r
-    }\r
-\r
-    override final def registerDataChangeListener(P path, DCL listener) {\r
-        return withLock(registrationLock) [|
-            val reg = new DataChangeListenerRegistration(path, listener, this);\r
-            listeners.put(path, reg);\r
-            val initialConfig = dataReadRouter.readConfigurationData(path);\r
-            val initialOperational = dataReadRouter.readOperationalData(path);\r
-            val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
-            listener.onDataChanged(event);\r
-            return reg;
-        ]\r
-    }\r
-\r
-    final def registerDataReader(P path, DataReader<P, D> reader) {\r
-        return withLock(registrationLock) [|\r
-            val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
-            val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
-    \r
-            return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
-        ]\r
-    }\r
-    \r
-    override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
-        val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
-        return ret;\r
-    }\r
-    \r
-    protected  def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {\r
-        return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);\r
-        \r
-    }\r
-\r
-    protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
-        return withLock(registrationLock) [|\r
-            listeners.remove(registration.path, registration);
-        ]\r
-    }\r
-\r
-    protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
-        return withLock(registrationLock) [|
-            commitHandlers.remove(registration.path, registration);\r
-             LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
-            for(listener : commitHandlerRegistrationListeners) {\r
-                try {\r
-                    listener.instance.onUnregister(registration);\r
-                } catch (Exception e) {\r
-                    LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
-                }\r
-            }
-            return null;
-        ]\r
-    }\r
-\r
-    protected final def getActiveCommitHandlers() {\r
-        return commitHandlers.entries;\r
-    }\r
-\r
-    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
-        HashSet<P> paths) {
-        return withLock(registrationLock) [|\r
-            return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
-                val operationalState = readOperationalData(key)\r
-                val configurationState = readConfigurationData(key)\r
-                return new ListenerStateCapture(key, value, operationalState, configurationState)\r
-            ].toList()
-        ]\r
-    }\r
-\r
-    protected def boolean isAffectedBy(P key, Set<P> paths) {\r
-        if (paths.contains(key)) {\r
-            return true;\r
-        }\r
-        for (path : paths) {\r
-            if (key.contains(path)) {\r
-                return true;\r
-            }\r
-        }\r
-\r
-        return false;\r
-    }\r
-\r
-    package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
-        checkNotNull(transaction);\r
-        transaction.changeStatus(TransactionStatus.SUBMITED);\r
-        val task = new TwoPhaseCommit(transaction, this);\r
-        submittedTransactionsCount.andIncrement;\r
-        return executor.submit(task);\r
-    }\r
-\r
-}\r
-\r
-@Data\r
-package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
-\r
-    @Property\r
-    P path;\r
-\r
-    @Property\r
-    Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
-\r
-    @Property\r
-    D initialOperationalState;\r
-\r
-    @Property\r
-    D initialConfigurationState;\r
-}\r
-\r
-package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
-\r
-    AbstractDataBroker<P, D, DCL> dataBroker;\r
-\r
-    @Property\r
-    val P path;\r
-\r
-    new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
-        super(instance)\r
-        dataBroker = broker;\r
-        _path = path;\r
-    }\r
-\r
-    override protected removeRegistration() {\r
-        dataBroker.removeListener(this);\r
-        dataBroker = null;\r
-    }\r
-\r
-}\r
-\r
-package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
-extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
-implements DataCommitHandlerRegistration<P, D> {\r
-\r
-    AbstractDataBroker<P, D, ?> dataBroker;\r
-\r
-    @Property\r
-    val P path;\r
-\r
-    new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
-        super(instance)\r
-        dataBroker = broker;\r
-        _path = path;\r
-    }\r
-\r
-    override protected removeRegistration() {\r
-        dataBroker.removeCommitHandler(this);\r
-        dataBroker = null;\r
-    }\r
-}\r
-\r
-package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
-\r
-    private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
-\r
-    val AbstractDataTransaction<P, D> transaction;\r
-    val AbstractDataBroker<P, D, DCL> dataBroker;\r
-    \r
-    new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
-        this.transaction = transaction;\r
-        this.dataBroker = broker;\r
-    }\r
-\r
-    override call() throws Exception {\r
-\r
-        // get affected paths\r
-        val affectedPaths = new HashSet<P>();\r
-\r
-        affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
-        affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
-        affectedPaths.addAll(transaction.removedConfigurationData);\r
-\r
-        affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
-        affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
-        affectedPaths.addAll(transaction.removedOperationalData);\r
-
-        val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
-\r
-        val transactionId = transaction.identifier;\r
-\r
-        log.trace("Transaction: {} Started.",transactionId);\r
-        log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
-        // requesting commits\r
-        val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
-        val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
-        try {\r
-            for (handler : commitHandlers) {\r
-                handlerTransactions.add(handler.requestCommit(transaction));\r
-            }\r
-        } catch (Exception e) {\r
-            log.error("Transaction: {} Request Commit failed", transactionId,e);\r
-            dataBroker.failedTransactionsCount.andIncrement\r
-            transaction.changeStatus(TransactionStatus.FAILED)
-            return rollback(handlerTransactions, e);\r
-        }\r
-        val List<RpcResult<Void>> results = new ArrayList();\r
-        try {\r
-            for (subtransaction : handlerTransactions) {\r
-                results.add(subtransaction.finish());\r
-            }\r
-            listeners.publishDataChangeEvent();\r
-        } catch (Exception e) {\r
-            log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
-            dataBroker.failedTransactionsCount.andIncrement
-            transaction.changeStatus(TransactionStatus.FAILED)\r
-            return rollback(handlerTransactions, e);\r
-        }\r
-        log.trace("Transaction: {} Finished successfully.",transactionId);\r
-        dataBroker.finishedTransactionsCount.andIncrement;
-        transaction.changeStatus(TransactionStatus.COMMITED)\r
-        return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
-\r
-    }\r
-\r
-    def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
-        dataBroker.executor.submit [|\r
-            for (listenerSet : listeners) {
-                val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
-                val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
-
-                val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
-                    listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
-                for (listener : listenerSet.listeners) {
-                    try {
-                        listener.instance.onDataChanged(changeEvent);
-
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            }        \r
-        ]\r
-    }\r
-\r
-    def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
-        for (transaction : transactions) {\r
-            transaction.rollback()\r
-        }\r
-\r
-        // FIXME return encountered error.\r
-        return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());\r
-    }\r
-}\r
-\r
-public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
-\r
-    private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
-
-    @Property\r
-    private val Object identifier;\r
-\r
-    var TransactionStatus status;\r
-\r
-    var AbstractDataBroker<P, D, ?> broker;\r
-\r
-    protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {\r
-        super(dataBroker);\r
-        _identifier = identifier;\r
-        broker = dataBroker;\r
-        status = TransactionStatus.NEW;\r
-        LOG.debug("Transaction {} Allocated.", identifier);
-\r
-    //listeners = new ListenerRegistry<>();\r
-    }\r
-\r
-    override commit() {\r
-        return broker.commit(this);\r
-    }\r
-\r
-    override readConfigurationData(P path) {\r
-        val local = this.updatedConfigurationData.get(path);\r
-        if(local != null) {\r
-            return local;\r
-        }\r
-        \r
-        return broker.readConfigurationData(path);\r
-    }\r
-\r
-    override readOperationalData(P path) {\r
-        val local = this.updatedOperationalData.get(path);\r
-        if(local != null) {\r
-            return local;\r
-        }\r
-        return broker.readOperationalData(path);\r
-    }\r
-\r
-    override hashCode() {\r
-        return identifier.hashCode;\r
-    }\r
-\r
-    override equals(Object obj) {\r
-        if (this === obj)\r
-            return true;\r
-        if (obj == null)\r
-            return false;\r
-        if (getClass() != obj.getClass())\r
-            return false;\r
-        val other = (obj as AbstractDataTransaction<P,D>);\r
-        if (broker == null) {\r
-            if (other.broker != null)\r
-                return false;\r
-        } else if (!broker.equals(other.broker))\r
-            return false;\r
-        if (identifier == null) {\r
-            if (other.identifier != null)\r
-                return false;\r
-        } else if (!identifier.equals(other.identifier))\r
-            return false;\r
-        return true;\r
-    }\r
-\r
-    override TransactionStatus getStatus() {\r
-        return status;\r
-    }\r
-\r
-    protected abstract def void onStatusChange(TransactionStatus status);\r
-\r
-    public def changeStatus(TransactionStatus status) {\r
-        LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
-        this.status = status;\r
-        onStatusChange(status);\r
-    }\r
-\r
-}\r
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataTransaction.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataTransaction.java
new file mode 100644 (file)
index 0000000..c73a627
--- /dev/null
@@ -0,0 +1,108 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.concurrent.Future;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification;
+import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("all")
+public abstract class AbstractDataTransaction<P extends Path<P>, D extends Object> extends
+        AbstractDataModification<P, D> {
+    private final static Logger LOG = LoggerFactory.getLogger(AbstractDataTransaction.class);
+
+    private final Object identifier;
+
+    @Override
+    public Object getIdentifier() {
+        return this.identifier;
+    }
+
+    private TransactionStatus status;
+
+    private final AbstractDataBroker<P, D, ? extends Object> broker;
+
+    protected AbstractDataTransaction(final Object identifier,
+            final AbstractDataBroker<P, D, ? extends Object> dataBroker) {
+        super(dataBroker);
+        this.identifier = identifier;
+        this.broker = dataBroker;
+        this.status = TransactionStatus.NEW;
+        AbstractDataTransaction.LOG.debug("Transaction {} Allocated.", identifier);
+    }
+
+    @Override
+    public Future<RpcResult<TransactionStatus>> commit() {
+        return this.broker.commit(this);
+    }
+
+    @Override
+    public D readConfigurationData(final P path) {
+        final D local = getUpdatedConfigurationData().get(path);
+        if (local != null) {
+            return local;
+        }
+        return this.broker.readConfigurationData(path);
+    }
+
+    @Override
+    public D readOperationalData(final P path) {
+        final D local = this.getUpdatedOperationalData().get(path);
+        if (local != null) {
+            return local;
+        }
+        return this.broker.readOperationalData(path);
+    }
+
+
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((identifier == null) ? 0 : identifier.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        AbstractDataTransaction other = (AbstractDataTransaction) obj;
+        if (identifier == null) {
+            if (other.identifier != null)
+                return false;
+        } else if (!identifier.equals(other.identifier))
+            return false;
+        return true;
+    }
+
+    @Override
+    public TransactionStatus getStatus() {
+        return this.status;
+    }
+
+    protected abstract void onStatusChange(final TransactionStatus status);
+
+    public void changeStatus(final TransactionStatus status) {
+        Object _identifier = this.getIdentifier();
+        AbstractDataTransaction.LOG
+                .debug("Transaction {} transitioned from {} to {}", _identifier, this.status, status);
+        this.status = status;
+        this.onStatusChange(status);
+    }
+}
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/DataChangeListenerRegistration.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/DataChangeListenerRegistration.java
new file mode 100644 (file)
index 0000000..57d511e
--- /dev/null
@@ -0,0 +1,37 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Path;
+
+@SuppressWarnings("all")
+class DataChangeListenerRegistration<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>> extends
+        AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
+    private AbstractDataBroker<P, D, DCL> dataBroker;
+
+    private final P path;
+
+    public P getPath() {
+        return this.path;
+    }
+
+    public DataChangeListenerRegistration(final P path, final DCL instance, final AbstractDataBroker<P, D, DCL> broker) {
+        super(instance);
+        this.dataBroker = broker;
+        this.path = path;
+    }
+
+    @Override
+    protected void removeRegistration() {
+        this.dataBroker.removeListener(this);
+        this.dataBroker = null;
+    }
+}
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ImmutableDataChangeEvent.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ImmutableDataChangeEvent.java
new file mode 100644 (file)
index 0000000..776ff7b
--- /dev/null
@@ -0,0 +1,226 @@
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.DataModification;
+import org.opendaylight.yangtools.concepts.Path;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+final class ImmutableDataChangeEvent<P extends Path<P>, D> implements DataChangeEvent<P,D> {
+
+    private final D updatedOperationalSubtree;
+    private final Map<P, D> updatedOperational;
+    private final D updatedConfigurationSubtree;
+    private final Map<P, D> updatedConfiguration;
+    private final Set<P> removedOperational;
+    private final Set<P> removedConfiguration;
+    private final D originalOperationalSubtree;
+    private final Map<P, D> originalOperational;
+    private final D originalConfigurationSubtree;
+    private final Map<P, D> originalConfiguration;
+    private final Map<P, D> createdOperational;
+    private final Map<P, D> createdConfiguration;
+
+
+    public ImmutableDataChangeEvent(Builder<P, D> builder) {
+
+        createdConfiguration = builder.getCreatedConfiguration().build();
+        createdOperational = builder.getCreatedOperational().build();
+        originalConfiguration = builder.getOriginalConfiguration().build();
+        originalConfigurationSubtree = builder.getOriginalConfigurationSubtree();
+        originalOperational = builder.getOriginalOperational().build();
+        originalOperationalSubtree = builder.getOriginalOperationalSubtree();
+        removedConfiguration = builder.getRemovedConfiguration().build();
+        removedOperational = builder.getRemovedOperational().build();
+        updatedConfiguration = builder.getUpdatedConfiguration().build();
+        updatedConfigurationSubtree = builder.getUpdatedConfigurationSubtree();
+        updatedOperational = builder.getUpdatedOperational().build();
+        updatedOperationalSubtree = builder.getUpdatedOperationalSubtree();
+    }
+
+    @Override
+    public Map<P, D> getCreatedConfigurationData() {
+        return createdConfiguration;
+    }
+
+    @Override
+    public Map<P, D> getCreatedOperationalData() {
+        return createdOperational;
+    }
+
+    @Override
+    public Map<P, D> getOriginalConfigurationData() {
+        return originalConfiguration;
+    }
+    @Override
+    public D getOriginalConfigurationSubtree() {
+        return originalConfigurationSubtree;
+    }
+    @Override
+    public Map<P, D> getOriginalOperationalData() {
+        return originalOperational;
+    }
+    @Override
+    public D getOriginalOperationalSubtree() {
+        return originalOperationalSubtree;
+    }
+    @Override
+    public Set<P> getRemovedConfigurationData() {
+        return removedConfiguration;
+    }
+    @Override
+    public Set<P> getRemovedOperationalData() {
+        return removedOperational;
+    }
+    @Override
+    public Map<P, D> getUpdatedConfigurationData() {
+        return updatedConfiguration;
+    }
+    @Override
+    public D getUpdatedConfigurationSubtree() {
+        return updatedConfigurationSubtree;
+    }
+    @Override
+    public Map<P, D> getUpdatedOperationalData() {
+        return updatedOperational;
+    }
+    @Override
+    public D getUpdatedOperationalSubtree() {
+        return updatedOperationalSubtree;
+    }
+
+    static final <P extends Path<P>,D> Builder<P, D> builder() {
+        return new Builder<>();
+    }
+
+    static final class Builder<P extends Path<P>,D> {
+
+        private  D updatedOperationalSubtree;
+        private  D originalOperationalSubtree;
+        private  D originalConfigurationSubtree;
+        private  D updatedConfigurationSubtree;
+
+        private final ImmutableMap.Builder<P, D> updatedOperational = ImmutableMap.builder();
+        private final ImmutableMap.Builder<P, D> updatedConfiguration = ImmutableMap.builder();
+        private final ImmutableSet.Builder<P> removedOperational = ImmutableSet.builder();
+        private final ImmutableSet.Builder<P> removedConfiguration = ImmutableSet.builder();
+        private final ImmutableMap.Builder<P, D> originalOperational = ImmutableMap.builder();
+
+        private final ImmutableMap.Builder<P, D> originalConfiguration = ImmutableMap.builder();
+        private final ImmutableMap.Builder<P, D> createdOperational = ImmutableMap.builder();
+        private final ImmutableMap.Builder<P, D> createdConfiguration = ImmutableMap.builder();
+
+
+        protected Builder<P,D> addTransaction(DataModification<P, D> data, Predicate<P> keyFilter) {
+            updatedOperational.putAll(Maps.filterKeys(data.getUpdatedOperationalData(), keyFilter));
+            updatedConfiguration.putAll(Maps.filterKeys(data.getUpdatedConfigurationData(), keyFilter));
+            originalConfiguration.putAll(Maps.filterKeys(data.getOriginalConfigurationData(), keyFilter));
+            originalOperational.putAll(Maps.filterKeys(data.getOriginalOperationalData(), keyFilter));
+            createdOperational.putAll(Maps.filterKeys(data.getCreatedOperationalData(), keyFilter));
+            createdConfiguration.putAll(Maps.filterKeys(data.getCreatedConfigurationData(), keyFilter));
+            return this;
+        }
+
+        protected Builder<P, D> addConfigurationChangeSet(RootedChangeSet<P, D> changeSet) {
+            if(changeSet == null) {
+                return this;
+            }
+
+            originalConfiguration.putAll(changeSet.getOriginal());
+            createdConfiguration.putAll(changeSet.getCreated());
+            updatedConfiguration.putAll(changeSet.getUpdated());
+            removedConfiguration.addAll(changeSet.getRemoved());
+            return this;
+        }
+
+        protected Builder<P, D> addOperationalChangeSet(RootedChangeSet<P, D> changeSet) {
+            if(changeSet == null) {
+                return this;
+            }
+            originalOperational.putAll(changeSet.getOriginal());
+            createdOperational.putAll(changeSet.getCreated());
+            updatedOperational.putAll(changeSet.getUpdated());
+            removedOperational.addAll(changeSet.getRemoved());
+            return this;
+        }
+
+        protected ImmutableDataChangeEvent<P, D> build() {
+            return new ImmutableDataChangeEvent<P,D>(this);
+        }
+
+        protected D getUpdatedOperationalSubtree() {
+            return updatedOperationalSubtree;
+        }
+
+        protected Builder<P, D> setUpdatedOperationalSubtree(D updatedOperationalSubtree) {
+            this.updatedOperationalSubtree = updatedOperationalSubtree;
+            return this;
+        }
+
+        protected D getOriginalOperationalSubtree() {
+            return originalOperationalSubtree;
+        }
+
+        protected Builder<P,D> setOriginalOperationalSubtree(D originalOperationalSubtree) {
+            this.originalOperationalSubtree = originalOperationalSubtree;
+            return this;
+        }
+
+        protected D getOriginalConfigurationSubtree() {
+            return originalConfigurationSubtree;
+        }
+
+        protected Builder<P, D> setOriginalConfigurationSubtree(D originalConfigurationSubtree) {
+            this.originalConfigurationSubtree = originalConfigurationSubtree;
+            return this;
+        }
+
+        protected D getUpdatedConfigurationSubtree() {
+            return updatedConfigurationSubtree;
+        }
+
+        protected Builder<P,D> setUpdatedConfigurationSubtree(D updatedConfigurationSubtree) {
+            this.updatedConfigurationSubtree = updatedConfigurationSubtree;
+            return this;
+        }
+
+        protected ImmutableMap.Builder<P, D> getUpdatedOperational() {
+            return updatedOperational;
+        }
+
+        protected ImmutableMap.Builder<P, D> getUpdatedConfiguration() {
+            return updatedConfiguration;
+        }
+
+        protected ImmutableSet.Builder<P> getRemovedOperational() {
+            return removedOperational;
+        }
+
+        protected ImmutableSet.Builder<P> getRemovedConfiguration() {
+            return removedConfiguration;
+        }
+
+        protected ImmutableMap.Builder<P, D> getOriginalOperational() {
+            return originalOperational;
+        }
+
+        protected ImmutableMap.Builder<P, D> getOriginalConfiguration() {
+            return originalConfiguration;
+        }
+
+        protected ImmutableMap.Builder<P, D> getCreatedOperational() {
+            return createdOperational;
+        }
+
+        protected ImmutableMap.Builder<P, D> getCreatedConfiguration() {
+            return createdConfiguration;
+        }
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ListenerStateCapture.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ListenerStateCapture.java
new file mode 100644 (file)
index 0000000..502ca90
--- /dev/null
@@ -0,0 +1,118 @@
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.Map;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.DataModification;
+import org.opendaylight.yangtools.concepts.Path;
+
+import com.google.common.base.Predicate;
+
+public final class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
+
+    final P path;
+
+    final Iterable<DataChangeListenerRegistration<P, D, DCL>> listeners;
+
+    D initialOperationalState;
+
+    D initialConfigurationState;
+
+    D finalConfigurationState;
+
+    D finalOperationalState;
+
+    Map<P, D> additionalConfigOriginal;
+    Map<P, D> additionalConfigCreated;
+    Map<P, D> additionalConfigUpdated;
+    Map<P, D> additionalConfigDeleted;
+
+    Map<P, D> additionalOperOriginal;
+    Map<P, D> additionalOperCreated;
+    Map<P, D> additionalOperUpdated;
+    Map<P, D> additionalOperDeleted;
+
+    RootedChangeSet<P, D> normalizedConfigurationChanges;
+    RootedChangeSet<P, D> normalizedOperationalChanges;
+
+    private final Predicate<P> containsPredicate;
+
+    public ListenerStateCapture(P path, Iterable<DataChangeListenerRegistration<P, D, DCL>> listeners,
+            Predicate<P> containsPredicate) {
+        super();
+        this.path = path;
+        this.listeners = listeners;
+        this.containsPredicate = containsPredicate;
+    }
+
+    protected D getInitialOperationalState() {
+        return initialOperationalState;
+    }
+
+    protected void setInitialOperationalState(D initialOperationalState) {
+        this.initialOperationalState = initialOperationalState;
+    }
+
+    protected D getInitialConfigurationState() {
+        return initialConfigurationState;
+    }
+
+    protected void setInitialConfigurationState(D initialConfigurationState) {
+        this.initialConfigurationState = initialConfigurationState;
+    }
+
+    protected P getPath() {
+        return path;
+    }
+
+    protected Iterable<DataChangeListenerRegistration<P, D, DCL>> getListeners() {
+        return listeners;
+    }
+
+    protected D getFinalConfigurationState() {
+        return finalConfigurationState;
+    }
+
+    protected void setFinalConfigurationState(D finalConfigurationState) {
+        this.finalConfigurationState = finalConfigurationState;
+    }
+
+    protected D getFinalOperationalState() {
+        return finalOperationalState;
+    }
+
+    protected void setFinalOperationalState(D finalOperationalState) {
+        this.finalOperationalState = finalOperationalState;
+    }
+
+    protected RootedChangeSet<P, D> getNormalizedConfigurationChanges() {
+        return normalizedConfigurationChanges;
+    }
+
+    protected void setNormalizedConfigurationChanges(RootedChangeSet<P, D> normalizedConfigurationChanges) {
+        this.normalizedConfigurationChanges = normalizedConfigurationChanges;
+    }
+
+    protected RootedChangeSet<P, D> getNormalizedOperationalChanges() {
+        return normalizedOperationalChanges;
+    }
+
+    protected void setNormalizedOperationalChanges(RootedChangeSet<P, D> normalizedOperationalChange) {
+        this.normalizedOperationalChanges = normalizedOperationalChange;
+    }
+
+    protected DataChangeEvent<P, D> createEvent(DataModification<P, D> modification) {
+        return ImmutableDataChangeEvent.<P, D> builder()//
+                .addTransaction(modification, containsPredicate) //
+                .addConfigurationChangeSet(normalizedConfigurationChanges) //
+                .addOperationalChangeSet(normalizedOperationalChanges) //
+                .setOriginalConfigurationSubtree(initialConfigurationState) //
+                .setOriginalOperationalSubtree(initialOperationalState) //
+                .setUpdatedConfigurationSubtree(finalConfigurationState) //
+                .setUpdatedOperationalSubtree(finalOperationalState) //
+                .build();
+
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/RootedChangeSet.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/RootedChangeSet.java
new file mode 100644 (file)
index 0000000..e052565
--- /dev/null
@@ -0,0 +1,66 @@
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.opendaylight.yangtools.concepts.Path;
+
+public class RootedChangeSet<P extends Path<P>,D> {
+
+    private final P root;
+    private final Map<P,D> original;
+    private final Map<P,D> created = new HashMap<>();
+    private final Map<P,D> updated = new HashMap<>();
+    private final Set<P> removed = new HashSet<>();
+
+
+
+    public RootedChangeSet(P root,Map<P, D> original) {
+        super();
+        this.root = root;
+        this.original = original;
+    }
+
+    protected P getRoot() {
+        return root;
+    }
+
+    protected Map<P, D> getOriginal() {
+        return original;
+    }
+
+    protected Map<P, D> getCreated() {
+        return created;
+    }
+
+    protected Map<P, D> getUpdated() {
+        return updated;
+    }
+
+    protected Set<P> getRemoved() {
+        return removed;
+    }
+
+    public void addCreated(Map<P,D> created) {
+        this.created.putAll(created);
+    }
+
+    public void addCreated(Entry<P,D> entry) {
+        created.put(entry.getKey(), entry.getValue());
+    }
+
+    public void addUpdated(Entry<P,D> entry) {
+        updated.put(entry.getKey(), entry.getValue());
+    }
+
+    public void addRemoval(P path) {
+        removed.add(path);
+    }
+
+    public boolean isChange() {
+        return !created.isEmpty() || !updated.isEmpty() || !removed.isEmpty();
+    }
+}
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/TwoPhaseCommit.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/TwoPhaseCommit.java
new file mode 100644 (file)
index 0000000..e99fc0f
--- /dev/null
@@ -0,0 +1,237 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TwoPhaseCommit<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>> implements
+        Callable<RpcResult<TransactionStatus>> {
+    private final static Logger log = LoggerFactory.getLogger(TwoPhaseCommit.class);
+
+    private final AbstractDataTransaction<P, D> transaction;
+
+    private final AbstractDataBroker<P, D, DCL> dataBroker;
+
+    public TwoPhaseCommit(final AbstractDataTransaction<P, D> transaction, final AbstractDataBroker<P, D, DCL> broker) {
+        this.transaction = transaction;
+        this.dataBroker = broker;
+    }
+
+    @Override
+    public RpcResult<TransactionStatus> call() throws Exception {
+        final Object transactionId = this.transaction.getIdentifier();
+
+        Set<P> changedPaths = ImmutableSet.<P> builder().addAll(transaction.getUpdatedConfigurationData().keySet())
+                .addAll(transaction.getCreatedConfigurationData().keySet())
+                .addAll(transaction.getRemovedConfigurationData())
+                .addAll(transaction.getUpdatedOperationalData().keySet())
+                .addAll(transaction.getCreatedOperationalData().keySet())
+                .addAll(transaction.getRemovedOperationalData()).build();
+
+        log.trace("Transaction: {} Affected Subtrees:", transactionId, changedPaths);
+
+        final ImmutableList.Builder<ListenerStateCapture<P, D, DCL>> listenersBuilder = ImmutableList.builder();
+        listenersBuilder.addAll(dataBroker.affectedListeners(changedPaths));
+        filterProbablyAffectedListeners(dataBroker.probablyAffectedListeners(changedPaths),listenersBuilder);
+
+
+
+        final ImmutableList<ListenerStateCapture<P, D, DCL>> listeners = listenersBuilder.build();
+        final Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(changedPaths);
+        captureInitialState(listeners);
+
+
+        log.trace("Transaction: {} Starting Request Commit.",transactionId);
+        final List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList<>();
+        try {
+            for (final DataCommitHandler<P, D> handler : commitHandlers) {
+                DataCommitTransaction<P, D> requestCommit = handler.requestCommit(this.transaction);
+                if (requestCommit != null) {
+                    handlerTransactions.add(requestCommit);
+                } else {
+                    log.debug("Transaction: {}, Handler {}  is not participating in transaction.", transactionId,
+                            handler);
+                }
+            }
+        } catch (Exception e) {
+            log.error("Transaction: {} Request Commit failed", transactionId, e);
+            dataBroker.getFailedTransactionsCount().getAndIncrement();
+            this.transaction.changeStatus(TransactionStatus.FAILED);
+            return this.rollback(handlerTransactions, e);
+
+        }
+
+        log.trace("Transaction: {} Starting Finish.",transactionId);
+        final List<RpcResult<Void>> results = new ArrayList<RpcResult<Void>>();
+        try {
+            for (final DataCommitTransaction<P, D> subtransaction : handlerTransactions) {
+                results.add(subtransaction.finish());
+            }
+        } catch (Exception e) {
+            log.error("Transaction: {} Finish Commit failed", transactionId, e);
+            dataBroker.getFailedTransactionsCount().getAndIncrement();
+            transaction.changeStatus(TransactionStatus.FAILED);
+            return this.rollback(handlerTransactions, e);
+        }
+
+
+        dataBroker.getFinishedTransactionsCount().getAndIncrement();
+        transaction.changeStatus(TransactionStatus.COMMITED);
+
+        log.trace("Transaction: {} Finished successfully.", transactionId);
+
+        captureFinalState(listeners);
+
+        log.trace("Transaction: {} Notifying listeners.");
+
+        publishDataChangeEvent(listeners);
+        return Rpcs.<TransactionStatus> getRpcResult(true, TransactionStatus.COMMITED,
+                Collections.<RpcError> emptySet());
+    }
+
+    private void captureInitialState(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
+        for (ListenerStateCapture<P, D, DCL> state : listeners) {
+            state.setInitialConfigurationState(dataBroker.readConfigurationData(state.getPath()));
+            state.setInitialOperationalState(dataBroker.readOperationalData(state.getPath()));
+        }
+    }
+
+
+    private void captureFinalState(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
+        for (ListenerStateCapture<P, D, DCL> state : listeners) {
+            state.setFinalConfigurationState(dataBroker.readConfigurationData(state.getPath()));
+            state.setFinalOperationalState(dataBroker.readOperationalData(state.getPath()));
+        }
+    }
+
+    private void filterProbablyAffectedListeners(
+            ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners, Builder<ListenerStateCapture<P, D, DCL>> reallyAffected) {
+
+        for(ListenerStateCapture<P, D, DCL> listenerSet : probablyAffectedListeners) {
+            P affectedPath = listenerSet.getPath();
+            Optional<RootedChangeSet<P,D>> configChange = resolveConfigChange(affectedPath);
+            Optional<RootedChangeSet<P, D>> operChange = resolveOperChange(affectedPath);
+
+            if(configChange.isPresent() || operChange.isPresent()) {
+                reallyAffected.add(listenerSet);
+                if(configChange.isPresent()) {
+                    listenerSet.setNormalizedConfigurationChanges(configChange.get());
+                }
+
+                if(operChange.isPresent()) {
+                    listenerSet.setNormalizedOperationalChanges(operChange.get());
+                }
+            }
+        }
+    }
+
+    private Optional<RootedChangeSet<P, D>> resolveOperChange(P affectedPath) {
+        Map<P, D> originalOper = dataBroker.deepGetBySubpath(transaction.getOriginalOperationalData(),affectedPath);
+        Map<P, D> createdOper = dataBroker.deepGetBySubpath(transaction.getCreatedOperationalData(),affectedPath);
+        Map<P, D> updatedOper = dataBroker.deepGetBySubpath(transaction.getUpdatedOperationalData(),affectedPath);
+        Set<P> removedOper = Sets.filter(transaction.getRemovedOperationalData(), dataBroker.createIsContainedPredicate(affectedPath));
+        return resolveChanges(affectedPath,originalOper,createdOper,updatedOper,removedOper);
+    }
+
+    private Optional<RootedChangeSet<P, D>> resolveConfigChange(P affectedPath) {
+        Map<P, D> originalConfig = dataBroker.deepGetBySubpath(transaction.getOriginalConfigurationData(),affectedPath);
+        Map<P, D> createdConfig = dataBroker.deepGetBySubpath(transaction.getCreatedConfigurationData(),affectedPath);
+        Map<P, D> updatedConfig = dataBroker.deepGetBySubpath(transaction.getUpdatedConfigurationData(),affectedPath);
+        Set<P> removedConfig = Sets.filter(transaction.getRemovedConfigurationData(), dataBroker.createIsContainedPredicate(affectedPath));
+        return resolveChanges(affectedPath,originalConfig,createdConfig,updatedConfig,removedConfig);
+    }
+
+    private Optional<RootedChangeSet<P,D>> resolveChanges(P affectedPath, Map<P, D> originalConfig, Map<P, D> createdConfig, Map<P, D> updatedConfig,Set<P> potentialDeletions) {
+        Predicate<P> isContained = dataBroker.createIsContainedPredicate(affectedPath);
+
+        if(createdConfig.isEmpty() && updatedConfig.isEmpty() && potentialDeletions.isEmpty()) {
+            return Optional.absent();
+        }
+        RootedChangeSet<P, D> changeSet = new RootedChangeSet<P,D>(affectedPath,originalConfig);
+        changeSet.addCreated(createdConfig);
+
+        for(Entry<P, D> entry : updatedConfig.entrySet()) {
+            if(originalConfig.containsKey(entry.getKey())) {
+                changeSet.addUpdated(entry);
+            } else {
+                changeSet.addCreated(entry);
+            }
+        }
+
+        for(Entry<P,D> entry : originalConfig.entrySet()) {
+            for(P deletion : potentialDeletions) {
+                if(isContained.apply(deletion)) {
+                    changeSet.addRemoval(entry.getKey());
+                }
+            }
+        }
+
+        if(changeSet.isChange()) {
+            return Optional.of(changeSet);
+        } else {
+            return Optional.absent();
+        }
+
+    }
+
+    public void publishDataChangeEvent(final ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
+        ExecutorService executor = this.dataBroker.getExecutor();
+        final Runnable notifyTask = new Runnable() {
+            @Override
+            public void run() {
+                for (final ListenerStateCapture<P, D, DCL> listenerSet : listeners) {
+                    {
+                        DataChangeEvent<P, D> changeEvent = listenerSet.createEvent(transaction);
+                        for (final DataChangeListenerRegistration<P, D, DCL> listener : listenerSet.getListeners()) {
+                            try {
+                                listener.getInstance().onDataChanged(changeEvent);
+                            } catch (Exception e) {
+                                log.error("Unhandled exception when invoking listener {}", listener);
+                            }
+                        }
+                    }
+                }
+            }
+        };
+        executor.submit(notifyTask);
+    }
+
+    public RpcResult<TransactionStatus> rollback(final List<DataCommitTransaction<P, D>> transactions, final Exception e) {
+        for (final DataCommitTransaction<P, D> transaction : transactions) {
+            transaction.rollback();
+        }
+        Set<RpcError> _emptySet = Collections.<RpcError> emptySet();
+        return Rpcs.<TransactionStatus> getRpcResult(false, TransactionStatus.FAILED, _emptySet);
+    }
+}
index b32d906d1e70cc8dc5997fea3c57c3379943b6b5..b02a37c3003e594b728554f6c5a0124961872ce3 100644 (file)
@@ -30,6 +30,7 @@ import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.opendaylight.yangtools.yang.model.api.SchemaServiceListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,6 +42,7 @@ import com.google.common.collect.ImmutableSet;
 public class SchemaAwareDataStoreAdapter extends AbstractLockableDelegator<DataStore> implements //
         DataStore, //
         SchemaServiceListener, //
+        SchemaContextListener, //
         AutoCloseable {
 
     private final static Logger LOG = LoggerFactory.getLogger(SchemaAwareDataStoreAdapter.class);