Merge "Fix issue where NodeConnector ADDED events were propagated as NodeConnector...
authorTony Tkacik <ttkacik@cisco.com>
Sat, 15 Feb 2014 01:32:32 +0000 (01:32 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 15 Feb 2014 01:32:32 +0000 (01:32 +0000)
27 files changed:
opendaylight/configuration/api/src/main/java/org/opendaylight/controller/configuration/IConfigurationContainerService.java
opendaylight/configuration/implementation/src/main/java/org/opendaylight/controller/configuration/internal/ConfigurationService.java
opendaylight/configuration/implementation/src/main/java/org/opendaylight/controller/configuration/internal/ContainerConfigurationService.java
opendaylight/containermanager/implementation/src/main/java/org/opendaylight/controller/containermanager/internal/ContainerManager.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/DataBrokerImplModule.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolder.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataBrokerImpl.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/PutAugmentationTest.java
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/routing/RoutingUtils.java
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend [deleted file]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/DataChangeListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ImmutableDataChangeEvent.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ListenerStateCapture.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/RootedChangeSet.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/TwoPhaseCommit.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcProvisionRegistry.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerImpl.xtend
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/ConsumerContextImpl.xtend
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareDataStoreAdapter.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/osgi/RpcProvisionRegistryProxy.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java
opendaylight/samples/simpleforwarding/src/main/java/org/opendaylight/controller/samples/simpleforwarding/internal/SimpleBroadcastHandlerImpl.java
opendaylight/switchmanager/implementation/src/main/java/org/opendaylight/controller/switchmanager/internal/SwitchManager.java

index 2123f6b9eb9c1004d6dcc2dd34cf26726168c153..ee571b83e1c9c4e919eb688f0f965ab633219b21 100644 (file)
@@ -13,4 +13,12 @@ package org.opendaylight.controller.configuration;
  * Container configuration service
  */
 public interface IConfigurationContainerService extends IConfigurationServiceCommon {
+
+    /**
+     * Bundle will call this function to ask ContainerConfigurationService to provide the
+     * directory location of container
+     *
+     * @return The path to active container directory
+     */
+    String getConfigurationRoot();
 }
index e4d55d11fb39e126d2dbb99f0ceb0cb6672d7069..4c0f3a2da5f08b409e6c751e9e8828359ce7e850 100644 (file)
@@ -9,6 +9,7 @@
 
 package org.opendaylight.controller.configuration.internal;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -25,6 +26,7 @@ import org.opendaylight.controller.clustering.services.IClusterServices;
 import org.opendaylight.controller.configuration.ConfigurationEvent;
 import org.opendaylight.controller.configuration.ConfigurationObject;
 import org.opendaylight.controller.configuration.IConfigurationAware;
+import org.opendaylight.controller.configuration.IConfigurationContainerService;
 import org.opendaylight.controller.configuration.IConfigurationService;
 import org.opendaylight.controller.sal.utils.GlobalConstants;
 import org.opendaylight.controller.sal.utils.IObjectReader;
@@ -46,7 +48,7 @@ public class ConfigurationService implements IConfigurationService, ICacheUpdate
     private static final Logger logger = LoggerFactory
             .getLogger(ConfigurationService.class);
     public static final String SAVE_EVENT_CACHE = "config.event.save";
-    private static final Object ROOT = GlobalConstants.STARTUPHOME.toString();
+    private static final String ROOT = GlobalConstants.STARTUPHOME.toString();
     private IClusterGlobalServices clusterServices;
     private ConcurrentMap <ConfigurationEvent, String> configEvent;
     private Set<IConfigurationAware> configurationAwareList = Collections
@@ -105,21 +107,66 @@ public class ConfigurationService implements IConfigurationService, ICacheUpdate
         return saveConfigurationsInternal();
     }
 
+
+    private List<String> getContainerDirectoryList() {
+        List<String> containerList = new ArrayList<String>();
+        for (IConfigurationAware configurationAware : this.configurationAwareList) {
+            if (configurationAware instanceof IConfigurationContainerService) {
+                String containerFilePath = ((ContainerConfigurationService)configurationAware).getConfigurationRoot();
+                containerList.add(containerFilePath);
+            }
+        }
+        return containerList;
+    }
+
+    private void createContainerDirectory(IConfigurationAware configurationAware) {
+        String containerFilePath = ((ContainerConfigurationService) configurationAware).getConfigurationRoot();
+        if (!new File(containerFilePath).exists()) {
+            boolean created = new File(containerFilePath).mkdir();
+            if (!created) {
+               logger.error("Failed to create startup config directory: {}", containerFilePath);
+            }
+        }
+    }
+
+    private void clearStaleContainerDirectories() {
+        List<String> activeContainers = getContainerDirectoryList();
+        for (File file : new File(ROOT).listFiles()) {
+            if (file.isDirectory() && !activeContainers.contains(file.toPath() + File.separator)) {
+                logger.trace("Removing directory for container {}", file.getName());
+                for (File innerFile : file.listFiles()) {
+                      innerFile.delete();
+                }
+                boolean removed = file.delete();
+                if (!removed) {
+                   logger.warn("Failed to remove stale directory: {}", file.getName());
+                }
+            }
+        }
+    }
+
+
     private Status saveConfigurationsInternal() {
         boolean success = true;
         for (IConfigurationAware configurationAware : configurationAwareList) {
+            if (configurationAware instanceof IConfigurationContainerService) {
+                // Create directory for new containers
+                createContainerDirectory(configurationAware);
+            }
             Status status = configurationAware.saveConfiguration();
             if (!status.isSuccess()) {
                 success = false;
-                logger.warn("Failed to save config for {}",
-                        configurationAware.getClass().getName());
+                logger.warn("Failed to save config for {}", configurationAware.getClass().getName());
             }
         }
+        // Remove startup directories of containers that were removed from
+        // the configuration but not saved
+        clearStaleContainerDirectories();
+
         if (success) {
             return new Status(StatusCode.SUCCESS);
         } else {
-            return new Status(StatusCode.INTERNALERROR,
-                    "Failed to Save All Configurations");
+            return new Status(StatusCode.INTERNALERROR, "Failed to Save All Configurations");
         }
     }
 
index 9c1d391daa7b8e80eec73ce454e30a6b768efe4d..3e067254edb721cd6e5b7adb109c1e96badbadb2 100644 (file)
@@ -9,7 +9,6 @@
 
 package org.opendaylight.controller.configuration.internal;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Dictionary;
@@ -52,14 +51,10 @@ public class ContainerConfigurationService implements IConfigurationContainerSer
     private static final Logger logger = LoggerFactory.getLogger(ContainerConfigurationService.class);
     private IClusterContainerServices clusterServices;
     private ConcurrentMap <ConfigurationEvent, String> containerConfigEvent;
-    /*
-     * Collection containing the configuration objects.
-     * This is configuration world: container names (also the map key)
-     * are maintained as they were configured by user, same case
-     */
+    // Directory which contains the startup files for this container
+    private String root;
     private Set<IConfigurationContainerAware> configurationAwareList = Collections
             .synchronizedSet(new HashSet<IConfigurationContainerAware>());
-    private String root;
     private ObjectReader objReader;
     private ObjectWriter objWriter;
 
@@ -93,14 +88,9 @@ public class ContainerConfigurationService implements IConfigurationContainerSer
 
     void init(Component c) {
         Dictionary<?, ?> props = c.getServiceProperties();
-        String containerName = (props != null) ? (String) props.get("containerName") : GlobalConstants.DEFAULT.toString();
-        root = String.format("%s%s/", GlobalConstants.STARTUPHOME.toString(), containerName);
-        if (!new File(root).exists()) {
-            boolean created = new File(root).mkdir();
-            if (!created) {
-                logger.error("Failed to create startup config directory for container {}", containerName);
-            }
-        }
+        String containerName = (props != null) ? (String) props.get("containerName") :
+            GlobalConstants.DEFAULT.toString();
+        root =  String.format("%s%s/", GlobalConstants.STARTUPHOME.toString(), containerName);
     }
 
     public void start() {
@@ -119,17 +109,18 @@ public class ContainerConfigurationService implements IConfigurationContainerSer
      * Function called by the dependency manager before Container is Stopped and Destroyed.
      */
     public void containerStop() {
-        // Remove container directory along with its startup files
-        File[] files = new File(root).listFiles();
-        for (File file : files) {
-            file.delete();
-        }
-        new File(root).delete();
+        // Do nothing
+    }
+
+    @Override
+    public String getConfigurationRoot() {
+        return root;
     }
 
     @Override
     public Status saveConfiguration() {
         boolean success = true;
+
         for (IConfigurationContainerAware configurationAware : configurationAwareList) {
             logger.trace("Save Config triggered for {}", configurationAware.getClass().getSimpleName());
 
index ad897fd6899b01a2645aee018198fd0b8742129a..0fee183b67b8c1be06725b310c4111fbf3f9f8a8 100644 (file)
@@ -9,7 +9,6 @@
 
 package org.opendaylight.controller.containermanager.internal;
 
-import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -751,26 +750,6 @@ public class ContainerManager extends Authorization<String> implements IContaine
         return status;
     }
 
-    private void removeComponentsStartUpfiles(String containerName) {
-        String startupLocation = String.format("./%s", GlobalConstants.STARTUPHOME.toString());
-        String containerPrint = String.format("_%s.", containerName.toLowerCase(Locale.ENGLISH));
-
-        File directory = new File(startupLocation);
-        String[] fileList = directory.list();
-
-        logger.trace("Deleting startup configuration files for container {}", containerName);
-        if (fileList != null) {
-            for (String fileName : fileList) {
-                if (fileName.contains(containerPrint)) {
-                    String fullPath = String.format("%s/%s", startupLocation, fileName);
-                    File file = new File(fullPath);
-                    boolean done = file.delete();
-                    logger.trace("{} {}", (done ? "Deleted: " : "Failed to delete: "), fileName);
-                }
-            }
-        }
-    }
-
     /**
      * Create and initialize default all resource group and create association
      * with default well known users and profiles, if not already learnt from
@@ -1013,19 +992,6 @@ public class ContainerManager extends Authorization<String> implements IContaine
         notifyContainerModeChange(delete, notifyLocal);
         // Notify listeners
         notifyContainerAwareListeners(container, delete);
-
-        /*
-         * This is a quick fix until configuration service becomes the
-         * centralized configuration management place. Here container manager
-         * will remove the startup files for all the bundles that are present in
-         * the container being deleted. Do the cleanup here in Container manger
-         * as do not want to put this temporary code in Configuration manager
-         * yet which is ODL.
-         */
-        if (delete) {
-            // TODO: remove when Config Mgr takes over
-            removeComponentsStartUpfiles(containerName);
-        }
     }
 
     private void notifyContainerEntryChangeInternal(String containerName, List<NodeConnector> ncList, UpdateType update, boolean notifyLocal) {
index 286b0c378ca7183589e3001105db2be973e8222a..7357926b9e6226a612bd0fc453dc28b737cff5b8 100644 (file)
@@ -7,17 +7,17 @@
  */
 package org.opendaylight.controller.config.yang.md.sal.binding.impl;\r
 \r
-import java.util.concurrent.ExecutorService;\r
-\r
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;\r
-import org.opendaylight.controller.sal.binding.impl.RootDataBrokerImpl;\r
-import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingDomConnectorDeployer;\r
-import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentConnector;\r
-import org.opendaylight.controller.sal.binding.impl.forward.DomForwardedDataBrokerImpl;\r
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;\r
-import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;\r
-import org.osgi.framework.BundleContext;\r
-import org.osgi.framework.ServiceReference;\r
+import java.util.concurrent.ExecutorService;
+
+import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
+import org.opendaylight.controller.sal.binding.impl.RootDataBrokerImpl;
+import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingDomConnectorDeployer;
+import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentConnector;
+import org.opendaylight.controller.sal.binding.impl.forward.DomForwardedDataBrokerImpl;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
 \r
 /**\r
 *\r
@@ -57,14 +57,14 @@ public final class DataBrokerImplModule extends
             dataBindingBroker = createStandAloneBroker(listeningExecutor);\r
         }\r
         dataBindingBroker.registerRuntimeBean(getRootRuntimeBeanRegistratorWrapper());\r
-\r
+        dataBindingBroker.setNotificationExecutor(SingletonHolder.getDefaultChangeEventExecutor());\r
         return dataBindingBroker;\r
     }\r
     private BindingIndependentMappingService resolveMappingServiceDependency() {\r
         if(getMappingService() != null) {\r
             return getMappingServiceDependency();\r
         }\r
-        \r
+\r
         ServiceReference<BindingIndependentMappingService> potentialMappingService = bundleContext.getServiceReference(BindingIndependentMappingService.class);\r
         if(potentialMappingService != null) {\r
             return bundleContext.getService(potentialMappingService);\r
index 291677a79a4fba3a16d27e1929e1fd23e2e14743..a0bbb28d9e07624e23ad0afd094c347c27d2fce2 100644 (file)
@@ -29,6 +29,7 @@ public class SingletonHolder {
     public static final NotificationInvokerFactory INVOKER_FACTORY = RPC_GENERATOR_IMPL.getInvokerFactory();
     private static ListeningExecutorService NOTIFICATION_EXECUTOR = null;
     private static ListeningExecutorService COMMIT_EXECUTOR = null;
+    private static ListeningExecutorService CHANGE_EVENT_EXECUTOR = null;
 
     public static synchronized final ListeningExecutorService getDefaultNotificationExecutor() {
         if (NOTIFICATION_EXECUTOR == null) {
@@ -64,4 +65,21 @@ public class SingletonHolder {
         ExecutorService executor = Executors.newCachedThreadPool(factory);
         return MoreExecutors.listeningDecorator(executor);
     }
+
+    public static ExecutorService getDefaultChangeEventExecutor() {
+        if (CHANGE_EVENT_EXECUTOR == null) {
+            ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("md-sal-binding-change-%d").build();
+            /*
+             * FIXME: this used to be newCacheThreadPool(), but MD-SAL does not have transaction
+             *        ordering guarantees, which means that using a concurrent threadpool results
+             *        in application data being committed in random order, potentially resulting
+             *        in inconsistent data being present. Once proper primitives are introduced,
+             *        concurrency can be reintroduced.
+             */
+            ExecutorService executor = Executors.newSingleThreadExecutor(factory);
+            CHANGE_EVENT_EXECUTOR  = MoreExecutors.listeningDecorator(executor);
+        }
+
+        return CHANGE_EVENT_EXECUTOR;
+    }
 }
index ddf67719dc03f8c9311f565fbf78c98f8ba107cd..16d5a24cb5b7c80364edd8dd5dbe4f8b59e6aa97 100644 (file)
@@ -7,7 +7,8 @@
  */
 package org.opendaylight.controller.sal.binding.impl;\r
 \r
-import java.util.Set;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -19,12 +20,46 @@ import org.opendaylight.controller.sal.common.DataStoreIdentifier;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.DataRoot;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.util.DataObjectReadingUtil;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+import com.google.common.collect.Maps;
 \r
 \r
 public class DataBrokerImpl extends AbstractDataBroker<InstanceIdentifier<? extends DataObject>, DataObject, DataChangeListener> //\r
        implements DataProviderService, AutoCloseable {\r
 \r
+    private final static class ContainsWildcarded implements Predicate<InstanceIdentifier<? extends DataObject>> {
+
+        private final  InstanceIdentifier<? extends DataObject> key;
+
+        public ContainsWildcarded(InstanceIdentifier<? extends DataObject> key) {
+            this.key = key;
+        }
+
+        @Override
+        public boolean apply(InstanceIdentifier<? extends DataObject> input) {
+            return key.containsWildcarded(input);
+        }
+    }
+
+    private final static class IsContainedWildcarded implements Predicate<InstanceIdentifier<? extends DataObject>> {
+
+        private final  InstanceIdentifier<? extends DataObject> key;
+
+        public IsContainedWildcarded(InstanceIdentifier<? extends DataObject> key) {
+            this.key = key;
+        }
+
+        @Override
+        public boolean apply(InstanceIdentifier<? extends DataObject> input) {
+            return input.containsWildcarded(key);
+        }
+    }
+
     private final AtomicLong nextTransaction = new AtomicLong();\r
     private final AtomicLong createdTransactionsCount = new AtomicLong();\r
 \r
@@ -110,16 +145,33 @@ public class DataBrokerImpl extends AbstractDataBroker<InstanceIdentifier<? exte
     }
 
     @Override
-    protected boolean isAffectedBy(InstanceIdentifier<? extends DataObject> key,
-            Set<InstanceIdentifier<? extends DataObject>> paths) {
-        if (paths.contains(key)) {
-            return true;
-        }
-        for (InstanceIdentifier<?> path : paths) {
-            if (key.containsWildcarded(path)) {
-                return true;
+    protected Predicate<InstanceIdentifier<? extends DataObject>> createContainsPredicate(final
+            InstanceIdentifier<? extends DataObject> key) {
+        return new ContainsWildcarded(key);
+    }
+
+    @Override
+    protected Predicate<InstanceIdentifier<? extends DataObject>> createIsContainedPredicate(final
+            InstanceIdentifier<? extends DataObject> key) {
+        return new IsContainedWildcarded(key);
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    protected Map<InstanceIdentifier<? extends DataObject>, DataObject> deepGetBySubpath(
+            Map<InstanceIdentifier<? extends DataObject>, DataObject> dataSet,
+            InstanceIdentifier<? extends DataObject> path) {
+        Builder<InstanceIdentifier<? extends DataObject>, DataObject> builder = ImmutableMap.builder();
+        Map<InstanceIdentifier<? extends DataObject>, DataObject> potential = Maps.filterKeys(dataSet, createIsContainedPredicate(path));
+        for(Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : potential.entrySet()) {
+            try {
+                builder.putAll(DataObjectReadingUtil.readData(entry.getValue(),(InstanceIdentifier)entry.getKey(),path));
+            } catch (Exception e) {
+                // FIXME : Log exception;
             }
         }
-        return false;
-    }\r
+        return builder.build();
+
+    }
+\r
 }
index 5630664a678e8387426bb6a2b2ff88957b0654e8..e48ebbc0577f1b6101a772284ba9e07b3e6580cf 100644 (file)
@@ -45,8 +45,6 @@ import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
 import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
 import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
-import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
-import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener;
 import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener;
@@ -79,6 +77,8 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
+import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,11 +113,11 @@ public class BindingIndependentConnector implements //
 
     private DataProviderService baDataService;
 
-    private ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
-    private ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
 
-    private BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
-    private DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
+    private final BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
+    private final DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
 
     private Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> baCommitHandlerRegistration;
 
@@ -130,7 +130,7 @@ public class BindingIndependentConnector implements //
     // private ListenerRegistration<BindingToDomRpcForwardingManager>
     // bindingToDomRpcManager;
 
-    private Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
+    private final Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
 
         @Override
         public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(InstanceIdentifier<?> input) {
@@ -149,8 +149,6 @@ public class BindingIndependentConnector implements //
 
     private RpcProviderRegistryImpl baRpcRegistryImpl;
 
-    private org.opendaylight.controller.sal.dom.broker.spi.RpcRouter biRouter;
-
     private NotificationProviderService baNotifyService;
 
     private NotificationPublishService domNotificationService;
@@ -319,9 +317,6 @@ public class BindingIndependentConnector implements //
                 baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance());
                 baRpcRegistryImpl.registerGlobalRpcRegistrationListener(domToBindingRpcManager.getInstance());
             }
-            if (biRpcRegistry instanceof org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) {
-                biRouter = (org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) biRpcRegistry;
-            }
             rpcForwarding = true;
         }
     }
@@ -413,8 +408,8 @@ public class BindingIndependentConnector implements //
     private class BindingToDomTransaction implements
             DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> {
 
-        private DataModificationTransaction backing;
-        private DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
+        private final DataModificationTransaction backing;
+        private final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
 
         public BindingToDomTransaction(DataModificationTransaction backing,
                 DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification) {
@@ -491,6 +486,7 @@ public class BindingIndependentConnector implements //
             // FIXME: do registration based on only active commit handlers.
         }
 
+        @Override
         public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> requestCommit(
                 DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> domTransaction) {
             Object identifier = domTransaction.getIdentifier();
@@ -587,9 +583,9 @@ public class BindingIndependentConnector implements //
 
         private final Set<QName> supportedRpcs;
         private final WeakReference<Class<? extends RpcService>> rpcServiceType;
-        private Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
-        private Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
-        private WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
+        private final Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
+        private final Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
+        private final WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
 
         public DomToBindingRpcForwarder(Class<? extends RpcService> service) {
             this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
@@ -771,10 +767,10 @@ public class BindingIndependentConnector implements //
     private class DefaultInvocationStrategy extends RpcInvocationStrategy {
 
         @SuppressWarnings("rawtypes")
-        private WeakReference<Class> inputClass;
+        private final WeakReference<Class> inputClass;
 
         @SuppressWarnings("rawtypes")
-        private WeakReference<Class> outputClass;
+        private final WeakReference<Class> outputClass;
 
         @SuppressWarnings({ "rawtypes", "unchecked" })
         public DefaultInvocationStrategy(QName rpc, Method targetMethod, Class<?> outputClass,
@@ -803,10 +799,10 @@ public class BindingIndependentConnector implements //
 
         @Override
         public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
-            if(biRouter != null) {
+            if(biRpcRegistry != null) {
                 CompositeNode xml = mappingService.toDataDom(input);
                 CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.<Node<?>> of(xml));
-                RpcResult<CompositeNode> result = biRouter.invokeRpc(rpc, wrappedXml);
+                RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
                 Object baResultValue = null;
                 if (result.getResult() != null) {
                     baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult());
@@ -825,6 +821,7 @@ public class BindingIndependentConnector implements //
             super(rpc, targetMethod);
         }
 
+        @Override
         public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
             @SuppressWarnings("unchecked")
             Future<RpcResult<Void>> result = (Future<RpcResult<Void>>) targetMethod.invoke(rpcService);
@@ -837,21 +834,21 @@ public class BindingIndependentConnector implements //
             return Futures.immediateFuture(null);
         }
     }
-    
+
     private class NoOutputInvocationStrategy extends RpcInvocationStrategy {
 
-        
+
         @SuppressWarnings("rawtypes")
-        private WeakReference<Class> inputClass;
+        private final WeakReference<Class> inputClass;
 
         @SuppressWarnings({ "rawtypes", "unchecked" })
-        public NoOutputInvocationStrategy(QName rpc, Method targetMethod, 
+        public NoOutputInvocationStrategy(QName rpc, Method targetMethod,
                 Class<? extends DataContainer> inputClass) {
             super(rpc,targetMethod);
             this.inputClass = new WeakReference(inputClass);
         }
-        
-        
+
+
         @Override
         public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
             DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
@@ -865,10 +862,10 @@ public class BindingIndependentConnector implements //
 
         @Override
         public Future<RpcResult<?>> forwardToDomBroker(DataObject input) {
-            if(biRouter != null) {
+            if(biRpcRegistry != null) {
                 CompositeNode xml = mappingService.toDataDom(input);
                 CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.<Node<?>>of(xml));
-                RpcResult<CompositeNode> result = biRouter.invokeRpc(rpc, wrappedXml);
+                RpcResult<CompositeNode> result = biRpcRegistry.invokeRpc(rpc, wrappedXml);
                 Object baResultValue = null;
                 RpcResult<?> baResult = Rpcs.<Void>getRpcResult(result.isSuccessful(), null, result.getErrors());
                 return Futures.<RpcResult<?>>immediateFuture(baResult);
@@ -902,12 +899,12 @@ public class BindingIndependentConnector implements //
     public void setDomNotificationService(NotificationPublishService domService) {
         this.domNotificationService = domService;
     }
-    
+
     private class DomToBindingNotificationForwarder implements NotificationInterestListener, NotificationListener {
 
-        private ConcurrentMap<QName, WeakReference<Class<? extends Notification>>> notifications = new ConcurrentHashMap<>();
-        private Set<QName> supportedNotifications = new HashSet<>();
-        
+        private final ConcurrentMap<QName, WeakReference<Class<? extends Notification>>> notifications = new ConcurrentHashMap<>();
+        private final Set<QName> supportedNotifications = new HashSet<>();
+
         @Override
         public Set<QName> getSupportedNotifications() {
             return Collections.unmodifiableSet(supportedNotifications);
@@ -922,7 +919,7 @@ public class BindingIndependentConnector implements //
                 if (potentialClass != null) {
                     final DataContainer baNotification = mappingService.dataObjectFromDataDom(potentialClass,
                             notification);
-                    
+
                     if (baNotification instanceof Notification) {
                         baNotifyService.publish((Notification) baNotification);
                     }
index 598743af90337cdf2baad9194bcdd0020f6f0d04..90fa2be21103a7bf89ec8191a2a4d9fad4b301f4 100644 (file)
@@ -6,9 +6,11 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 package org.opendaylight.controller.sal.binding.test.bugfix;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.Collections;
 import java.util.Map;
@@ -52,38 +54,42 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
     private static final InstanceIdentifier<Nodes> NODES_INSTANCE_ID_BA = InstanceIdentifier.builder(Nodes.class) //
             .toInstance();
 
-
     private static final InstanceIdentifier<Node> NODE_INSTANCE_ID_BA = InstanceIdentifier//
             .builder(NODES_INSTANCE_ID_BA) //
             .child(Node.class, NODE_KEY).toInstance();
 
-
     private static final InstanceIdentifier<SupportedActions> SUPPORTED_ACTIONS_INSTANCE_ID_BA = InstanceIdentifier//
             .builder(NODES_INSTANCE_ID_BA) //
             .child(Node.class, NODE_KEY) //
             .augmentation(FlowCapableNode.class) //
-            .child(SupportedActions.class)
-            .toInstance();
+            .child(SupportedActions.class).toInstance();
 
+    private static final InstanceIdentifier<FlowCapableNode> ALL_FLOW_CAPABLE_NODES = InstanceIdentifier //
+            .builder(NODES_INSTANCE_ID_BA) //
+            .child(Node.class) //
+            .augmentation(FlowCapableNode.class) //
+            .build();
 
     private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier NODE_INSTANCE_ID_BI = //
     org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.builder() //
             .node(Nodes.QNAME) //
             .nodeWithKey(Node.QNAME, NODE_KEY_BI) //
             .toInstance();
-    private static final QName SUPPORTED_ACTIONS_QNAME = QName.create(FlowCapableNode.QNAME, SupportedActions.QNAME.getLocalName());
-
+    private static final QName SUPPORTED_ACTIONS_QNAME = QName.create(FlowCapableNode.QNAME,
+            SupportedActions.QNAME.getLocalName());
 
     private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier SUPPORTED_ACTIONS_INSTANCE_ID_BI = //
-            org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.builder() //
-                    .node(Nodes.QNAME) //
-                    .nodeWithKey(Node.QNAME, NODE_KEY_BI) //
-                    .node(SUPPORTED_ACTIONS_QNAME) //
-                    .toInstance();
-
-    private DataChangeEvent<InstanceIdentifier<?>, DataObject> receivedChangeEvent;
-
+    org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.builder() //
+            .node(Nodes.QNAME) //
+            .nodeWithKey(Node.QNAME, NODE_KEY_BI) //
+            .node(SUPPORTED_ACTIONS_QNAME) //
+            .toInstance();
+    private static final InstanceIdentifier<FlowCapableNode> FLOW_AUGMENTATION_PATH = InstanceIdentifier //
+            .builder(NODE_INSTANCE_ID_BA) //
+            .augmentation(FlowCapableNode.class) //
+            .build();
 
+    private DataChangeEvent<InstanceIdentifier<?>, DataObject> lastReceivedChangeEvent;
 
     /**
      * Test for Bug 148
@@ -93,7 +99,8 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
     @Test
     public void putNodeAndAugmentation() throws Exception {
 
-        baDataService.registerDataChangeListener(NODES_INSTANCE_ID_BA, this);
+        baDataService.registerDataChangeListener(ALL_FLOW_CAPABLE_NODES, this);
+
 
         NodeBuilder nodeBuilder = new NodeBuilder();
         nodeBuilder.setId(new NodeId(NODE_ID));
@@ -102,7 +109,7 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
         baseTransaction.putOperationalData(NODE_INSTANCE_ID_BA, nodeBuilder.build());
         RpcResult<TransactionStatus> result = baseTransaction.commit().get();
         assertEquals(TransactionStatus.COMMITED, result.getResult());
-        assertNotNull(receivedChangeEvent);
+
         Node node = (Node) baDataService.readOperationalData(NODE_INSTANCE_ID_BA);
         assertNotNull(node);
         assertEquals(NODE_KEY, node.getKey());
@@ -114,13 +121,16 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
         fnub.setDescription("Description Foo");
         fnub.setSoftware("JUnit emulated");
         FlowCapableNode fnu = fnub.build();
-        InstanceIdentifier<FlowCapableNode> augmentIdentifier = InstanceIdentifier.builder(NODE_INSTANCE_ID_BA).augmentation(FlowCapableNode.class).toInstance();
+        InstanceIdentifier<FlowCapableNode> augmentIdentifier = InstanceIdentifier.builder(NODE_INSTANCE_ID_BA)
+                .augmentation(FlowCapableNode.class).toInstance();
         DataModificationTransaction augmentedTransaction = baDataService.beginTransaction();
         augmentedTransaction.putOperationalData(augmentIdentifier, fnu);
 
         result = augmentedTransaction.commit().get();
         assertEquals(TransactionStatus.COMMITED, result.getResult());
 
+        assertNotNull(lastReceivedChangeEvent);
+        assertTrue(lastReceivedChangeEvent.getCreatedOperationalData().containsKey(FLOW_AUGMENTATION_PATH));
 
         Node augmentedNode = (Node) baDataService.readOperationalData(NODE_INSTANCE_ID_BA);
         assertNotNull(node);
@@ -131,11 +141,14 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
         assertEquals(fnu.getDescription(), readedAugmentation.getDescription());
         assertBindingIndependentVersion(NODE_INSTANCE_ID_BI);
         testNodeRemove();
+        assertTrue(lastReceivedChangeEvent.getRemovedOperationalData().contains(FLOW_AUGMENTATION_PATH));
     }
 
     @Test
     public void putNodeWithAugmentation() throws Exception {
 
+        baDataService.registerDataChangeListener(ALL_FLOW_CAPABLE_NODES, this);
+
         NodeBuilder nodeBuilder = new NodeBuilder();
         nodeBuilder.setId(new NodeId(NODE_ID));
         nodeBuilder.setKey(NODE_KEY);
@@ -151,23 +164,31 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
         DataModificationTransaction baseTransaction = baDataService.beginTransaction();
         baseTransaction.putOperationalData(NODE_INSTANCE_ID_BA, nodeBuilder.build());
         RpcResult<TransactionStatus> result = baseTransaction.commit().get();
+
+        assertNotNull(lastReceivedChangeEvent);
+        assertTrue(lastReceivedChangeEvent.getCreatedOperationalData().containsKey(FLOW_AUGMENTATION_PATH));
+        lastReceivedChangeEvent = null;
         assertEquals(TransactionStatus.COMMITED, result.getResult());
 
-        FlowCapableNode readedAugmentation = (FlowCapableNode) baDataService.readOperationalData(InstanceIdentifier.builder(NODE_INSTANCE_ID_BA).augmentation(FlowCapableNode.class).toInstance());
+        FlowCapableNode readedAugmentation = (FlowCapableNode) baDataService.readOperationalData(InstanceIdentifier
+                .builder(NODE_INSTANCE_ID_BA).augmentation(FlowCapableNode.class).toInstance());
         assertNotNull(readedAugmentation);
+
         assertEquals(fnu.getHardware(), readedAugmentation.getHardware());
 
         testPutNodeConnectorWithAugmentation();
+        lastReceivedChangeEvent = null;
         testNodeRemove();
-    }
 
+        assertTrue(lastReceivedChangeEvent.getRemovedOperationalData().contains(FLOW_AUGMENTATION_PATH));
+    }
 
     private void testPutNodeConnectorWithAugmentation() throws Exception {
         NodeConnectorKey ncKey = new NodeConnectorKey(new NodeConnectorId("test:0:0"));
         InstanceIdentifier<NodeConnector> ncPath = InstanceIdentifier.builder(NODE_INSTANCE_ID_BA)
-        .child(NodeConnector.class, ncKey).toInstance();
+                .child(NodeConnector.class, ncKey).toInstance();
         InstanceIdentifier<FlowCapableNodeConnector> ncAugmentPath = InstanceIdentifier.builder(ncPath)
-        .augmentation(FlowCapableNodeConnector.class).toInstance();
+                .augmentation(FlowCapableNodeConnector.class).toInstance();
 
         NodeConnectorBuilder nc = new NodeConnectorBuilder();
         nc.setKey(ncKey);
@@ -181,7 +202,8 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
         RpcResult<TransactionStatus> result = baseTransaction.commit().get();
         assertEquals(TransactionStatus.COMMITED, result.getResult());
 
-        FlowCapableNodeConnector readedAugmentation = (FlowCapableNodeConnector) baDataService.readOperationalData(ncAugmentPath);
+        FlowCapableNodeConnector readedAugmentation = (FlowCapableNodeConnector) baDataService
+                .readOperationalData(ncAugmentPath);
         assertNotNull(readedAugmentation);
         assertEquals(fncb.getName(), readedAugmentation.getName());
     }
@@ -196,7 +218,7 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
         assertNull(node);
     }
 
-    private void verifyNodes(Nodes nodes,Node original) {
+    private void verifyNodes(Nodes nodes, Node original) {
         assertNotNull(nodes);
         assertNotNull(nodes.getNode());
         assertEquals(1, nodes.getNode().size());
@@ -212,8 +234,7 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
 
     }
 
-    private void assertBindingIndependentVersion(
-            org.opendaylight.yangtools.yang.data.api.InstanceIdentifier nodeId) {
+    private void assertBindingIndependentVersion(org.opendaylight.yangtools.yang.data.api.InstanceIdentifier nodeId) {
         CompositeNode node = biDataService.readOperationalData(nodeId);
         assertNotNull(node);
     }
@@ -224,7 +245,7 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data
 
     @Override
     public void onDataChanged(DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
-        receivedChangeEvent = change;
+        lastReceivedChangeEvent = change;
     }
 
 }
index bfa4f36c18ca1d23f852a3da9c554942aa0b3e0e..274f084f01d420cf291723d96949382da5c0cd7d 100644 (file)
@@ -42,7 +42,7 @@ public class RoutingUtils {
         private final Map<C, Set<P>> removal;
         private final Map<C, Set<P>> announcement;
 
-        public RouteChangeImpl(ImmutableMap<C, Set<P>> removal, ImmutableMap<C, Set<P>> announcement) {
+        public RouteChangeImpl(ImmutableMap<C, Set<P>> announcement, ImmutableMap<C, Set<P>> removal) {
             super();
             this.removal = removal;
             this.announcement = announcement;
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.java
new file mode 100644 (file)
index 0000000..bfffb59
--- /dev/null
@@ -0,0 +1,441 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.eclipse.xtext.xbase.lib.Exceptions;
+import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
+import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory;
+import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService;
+import org.opendaylight.controller.md.sal.common.api.data.DataReader;
+import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public abstract class AbstractDataBroker<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>>
+        implements DataModificationTransactionFactory<P, D>, DataReader<P, D>, DataChangePublisher<P, D, DCL>,
+        DataProvisionService<P, D> {
+    private final static Logger LOG = LoggerFactory.getLogger(AbstractDataBroker.class);
+
+    private ExecutorService executor;
+
+    public ExecutorService getExecutor() {
+        return this.executor;
+    }
+
+    public void setExecutor(final ExecutorService executor) {
+        this.executor = executor;
+    }
+
+    private ExecutorService notificationExecutor = MoreExecutors.sameThreadExecutor();
+
+    public ExecutorService getNotificationExecutor() {
+        return this.notificationExecutor;
+    }
+
+    public void setNotificationExecutor(final ExecutorService notificationExecutor) {
+        this.notificationExecutor = notificationExecutor;
+    }
+
+    private AbstractDataReadRouter<P, D> dataReadRouter;
+
+    private final AtomicLong submittedTransactionsCount = new AtomicLong();
+
+    private final AtomicLong failedTransactionsCount = new AtomicLong();
+
+    private final AtomicLong finishedTransactionsCount = new AtomicLong();
+
+    public AbstractDataReadRouter<P, D> getDataReadRouter() {
+        return this.dataReadRouter;
+    }
+
+    public void setDataReadRouter(final AbstractDataReadRouter<P, D> dataReadRouter) {
+        this.dataReadRouter = dataReadRouter;
+    }
+
+    public AtomicLong getSubmittedTransactionsCount() {
+        return this.submittedTransactionsCount;
+    }
+
+    public AtomicLong getFailedTransactionsCount() {
+        return this.failedTransactionsCount;
+    }
+
+    public AtomicLong getFinishedTransactionsCount() {
+        return this.finishedTransactionsCount;
+    }
+
+    private final Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps
+            .synchronizedSetMultimap(HashMultimap.<P, DataChangeListenerRegistration<P, D, DCL>> create());
+
+    private final Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps
+            .synchronizedSetMultimap(HashMultimap.<P, DataCommitHandlerRegistrationImpl<P, D>> create());
+
+    private final Lock registrationLock = new ReentrantLock();
+
+    private final ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>> commitHandlerRegistrationListeners = new ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>>();
+
+    public AbstractDataBroker() {
+    }
+
+    protected ImmutableList<DataCommitHandler<P, D>> affectedCommitHandlers(final Set<P> paths) {
+        final Callable<ImmutableList<DataCommitHandler<P, D>>> _function = new Callable<ImmutableList<DataCommitHandler<P, D>>>() {
+            @Override
+            public ImmutableList<DataCommitHandler<P, D>> call() throws Exception {
+                Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
+                Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
+                FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
+                        .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
+                final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
+                    @Override
+                    public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
+                        P _key = it.getKey();
+                        boolean _isAffectedBy = isAffectedBy(_key, paths);
+                        return _isAffectedBy;
+                    }
+                };
+                FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
+                        .filter(_function);
+                final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
+                    @Override
+                    public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
+                            final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
+                        Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
+                        return _value;
+                    }
+                };
+                FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
+                        .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
+                final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
+                    @Override
+                    public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
+                        DataCommitHandler<P, D> _instance = it.getInstance();
+                        return _instance;
+                    }
+                };
+                FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
+                        .<DataCommitHandler<P, D>> transform(_function_2);
+                return _transform.toList();
+            }
+        };
+        return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
+    }
+
+    protected ImmutableList<DataCommitHandler<P, D>> probablyAffectedCommitHandlers(final HashSet<P> paths) {
+        final Callable<ImmutableList<DataCommitHandler<P, D>>> _function = new Callable<ImmutableList<DataCommitHandler<P, D>>>() {
+            @Override
+            public ImmutableList<DataCommitHandler<P, D>> call() throws Exception {
+                Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
+                Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
+                FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
+                        .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
+                final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
+                    @Override
+                    public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
+                        P _key = it.getKey();
+                        boolean _isProbablyAffectedBy = isProbablyAffectedBy(_key, paths);
+                        return _isProbablyAffectedBy;
+                    }
+                };
+                FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
+                        .filter(_function);
+                final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
+                    @Override
+                    public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
+                            final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
+                        Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
+                        return _value;
+                    }
+                };
+                FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
+                        .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
+                final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
+                    @Override
+                    public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
+                        DataCommitHandler<P, D> _instance = it.getInstance();
+                        return _instance;
+                    }
+                };
+                FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
+                        .<DataCommitHandler<P, D>> transform(_function_2);
+                return _transform.toList();
+            }
+        };
+        return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
+    }
+
+    protected Map<P, D> deepGetBySubpath(final Map<P, D> dataSet, final P path) {
+        return Collections.<P, D> emptyMap();
+    }
+
+    @Override
+    public final D readConfigurationData(final P path) {
+        AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
+        return _dataReadRouter.readConfigurationData(path);
+    }
+
+    @Override
+    public final D readOperationalData(final P path) {
+        AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
+        return _dataReadRouter.readOperationalData(path);
+    }
+
+    private static <T extends Object> T withLock(final Lock lock, final Callable<T> method) {
+        lock.lock();
+        try {
+            return method.call();
+        } catch (Exception e) {
+            throw Exceptions.sneakyThrow(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public final Registration<DataCommitHandler<P, D>> registerCommitHandler(final P path,
+            final DataCommitHandler<P, D> commitHandler) {
+        synchronized (commitHandler) {
+            final DataCommitHandlerRegistrationImpl<P, D> registration = new DataCommitHandlerRegistrationImpl<P, D>(
+                    path, commitHandler, this);
+            commitHandlers.put(path, registration);
+            LOG.trace("Registering Commit Handler {} for path: {}", commitHandler, path);
+            for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
+                try {
+                    listener.getInstance().onRegister(registration);
+                } catch (Exception e) {
+                    LOG.error("Unexpected exception in listener {} during invoking onRegister", listener.getInstance(),
+                            e);
+                }
+            }
+            return registration;
+        }
+    }
+
+    @Override
+    public final ListenerRegistration<DCL> registerDataChangeListener(final P path, final DCL listener) {
+        synchronized (listeners) {
+            final DataChangeListenerRegistration<P, D, DCL> reg = new DataChangeListenerRegistration<P, D, DCL>(path,
+                    listener, AbstractDataBroker.this);
+            listeners.put(path, reg);
+            final D initialConfig = getDataReadRouter().readConfigurationData(path);
+            final D initialOperational = getDataReadRouter().readOperationalData(path);
+            final DataChangeEvent<P, D> event = createInitialListenerEvent(path, initialConfig, initialOperational);
+            listener.onDataChanged(event);
+            return reg;
+        }
+    }
+
+    public final CompositeObjectRegistration<DataReader<P, D>> registerDataReader(final P path,
+            final DataReader<P, D> reader) {
+
+        final Registration<DataReader<P, D>> confReg = getDataReadRouter().registerConfigurationReader(path, reader);
+        final Registration<DataReader<P, D>> dataReg = getDataReadRouter().registerOperationalReader(path, reader);
+        return new CompositeObjectRegistration<DataReader<P, D>>(reader, Arrays.asList(confReg, dataReg));
+    }
+
+    @Override
+    public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> registerCommitHandlerListener(
+            final RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
+        final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> ret = this.commitHandlerRegistrationListeners
+                .register(commitHandlerListener);
+        return ret;
+    }
+
+    protected DataChangeEvent<P, D> createInitialListenerEvent(final P path, final D initialConfig,
+            final D initialOperational) {
+        InitialDataChangeEventImpl<P, D> _initialDataChangeEventImpl = new InitialDataChangeEventImpl<P, D>(
+                initialConfig, initialOperational);
+        return _initialDataChangeEventImpl;
+    }
+
+    protected final void removeListener(final DataChangeListenerRegistration<P, D, DCL> registration) {
+        synchronized (listeners) {
+            listeners.remove(registration.getPath(), registration);
+        }
+    }
+
+    protected final void removeCommitHandler(final DataCommitHandlerRegistrationImpl<P, D> registration) {
+        synchronized (commitHandlers) {
+
+            commitHandlers.remove(registration.getPath(), registration);
+            LOG.trace("Removing Commit Handler {} for path: {}", registration.getInstance(), registration.getPath());
+            for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
+                try {
+                    listener.getInstance().onUnregister(registration);
+                } catch (Exception e) {
+                    LOG.error("Unexpected exception in listener {} during invoking onUnregister",
+                            listener.getInstance(), e);
+                }
+            }
+        }
+
+    }
+
+    protected final Collection<Entry<P, DataCommitHandlerRegistrationImpl<P, D>>> getActiveCommitHandlers() {
+        return commitHandlers.entries();
+    }
+
+    protected ImmutableList<ListenerStateCapture<P, D, DCL>> affectedListeners(final Set<P> paths) {
+
+        synchronized (listeners) {
+            return FluentIterable //
+                    .from(listeners.asMap().entrySet()) //
+                    .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
+                        @Override
+                        public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
+                            return isAffectedBy(it.getKey(), paths);
+                        }
+                    }) //
+                    .transform(
+                            new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
+                                @Override
+                                public ListenerStateCapture<P, D, DCL> apply(
+                                        final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
+                                    return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
+                                            createContainsPredicate(it.getKey()));
+                                }
+                            }) //
+                    .toList();
+        }
+    }
+
+    protected ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners(final Set<P> paths) {
+        synchronized (listeners) {
+            return FluentIterable //
+                    .from(listeners.asMap().entrySet()) //
+                    .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
+                        @Override
+                        public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
+                            return isProbablyAffectedBy(it.getKey(), paths);
+                        }
+                    }) //
+                    .transform(
+                            new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
+                                @Override
+                                public ListenerStateCapture<P, D, DCL> apply(
+                                        final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
+                                    return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
+                                            createIsContainedPredicate(it.getKey()));
+                                }
+                            }) //
+                    .toList();
+        }
+    }
+
+    protected Predicate<P> createContainsPredicate(final P key) {
+        return new Predicate<P>() {
+            @Override
+            public boolean apply(final P other) {
+                return key.contains(other);
+            }
+        };
+    }
+
+    protected Predicate<P> createIsContainedPredicate(final P key) {
+        return new Predicate<P>() {
+            @Override
+            public boolean apply(final P other) {
+                return other.contains(key);
+            }
+        };
+    }
+
+    protected boolean isAffectedBy(final P key, final Set<P> paths) {
+        final Predicate<P> contains = this.createContainsPredicate(key);
+        if (paths.contains(key)) {
+            return true;
+        }
+        for (final P path : paths) {
+            if (contains.apply(path)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    protected boolean isProbablyAffectedBy(final P key, final Set<P> paths) {
+        final Predicate<P> isContained = this.createIsContainedPredicate(key);
+        for (final P path : paths) {
+            if (isContained.apply(path)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    final Future<RpcResult<TransactionStatus>> commit(final AbstractDataTransaction<P, D> transaction) {
+        Preconditions.checkNotNull(transaction);
+        transaction.changeStatus(TransactionStatus.SUBMITED);
+        final TwoPhaseCommit<P, D, DCL> task = new TwoPhaseCommit<P, D, DCL>(transaction, this);
+        ;
+        this.getSubmittedTransactionsCount().getAndIncrement();
+        return this.getExecutor().submit(task);
+    }
+
+    private static class DataCommitHandlerRegistrationImpl<P extends Path<P>, D extends Object> //
+            extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
+            implements DataCommitHandlerRegistration<P, D> {
+
+        private AbstractDataBroker<P, D, ? extends Object> dataBroker;
+        private final P path;
+
+        @Override
+        public P getPath() {
+            return this.path;
+        }
+
+        public DataCommitHandlerRegistrationImpl(final P path, final DataCommitHandler<P, D> instance,
+                final AbstractDataBroker<P, D, ? extends Object> broker) {
+            super(instance);
+            this.dataBroker = broker;
+            this.path = path;
+        }
+
+        @Override
+        protected void removeRegistration() {
+            this.dataBroker.removeCommitHandler(this);
+            this.dataBroker = null;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend
deleted file mode 100644 (file)
index 7c6f52f..0000000
+++ /dev/null
@@ -1,443 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.common.impl.service\r
-\r
-import com.google.common.collect.FluentIterable\r
-import com.google.common.collect.HashMultimap\r
-import com.google.common.collect.ImmutableList\r
-import com.google.common.collect.Multimap\r
-import java.util.ArrayList\r
-import java.util.Arrays\r
-import java.util.Collection\r
-import java.util.Collections\r
-import java.util.HashSet\r
-import java.util.List\r
-import java.util.Set\r
-import java.util.concurrent.Callable\r
-import java.util.concurrent.ExecutorService\r
-import java.util.concurrent.Future\r
-import java.util.concurrent.atomic.AtomicLong\r
-import org.opendaylight.controller.md.sal.common.api.RegistrationListener\r
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus\r
-import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener\r
-import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher\r
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler\r
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction\r
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration\r
-import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory\r
-import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService\r
-import org.opendaylight.controller.md.sal.common.api.data.DataReader\r
-import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification\r
-import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter\r
-import org.opendaylight.controller.sal.common.util.Rpcs\r
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
-import org.opendaylight.yangtools.concepts.CompositeObjectRegistration\r
-import org.opendaylight.yangtools.concepts.ListenerRegistration\r
-import org.opendaylight.yangtools.concepts.Path\r
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
-import org.opendaylight.yangtools.yang.common.RpcResult\r
-import org.slf4j.LoggerFactory\r
-\r
-import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
-import com.google.common.collect.Multimaps
-import java.util.concurrent.locks.Lock
-import java.util.concurrent.locks.ReentrantLock
-
-abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
-DataReader<P, D>, //\r
-DataChangePublisher<P, D, DCL>, //\r
-DataProvisionService<P, D> {\r
-\r
-    private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);\r
-\r
-    @Property\r
-    var ExecutorService executor;\r
-\r
-    @Property\r
-    var AbstractDataReadRouter<P, D> dataReadRouter;\r
-    \r
-    @Property\r
-    private val AtomicLong submittedTransactionsCount = new AtomicLong;\r
-    \r
-    @Property\r
-    private val AtomicLong failedTransactionsCount = new AtomicLong\r
-    \r
-    @Property\r
-    private val AtomicLong finishedTransactionsCount = new AtomicLong\r
-\r
-    Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
-    Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
-    
-    private val Lock registrationLock = new ReentrantLock;
-    \r
-    val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
-    public new() {\r
-    }\r
-\r
-    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
-        HashSet<P> paths) {
-        return withLock(registrationLock) [|\r
-            return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
-                .transformAndConcat[value] //\r
-                .transform[instance].toList()
-        ]\r
-    }\r
-\r
-    override final readConfigurationData(P path) {\r
-        return dataReadRouter.readConfigurationData(path);\r
-    }\r
-\r
-    override final readOperationalData(P path) {\r
-        return dataReadRouter.readOperationalData(path);\r
-    }
-    
-    private static def <T> withLock(Lock lock,Callable<T> method) {
-        lock.lock
-        try {
-            return method.call
-        } finally {
-            lock.unlock
-        }
-    } \r
-\r
-    override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
-        return withLock(registrationLock) [|\r
-            val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
-            commitHandlers.put(path, registration)\r
-            LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
-            for(listener : commitHandlerRegistrationListeners) {\r
-                try {\r
-                    listener.instance.onRegister(registration);\r
-                } catch (Exception e) {\r
-                    LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
-                }\r
-            }
-            return registration;
-        ]\r
-    }\r
-\r
-    override final def registerDataChangeListener(P path, DCL listener) {\r
-        return withLock(registrationLock) [|
-            val reg = new DataChangeListenerRegistration(path, listener, this);\r
-            listeners.put(path, reg);\r
-            val initialConfig = dataReadRouter.readConfigurationData(path);\r
-            val initialOperational = dataReadRouter.readOperationalData(path);\r
-            val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
-            listener.onDataChanged(event);\r
-            return reg;
-        ]\r
-    }\r
-\r
-    final def registerDataReader(P path, DataReader<P, D> reader) {\r
-        return withLock(registrationLock) [|\r
-            val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
-            val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
-    \r
-            return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
-        ]\r
-    }\r
-    \r
-    override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
-        val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
-        return ret;\r
-    }\r
-    \r
-    protected  def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {\r
-        return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);\r
-        \r
-    }\r
-\r
-    protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
-        return withLock(registrationLock) [|\r
-            listeners.remove(registration.path, registration);
-        ]\r
-    }\r
-\r
-    protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
-        return withLock(registrationLock) [|
-            commitHandlers.remove(registration.path, registration);\r
-             LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
-            for(listener : commitHandlerRegistrationListeners) {\r
-                try {\r
-                    listener.instance.onUnregister(registration);\r
-                } catch (Exception e) {\r
-                    LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
-                }\r
-            }
-            return null;
-        ]\r
-    }\r
-\r
-    protected final def getActiveCommitHandlers() {\r
-        return commitHandlers.entries;\r
-    }\r
-\r
-    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
-        HashSet<P> paths) {
-        return withLock(registrationLock) [|\r
-            return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
-                val operationalState = readOperationalData(key)\r
-                val configurationState = readConfigurationData(key)\r
-                return new ListenerStateCapture(key, value, operationalState, configurationState)\r
-            ].toList()
-        ]\r
-    }\r
-\r
-    protected def boolean isAffectedBy(P key, Set<P> paths) {\r
-        if (paths.contains(key)) {\r
-            return true;\r
-        }\r
-        for (path : paths) {\r
-            if (key.contains(path)) {\r
-                return true;\r
-            }\r
-        }\r
-\r
-        return false;\r
-    }\r
-\r
-    package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
-        checkNotNull(transaction);\r
-        transaction.changeStatus(TransactionStatus.SUBMITED);\r
-        val task = new TwoPhaseCommit(transaction, this);\r
-        submittedTransactionsCount.andIncrement;\r
-        return executor.submit(task);\r
-    }\r
-\r
-}\r
-\r
-@Data\r
-package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
-\r
-    @Property\r
-    P path;\r
-\r
-    @Property\r
-    Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
-\r
-    @Property\r
-    D initialOperationalState;\r
-\r
-    @Property\r
-    D initialConfigurationState;\r
-}\r
-\r
-package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
-\r
-    AbstractDataBroker<P, D, DCL> dataBroker;\r
-\r
-    @Property\r
-    val P path;\r
-\r
-    new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
-        super(instance)\r
-        dataBroker = broker;\r
-        _path = path;\r
-    }\r
-\r
-    override protected removeRegistration() {\r
-        dataBroker.removeListener(this);\r
-        dataBroker = null;\r
-    }\r
-\r
-}\r
-\r
-package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
-extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
-implements DataCommitHandlerRegistration<P, D> {\r
-\r
-    AbstractDataBroker<P, D, ?> dataBroker;\r
-\r
-    @Property\r
-    val P path;\r
-\r
-    new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
-        super(instance)\r
-        dataBroker = broker;\r
-        _path = path;\r
-    }\r
-\r
-    override protected removeRegistration() {\r
-        dataBroker.removeCommitHandler(this);\r
-        dataBroker = null;\r
-    }\r
-}\r
-\r
-package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
-\r
-    private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
-\r
-    val AbstractDataTransaction<P, D> transaction;\r
-    val AbstractDataBroker<P, D, DCL> dataBroker;\r
-    \r
-    new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
-        this.transaction = transaction;\r
-        this.dataBroker = broker;\r
-    }\r
-\r
-    override call() throws Exception {\r
-\r
-        // get affected paths\r
-        val affectedPaths = new HashSet<P>();\r
-\r
-        affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
-        affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
-        affectedPaths.addAll(transaction.removedConfigurationData);\r
-\r
-        affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
-        affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
-        affectedPaths.addAll(transaction.removedOperationalData);\r
-
-        val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
-\r
-        val transactionId = transaction.identifier;\r
-\r
-        log.trace("Transaction: {} Started.",transactionId);\r
-        log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
-        // requesting commits\r
-        val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
-        val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
-        try {\r
-            for (handler : commitHandlers) {\r
-                handlerTransactions.add(handler.requestCommit(transaction));\r
-            }\r
-        } catch (Exception e) {\r
-            log.error("Transaction: {} Request Commit failed", transactionId,e);\r
-            dataBroker.failedTransactionsCount.andIncrement\r
-            transaction.changeStatus(TransactionStatus.FAILED)
-            return rollback(handlerTransactions, e);\r
-        }\r
-        val List<RpcResult<Void>> results = new ArrayList();\r
-        try {\r
-            for (subtransaction : handlerTransactions) {\r
-                results.add(subtransaction.finish());\r
-            }\r
-            listeners.publishDataChangeEvent();\r
-        } catch (Exception e) {\r
-            log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
-            dataBroker.failedTransactionsCount.andIncrement
-            transaction.changeStatus(TransactionStatus.FAILED)\r
-            return rollback(handlerTransactions, e);\r
-        }\r
-        log.trace("Transaction: {} Finished successfully.",transactionId);\r
-        dataBroker.finishedTransactionsCount.andIncrement;
-        transaction.changeStatus(TransactionStatus.COMMITED)\r
-        return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
-\r
-    }\r
-\r
-    def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
-        dataBroker.executor.submit [|\r
-            for (listenerSet : listeners) {
-                val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
-                val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
-
-                val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
-                    listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
-                for (listener : listenerSet.listeners) {
-                    try {
-                        listener.instance.onDataChanged(changeEvent);
-
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            }        \r
-        ]\r
-    }\r
-\r
-    def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
-        for (transaction : transactions) {\r
-            transaction.rollback()\r
-        }\r
-\r
-        // FIXME return encountered error.\r
-        return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());\r
-    }\r
-}\r
-\r
-public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
-\r
-    private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
-
-    @Property\r
-    private val Object identifier;\r
-\r
-    var TransactionStatus status;\r
-\r
-    var AbstractDataBroker<P, D, ?> broker;\r
-\r
-    protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {\r
-        super(dataBroker);\r
-        _identifier = identifier;\r
-        broker = dataBroker;\r
-        status = TransactionStatus.NEW;\r
-        LOG.debug("Transaction {} Allocated.", identifier);
-\r
-    //listeners = new ListenerRegistry<>();\r
-    }\r
-\r
-    override commit() {\r
-        return broker.commit(this);\r
-    }\r
-\r
-    override readConfigurationData(P path) {\r
-        val local = this.updatedConfigurationData.get(path);\r
-        if(local != null) {\r
-            return local;\r
-        }\r
-        \r
-        return broker.readConfigurationData(path);\r
-    }\r
-\r
-    override readOperationalData(P path) {\r
-        val local = this.updatedOperationalData.get(path);\r
-        if(local != null) {\r
-            return local;\r
-        }\r
-        return broker.readOperationalData(path);\r
-    }\r
-\r
-    override hashCode() {\r
-        return identifier.hashCode;\r
-    }\r
-\r
-    override equals(Object obj) {\r
-        if (this === obj)\r
-            return true;\r
-        if (obj == null)\r
-            return false;\r
-        if (getClass() != obj.getClass())\r
-            return false;\r
-        val other = (obj as AbstractDataTransaction<P,D>);\r
-        if (broker == null) {\r
-            if (other.broker != null)\r
-                return false;\r
-        } else if (!broker.equals(other.broker))\r
-            return false;\r
-        if (identifier == null) {\r
-            if (other.identifier != null)\r
-                return false;\r
-        } else if (!identifier.equals(other.identifier))\r
-            return false;\r
-        return true;\r
-    }\r
-\r
-    override TransactionStatus getStatus() {\r
-        return status;\r
-    }\r
-\r
-    protected abstract def void onStatusChange(TransactionStatus status);\r
-\r
-    public def changeStatus(TransactionStatus status) {\r
-        LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
-        this.status = status;\r
-        onStatusChange(status);\r
-    }\r
-\r
-}\r
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataTransaction.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataTransaction.java
new file mode 100644 (file)
index 0000000..c73a627
--- /dev/null
@@ -0,0 +1,108 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.concurrent.Future;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification;
+import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("all")
+public abstract class AbstractDataTransaction<P extends Path<P>, D extends Object> extends
+        AbstractDataModification<P, D> {
+    private final static Logger LOG = LoggerFactory.getLogger(AbstractDataTransaction.class);
+
+    private final Object identifier;
+
+    @Override
+    public Object getIdentifier() {
+        return this.identifier;
+    }
+
+    private TransactionStatus status;
+
+    private final AbstractDataBroker<P, D, ? extends Object> broker;
+
+    protected AbstractDataTransaction(final Object identifier,
+            final AbstractDataBroker<P, D, ? extends Object> dataBroker) {
+        super(dataBroker);
+        this.identifier = identifier;
+        this.broker = dataBroker;
+        this.status = TransactionStatus.NEW;
+        AbstractDataTransaction.LOG.debug("Transaction {} Allocated.", identifier);
+    }
+
+    @Override
+    public Future<RpcResult<TransactionStatus>> commit() {
+        return this.broker.commit(this);
+    }
+
+    @Override
+    public D readConfigurationData(final P path) {
+        final D local = getUpdatedConfigurationData().get(path);
+        if (local != null) {
+            return local;
+        }
+        return this.broker.readConfigurationData(path);
+    }
+
+    @Override
+    public D readOperationalData(final P path) {
+        final D local = this.getUpdatedOperationalData().get(path);
+        if (local != null) {
+            return local;
+        }
+        return this.broker.readOperationalData(path);
+    }
+
+
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((identifier == null) ? 0 : identifier.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        AbstractDataTransaction other = (AbstractDataTransaction) obj;
+        if (identifier == null) {
+            if (other.identifier != null)
+                return false;
+        } else if (!identifier.equals(other.identifier))
+            return false;
+        return true;
+    }
+
+    @Override
+    public TransactionStatus getStatus() {
+        return this.status;
+    }
+
+    protected abstract void onStatusChange(final TransactionStatus status);
+
+    public void changeStatus(final TransactionStatus status) {
+        Object _identifier = this.getIdentifier();
+        AbstractDataTransaction.LOG
+                .debug("Transaction {} transitioned from {} to {}", _identifier, this.status, status);
+        this.status = status;
+        this.onStatusChange(status);
+    }
+}
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/DataChangeListenerRegistration.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/DataChangeListenerRegistration.java
new file mode 100644 (file)
index 0000000..57d511e
--- /dev/null
@@ -0,0 +1,37 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Path;
+
+@SuppressWarnings("all")
+class DataChangeListenerRegistration<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>> extends
+        AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
+    private AbstractDataBroker<P, D, DCL> dataBroker;
+
+    private final P path;
+
+    public P getPath() {
+        return this.path;
+    }
+
+    public DataChangeListenerRegistration(final P path, final DCL instance, final AbstractDataBroker<P, D, DCL> broker) {
+        super(instance);
+        this.dataBroker = broker;
+        this.path = path;
+    }
+
+    @Override
+    protected void removeRegistration() {
+        this.dataBroker.removeListener(this);
+        this.dataBroker = null;
+    }
+}
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ImmutableDataChangeEvent.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ImmutableDataChangeEvent.java
new file mode 100644 (file)
index 0000000..776ff7b
--- /dev/null
@@ -0,0 +1,226 @@
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.DataModification;
+import org.opendaylight.yangtools.concepts.Path;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+final class ImmutableDataChangeEvent<P extends Path<P>, D> implements DataChangeEvent<P,D> {
+
+    private final D updatedOperationalSubtree;
+    private final Map<P, D> updatedOperational;
+    private final D updatedConfigurationSubtree;
+    private final Map<P, D> updatedConfiguration;
+    private final Set<P> removedOperational;
+    private final Set<P> removedConfiguration;
+    private final D originalOperationalSubtree;
+    private final Map<P, D> originalOperational;
+    private final D originalConfigurationSubtree;
+    private final Map<P, D> originalConfiguration;
+    private final Map<P, D> createdOperational;
+    private final Map<P, D> createdConfiguration;
+
+
+    public ImmutableDataChangeEvent(Builder<P, D> builder) {
+
+        createdConfiguration = builder.getCreatedConfiguration().build();
+        createdOperational = builder.getCreatedOperational().build();
+        originalConfiguration = builder.getOriginalConfiguration().build();
+        originalConfigurationSubtree = builder.getOriginalConfigurationSubtree();
+        originalOperational = builder.getOriginalOperational().build();
+        originalOperationalSubtree = builder.getOriginalOperationalSubtree();
+        removedConfiguration = builder.getRemovedConfiguration().build();
+        removedOperational = builder.getRemovedOperational().build();
+        updatedConfiguration = builder.getUpdatedConfiguration().build();
+        updatedConfigurationSubtree = builder.getUpdatedConfigurationSubtree();
+        updatedOperational = builder.getUpdatedOperational().build();
+        updatedOperationalSubtree = builder.getUpdatedOperationalSubtree();
+    }
+
+    @Override
+    public Map<P, D> getCreatedConfigurationData() {
+        return createdConfiguration;
+    }
+
+    @Override
+    public Map<P, D> getCreatedOperationalData() {
+        return createdOperational;
+    }
+
+    @Override
+    public Map<P, D> getOriginalConfigurationData() {
+        return originalConfiguration;
+    }
+    @Override
+    public D getOriginalConfigurationSubtree() {
+        return originalConfigurationSubtree;
+    }
+    @Override
+    public Map<P, D> getOriginalOperationalData() {
+        return originalOperational;
+    }
+    @Override
+    public D getOriginalOperationalSubtree() {
+        return originalOperationalSubtree;
+    }
+    @Override
+    public Set<P> getRemovedConfigurationData() {
+        return removedConfiguration;
+    }
+    @Override
+    public Set<P> getRemovedOperationalData() {
+        return removedOperational;
+    }
+    @Override
+    public Map<P, D> getUpdatedConfigurationData() {
+        return updatedConfiguration;
+    }
+    @Override
+    public D getUpdatedConfigurationSubtree() {
+        return updatedConfigurationSubtree;
+    }
+    @Override
+    public Map<P, D> getUpdatedOperationalData() {
+        return updatedOperational;
+    }
+    @Override
+    public D getUpdatedOperationalSubtree() {
+        return updatedOperationalSubtree;
+    }
+
+    static final <P extends Path<P>,D> Builder<P, D> builder() {
+        return new Builder<>();
+    }
+
+    static final class Builder<P extends Path<P>,D> {
+
+        private  D updatedOperationalSubtree;
+        private  D originalOperationalSubtree;
+        private  D originalConfigurationSubtree;
+        private  D updatedConfigurationSubtree;
+
+        private final ImmutableMap.Builder<P, D> updatedOperational = ImmutableMap.builder();
+        private final ImmutableMap.Builder<P, D> updatedConfiguration = ImmutableMap.builder();
+        private final ImmutableSet.Builder<P> removedOperational = ImmutableSet.builder();
+        private final ImmutableSet.Builder<P> removedConfiguration = ImmutableSet.builder();
+        private final ImmutableMap.Builder<P, D> originalOperational = ImmutableMap.builder();
+
+        private final ImmutableMap.Builder<P, D> originalConfiguration = ImmutableMap.builder();
+        private final ImmutableMap.Builder<P, D> createdOperational = ImmutableMap.builder();
+        private final ImmutableMap.Builder<P, D> createdConfiguration = ImmutableMap.builder();
+
+
+        protected Builder<P,D> addTransaction(DataModification<P, D> data, Predicate<P> keyFilter) {
+            updatedOperational.putAll(Maps.filterKeys(data.getUpdatedOperationalData(), keyFilter));
+            updatedConfiguration.putAll(Maps.filterKeys(data.getUpdatedConfigurationData(), keyFilter));
+            originalConfiguration.putAll(Maps.filterKeys(data.getOriginalConfigurationData(), keyFilter));
+            originalOperational.putAll(Maps.filterKeys(data.getOriginalOperationalData(), keyFilter));
+            createdOperational.putAll(Maps.filterKeys(data.getCreatedOperationalData(), keyFilter));
+            createdConfiguration.putAll(Maps.filterKeys(data.getCreatedConfigurationData(), keyFilter));
+            return this;
+        }
+
+        protected Builder<P, D> addConfigurationChangeSet(RootedChangeSet<P, D> changeSet) {
+            if(changeSet == null) {
+                return this;
+            }
+
+            originalConfiguration.putAll(changeSet.getOriginal());
+            createdConfiguration.putAll(changeSet.getCreated());
+            updatedConfiguration.putAll(changeSet.getUpdated());
+            removedConfiguration.addAll(changeSet.getRemoved());
+            return this;
+        }
+
+        protected Builder<P, D> addOperationalChangeSet(RootedChangeSet<P, D> changeSet) {
+            if(changeSet == null) {
+                return this;
+            }
+            originalOperational.putAll(changeSet.getOriginal());
+            createdOperational.putAll(changeSet.getCreated());
+            updatedOperational.putAll(changeSet.getUpdated());
+            removedOperational.addAll(changeSet.getRemoved());
+            return this;
+        }
+
+        protected ImmutableDataChangeEvent<P, D> build() {
+            return new ImmutableDataChangeEvent<P,D>(this);
+        }
+
+        protected D getUpdatedOperationalSubtree() {
+            return updatedOperationalSubtree;
+        }
+
+        protected Builder<P, D> setUpdatedOperationalSubtree(D updatedOperationalSubtree) {
+            this.updatedOperationalSubtree = updatedOperationalSubtree;
+            return this;
+        }
+
+        protected D getOriginalOperationalSubtree() {
+            return originalOperationalSubtree;
+        }
+
+        protected Builder<P,D> setOriginalOperationalSubtree(D originalOperationalSubtree) {
+            this.originalOperationalSubtree = originalOperationalSubtree;
+            return this;
+        }
+
+        protected D getOriginalConfigurationSubtree() {
+            return originalConfigurationSubtree;
+        }
+
+        protected Builder<P, D> setOriginalConfigurationSubtree(D originalConfigurationSubtree) {
+            this.originalConfigurationSubtree = originalConfigurationSubtree;
+            return this;
+        }
+
+        protected D getUpdatedConfigurationSubtree() {
+            return updatedConfigurationSubtree;
+        }
+
+        protected Builder<P,D> setUpdatedConfigurationSubtree(D updatedConfigurationSubtree) {
+            this.updatedConfigurationSubtree = updatedConfigurationSubtree;
+            return this;
+        }
+
+        protected ImmutableMap.Builder<P, D> getUpdatedOperational() {
+            return updatedOperational;
+        }
+
+        protected ImmutableMap.Builder<P, D> getUpdatedConfiguration() {
+            return updatedConfiguration;
+        }
+
+        protected ImmutableSet.Builder<P> getRemovedOperational() {
+            return removedOperational;
+        }
+
+        protected ImmutableSet.Builder<P> getRemovedConfiguration() {
+            return removedConfiguration;
+        }
+
+        protected ImmutableMap.Builder<P, D> getOriginalOperational() {
+            return originalOperational;
+        }
+
+        protected ImmutableMap.Builder<P, D> getOriginalConfiguration() {
+            return originalConfiguration;
+        }
+
+        protected ImmutableMap.Builder<P, D> getCreatedOperational() {
+            return createdOperational;
+        }
+
+        protected ImmutableMap.Builder<P, D> getCreatedConfiguration() {
+            return createdConfiguration;
+        }
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ListenerStateCapture.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ListenerStateCapture.java
new file mode 100644 (file)
index 0000000..502ca90
--- /dev/null
@@ -0,0 +1,118 @@
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.Map;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.DataModification;
+import org.opendaylight.yangtools.concepts.Path;
+
+import com.google.common.base.Predicate;
+
+public final class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
+
+    final P path;
+
+    final Iterable<DataChangeListenerRegistration<P, D, DCL>> listeners;
+
+    D initialOperationalState;
+
+    D initialConfigurationState;
+
+    D finalConfigurationState;
+
+    D finalOperationalState;
+
+    Map<P, D> additionalConfigOriginal;
+    Map<P, D> additionalConfigCreated;
+    Map<P, D> additionalConfigUpdated;
+    Map<P, D> additionalConfigDeleted;
+
+    Map<P, D> additionalOperOriginal;
+    Map<P, D> additionalOperCreated;
+    Map<P, D> additionalOperUpdated;
+    Map<P, D> additionalOperDeleted;
+
+    RootedChangeSet<P, D> normalizedConfigurationChanges;
+    RootedChangeSet<P, D> normalizedOperationalChanges;
+
+    private final Predicate<P> containsPredicate;
+
+    public ListenerStateCapture(P path, Iterable<DataChangeListenerRegistration<P, D, DCL>> listeners,
+            Predicate<P> containsPredicate) {
+        super();
+        this.path = path;
+        this.listeners = listeners;
+        this.containsPredicate = containsPredicate;
+    }
+
+    protected D getInitialOperationalState() {
+        return initialOperationalState;
+    }
+
+    protected void setInitialOperationalState(D initialOperationalState) {
+        this.initialOperationalState = initialOperationalState;
+    }
+
+    protected D getInitialConfigurationState() {
+        return initialConfigurationState;
+    }
+
+    protected void setInitialConfigurationState(D initialConfigurationState) {
+        this.initialConfigurationState = initialConfigurationState;
+    }
+
+    protected P getPath() {
+        return path;
+    }
+
+    protected Iterable<DataChangeListenerRegistration<P, D, DCL>> getListeners() {
+        return listeners;
+    }
+
+    protected D getFinalConfigurationState() {
+        return finalConfigurationState;
+    }
+
+    protected void setFinalConfigurationState(D finalConfigurationState) {
+        this.finalConfigurationState = finalConfigurationState;
+    }
+
+    protected D getFinalOperationalState() {
+        return finalOperationalState;
+    }
+
+    protected void setFinalOperationalState(D finalOperationalState) {
+        this.finalOperationalState = finalOperationalState;
+    }
+
+    protected RootedChangeSet<P, D> getNormalizedConfigurationChanges() {
+        return normalizedConfigurationChanges;
+    }
+
+    protected void setNormalizedConfigurationChanges(RootedChangeSet<P, D> normalizedConfigurationChanges) {
+        this.normalizedConfigurationChanges = normalizedConfigurationChanges;
+    }
+
+    protected RootedChangeSet<P, D> getNormalizedOperationalChanges() {
+        return normalizedOperationalChanges;
+    }
+
+    protected void setNormalizedOperationalChanges(RootedChangeSet<P, D> normalizedOperationalChange) {
+        this.normalizedOperationalChanges = normalizedOperationalChange;
+    }
+
+    protected DataChangeEvent<P, D> createEvent(DataModification<P, D> modification) {
+        return ImmutableDataChangeEvent.<P, D> builder()//
+                .addTransaction(modification, containsPredicate) //
+                .addConfigurationChangeSet(normalizedConfigurationChanges) //
+                .addOperationalChangeSet(normalizedOperationalChanges) //
+                .setOriginalConfigurationSubtree(initialConfigurationState) //
+                .setOriginalOperationalSubtree(initialOperationalState) //
+                .setUpdatedConfigurationSubtree(finalConfigurationState) //
+                .setUpdatedOperationalSubtree(finalOperationalState) //
+                .build();
+
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/RootedChangeSet.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/RootedChangeSet.java
new file mode 100644 (file)
index 0000000..e052565
--- /dev/null
@@ -0,0 +1,66 @@
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.opendaylight.yangtools.concepts.Path;
+
+public class RootedChangeSet<P extends Path<P>,D> {
+
+    private final P root;
+    private final Map<P,D> original;
+    private final Map<P,D> created = new HashMap<>();
+    private final Map<P,D> updated = new HashMap<>();
+    private final Set<P> removed = new HashSet<>();
+
+
+
+    public RootedChangeSet(P root,Map<P, D> original) {
+        super();
+        this.root = root;
+        this.original = original;
+    }
+
+    protected P getRoot() {
+        return root;
+    }
+
+    protected Map<P, D> getOriginal() {
+        return original;
+    }
+
+    protected Map<P, D> getCreated() {
+        return created;
+    }
+
+    protected Map<P, D> getUpdated() {
+        return updated;
+    }
+
+    protected Set<P> getRemoved() {
+        return removed;
+    }
+
+    public void addCreated(Map<P,D> created) {
+        this.created.putAll(created);
+    }
+
+    public void addCreated(Entry<P,D> entry) {
+        created.put(entry.getKey(), entry.getValue());
+    }
+
+    public void addUpdated(Entry<P,D> entry) {
+        updated.put(entry.getKey(), entry.getValue());
+    }
+
+    public void addRemoval(P path) {
+        removed.add(path);
+    }
+
+    public boolean isChange() {
+        return !created.isEmpty() || !updated.isEmpty() || !removed.isEmpty();
+    }
+}
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/TwoPhaseCommit.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/TwoPhaseCommit.java
new file mode 100644 (file)
index 0000000..e99fc0f
--- /dev/null
@@ -0,0 +1,237 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TwoPhaseCommit<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>> implements
+        Callable<RpcResult<TransactionStatus>> {
+    private final static Logger log = LoggerFactory.getLogger(TwoPhaseCommit.class);
+
+    private final AbstractDataTransaction<P, D> transaction;
+
+    private final AbstractDataBroker<P, D, DCL> dataBroker;
+
+    public TwoPhaseCommit(final AbstractDataTransaction<P, D> transaction, final AbstractDataBroker<P, D, DCL> broker) {
+        this.transaction = transaction;
+        this.dataBroker = broker;
+    }
+
+    @Override
+    public RpcResult<TransactionStatus> call() throws Exception {
+        final Object transactionId = this.transaction.getIdentifier();
+
+        Set<P> changedPaths = ImmutableSet.<P> builder().addAll(transaction.getUpdatedConfigurationData().keySet())
+                .addAll(transaction.getCreatedConfigurationData().keySet())
+                .addAll(transaction.getRemovedConfigurationData())
+                .addAll(transaction.getUpdatedOperationalData().keySet())
+                .addAll(transaction.getCreatedOperationalData().keySet())
+                .addAll(transaction.getRemovedOperationalData()).build();
+
+        log.trace("Transaction: {} Affected Subtrees:", transactionId, changedPaths);
+
+        final ImmutableList.Builder<ListenerStateCapture<P, D, DCL>> listenersBuilder = ImmutableList.builder();
+        listenersBuilder.addAll(dataBroker.affectedListeners(changedPaths));
+        filterProbablyAffectedListeners(dataBroker.probablyAffectedListeners(changedPaths),listenersBuilder);
+
+
+
+        final ImmutableList<ListenerStateCapture<P, D, DCL>> listeners = listenersBuilder.build();
+        final Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(changedPaths);
+        captureInitialState(listeners);
+
+
+        log.trace("Transaction: {} Starting Request Commit.",transactionId);
+        final List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList<>();
+        try {
+            for (final DataCommitHandler<P, D> handler : commitHandlers) {
+                DataCommitTransaction<P, D> requestCommit = handler.requestCommit(this.transaction);
+                if (requestCommit != null) {
+                    handlerTransactions.add(requestCommit);
+                } else {
+                    log.debug("Transaction: {}, Handler {}  is not participating in transaction.", transactionId,
+                            handler);
+                }
+            }
+        } catch (Exception e) {
+            log.error("Transaction: {} Request Commit failed", transactionId, e);
+            dataBroker.getFailedTransactionsCount().getAndIncrement();
+            this.transaction.changeStatus(TransactionStatus.FAILED);
+            return this.rollback(handlerTransactions, e);
+
+        }
+
+        log.trace("Transaction: {} Starting Finish.",transactionId);
+        final List<RpcResult<Void>> results = new ArrayList<RpcResult<Void>>();
+        try {
+            for (final DataCommitTransaction<P, D> subtransaction : handlerTransactions) {
+                results.add(subtransaction.finish());
+            }
+        } catch (Exception e) {
+            log.error("Transaction: {} Finish Commit failed", transactionId, e);
+            dataBroker.getFailedTransactionsCount().getAndIncrement();
+            transaction.changeStatus(TransactionStatus.FAILED);
+            return this.rollback(handlerTransactions, e);
+        }
+
+
+        dataBroker.getFinishedTransactionsCount().getAndIncrement();
+        transaction.changeStatus(TransactionStatus.COMMITED);
+
+        log.trace("Transaction: {} Finished successfully.", transactionId);
+
+        captureFinalState(listeners);
+
+        log.trace("Transaction: {} Notifying listeners.");
+
+        publishDataChangeEvent(listeners);
+        return Rpcs.<TransactionStatus> getRpcResult(true, TransactionStatus.COMMITED,
+                Collections.<RpcError> emptySet());
+    }
+
+    private void captureInitialState(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
+        for (ListenerStateCapture<P, D, DCL> state : listeners) {
+            state.setInitialConfigurationState(dataBroker.readConfigurationData(state.getPath()));
+            state.setInitialOperationalState(dataBroker.readOperationalData(state.getPath()));
+        }
+    }
+
+
+    private void captureFinalState(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
+        for (ListenerStateCapture<P, D, DCL> state : listeners) {
+            state.setFinalConfigurationState(dataBroker.readConfigurationData(state.getPath()));
+            state.setFinalOperationalState(dataBroker.readOperationalData(state.getPath()));
+        }
+    }
+
+    private void filterProbablyAffectedListeners(
+            ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners, Builder<ListenerStateCapture<P, D, DCL>> reallyAffected) {
+
+        for(ListenerStateCapture<P, D, DCL> listenerSet : probablyAffectedListeners) {
+            P affectedPath = listenerSet.getPath();
+            Optional<RootedChangeSet<P,D>> configChange = resolveConfigChange(affectedPath);
+            Optional<RootedChangeSet<P, D>> operChange = resolveOperChange(affectedPath);
+
+            if(configChange.isPresent() || operChange.isPresent()) {
+                reallyAffected.add(listenerSet);
+                if(configChange.isPresent()) {
+                    listenerSet.setNormalizedConfigurationChanges(configChange.get());
+                }
+
+                if(operChange.isPresent()) {
+                    listenerSet.setNormalizedOperationalChanges(operChange.get());
+                }
+            }
+        }
+    }
+
+    private Optional<RootedChangeSet<P, D>> resolveOperChange(P affectedPath) {
+        Map<P, D> originalOper = dataBroker.deepGetBySubpath(transaction.getOriginalOperationalData(),affectedPath);
+        Map<P, D> createdOper = dataBroker.deepGetBySubpath(transaction.getCreatedOperationalData(),affectedPath);
+        Map<P, D> updatedOper = dataBroker.deepGetBySubpath(transaction.getUpdatedOperationalData(),affectedPath);
+        Set<P> removedOper = Sets.filter(transaction.getRemovedOperationalData(), dataBroker.createIsContainedPredicate(affectedPath));
+        return resolveChanges(affectedPath,originalOper,createdOper,updatedOper,removedOper);
+    }
+
+    private Optional<RootedChangeSet<P, D>> resolveConfigChange(P affectedPath) {
+        Map<P, D> originalConfig = dataBroker.deepGetBySubpath(transaction.getOriginalConfigurationData(),affectedPath);
+        Map<P, D> createdConfig = dataBroker.deepGetBySubpath(transaction.getCreatedConfigurationData(),affectedPath);
+        Map<P, D> updatedConfig = dataBroker.deepGetBySubpath(transaction.getUpdatedConfigurationData(),affectedPath);
+        Set<P> removedConfig = Sets.filter(transaction.getRemovedConfigurationData(), dataBroker.createIsContainedPredicate(affectedPath));
+        return resolveChanges(affectedPath,originalConfig,createdConfig,updatedConfig,removedConfig);
+    }
+
+    private Optional<RootedChangeSet<P,D>> resolveChanges(P affectedPath, Map<P, D> originalConfig, Map<P, D> createdConfig, Map<P, D> updatedConfig,Set<P> potentialDeletions) {
+        Predicate<P> isContained = dataBroker.createIsContainedPredicate(affectedPath);
+
+        if(createdConfig.isEmpty() && updatedConfig.isEmpty() && potentialDeletions.isEmpty()) {
+            return Optional.absent();
+        }
+        RootedChangeSet<P, D> changeSet = new RootedChangeSet<P,D>(affectedPath,originalConfig);
+        changeSet.addCreated(createdConfig);
+
+        for(Entry<P, D> entry : updatedConfig.entrySet()) {
+            if(originalConfig.containsKey(entry.getKey())) {
+                changeSet.addUpdated(entry);
+            } else {
+                changeSet.addCreated(entry);
+            }
+        }
+
+        for(Entry<P,D> entry : originalConfig.entrySet()) {
+            for(P deletion : potentialDeletions) {
+                if(isContained.apply(deletion)) {
+                    changeSet.addRemoval(entry.getKey());
+                }
+            }
+        }
+
+        if(changeSet.isChange()) {
+            return Optional.of(changeSet);
+        } else {
+            return Optional.absent();
+        }
+
+    }
+
+    public void publishDataChangeEvent(final ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
+        ExecutorService executor = this.dataBroker.getExecutor();
+        final Runnable notifyTask = new Runnable() {
+            @Override
+            public void run() {
+                for (final ListenerStateCapture<P, D, DCL> listenerSet : listeners) {
+                    {
+                        DataChangeEvent<P, D> changeEvent = listenerSet.createEvent(transaction);
+                        for (final DataChangeListenerRegistration<P, D, DCL> listener : listenerSet.getListeners()) {
+                            try {
+                                listener.getInstance().onDataChanged(changeEvent);
+                            } catch (Exception e) {
+                                log.error("Unhandled exception when invoking listener {}", listener);
+                            }
+                        }
+                    }
+                }
+            }
+        };
+        executor.submit(notifyTask);
+    }
+
+    public RpcResult<TransactionStatus> rollback(final List<DataCommitTransaction<P, D>> transactions, final Exception e) {
+        for (final DataCommitTransaction<P, D> transaction : transactions) {
+            transaction.rollback();
+        }
+        Set<RpcError> _emptySet = Collections.<RpcError> emptySet();
+        return Rpcs.<TransactionStatus> getRpcResult(false, TransactionStatus.FAILED, _emptySet);
+    }
+}
index 24cb99f8c367cd566c576f877cb6bcb20b6519e9..8a9d1678657c6b3555ba23d4a3c536cae5d3d30d 100644 (file)
@@ -14,7 +14,7 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 
-public interface RpcProvisionRegistry extends BrokerService, RouteChangePublisher<RpcRoutingContext, InstanceIdentifier> {
+public interface RpcProvisionRegistry extends RpcImplementation, BrokerService, RouteChangePublisher<RpcRoutingContext, InstanceIdentifier> {
 
     /**
      * Registers an implementation of the rpc.
index aa5138a04db226d777c70dbd8ece6d3df5b77736..8f734d7d4c3934a22c1ca91af146534a0a5fac5b 100644 (file)
@@ -73,7 +73,7 @@ public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
         return session;
     }
 
-    protected def Future<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
+    protected def Future<RpcResult<CompositeNode>> invokeRpcAsync(QName rpc, CompositeNode input) {
         val result = executor.submit([|router.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
         return result;
     }
@@ -135,5 +135,13 @@ public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
     override <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> registerRouteChangeListener(L listener) {
         return router.registerRouteChangeListener(listener);
     }
+
+    override invokeRpc(QName rpc,CompositeNode input){
+        return router.invokeRpc(rpc,input)
+    }
+
+    override getSupportedRpcs() {
+        return router.getSupportedRpcs();
+    }
     
 }
index e4808e9bd6ddfdc86dace504e67f8173eb1e8ca9..813f52b67d4bab0632fdefb6a9fe7d23d8054792 100644 (file)
@@ -37,7 +37,7 @@ class ConsumerContextImpl implements ConsumerSession {
     }
 
     override rpc(QName rpc, CompositeNode input) {
-        return broker.invokeRpc(rpc, input);
+        return broker.invokeRpcAsync(rpc, input);
     }
 
     override <T extends BrokerService> T getService(Class<T> service) {
index 5a3e060a3c9a52a2c55b4876b283ec5559de63e5..a8bdddb5108d3ab0242024b7c3db4df3a1d1694a 100644 (file)
@@ -219,4 +219,6 @@ public class MountPointImpl implements MountProvisionInstance, SchemaContextProv
             L listener) {
         return rpcs.registerRouteChangeListener(listener);
     }
+
+
 }
index b32d906d1e70cc8dc5997fea3c57c3379943b6b5..b02a37c3003e594b728554f6c5a0124961872ce3 100644 (file)
@@ -30,6 +30,7 @@ import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.opendaylight.yangtools.yang.model.api.SchemaServiceListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,6 +42,7 @@ import com.google.common.collect.ImmutableSet;
 public class SchemaAwareDataStoreAdapter extends AbstractLockableDelegator<DataStore> implements //
         DataStore, //
         SchemaServiceListener, //
+        SchemaContextListener, //
         AutoCloseable {
 
     private final static Logger LOG = LoggerFactory.getLogger(SchemaAwareDataStoreAdapter.class);
index e375e14cf2e96bf9e4ca29998a156df402ec3420..e218a957826f110bd36d3acb3fe059fc69b5092a 100644 (file)
@@ -16,9 +16,13 @@ import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.osgi.framework.ServiceReference;
 
+import java.util.Set;
+
 public class RpcProvisionRegistryProxy extends AbstractBrokerServiceProxy<RpcProvisionRegistry>
                                        implements RpcProvisionRegistry {
 
@@ -45,4 +49,15 @@ public class RpcProvisionRegistryProxy extends AbstractBrokerServiceProxy<RpcPro
     public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(L listener) {
         return getDelegate().registerRouteChangeListener(listener);
     }
+
+
+  @Override
+  public Set<QName> getSupportedRpcs() {
+    return getDelegate().getSupportedRpcs();
+  }
+
+  @Override
+  public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+    return getDelegate().invokeRpc(rpc,input);
+  }
 }
index 6d0b5ea0e31bdb75046ff157071a4ded54bfcaa7..395bacb5e374f9595f1d0f6a63e8778b6f83d1f6 100644 (file)
@@ -122,7 +122,7 @@ public class NodeStatisticsHandler {
         this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
     }
 
-    public class FlowEntry {
+    private static class FlowEntry {
         private final Short tableId;
         private final Flow flow;
 
@@ -143,7 +143,6 @@ public class NodeStatisticsHandler {
         public int hashCode() {
             final int prime = 31;
             int result = 1;
-            result = prime * result + getOuterType().hashCode();
             result = prime * result + ((flow == null) ? 0 : flow.hashCode());
             result = prime * result + ((tableId == null) ? 0 : tableId.hashCode());
             return result;
@@ -158,8 +157,6 @@ public class NodeStatisticsHandler {
             if (getClass() != obj.getClass())
                 return false;
             FlowEntry other = (FlowEntry) obj;
-            if (!getOuterType().equals(other.getOuterType()))
-                return false;
             if (flow == null) {
                 if (other.flow != null)
                     return false;
@@ -172,10 +169,6 @@ public class NodeStatisticsHandler {
                 return false;
             return true;
         }
-
-        private NodeStatisticsHandler getOuterType() {
-            return NodeStatisticsHandler.this;
-        }
     }
 
     private static final class QueueEntry{
index fec6bbe6b466519f9c5e8ed3cc5d89aa2558b66e..d2016b1f6337b6923662268ae7d8029d34ae85b0 100644 (file)
@@ -48,7 +48,7 @@ public class SimpleBroadcastHandlerImpl implements IBroadcastHandler, IListenDat
 
     protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
-    BroadcastMode mode = BroadcastMode.BROADCAST_TO_NONINTERNAL;
+    BroadcastMode mode = BroadcastMode.DISABLED;
 
     @Override
     public PacketResult receiveDataPacket(RawPacket inPkt) {
index 19f45e63c591dbde396e0929141c5c042a55adf5..615bb4df8379284df696363b9efd8461ec353f2b 100644 (file)
@@ -1004,7 +1004,8 @@ public class SwitchManager implements ISwitchManager, IConfigurationContainerAwa
             }
         }
 
-        boolean proactiveForwarding = false;
+        boolean forwardingModeChanged = false;
+
         // copy node properties from config
         if (nodeConfigList != null) {
             String nodeId = node.toString();
@@ -1014,7 +1015,7 @@ public class SwitchManager implements ISwitchManager, IConfigurationContainerAwa
                 propMap.putAll(nodeProperties);
                 if (nodeProperties.get(ForwardingMode.name) != null) {
                     ForwardingMode mode = (ForwardingMode) nodeProperties.get(ForwardingMode.name);
-                    proactiveForwarding = mode.isProactive();
+                    forwardingModeChanged = mode.isProactive();
                 }
             }
         }
@@ -1023,28 +1024,35 @@ public class SwitchManager implements ISwitchManager, IConfigurationContainerAwa
             Property defaultMode = new ForwardingMode(ForwardingMode.REACTIVE_FORWARDING);
             propMap.put(ForwardingMode.name, defaultMode);
         }
-        boolean result = false;
-        if (propMapCurr == null) {
-            if (nodeProps.putIfAbsent(node, propMap) == null) {
-                result = true;
-            }
+
+        boolean propsAdded = false;
+        // Attempt initial add
+        if (nodeProps.putIfAbsent(node, propMap) == null) {
+                propsAdded = true;
+
+                /* Notify listeners only for initial node addition
+                 * to avoid expensive tasks triggered by redundant notifications
+                 */
+                notifyNode(node, UpdateType.ADDED, propMap);
         } else {
-            result = nodeProps.replace(node, propMapCurr, propMap);
+
+            propsAdded = nodeProps.replace(node, propMapCurr, propMap);
+
+            // check whether forwarding mode changed
+            if (propMapCurr.get(ForwardingMode.name) != null) {
+                ForwardingMode mode = (ForwardingMode) propMapCurr.get(ForwardingMode.name);
+                forwardingModeChanged ^= mode.isProactive();
+            }
         }
-        if (!result) {
-            log.debug("Cluster conflict: Conflict while adding the node properties. Node: {}  Properties: {}",
-                    node.getID(), props);
+        if (!propsAdded) {
+            log.debug("Cluster conflict while adding node {}. Overwriting with latest props: {}", node.getID(), props);
             addNodeProps(node, propMap);
         }
 
-        // check if span ports are configed
+        // check if span ports are configured
         addSpanPorts(node);
-
-        // notify node listeners
-        notifyNode(node, UpdateType.ADDED, propMap);
-
         // notify proactive mode forwarding
-        if (proactiveForwarding) {
+        if (forwardingModeChanged) {
             notifyModeChange(node, true);
         }
     }
@@ -1054,7 +1062,12 @@ public class SwitchManager implements ISwitchManager, IConfigurationContainerAwa
         if (nodeProps == null) {
             return;
         }
-        nodeProps.remove(node);
+
+        if (nodeProps.remove(node) == null) {
+            log.debug("Received redundant node REMOVED udate for {}. Skipping..", node);
+            return;
+        }
+
         nodeConnectorNames.remove(node);
         Set<NodeConnector> removeNodeConnectorSet = new HashSet<NodeConnector>();
         for (Map.Entry<NodeConnector, Map<String, Property>> entry : nodeConnectorProps.entrySet()) {
@@ -1149,6 +1162,13 @@ public class SwitchManager implements ISwitchManager, IConfigurationContainerAwa
 
         switch (type) {
         case ADDED:
+            // Skip redundant ADDED update (e.g. cluster switch-over)
+            if (nodeConnectorProps.containsKey(nodeConnector)) {
+                log.debug("Redundant nodeconnector ADDED for {}, props {} for container {}",
+                        nodeConnector, props, containerName);
+                update = false;
+            }
+
             if (props != null) {
                 for (Property prop : props) {
                     addNodeConnectorProp(nodeConnector, prop);
@@ -1158,6 +1178,7 @@ public class SwitchManager implements ISwitchManager, IConfigurationContainerAwa
                 addNodeConnectorProp(nodeConnector, null);
             }
 
+
             addSpanPort(nodeConnector);
             break;
         case CHANGED:
@@ -2026,9 +2047,9 @@ public class SwitchManager implements ISwitchManager, IConfigurationContainerAwa
         // only add if span is configured on this nodeConnector
         for (SpanConfig conf : getSpanConfigList(nodeConnector.getNode())) {
             if (conf.getPortArrayList().contains(nodeConnector)) {
-                List<NodeConnector> ncLists = new ArrayList<NodeConnector>();
-                ncLists.add(nodeConnector);
-                addSpanPorts(nodeConnector.getNode(), ncLists);
+                List<NodeConnector> ncList = new ArrayList<NodeConnector>();
+                ncList.add(nodeConnector);
+                addSpanPorts(nodeConnector.getNode(), ncList);
                 return;
             }
         }