Merge "Create transaction on the backend datastore only when neccessary"
authorTony Tkacik <ttkacik@cisco.com>
Fri, 24 Apr 2015 10:09:48 +0000 (10:09 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 24 Apr 2015 10:09:48 +0000 (10:09 +0000)
59 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/BindingNotificationAdapterModule.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/NotificationBrokerImplModule.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/AbstractNotificationListenerRegistration.java [moved from opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/AbstractNotificationListenerRegistration.java with 96% similarity]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/AggregatedNotificationListenerRegistration.java [moved from opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/AggregatedNotificationListenerRegistration.java with 97% similarity]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/HydrogenNotificationBrokerImpl.java [moved from opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java with 95% similarity]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/ListenerMapGeneration.java [moved from opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/ListenerMapGeneration.java with 98% similarity]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/NotificationListenerRegistration.java [moved from opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationListenerRegistration.java with 95% similarity]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/compat/NotifyTask.java [moved from opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotifyTask.java with 98% similarity]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMNotificationServiceAdapter.java
opendaylight/md-sal/sal-binding-broker/src/main/yang/opendaylight-binding-broker-impl.yang
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/test/DataBrokerTestCustomizer.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java
opendaylight/md-sal/sal-binding-it/src/test/resources/controller.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataTreeCandidateNode.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DeletedDataTreeCandidateNode.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ModifiedDataTreeCandidateNode.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryShardInfo.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java
opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataTreeListenerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractDOMStoreTransaction.java
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractSnapshotBackedTransactionChain.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedReadTransaction.java [moved from opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadTransaction.java with 80% similarity]
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedReadWriteTransaction.java [moved from opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadWriteTransaction.java with 78% similarity]
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedTransactions.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedWriteTransaction.java [moved from opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedWriteTransaction.java with 81% similarity]
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChainedTransactionCommitImpl.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DOMStoreTransactionChainImpl.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreThreePhaseCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/XmlNormalizedNodeBodyReader.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java

index 8479548..9a91662 100644 (file)
@@ -201,14 +201,16 @@ public class SnapshotManager implements SnapshotState {
 
             LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
 
+            SnapshotManager.this.currentState = CREATING;
+
             try {
                 createSnapshotProcedure.apply(null);
             } catch (Exception e) {
+                SnapshotManager.this.currentState = IDLE;
                 LOG.error("Error creating snapshot", e);
                 return false;
             }
 
-            SnapshotManager.this.currentState = CREATING;
             return true;
         }
 
index 903cb27..971153b 100644 (file)
@@ -9,18 +9,17 @@ package org.opendaylight.controller.config.yang.md.sal.binding.impl;
 
 import org.opendaylight.controller.config.api.DependencyResolver;
 import org.opendaylight.controller.config.api.ModuleIdentifier;
-import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
 import org.opendaylight.controller.md.sal.binding.impl.BindingDOMNotificationServiceAdapter;
+import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
 import org.opendaylight.controller.sal.core.api.Broker;
 
 public class BindingNotificationAdapterModule extends AbstractBindingNotificationAdapterModule  {
-    public BindingNotificationAdapterModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver) {
+    public BindingNotificationAdapterModule(final ModuleIdentifier identifier, final DependencyResolver dependencyResolver) {
         super(identifier, dependencyResolver);
     }
 
-    public BindingNotificationAdapterModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.md.sal.binding.impl.BindingNotificationAdapterModule oldModule, java.lang.AutoCloseable oldInstance) {
+    public BindingNotificationAdapterModule(final ModuleIdentifier identifier, final DependencyResolver dependencyResolver, final org.opendaylight.controller.config.yang.md.sal.binding.impl.BindingNotificationAdapterModule oldModule, final java.lang.AutoCloseable oldInstance) {
         super(identifier, dependencyResolver, oldModule, oldInstance);
     }
 
@@ -34,7 +33,7 @@ public class BindingNotificationAdapterModule extends AbstractBindingNotificatio
         final BindingToNormalizedNodeCodec codec = getBindingMappingServiceDependency();
         final Broker.ProviderSession session = getDomAsyncBrokerDependency().registerProvider(new DummyDOMProvider());
         final DOMNotificationService notifService = session.getService(DOMNotificationService.class);
-        return new BindingDOMNotificationServiceAdapter(codec.getCodecRegistry(), notifService, SingletonHolder.INVOKER_FACTORY);
+        return new BindingDOMNotificationServiceAdapter(codec.getCodecRegistry(), notifService);
     }
 
 }
index 415c978..58d5a85 100644 (file)
@@ -7,9 +7,12 @@
  */
 package org.opendaylight.controller.config.yang.md.sal.binding.impl;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
+import org.opendaylight.controller.md.sal.binding.compat.HydrogenNotificationBrokerImpl;
+
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.controller.md.sal.binding.api.NotificationService;
+import org.opendaylight.controller.md.sal.binding.compat.HeliumNotificationProviderServiceAdapter;
 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
-import org.opendaylight.controller.sal.binding.impl.NotificationBrokerImpl;
 
 /**
 *
@@ -17,14 +20,14 @@ import org.opendaylight.controller.sal.binding.impl.NotificationBrokerImpl;
 public final class NotificationBrokerImplModule extends
         org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractNotificationBrokerImplModule {
 
-    public NotificationBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,
-            org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+    public NotificationBrokerImplModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier,
+            final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
         super(identifier, dependencyResolver);
     }
 
-    public NotificationBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,
-            org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
-            NotificationBrokerImplModule oldModule, java.lang.AutoCloseable oldInstance) {
+    public NotificationBrokerImplModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier,
+            final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
+            final NotificationBrokerImplModule oldModule, final java.lang.AutoCloseable oldInstance) {
         super(identifier, dependencyResolver, oldModule, oldInstance);
     }
 
@@ -37,14 +40,20 @@ public final class NotificationBrokerImplModule extends
     @Override
     public java.lang.AutoCloseable createInstance() {
 
+        final NotificationPublishService notificationPublishService = getNotificationPublishAdapterDependency();
+        final NotificationService notificationService = getNotificationAdapterDependency();
+
+        if(notificationPublishService != null & notificationService != null) {
+            return new HeliumNotificationProviderServiceAdapter(notificationPublishService, notificationService);
+        }
+
         /*
          *  FIXME: Switch to new broker (which has different threading model)
          *  once this change is communicated with downstream users or
          *  we will have adapter implementation which will honor Helium
          *  threading model for notifications.
          */
-        ListeningExecutorService listeningExecutor = SingletonHolder.getDefaultNotificationExecutor();
-        NotificationBrokerImpl broker = new NotificationBrokerImpl(listeningExecutor);
-        return broker;
+
+        return new HydrogenNotificationBrokerImpl(SingletonHolder.getDefaultNotificationExecutor());
     }
 }
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.sal.binding.impl;
+package org.opendaylight.controller.md.sal.binding.compat;
 
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -28,15 +28,15 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 
-public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
-    private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
+public class HydrogenNotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(HydrogenNotificationBrokerImpl.class);
 
     private final ListenerRegistry<NotificationInterestListener> interestListeners =
             ListenerRegistry.create();
     private final AtomicReference<ListenerMapGeneration> listeners = new AtomicReference<>(new ListenerMapGeneration());
     private final ExecutorService executor;
 
-    public NotificationBrokerImpl(final ExecutorService executor) {
+    public HydrogenNotificationBrokerImpl(final ExecutorService executor) {
         this.executor = Preconditions.checkNotNull(executor);
     }
 
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.sal.binding.impl;
+package org.opendaylight.controller.md.sal.binding.compat;
 
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.slf4j.Logger;
index cdf03fa..2a31d34 100644 (file)
@@ -14,8 +14,6 @@ import org.opendaylight.controller.md.sal.binding.api.NotificationService;
 import org.opendaylight.controller.md.sal.binding.impl.BindingDOMAdapterBuilder.Factory;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
 import org.opendaylight.controller.md.sal.dom.api.DOMService;
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
-import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
 import org.opendaylight.yangtools.binding.data.codec.api.BindingNormalizedNodeSerializer;
 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
@@ -34,7 +32,7 @@ public class BindingDOMNotificationServiceAdapter implements NotificationService
     private final BindingNormalizedNodeSerializer codec;
     private final DOMNotificationService domNotifService;
 
-    public BindingDOMNotificationServiceAdapter(final BindingNormalizedNodeSerializer codec, final DOMNotificationService domNotifService, final NotificationInvokerFactory notificationInvokerFactory) {
+    public BindingDOMNotificationServiceAdapter(final BindingNormalizedNodeSerializer codec, final DOMNotificationService domNotifService) {
         this.codec = codec;
         this.domNotifService = domNotifService;
     }
@@ -72,8 +70,7 @@ public class BindingDOMNotificationServiceAdapter implements NotificationService
         protected NotificationService createInstance(final BindingToNormalizedNodeCodec codec,
                 final ClassToInstanceMap<DOMService> delegates) {
             final DOMNotificationService domNotification = delegates.getInstance(DOMNotificationService.class);
-            final NotificationInvokerFactory invokerFactory = SingletonHolder.INVOKER_FACTORY;
-            return new BindingDOMNotificationServiceAdapter(codec.getCodecRegistry(), domNotification, invokerFactory);
+            return new BindingDOMNotificationServiceAdapter(codec.getCodecRegistry(), domNotification);
         }
 
         @Override
index ee130fd..866cb84 100644 (file)
@@ -217,6 +217,29 @@ module opendaylight-sal-binding-broker-impl {
         }
     }
 
+    augment "/config:modules/config:module/config:configuration" {
+        case binding-notification-broker {
+            when "/config:modules/config:module/config:type = 'binding-notification-broker'";
+            container notification-adapter {
+                uses config:service-ref {
+                    refine type {
+                        mandatory false;
+                        config:required-identity binding-new-notification-service;
+                    }
+                }
+            }
+
+            container notification-publish-adapter {
+                uses config:service-ref {
+                    refine type {
+                        mandatory false;
+                        config:required-identity binding-new-notification-publish-service;
+                    }
+                }
+            }
+        }
+    }
+
     augment "/config:modules/config:module/config:state" {
         case binding-notification-broker {
             when "/config:modules/config:module/config:type = 'binding-notification-broker'";
index 547d349..2647477 100644 (file)
@@ -23,7 +23,6 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.broker.impl.DOMNotificationRouter;
 import org.opendaylight.controller.md.sal.dom.broker.impl.SerializedDOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
 import org.opendaylight.controller.sal.binding.test.util.MockSchemaService;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
@@ -77,8 +76,7 @@ public class DataBrokerTestCustomizer {
     }
 
     public NotificationService createNotificationService() {
-        return new BindingDOMNotificationServiceAdapter(bindingToNormalized.getCodecRegistry(), domNotificationRouter,
-                SingletonHolder.INVOKER_FACTORY);
+        return new BindingDOMNotificationServiceAdapter(bindingToNormalized.getCodecRegistry(), domNotificationRouter);
     }
 
     public NotificationPublishService createNotificationPublishService() {
index a439e9e..1203a72 100644 (file)
@@ -20,19 +20,27 @@ import com.google.common.util.concurrent.MoreExecutors;
 import javassist.ClassPool;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.controller.md.sal.binding.api.NotificationService;
+import org.opendaylight.controller.md.sal.binding.compat.HeliumNotificationProviderServiceAdapter;
 import org.opendaylight.controller.md.sal.binding.compat.HeliumRpcProviderRegistry;
 import org.opendaylight.controller.md.sal.binding.compat.HydrogenDataBrokerAdapter;
 import org.opendaylight.controller.md.sal.binding.compat.HydrogenMountProvisionServiceAdapter;
 import org.opendaylight.controller.md.sal.binding.impl.BindingDOMDataBrokerAdapter;
 import org.opendaylight.controller.md.sal.binding.impl.BindingDOMMountPointServiceAdapter;
+import org.opendaylight.controller.md.sal.binding.impl.BindingDOMNotificationPublishServiceAdapter;
+import org.opendaylight.controller.md.sal.binding.impl.BindingDOMNotificationServiceAdapter;
 import org.opendaylight.controller.md.sal.binding.impl.BindingDOMRpcProviderServiceAdapter;
 import org.opendaylight.controller.md.sal.binding.impl.BindingDOMRpcServiceAdapter;
 import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.broker.impl.DOMNotificationRouter;
 import org.opendaylight.controller.md.sal.dom.broker.impl.DOMRpcRouter;
 import org.opendaylight.controller.md.sal.dom.broker.impl.SerializedDOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.broker.impl.mount.DOMMountPointServiceImpl;
@@ -41,7 +49,6 @@ import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.controller.sal.binding.api.mount.MountProviderService;
-import org.opendaylight.controller.sal.binding.impl.NotificationBrokerImpl;
 import org.opendaylight.controller.sal.binding.impl.RootBindingAwareBroker;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
 import org.opendaylight.controller.sal.core.api.BrokerService;
@@ -65,7 +72,7 @@ public class BindingTestContext implements AutoCloseable {
 
     private RootBindingAwareBroker baBrokerImpl;
 
-    private NotificationBrokerImpl baNotifyImpl;
+    private HeliumNotificationProviderServiceAdapter baNotifyImpl;
 
 
     private BrokerImpl biBrokerImpl;
@@ -93,6 +100,14 @@ public class BindingTestContext implements AutoCloseable {
     private BindingDOMRpcProviderServiceAdapter baProviderRpc;
     private DOMRpcRouter domRouter;
 
+    private NotificationPublishService publishService;
+
+    private NotificationService listenService;
+
+    private DOMNotificationPublishService domPublishService;
+
+    private DOMNotificationService domListenService;
+
 
 
     public DOMDataBroker getDomAsyncDataBroker() {
@@ -249,7 +264,12 @@ public class BindingTestContext implements AutoCloseable {
 
     public void startBindingNotificationBroker() {
         checkState(executor != null);
-        baNotifyImpl = new NotificationBrokerImpl(executor);
+        final DOMNotificationRouter router = DOMNotificationRouter.create(16);
+        domPublishService = router;
+        domListenService = router;
+        publishService = new BindingDOMNotificationPublishServiceAdapter(codec, domPublishService);
+        listenService = new BindingDOMNotificationServiceAdapter(codec, domListenService);
+        baNotifyImpl = new HeliumNotificationProviderServiceAdapter(publishService,listenService);
 
     }
 
index f9c4a04..7bfe254 100644 (file)
                 <module>
                             <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:binding-notification-broker</type>
                             <name>binding-notification-broker</name>
+                    <notification-adapter xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">
+                         <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:binding-new-notification-service</type>
+                         <name>binding-notification-adapter</name>
+                    </notification-adapter>
+                    <notification-publish-adapter xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">
+                         <type  xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:binding-new-notification-publish-service</type>
+                         <name>binding-notification-publish-adapter</name>
+                    </notification-publish-adapter>
                         </module>
                         <module>
                             <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:binding-broker-impl</type>
                         <module>
                             <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl">prefix:dom-inmemory-data-broker</type>
                             <name>inmemory-data-broker</name>
+
                             <schema-service>
                                 <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
                                 <name>yang-schema-service</name>
                             </schema-service>
+
+                   <config-data-store>
+                        <type xmlns:config-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store">config-dom-store-spi:config-dom-datastore</type>
+                        <name>config-store-service</name>
+                    </config-data-store>
+
+                    <operational-data-store>
+                        <type xmlns:operational-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store">operational-dom-store-spi:operational-dom-datastore</type>
+                        <name>operational-store-service</name>
+                    </operational-data-store>
                         </module>
                         <module>
                             <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl">prefix:dom-broker-impl</type>
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataTreeCandidateNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataTreeCandidateNode.java
new file mode 100644 (file)
index 0000000..c3940e5
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+
+/**
+ * Abstract base class for our internal implementation of {@link DataTreeCandidateNode},
+ * which we instantiate from a serialized stream. We do not retain the before-image and
+ * do not implement {@link #getModifiedChild(PathArgument)}, as that method is only
+ * useful for end users. Instances based on this class should never be leaked outside of
+ * this component.
+ */
+abstract class AbstractDataTreeCandidateNode implements DataTreeCandidateNode {
+    private final ModificationType type;
+
+    protected AbstractDataTreeCandidateNode(final ModificationType type) {
+        this.type = Preconditions.checkNotNull(type);
+    }
+
+    @Override
+    public final DataTreeCandidateNode getModifiedChild(final PathArgument identifier) {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
+    @Override
+    public final ModificationType getModificationType() {
+        return type;
+    }
+
+    @Override
+    public final Optional<NormalizedNode<?, ?>> getDataBefore() {
+        throw new UnsupportedOperationException("Before-image not available after serialization");
+    }
+
+    static DataTreeCandidateNode createUnmodified() {
+        return new AbstractDataTreeCandidateNode(ModificationType.UNMODIFIED) {
+            @Override
+            public PathArgument getIdentifier() {
+                throw new UnsupportedOperationException("Root node does not have an identifier");
+            }
+
+            @Override
+            public Optional<NormalizedNode<?, ?>> getDataAfter() {
+                throw new UnsupportedOperationException("After-image not available after serialization");
+            }
+
+            @Override
+            public Collection<DataTreeCandidateNode> getChildNodes() {
+                throw new UnsupportedOperationException("Children not available after serialization");
+            }
+        };
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java
new file mode 100644 (file)
index 0000000..4b471cf
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ChainedCommitCohort extends ShardDataTreeCohort {
+    private static final Logger LOG = LoggerFactory.getLogger(ChainedCommitCohort.class);
+    private final ReadWriteShardDataTreeTransaction transaction;
+    private final ShardDataTreeTransactionChain chain;
+    private final ShardDataTreeCohort delegate;
+
+    ChainedCommitCohort(final ShardDataTreeTransactionChain chain, final ReadWriteShardDataTreeTransaction transaction, final ShardDataTreeCohort delegate) {
+        this.transaction = Preconditions.checkNotNull(transaction);
+        this.delegate = Preconditions.checkNotNull(delegate);
+        this.chain = Preconditions.checkNotNull(chain);
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        final ListenableFuture<Void> ret = delegate.commit();
+
+        Futures.addCallback(ret, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(Void result) {
+                chain.clearTransaction(transaction);
+                LOG.debug("Committed transaction {}", transaction);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                LOG.error("Transaction {} commit failed, cannot recover", transaction, t);
+            }
+        });
+
+        return ret;
+    }
+
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
+        return delegate.canCommit();
+    }
+
+    @Override
+    public ListenableFuture<Void> preCommit() {
+        return delegate.preCommit();
+    }
+
+    @Override
+    public ListenableFuture<Void> abort() {
+        return delegate.abort();
+    }
+
+    @Override
+    DataTreeCandidateTip getCandidate() {
+        return delegate.getCandidate();
+    }
+}
\ No newline at end of file
index f1f33bf..9a800c1 100644 (file)
@@ -7,9 +7,9 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
 import java.util.List;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,7 +59,7 @@ final class ChainedTransactionProxy extends TransactionProxy {
      * previous Tx's ready operations haven't completed yet.
      */
     @Override
-    protected Future<ActorSelection> sendFindPrimaryShardAsync(final String shardName) {
+    protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(final String shardName) {
         // Check if there are any previous ready Futures, otherwise let the super class handle it.
         if(previousReadyFutures.isEmpty()) {
             return super.sendFindPrimaryShardAsync(shardName);
@@ -75,7 +75,7 @@ final class ChainedTransactionProxy extends TransactionProxy {
                 previousReadyFutures, getActorContext().getClientDispatcher());
 
         // Add a callback for completion of the combined Futures.
-        final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
+        final Promise<PrimaryShardInfo> returnPromise = akka.dispatch.Futures.promise();
         OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
             @Override
             public void onComplete(Throwable failure, Iterable<Object> notUsed) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java
new file mode 100644 (file)
index 0000000..54167b2
--- /dev/null
@@ -0,0 +1,222 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import com.google.protobuf.GeneratedMessage.GeneratedExtension;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages.AppendEntries;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DataTreeCandidatePayload extends Payload implements Externalizable {
+    private static final Logger LOG = LoggerFactory.getLogger(DataTreeCandidatePayload.class);
+    private static final long serialVersionUID = 1L;
+    private static final byte DELETE = 0;
+    private static final byte SUBTREE_MODIFIED = 1;
+    private static final byte UNMODIFIED = 2;
+    private static final byte WRITE = 3;
+
+    private transient byte[] serialized;
+
+    public DataTreeCandidatePayload() {
+        // Required by Externalizable
+    }
+
+    private DataTreeCandidatePayload(final byte[] serialized) {
+        this.serialized = Preconditions.checkNotNull(serialized);
+    }
+
+    private static void writeChildren(final NormalizedNodeOutputStreamWriter writer, final DataOutput out,
+            final Collection<DataTreeCandidateNode> children) throws IOException {
+        out.writeInt(children.size());
+        for (DataTreeCandidateNode child : children) {
+            writeNode(writer, out, child);
+        }
+    }
+
+    private static void writeNode(final NormalizedNodeOutputStreamWriter writer, final DataOutput out,
+            final DataTreeCandidateNode node) throws IOException {
+        switch (node.getModificationType()) {
+        case DELETE:
+            out.writeByte(DELETE);
+            writer.writePathArgument(node.getIdentifier());
+            break;
+        case SUBTREE_MODIFIED:
+            out.writeByte(SUBTREE_MODIFIED);
+            writer.writePathArgument(node.getIdentifier());
+            writeChildren(writer, out, node.getChildNodes());
+            break;
+        case WRITE:
+            out.writeByte(WRITE);
+            writer.writeNormalizedNode(node.getDataAfter().get());
+            break;
+        case UNMODIFIED:
+            throw new IllegalArgumentException("Unmodified candidate should never be in the payload");
+        default:
+            throw new IllegalArgumentException("Unhandled node type " + node.getModificationType());
+        }
+    }
+
+    static DataTreeCandidatePayload create(DataTreeCandidate candidate) {
+        final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+        try (final NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(out)) {
+            writer.writeYangInstanceIdentifier(candidate.getRootPath());
+
+            final DataTreeCandidateNode node = candidate.getRootNode();
+            switch (node.getModificationType()) {
+            case DELETE:
+                out.writeByte(DELETE);
+                break;
+            case SUBTREE_MODIFIED:
+                out.writeByte(SUBTREE_MODIFIED);
+                writeChildren(writer, out, node.getChildNodes());
+                break;
+            case UNMODIFIED:
+                out.writeByte(UNMODIFIED);
+                break;
+            case WRITE:
+                out.writeByte(WRITE);
+                writer.writeNormalizedNode(node.getDataAfter().get());
+                break;
+            default:
+                throw new IllegalArgumentException("Unhandled node type " + node.getModificationType());
+            }
+
+            writer.close();
+        } catch (IOException e) {
+            throw new IllegalArgumentException(String.format("Failed to serialize candidate %s", candidate), e);
+        }
+
+        return new DataTreeCandidatePayload(out.toByteArray());
+    }
+
+    private static Collection<DataTreeCandidateNode> readChildren(final NormalizedNodeInputStreamReader reader,
+        final DataInput in) throws IOException {
+        final int size = in.readInt();
+        if (size != 0) {
+            final Collection<DataTreeCandidateNode> ret = new ArrayList<>(size);
+            for (int i = 0; i < size; ++i) {
+                final DataTreeCandidateNode child = readNode(reader, in);
+                if (child != null) {
+                    ret.add(child);
+                }
+            }
+            return ret;
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+    private static DataTreeCandidateNode readNode(final NormalizedNodeInputStreamReader reader,
+            final DataInput in) throws IOException {
+        final byte type = in.readByte();
+        switch (type) {
+        case DELETE:
+            return DeletedDataTreeCandidateNode.create(reader.readPathArgument());
+        case SUBTREE_MODIFIED:
+            final PathArgument identifier = reader.readPathArgument();
+            final Collection<DataTreeCandidateNode> children = readChildren(reader, in);
+            if (children.isEmpty()) {
+                LOG.debug("Modified node {} does not have any children, not instantiating it", identifier);
+                return null;
+            } else {
+                return ModifiedDataTreeCandidateNode.create(identifier, children);
+            }
+        case UNMODIFIED:
+            return null;
+        case WRITE:
+            return DataTreeCandidateNodes.fromNormalizedNode(reader.readNormalizedNode());
+        default:
+            throw new IllegalArgumentException("Unhandled node type " + type);
+        }
+    }
+
+    private static DataTreeCandidate parseCandidate(final ByteArrayDataInput in) throws IOException {
+        final NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(in);
+        final YangInstanceIdentifier rootPath = reader.readYangInstanceIdentifier();
+        final byte type = in.readByte();
+
+        final DataTreeCandidateNode rootNode;
+        switch (type) {
+        case DELETE:
+            rootNode = DeletedDataTreeCandidateNode.create();
+            break;
+        case SUBTREE_MODIFIED:
+            rootNode = ModifiedDataTreeCandidateNode.create(readChildren(reader, in));
+            break;
+        case WRITE:
+            rootNode = DataTreeCandidateNodes.fromNormalizedNode(reader.readNormalizedNode());
+            break;
+        default:
+            throw new IllegalArgumentException("Unhandled node type " + type);
+        }
+
+        return DataTreeCandidates.newDataTreeCandidate(rootPath, rootNode);
+    }
+
+    DataTreeCandidate getCandidate() throws IOException {
+        return parseCandidate(ByteStreams.newDataInput(serialized));
+    }
+
+    @Override
+    @Deprecated
+    @SuppressWarnings("rawtypes")
+    public <T> Map<GeneratedExtension, T> encode() {
+        return null;
+    }
+
+    @Override
+    @Deprecated
+    public Payload decode(final AppendEntries.ReplicatedLogEntry.Payload payload) {
+        return null;
+    }
+
+    @Override
+    public int size() {
+        return serialized.length;
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeByte((byte)serialVersionUID);
+        out.writeInt(serialized.length);
+        out.write(serialized);
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        final long version = in.readByte();
+        Preconditions.checkArgument(version == serialVersionUID, "Unsupported serialization version %s", version);
+
+        final int length = in.readInt();
+        serialized = new byte[length];
+        in.readFully(serialized);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DeletedDataTreeCandidateNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DeletedDataTreeCandidateNode.java
new file mode 100644 (file)
index 0000000..2df380b
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Optional;
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+
+/**
+ * A deserialized {@link DataTreeCandidateNode} which represents a deletion.
+ */
+abstract class DeletedDataTreeCandidateNode extends AbstractDataTreeCandidateNode {
+    private DeletedDataTreeCandidateNode() {
+        super(ModificationType.DELETE);
+    }
+
+    static DataTreeCandidateNode create() {
+        return new DeletedDataTreeCandidateNode() {
+            @Override
+            public PathArgument getIdentifier() {
+                throw new UnsupportedOperationException("Root node does not have an identifier");
+            }
+        };
+    }
+
+    static DataTreeCandidateNode create(final PathArgument identifier) {
+        return new DeletedDataTreeCandidateNode() {
+            @Override
+            public final PathArgument getIdentifier() {
+                return identifier;
+            }
+        };
+    }
+
+    @Override
+    public final Optional<NormalizedNode<?, ?>> getDataAfter() {
+        return Optional.absent();
+    }
+
+    @Override
+    public final Collection<DataTreeCandidateNode> getChildNodes() {
+        // We would require the before-image to reconstruct the list of nodes which
+        // were deleted.
+        throw new UnsupportedOperationException("Children not available after serialization");
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ModifiedDataTreeCandidateNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ModifiedDataTreeCandidateNode.java
new file mode 100644 (file)
index 0000000..208ec33
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+
+/**
+ * A deserialized {@link DataTreeCandidateNode} which represents a modification in
+ * one of its children.
+ */
+abstract class ModifiedDataTreeCandidateNode extends AbstractDataTreeCandidateNode {
+    private final Collection<DataTreeCandidateNode> children;
+
+    private ModifiedDataTreeCandidateNode(final Collection<DataTreeCandidateNode> children) {
+        super(ModificationType.SUBTREE_MODIFIED);
+        this.children = Preconditions.checkNotNull(children);
+    }
+
+    static DataTreeCandidateNode create(final Collection<DataTreeCandidateNode> children) {
+        return new ModifiedDataTreeCandidateNode(children) {
+            @Override
+            public PathArgument getIdentifier() {
+                throw new UnsupportedOperationException("Root node does not have an identifier");
+            }
+        };
+    }
+
+    static DataTreeCandidateNode create(final PathArgument identifier, final Collection<DataTreeCandidateNode> children) {
+        return new ModifiedDataTreeCandidateNode(children) {
+            @Override
+            public final PathArgument getIdentifier() {
+                return identifier;
+            }
+        };
+    }
+
+    @Override
+    public final Optional<NormalizedNode<?, ?>> getDataAfter() {
+        throw new UnsupportedOperationException("After-image not available after serialization");
+    }
+
+    @Override
+    public final Collection<DataTreeCandidateNode> getChildNodes() {
+        return children;
+    }
+}
index 0f3ab61..cb17335 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeModification> {
@@ -26,7 +25,7 @@ final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTrans
         parent.abortTransaction(this);
     }
 
-    DOMStoreThreePhaseCommitCohort ready() {
+    ShardDataTreeCohort ready() {
         Preconditions.checkState(close(), "Transaction is already closed");
 
         return parent.finishTransaction(this);
index b53d12c..62d3259 100644 (file)
@@ -63,6 +63,9 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
@@ -291,15 +294,21 @@ public class Shard extends RaftActor {
         }
     }
 
+    private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
+        return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType());
+    }
+
     void continueCommit(final CohortEntry cohortEntry) throws Exception {
+        final DataTreeCandidate candidate = cohortEntry.getCohort().getCandidate();
+
         // If we do not have any followers and we are not using persistence
         // or if cohortEntry has no modifications
         // we can apply modification to the state immediately
-        if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
-            applyModificationToState(getSender(), cohortEntry.getTransactionID(), cohortEntry.getModification());
+        if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
+            applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate);
         } else {
             Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
-                    new ModificationPayload(cohortEntry.getModification()));
+                DataTreeCandidatePayload.create(candidate));
         }
     }
 
@@ -309,12 +318,37 @@ public class Shard extends RaftActor {
         }
     }
 
+    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) {
+        LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
+
+        try {
+            // We block on the future here so we don't have to worry about possibly accessing our
+            // state on a different thread outside of our dispatcher. Also, the data store
+            // currently uses a same thread executor anyway.
+            cohortEntry.getCohort().commit().get();
+
+            sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
+
+            shardMBean.incrementCommittedTransactionCount();
+            shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
+
+        } catch (Exception e) {
+            sender.tell(new akka.actor.Status.Failure(e), getSelf());
+
+            LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
+                    transactionID, e);
+            shardMBean.incrementFailedTransactionsCount();
+        } finally {
+            commitCoordinator.currentTransactionComplete(transactionID, true);
+        }
+    }
+
     private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
         // With persistence enabled, this method is called via applyState by the leader strategy
         // after the commit has been replicated to a majority of the followers.
 
         CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
-        if(cohortEntry == null) {
+        if (cohortEntry == null) {
             // The transaction is no longer the current commit. This can happen if the transaction
             // was aborted prior, most likely due to timeout in the front-end. We need to finish
             // committing the transaction though since it was successfully persisted and replicated
@@ -323,7 +357,13 @@ public class Shard extends RaftActor {
             // transaction.
             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
             if(cohortEntry != null) {
-                commitWithNewTransaction(cohortEntry.getModification());
+                try {
+                    store.applyForeignCandidate(transactionID, cohortEntry.getCohort().getCandidate());
+                } catch (DataValidationFailedException e) {
+                    shardMBean.incrementFailedTransactionsCount();
+                    LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
+                }
+
                 sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
             } else {
                 // This really shouldn't happen - it likely means that persistence or replication
@@ -334,31 +374,8 @@ public class Shard extends RaftActor {
                 LOG.error(ex.getMessage());
                 sender.tell(new akka.actor.Status.Failure(ex), getSelf());
             }
-
-            return;
-        }
-
-        LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
-
-        try {
-            // We block on the future here so we don't have to worry about possibly accessing our
-            // state on a different thread outside of our dispatcher. Also, the data store
-            // currently uses a same thread executor anyway.
-            cohortEntry.getCohort().commit().get();
-
-            sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
-
-            shardMBean.incrementCommittedTransactionCount();
-            shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
-
-        } catch (Exception e) {
-            sender.tell(new akka.actor.Status.Failure(e), getSelf());
-
-            LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
-                    transactionID, e);
-            shardMBean.incrementFailedTransactionsCount();
-        } finally {
-            commitCoordinator.currentTransactionComplete(transactionID, true);
+        } else {
+            finishCommit(sender, transactionID, cohortEntry);
         }
     }
 
@@ -556,15 +573,25 @@ public class Shard extends RaftActor {
 
     @Override
     protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
-
-        if(data instanceof ModificationPayload) {
+        if (data instanceof DataTreeCandidatePayload) {
+            if (clientActor == null) {
+                // No clientActor indicates a replica coming from the leader
+                try {
+                    store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate());
+                } catch (DataValidationFailedException | IOException e) {
+                    LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
+                }
+            } else {
+                // Replication consensus reached, proceed to commit
+                finishCommit(clientActor, identifier);
+            }
+        } else if (data instanceof ModificationPayload) {
             try {
                 applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
             } catch (ClassNotFoundException | IOException e) {
                 LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
             }
-        }
-        else if (data instanceof CompositeModificationPayload) {
+        } else if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
 
             applyModificationToState(clientActor, identifier, modification);
index 0eb48fd..30947fa 100644 (file)
@@ -30,7 +30,6 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.slf4j.Logger;
 
 /**
@@ -42,7 +41,7 @@ public class ShardCommitCoordinator {
 
     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
     public interface CohortDecorator {
-        DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual);
+        ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
     }
 
     private final Cache<String, CohortEntry> cohortCache;
@@ -413,8 +412,7 @@ public class ShardCommitCoordinator {
 
     static class CohortEntry {
         private final String transactionID;
-        private DOMStoreThreePhaseCommitCohort cohort;
-        private final MutableCompositeModification compositeModification;
+        private ShardDataTreeCohort cohort;
         private final ReadWriteShardDataTreeTransaction transaction;
         private ActorRef replySender;
         private Shard shard;
@@ -422,16 +420,14 @@ public class ShardCommitCoordinator {
         private boolean doImmediateCommit;
 
         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
-            this.compositeModification = new MutableCompositeModification();
             this.transaction = Preconditions.checkNotNull(transaction);
             this.transactionID = transactionID;
         }
 
-        CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
+        CohortEntry(String transactionID, ShardDataTreeCohort cohort,
                 MutableCompositeModification compositeModification) {
             this.transactionID = transactionID;
             this.cohort = cohort;
-            this.compositeModification = compositeModification;
             this.transaction = null;
         }
 
@@ -447,17 +443,12 @@ public class ShardCommitCoordinator {
             return transactionID;
         }
 
-        DOMStoreThreePhaseCommitCohort getCohort() {
+        ShardDataTreeCohort getCohort() {
             return cohort;
         }
 
-        MutableCompositeModification getModification() {
-            return compositeModification;
-        }
-
         void applyModifications(Iterable<Modification> modifications) {
-            for(Modification modification: modifications) {
-                compositeModification.addModification(modification);
+            for (Modification modification : modifications) {
                 modification.apply(transaction.getSnapshot());
             }
         }
@@ -498,9 +489,5 @@ public class ShardCommitCoordinator {
         void setShard(Shard shard) {
             this.shard = shard;
         }
-
-        boolean hasModifications(){
-            return compositeModification.getModifications().size() > 0;
-        }
     }
 }
index 373bf49..56c5eb6 100644 (file)
@@ -21,14 +21,13 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
 import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -93,7 +92,7 @@ public final class ShardDataTree extends ShardDataTreeTransactionParent {
         return ensureTransactionChain(chainId).newReadWriteTransaction(txId);
     }
 
-    void notifyListeners(final DataTreeCandidateTip candidate) {
+    void notifyListeners(final DataTreeCandidate candidate) {
         LOG.debug("Notifying listeners on candidate {}", candidate);
 
         // DataTreeChanges first, as they are more light-weight
@@ -116,7 +115,7 @@ public final class ShardDataTree extends ShardDataTreeTransactionParent {
         if (chain != null) {
             chain.close();
         } else {
-            LOG.warn("Closing non-existent transaction chain {}", transactionChainId);
+            LOG.debug("Closing non-existent transaction chain {}", transactionChainId);
         }
     }
 
@@ -152,15 +151,30 @@ public final class ShardDataTree extends ShardDataTreeTransactionParent {
         return new SimpleEntry<>(reg, event);
     }
 
+    void applyForeignCandidate(final String identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
+        LOG.debug("Applying foreign transaction {}", identifier);
+
+        final DataTreeModification mod = dataTree.takeSnapshot().newModification();
+        DataTreeCandidates.applyToModification(mod, foreign);
+        mod.ready();
+
+        LOG.trace("Applying foreign modification {}", mod);
+        dataTree.validate(mod);
+        final DataTreeCandidate candidate = dataTree.prepare(mod);
+        dataTree.commit(candidate);
+        notifyListeners(candidate);
+    }
+
     @Override
     void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
         // Intentional no-op
     }
 
     @Override
-    DOMStoreThreePhaseCommitCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+    ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
         final DataTreeModification snapshot = transaction.getSnapshot();
         snapshot.ready();
-        return new ShardDataTreeCohort(this, snapshot);
+        return new SimpleShardDataTreeCohort(this, snapshot);
     }
+
 }
index 11b3ca8..213e36a 100644 (file)
@@ -7,72 +7,23 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-final class ShardDataTreeCohort implements DOMStoreThreePhaseCommitCohort {
-    private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeCohort.class);
-    private static final ListenableFuture<Boolean> TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE);
-    private static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
-    private final DataTreeModification transaction;
-    private final ShardDataTree dataTree;
-    private DataTreeCandidateTip candidate;
-
-    ShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction) {
-        this.dataTree = Preconditions.checkNotNull(dataTree);
-        this.transaction = Preconditions.checkNotNull(transaction);
-    }
-
-    @Override
-    public ListenableFuture<Boolean> canCommit() {
-        try {
-            dataTree.getDataTree().validate(transaction);
-            LOG.debug("Transaction {} validated", transaction);
-            return TRUE_FUTURE;
-        } catch (Exception e) {
-            return Futures.immediateFailedFuture(e);
-        }
-    }
-
-    @Override
-    public ListenableFuture<Void> preCommit() {
-        try {
-            candidate = dataTree.getDataTree().prepare(transaction);
-            /*
-             * FIXME: this is the place where we should be interacting with persistence, specifically by invoking
-             *        persist on the candidate (which gives us a Future).
-             */
-            LOG.debug("Transaction {} prepared candidate {}", transaction, candidate);
-            return VOID_FUTURE;
-        } catch (Exception e) {
-            LOG.debug("Transaction {} failed to prepare", transaction, e);
-            return Futures.immediateFailedFuture(e);
-        }
-    }
-
-    @Override
-    public ListenableFuture<Void> abort() {
-        // No-op, really
-        return VOID_FUTURE;
+public abstract class ShardDataTreeCohort {
+    ShardDataTreeCohort() {
+        // Prevent foreign instantiation
     }
 
-    @Override
-    public ListenableFuture<Void> commit() {
-        try {
-            dataTree.getDataTree().commit(candidate);
-        } catch (Exception e) {
-            LOG.error("Transaction {} failed to commit", transaction, e);
-            return Futures.immediateFailedFuture(e);
-        }
+    abstract DataTreeCandidateTip getCandidate();
 
-        LOG.debug("Transaction {} committed, proceeding to notify", transaction);
-        dataTree.notifyListeners(candidate);
-        return VOID_FUTURE;
-    }
+    @VisibleForTesting
+    public abstract ListenableFuture<Boolean> canCommit();
+    @VisibleForTesting
+    public abstract ListenableFuture<Void> preCommit();
+    @VisibleForTesting
+    public abstract ListenableFuture<Void> abort();
+    @VisibleForTesting
+    public abstract ListenableFuture<Void> commit();
 }
index 780d940..183c219 100644 (file)
@@ -9,12 +9,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.ForwardingDOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,17 +72,17 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
     }
 
     @Override
-    protected DOMStoreThreePhaseCommitCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+    protected ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
         Preconditions.checkState(openTransaction != null, "Attempted to finish transaction %s while none is outstanding", transaction);
 
         // dataTree is finalizing ready the transaction, we just record it for the next
         // transaction in chain
-        final DOMStoreThreePhaseCommitCohort delegate = dataTree.finishTransaction(transaction);
+        final ShardDataTreeCohort delegate = dataTree.finishTransaction(transaction);
         openTransaction = null;
         previousTx = transaction;
         LOG.debug("Committing transaction {}", transaction);
 
-        return new CommitCohort(transaction, delegate);
+        return new ChainedCommitCohort(this, transaction, delegate);
     }
 
     @Override
@@ -95,40 +90,9 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
         return MoreObjects.toStringHelper(this).add("id", chainId).toString();
     }
 
-    private final class CommitCohort extends ForwardingDOMStoreThreePhaseCommitCohort {
-        private final ReadWriteShardDataTreeTransaction transaction;
-        private final DOMStoreThreePhaseCommitCohort delegate;
-
-        CommitCohort(final ReadWriteShardDataTreeTransaction transaction, final DOMStoreThreePhaseCommitCohort delegate) {
-            this.transaction = Preconditions.checkNotNull(transaction);
-            this.delegate = Preconditions.checkNotNull(delegate);
-        }
-
-        @Override
-        protected DOMStoreThreePhaseCommitCohort delegate() {
-            return delegate;
-        }
-
-        @Override
-        public ListenableFuture<Void> commit() {
-            final ListenableFuture<Void> ret = super.commit();
-
-            Futures.addCallback(ret, new FutureCallback<Void>() {
-                @Override
-                public void onSuccess(Void result) {
-                    if (transaction.equals(previousTx)) {
-                        previousTx = null;
-                    }
-                    LOG.debug("Committed transaction {}", transaction);
-                }
-
-                @Override
-                public void onFailure(Throwable t) {
-                    LOG.error("Transaction {} commit failed, cannot recover", transaction, t);
-                }
-            });
-
-            return ret;
+    void clearTransaction(ReadWriteShardDataTreeTransaction transaction) {
+        if (transaction.equals(previousTx)) {
+            previousTx = null;
         }
     }
 }
index 6cc1408..ee04aff 100644 (file)
@@ -7,9 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-
 abstract class ShardDataTreeTransactionParent {
     abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction);
-    abstract DOMStoreThreePhaseCommitCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
+    abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
 }
index f9d3050..7976419 100644 (file)
@@ -7,9 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.collect.Lists;
 import java.io.IOException;
-import java.util.List;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
@@ -17,10 +15,10 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.slf4j.Logger;
@@ -36,52 +34,55 @@ import org.slf4j.Logger;
  */
 class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build();
-    private final ShardDataTree store;
-    private List<ModificationPayload> currentLogRecoveryBatch;
+    private final DataTree store;
     private final String shardName;
     private final Logger log;
+    private DataTreeModification transaction;
+    private int size;
 
     ShardRecoveryCoordinator(ShardDataTree store, String shardName, Logger log) {
-        this.store = store;
+        this.store = store.getDataTree();
         this.shardName = shardName;
         this.log = log;
     }
 
     @Override
     public void startLogRecoveryBatch(int maxBatchSize) {
-        currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
-
         log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
+        transaction = store.takeSnapshot().newModification();
+        size = 0;
     }
 
     @Override
     public void appendRecoveredLogEntry(Payload payload) {
         try {
-            if(payload instanceof ModificationPayload) {
-                currentLogRecoveryBatch.add((ModificationPayload) payload);
+            if (payload instanceof DataTreeCandidatePayload) {
+                DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate());
+                size++;
+            } else if (payload instanceof ModificationPayload) {
+                MutableCompositeModification.fromSerializable(
+                    ((ModificationPayload) payload).getModification()).apply(transaction);
+                size++;
             } else if (payload instanceof CompositeModificationPayload) {
-                currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
-                        ((CompositeModificationPayload) payload).getModification())));
+                MutableCompositeModification.fromSerializable(
+                    ((CompositeModificationPayload) payload).getModification()).apply(transaction);
+                size++;
             } else if (payload instanceof CompositeModificationByteStringPayload) {
-                currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
-                        ((CompositeModificationByteStringPayload) payload).getModification())));
+                MutableCompositeModification.fromSerializable(
+                        ((CompositeModificationByteStringPayload) payload).getModification()).apply(transaction);
+                size++;
             } else {
                 log.error("{}: Unknown payload {} received during recovery", shardName, payload);
             }
-        } catch (IOException e) {
+        } catch (IOException | ClassNotFoundException e) {
             log.error("{}: Error extracting ModificationPayload", shardName, e);
         }
-
     }
 
-    private void commitTransaction(ReadWriteShardDataTreeTransaction transaction) {
-        DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction);
-        try {
-            commitCohort.preCommit().get();
-            commitCohort.commit().get();
-        } catch (Exception e) {
-            log.error("{}: Failed to commit Tx on recovery", shardName, e);
-        }
+    private void commitTransaction(DataTreeModification tx) throws DataValidationFailedException {
+        tx.ready();
+        store.validate(tx);
+        store.commit(store.prepare(tx));
     }
 
     /**
@@ -89,21 +90,13 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
      */
     @Override
     public void applyCurrentLogRecoveryBatch() {
-        log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
-
-        ReadWriteShardDataTreeTransaction writeTx = store.newReadWriteTransaction(shardName + "-recovery", null);
-        DataTreeModification snapshot = writeTx.getSnapshot();
-        for (ModificationPayload payload : currentLogRecoveryBatch) {
-            try {
-                MutableCompositeModification.fromSerializable(payload.getModification()).apply(snapshot);
-            } catch (Exception e) {
-                log.error("{}: Error extracting ModificationPayload", shardName, e);
-            }
+        log.debug("{}: Applying current log recovery batch with size {}", shardName, size);
+        try {
+            commitTransaction(transaction);
+        } catch (DataValidationFailedException e) {
+            log.error("{}: Failed to apply recovery batch", shardName, e);
         }
-
-        commitTransaction(writeTx);
-
-        currentLogRecoveryBatch = null;
+        transaction = null;
     }
 
     /**
@@ -115,19 +108,13 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     public void applyRecoverySnapshot(final byte[] snapshotBytes) {
         log.debug("{}: Applying recovered snapshot", shardName);
 
-        // Intentionally bypass normal transaction to side-step persistence/replication
-        final DataTree tree = store.getDataTree();
-        DataTreeModification writeTx = tree.takeSnapshot().newModification();
-
-        NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
-
-        writeTx.write(ROOT, node);
-        writeTx.ready();
+        final NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+        final DataTreeModification tx = store.takeSnapshot().newModification();
+        tx.write(ROOT, node);
         try {
-            tree.validate(writeTx);
-            tree.commit(tree.prepare(writeTx));
+            commitTransaction(tx);
         } catch (DataValidationFailedException e) {
-            log.error("{}: Failed to validate recovery snapshot", shardName, e);
+            log.error("{}: Failed to apply recovery snapshot", shardName, e);
         }
     }
 }
index 35d8e92..600509a 100644 (file)
@@ -14,7 +14,6 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactio
 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
@@ -86,7 +85,7 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
 
     void syncCommitTransaction(final ReadWriteShardDataTreeTransaction transaction)
             throws ExecutionException, InterruptedException {
-        DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction);
+        ShardDataTreeCohort commitCohort = store.finishTransaction(transaction);
         commitCohort.preCommit().get();
         commitCohort.commit().get();
     }
index 69a696f..365f97d 100644 (file)
@@ -31,7 +31,6 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 
 /**
  * @author: syedbahm
@@ -197,7 +196,7 @@ public class ShardWriteTransaction extends ShardTransaction {
 
         LOG.debug("readyTransaction : {}", transactionID);
 
-        DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
+        ShardDataTreeCohort cohort =  transaction.ready();
 
         getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
                 cohort, compositeModification, returnSerialized, doImmediateCommit), getContext());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java
new file mode 100644 (file)
index 0000000..9f22ce8
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
+    private static final ListenableFuture<Boolean> TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE);
+    private static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
+    private final DataTreeModification transaction;
+    private final ShardDataTree dataTree;
+    private DataTreeCandidateTip candidate;
+
+    SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction) {
+        this.dataTree = Preconditions.checkNotNull(dataTree);
+        this.transaction = Preconditions.checkNotNull(transaction);
+    }
+
+    @Override
+    DataTreeCandidateTip getCandidate() {
+        return candidate;
+    }
+
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
+        try {
+            dataTree.getDataTree().validate(transaction);
+            LOG.debug("Transaction {} validated", transaction);
+            return TRUE_FUTURE;
+        } catch (Exception e) {
+            return Futures.immediateFailedFuture(e);
+        }
+    }
+
+    @Override
+    public ListenableFuture<Void> preCommit() {
+        try {
+            candidate = dataTree.getDataTree().prepare(transaction);
+            /*
+             * FIXME: this is the place where we should be interacting with persistence, specifically by invoking
+             *        persist on the candidate (which gives us a Future).
+             */
+            LOG.debug("Transaction {} prepared candidate {}", transaction, candidate);
+            return VOID_FUTURE;
+        } catch (Exception e) {
+            LOG.debug("Transaction {} failed to prepare", transaction, e);
+            return Futures.immediateFailedFuture(e);
+        }
+    }
+
+    @Override
+    public ListenableFuture<Void> abort() {
+        // No-op, really
+        return VOID_FUTURE;
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        try {
+            dataTree.getDataTree().commit(candidate);
+        } catch (Exception e) {
+            LOG.error("Transaction {} failed to commit", transaction, e);
+            return Futures.immediateFailedFuture(e);
+        }
+
+        LOG.debug("Transaction {} committed, proceeding to notify", transaction);
+        dataTree.notifyListeners(candidate);
+        return VOID_FUTURE;
+    }
+}
index f12fdd9..e397ab5 100644 (file)
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
@@ -159,7 +160,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return false;
     }
 
-    private boolean isRootPath(YangInstanceIdentifier path){
+    private static boolean isRootPath(YangInstanceIdentifier path) {
         return !path.getPathArguments().iterator().hasNext();
     }
 
@@ -477,7 +478,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return ShardStrategyFactory.getStrategy(path).findShard(path);
     }
 
-    protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
+    protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(String shardName) {
         return actorContext.findPrimaryShardAsync(shardName);
     }
 
@@ -497,20 +498,20 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
         if(txFutureCallback == null) {
-            Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
+            Future<PrimaryShardInfo> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
 
             final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName);
 
             txFutureCallback = newTxFutureCallback;
             txFutureCallbackMap.put(shardName, txFutureCallback);
 
-            findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
+            findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
                 @Override
-                public void onComplete(Throwable failure, ActorSelection primaryShard) {
+                public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
                     if(failure != null) {
                         newTxFutureCallback.createTransactionContext(failure, null);
                     } else {
-                        newTxFutureCallback.setPrimaryShard(primaryShard);
+                        newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
                     }
                 }
             }, actorContext.getClientDispatcher());
index cdd7859..2f48ab9 100644 (file)
@@ -7,8 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 
 /**
  * Transaction ReadyTransaction message that is forwarded to the local Shard from the ShardTransaction.
@@ -17,14 +17,14 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
  */
 public class ForwardedReadyTransaction {
     private final String transactionID;
-    private final DOMStoreThreePhaseCommitCohort cohort;
+    private final ShardDataTreeCohort cohort;
     private final Modification modification;
     private final boolean returnSerialized;
     private final boolean doImmediateCommit;
     private final short txnClientVersion;
 
     public ForwardedReadyTransaction(String transactionID, short txnClientVersion,
-            DOMStoreThreePhaseCommitCohort cohort, Modification modification,
+            ShardDataTreeCohort cohort, Modification modification,
             boolean returnSerialized, boolean doImmediateCommit) {
         this.transactionID = transactionID;
         this.cohort = cohort;
@@ -38,7 +38,7 @@ public class ForwardedReadyTransaction {
         return transactionID;
     }
 
-    public DOMStoreThreePhaseCommitCohort getCohort() {
+    public ShardDataTreeCohort getCohort() {
         return cohort;
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryShardInfo.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryShardInfo.java
new file mode 100644 (file)
index 0000000..bbeb1aa
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorSelection;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+/**
+ * Local message DTO that contains information about the primary shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class PrimaryShardInfo {
+    private final ActorSelection primaryShardActor;
+    private final Optional<DataTree> localShardDataTree;
+
+    public PrimaryShardInfo(@Nonnull ActorSelection primaryShardActor, @Nonnull Optional<DataTree> localShardDataTree) {
+        this.primaryShardActor = Preconditions.checkNotNull(primaryShardActor);
+        this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree);
+    }
+
+    /**
+     * Returns an ActorSelection representing the primary shard actor.
+     */
+    public @Nonnull ActorSelection getPrimaryShardActor() {
+        return primaryShardActor;
+    }
+
+    /**
+     * Returns an Optional whose value contains the primary shard's DataTree if the primary shard is local
+     * to the caller. Otherwise the Optional value is absent.
+     */
+    public @Nonnull Optional<DataTree> getLocalShardDataTree() {
+        return localShardDataTree;
+    }
+}
index 17d9880..afa773b 100644 (file)
@@ -45,8 +45,10 @@ import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,7 +98,7 @@ public class ActorContext {
     private Timeout transactionCommitOperationTimeout;
     private Timeout shardInitializationTimeout;
     private final Dispatchers dispatchers;
-    private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
+    private Cache<String, Future<PrimaryShardInfo>> primaryShardInfoCache;
 
     private volatile SchemaContext schemaContext;
     private volatile boolean updated;
@@ -141,7 +143,7 @@ public class ActorContext {
 
         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
 
-        primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+        primaryShardInfoCache = CacheBuilder.newBuilder()
                 .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
                 .build();
     }
@@ -196,24 +198,25 @@ public class ActorContext {
         return schemaContext;
     }
 
-    public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
-        Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
+    public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
+        Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
         if(ret != null){
             return ret;
         }
         Future<Object> future = executeOperationAsync(shardManager,
                 new FindPrimary(shardName, true), shardInitializationTimeout);
 
-        return future.transform(new Mapper<Object, ActorSelection>() {
+        return future.transform(new Mapper<Object, PrimaryShardInfo>() {
             @Override
-            public ActorSelection checkedApply(Object response) throws Exception {
+            public PrimaryShardInfo checkedApply(Object response) throws Exception {
                 if(response instanceof PrimaryFound) {
                     PrimaryFound found = (PrimaryFound)response;
 
                     LOG.debug("Primary found {}", found.getPrimaryPath());
                     ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
-                    primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
-                    return actorSelection;
+                    PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.<DataTree>absent());
+                    primaryShardInfoCache.put(shardName, Futures.successful(info));
+                    return info;
                 } else if(response instanceof NotInitializedException) {
                     throw (NotInitializedException)response;
                 } else if(response instanceof PrimaryNotFoundException) {
@@ -387,15 +390,15 @@ public class ActorContext {
     public void broadcast(final Object message){
         for(final String shardName : configuration.getAllShardNames()){
 
-            Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
-            primaryFuture.onComplete(new OnComplete<ActorSelection>() {
+            Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
+            primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
                 @Override
-                public void onComplete(Throwable failure, ActorSelection primaryShard) {
+                public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
                     if(failure != null) {
                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
                                 message.getClass().getSimpleName(), shardName, failure);
                     } else {
-                        primaryShard.tell(message, ActorRef.noSender());
+                        primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
                     }
                 }
             }, getClientDispatcher());
@@ -553,7 +556,7 @@ public class ActorContext {
     }
 
     @VisibleForTesting
-    Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
-        return primaryShardActorSelectionCache;
+    Cache<String, Future<PrimaryShardInfo>> getPrimaryShardInfoCache() {
+        return primaryShardInfoCache;
     }
 }
index 03f2bb7..1100f3a 100644 (file)
@@ -21,7 +21,6 @@ import akka.japi.Creator;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Collections;
@@ -42,10 +41,6 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
@@ -53,6 +48,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
@@ -172,49 +168,35 @@ public abstract class AbstractShardTest extends AbstractActorTest{
         Assert.fail(String.format("Expected last applied: %d, Actual: %d", expectedValue, lastApplied));
     }
 
-    protected NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
-        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
-        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
-            transaction.read(YangInstanceIdentifier.builder().build());
-
-        Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
-
-        NormalizedNode<?, ?> normalizedNode = optional.get();
-
-        transaction.close();
-
-        return normalizedNode;
-    }
-
-    protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
+    protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName,
             final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
             final MutableCompositeModification modification) {
         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
     }
 
-    protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
+    protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName,
             final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
             final MutableCompositeModification modification,
-            final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
+            final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit) {
 
         ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction("setup-mock-" + cohortName, null);
         tx.getSnapshot().write(path, data);
-        DOMStoreThreePhaseCommitCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit);
+        ShardDataTreeCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit);
 
         modification.addModification(new WriteModification(path, data));
 
         return cohort;
     }
 
-    protected DOMStoreThreePhaseCommitCohort createDelegatingMockCohort(final String cohortName,
-            final DOMStoreThreePhaseCommitCohort actual) {
+    protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName,
+            final ShardDataTreeCohort actual) {
         return createDelegatingMockCohort(cohortName, actual, null);
     }
 
-    protected DOMStoreThreePhaseCommitCohort createDelegatingMockCohort(final String cohortName,
-            final DOMStoreThreePhaseCommitCohort actual,
-            final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
-        DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
+    protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName,
+            final ShardDataTreeCohort actual,
+            final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit) {
+        ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, cohortName);
 
         doAnswer(new Answer<ListenableFuture<Boolean>>() {
             @Override
@@ -248,6 +230,13 @@ public abstract class AbstractShardTest extends AbstractActorTest{
             }
         }).when(cohort).abort();
 
+        doAnswer(new Answer<DataTreeCandidateTip>() {
+            @Override
+            public DataTreeCandidateTip answer(final InvocationOnMock invocation) {
+                return actual.getCandidate();
+            }
+        }).when(cohort).getCandidate();
+
         return cohort;
     }
 
@@ -275,7 +264,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{
         ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null);
 
         transaction.getSnapshot().write(id, node);
-        DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
+        ShardDataTreeCohort cohort = transaction.ready();
         cohort.canCommit().get();
         cohort.preCommit().get();
         cohort.commit();
index abe7f76..a64a580 100644 (file)
@@ -54,6 +54,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionR
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
@@ -71,6 +72,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -309,6 +311,11 @@ public abstract class AbstractTransactionProxyTest {
         return setupActorContextWithoutInitialCreateTransaction(actorSystem, DefaultShardStrategy.DEFAULT_SHARD);
     }
 
+    protected Future<PrimaryShardInfo> primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef) {
+        return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()),
+                Optional.<DataTree>absent()));
+    }
+
     protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName) {
         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
         log.info("Created mock shard actor {}", actorRef);
@@ -316,7 +323,7 @@ public abstract class AbstractTransactionProxyTest {
         doReturn(actorSystem.actorSelection(actorRef.path())).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
 
-        doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
+        doReturn(primaryShardInfoReply(actorSystem, actorRef)).
                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
 
         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java
new file mode 100644 (file)
index 0000000..781c3db
--- /dev/null
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+
+public class DataTreeCandidatePayloadTest {
+    private DataTreeCandidate candidate;
+
+    private static DataTreeCandidateNode findNode(final Collection<DataTreeCandidateNode> nodes, final PathArgument arg) {
+        for (DataTreeCandidateNode node : nodes) {
+            if (arg.equals(node.getIdentifier())) {
+                return node;
+            }
+        }
+        return null;
+    }
+
+    private static void assertChildrenEquals(final Collection<DataTreeCandidateNode> expected,
+            final Collection<DataTreeCandidateNode> actual) {
+        // Make sure all expected nodes are there
+        for (DataTreeCandidateNode exp : expected) {
+            final DataTreeCandidateNode act = findNode(actual, exp.getIdentifier());
+            assertNotNull("missing expected child", act);
+            assertCandidateNodeEquals(exp, act);
+        }
+        // Make sure no nodes are present which are not in the expected set
+        for (DataTreeCandidateNode act : actual) {
+            final DataTreeCandidateNode exp = findNode(expected, act.getIdentifier());
+            assertNull("unexpected child", exp);
+        }
+    }
+
+    private static void assertCandidateEquals(final DataTreeCandidate expected, final DataTreeCandidate actual) {
+        assertEquals("root path", expected.getRootPath(), actual.getRootPath());
+
+        final DataTreeCandidateNode expRoot = expected.getRootNode();
+        final DataTreeCandidateNode actRoot = expected.getRootNode();
+        assertEquals("root type", expRoot.getModificationType(), actRoot.getModificationType());
+
+        switch (actRoot.getModificationType()) {
+        case DELETE:
+        case WRITE:
+            assertEquals("root data", expRoot.getDataAfter(), actRoot.getDataAfter());
+            break;
+        case SUBTREE_MODIFIED:
+            assertChildrenEquals(expRoot.getChildNodes(), actRoot.getChildNodes());
+            break;
+        default:
+            fail("Unexpect root type " + actRoot.getModificationType());
+            break;
+        }
+
+        assertCandidateNodeEquals(expected.getRootNode(), actual.getRootNode());
+    }
+
+    private static void assertCandidateNodeEquals(final DataTreeCandidateNode expected, final DataTreeCandidateNode actual) {
+        assertEquals("child type", expected.getModificationType(), actual.getModificationType());
+        assertEquals("child identifier", expected.getIdentifier(), actual.getIdentifier());
+
+        switch (actual.getModificationType()) {
+        case DELETE:
+        case WRITE:
+            assertEquals("child data", expected.getDataAfter(), actual.getDataAfter());
+            break;
+        case SUBTREE_MODIFIED:
+            assertChildrenEquals(expected.getChildNodes(), actual.getChildNodes());
+            break;
+        default:
+            fail("Unexpect root type " + actual.getModificationType());
+            break;
+        }
+    }
+
+    @Before
+    public void setUp() {
+        final YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        final NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+        candidate = DataTreeCandidates.fromNormalizedNode(writePath, writeData);
+    }
+
+    @Test
+    public void testCandidateSerialization() throws IOException {
+        final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
+        assertEquals("payload size", 141, payload.size());
+    }
+
+    @Test
+    public void testCandidateSerDes() throws IOException {
+        final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
+        assertCandidateEquals(candidate, payload.getCandidate());
+    }
+
+    @Test
+    public void testPayloadSerDes() throws IOException {
+        final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
+        assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate());
+    }
+}
index 22ce50b..3d28672 100644 (file)
@@ -25,7 +25,6 @@ import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
@@ -34,7 +33,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -88,11 +86,9 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
@@ -100,6 +96,13 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -108,6 +111,7 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 public class ShardTest extends AbstractShardTest {
+    private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
 
     @Test
     public void testRegisterChangeListener() throws Exception {
@@ -379,10 +383,25 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testRecovery() throws Exception {
+    public void testApplyStateWithCandidatePayload() throws Exception {
 
-        // Set up the InMemorySnapshotStore.
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
+
+        NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
+
+        ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
+                DataTreeCandidatePayload.create(candidate)));
+
+        shard.underlyingActor().onReceiveCommand(applyState);
+
+        NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
+        assertEquals("Applied state", node, actual);
+
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
 
+    DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
         DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
         testStore.setSchemaContext(SCHEMA_CONTEXT);
 
@@ -393,6 +412,55 @@ public class ShardTest extends AbstractShardTest {
         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
                 SerializationUtils.serializeNormalizedNode(root),
                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+        return testStore;
+    }
+
+    private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException {
+        source.validate(mod);
+        final DataTreeCandidate candidate = source.prepare(mod);
+        source.commit(candidate);
+        return DataTreeCandidatePayload.create(candidate);
+    }
+
+    @Test
+    public void testDataTreeCandidateRecovery() throws Exception {
+        // Set up the InMemorySnapshotStore.
+        final DataTree source = setupInMemorySnapshotStore();
+
+        final DataTreeModification writeMod = source.takeSnapshot().newModification();
+        writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+        // Set up the InMemoryJournal.
+        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
+
+        int nListEntries = 16;
+        Set<Integer> listEntryKeys = new HashSet<>();
+
+        // Add some ModificationPayload entries
+        for (int i = 1; i <= nListEntries; i++) {
+            listEntryKeys.add(Integer.valueOf(i));
+
+            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+
+            final DataTreeModification mod = source.takeSnapshot().newModification();
+            mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
+
+            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+                payloadForModification(source, mod)));
+        }
+
+        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
+                new ApplyJournalEntries(nListEntries));
+
+        testRecovery(listEntryKeys);
+    }
+
+    @Test
+    public void testModicationRecovery() throws Exception {
+
+        // Set up the InMemorySnapshotStore.
+        setupInMemorySnapshotStore();
 
         // Set up the InMemoryJournal.
 
@@ -420,7 +488,7 @@ public class ShardTest extends AbstractShardTest {
         testRecovery(listEntryKeys);
     }
 
-    private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
+    private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
         MutableCompositeModification compMod = new MutableCompositeModification();
         for(Modification mod: mods) {
             compMod.addModification(mod);
@@ -429,7 +497,6 @@ public class ShardTest extends AbstractShardTest {
         return new ModificationPayload(compMod);
     }
 
-    @SuppressWarnings({ "unchecked" })
     @Test
     public void testConcurrentThreePhaseCommits() throws Throwable {
         new ShardTestKit(getSystem()) {{
@@ -445,19 +512,19 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 
             String transactionID2 = "tx2";
             MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
                     TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
                     modification2);
 
             String transactionID3 = "tx3";
             MutableCompositeModification modification3 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+            ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
@@ -605,12 +672,12 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
-    private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
+    private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
             NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
     }
 
-    private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
+    private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
             YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
         batched.addModification(new WriteModification(path, data));
@@ -631,10 +698,10 @@ public class ShardTest extends AbstractShardTest {
             final String transactionID = "tx";
             FiniteDuration duration = duration("5 seconds");
 
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
+            final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
                 @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
+                public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
                     if(mockCohort.get() == null) {
                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
                     }
@@ -699,10 +766,10 @@ public class ShardTest extends AbstractShardTest {
             final String transactionID = "tx";
             FiniteDuration duration = duration("5 seconds");
 
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
+            final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
                 @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
+                public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
                     if(mockCohort.get() == null) {
                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
                     }
@@ -745,7 +812,7 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @SuppressWarnings("unchecked")
-    private void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
+    private static void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
@@ -818,6 +885,8 @@ public class ShardTest extends AbstractShardTest {
         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
         new ShardTestKit(getSystem()) {{
             Creator<Shard> creator = new Creator<Shard>() {
+                private static final long serialVersionUID = 1L;
+
                 @Override
                 public Shard create() throws Exception {
                     return new Shard(shardID, Collections.<String,String>emptyMap(),
@@ -867,7 +936,7 @@ public class ShardTest extends AbstractShardTest {
             String transactionID = "tx1";
             MutableCompositeModification modification = new MutableCompositeModification();
             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+            ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
                     TestModel.TEST_PATH, containerNode, modification);
 
             FiniteDuration duration = duration("5 seconds");
@@ -909,7 +978,7 @@ public class ShardTest extends AbstractShardTest {
             String transactionID = "tx";
             MutableCompositeModification modification = new MutableCompositeModification();
             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+            ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
                     TestModel.TEST_PATH, containerNode, modification);
 
             FiniteDuration duration = duration("5 seconds");
@@ -945,6 +1014,25 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    private static DataTreeCandidateTip mockCandidate(final String name) {
+        DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
+        DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
+        doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
+        doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
+        doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
+        doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
+        return mockCandidate;
+    }
+
+    private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
+        DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
+        DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
+        doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
+        doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
+        doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
+        return mockCandidate;
+    }
+
     @Test
     public void testCommitWhenTransactionHasNoModifications(){
         // Note that persistence is enabled which would normally result in the entry getting written to the journal
@@ -959,10 +1047,11 @@ public class ShardTest extends AbstractShardTest {
 
                 String transactionID = "tx1";
                 MutableCompositeModification modification = new MutableCompositeModification();
-                DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+                ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+                doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
 
                 FiniteDuration duration = duration("5 seconds");
 
@@ -1014,10 +1103,11 @@ public class ShardTest extends AbstractShardTest {
                 String transactionID = "tx1";
                 MutableCompositeModification modification = new MutableCompositeModification();
                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
-                DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+                ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+                doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
 
                 FiniteDuration duration = duration("5 seconds");
 
@@ -1070,14 +1160,15 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
+            doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
 
             String transactionID2 = "tx2";
             MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             FiniteDuration duration = duration("5 seconds");
@@ -1146,13 +1237,13 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
 
             String transactionID2 = "tx2";
             MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             FiniteDuration duration = duration("5 seconds");
@@ -1222,7 +1313,7 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
             // Simulate the ForwardedReadyTransaction messages that would be sent
@@ -1270,7 +1361,7 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
 
             // Simulate the ForwardedReadyTransaction messages that would be sent
@@ -1320,7 +1411,7 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
             // Simulate the ForwardedReadyTransaction messages that would be sent
@@ -1339,6 +1430,11 @@ public class ShardTest extends AbstractShardTest {
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+            DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
+            DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
+            doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
+            doReturn(candidateRoot).when(candidate).getRootNode();
+            doReturn(candidate).when(cohort).getCandidate();
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
                     cohort, modification, true, true), getRef());
@@ -1362,7 +1458,7 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID = "tx1";
             MutableCompositeModification modification = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
 
             // Simulate the ForwardedReadyTransaction messages that would be sent
@@ -1381,6 +1477,11 @@ public class ShardTest extends AbstractShardTest {
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+            DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
+            DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
+            doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
+            doReturn(candidateRoot).when(candidate).getRootNode();
+            doReturn(candidate).when(cohort).getCandidate();
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
                     cohort, modification, true, true), getRef());
@@ -1404,10 +1505,10 @@ public class ShardTest extends AbstractShardTest {
             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             final String transactionID = "tx1";
-            Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
-                          new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
+            Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
+                          new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
                 @Override
-                public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
+                public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
 
                     // Simulate an AbortTransaction message occurring during replication, after
@@ -1425,7 +1526,7 @@ public class ShardTest extends AbstractShardTest {
             };
 
             MutableCompositeModification modification = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
+            ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
                     modification, preCommit);
 
@@ -1475,7 +1576,7 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
@@ -1487,7 +1588,7 @@ public class ShardTest extends AbstractShardTest {
             MutableCompositeModification modification2 = new MutableCompositeModification();
             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
-            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
                     listNodePath,
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
                     modification2);
@@ -1546,19 +1647,19 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 
             String transactionID2 = "tx2";
             MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
                     TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
                     modification2);
 
             String transactionID3 = "tx3";
             MutableCompositeModification modification3 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+            ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
 
             // Ready the Tx's
@@ -1620,13 +1721,13 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
 
             String transactionID2 = "tx2";
             MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             FiniteDuration duration = duration("5 seconds");
@@ -1782,29 +1883,29 @@ public class ShardTest extends AbstractShardTest {
     /**
      * This test simply verifies that the applySnapShot logic will work
      * @throws ReadFailedException
+     * @throws DataValidationFailedException
      */
     @Test
-    public void testInMemoryDataStoreRestore() throws ReadFailedException {
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
-
-        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
+    public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
+        DataTree store = InMemoryDataTreeFactory.getInstance().create();
+        store.setSchemaContext(SCHEMA_CONTEXT);
 
-        DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
+        DataTreeModification putTransaction = store.takeSnapshot().newModification();
         putTransaction.write(TestModel.TEST_PATH,
             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-        commitTransaction(putTransaction);
+        commitTransaction(store, putTransaction);
 
 
-        NormalizedNode<?, ?> expected = readStore(store);
+        NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
 
-        DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
+        DataTreeModification writeTransaction = store.takeSnapshot().newModification();
 
         writeTransaction.delete(YangInstanceIdentifier.builder().build());
         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
 
-        commitTransaction(writeTransaction);
+        commitTransaction(store, writeTransaction);
 
-        NormalizedNode<?, ?> actual = readStore(store);
+        NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
 
         assertEquals(expected, actual);
     }
@@ -1913,15 +2014,9 @@ public class ShardTest extends AbstractShardTest {
         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-    private void commitTransaction(final DOMStoreWriteTransaction transaction) {
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-        ListenableFuture<Void> future =
-            commitCohort.preCommit();
-        try {
-            future.get();
-            future = commitCohort.commit();
-            future.get();
-        } catch (InterruptedException | ExecutionException e) {
-        }
+    private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
+        modification.ready();
+        store.validate(modification);
+        store.commit(store.prepare(modification));
     }
 }
index 93c6ddb..844feb2 100644 (file)
@@ -130,7 +130,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         if (exToThrow instanceof PrimaryNotFoundException) {
             doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
         } else {
-            doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
+            doReturn(primaryShardInfoReply(getSystem(), actorRef)).
                     when(mockActorContext).findPrimaryShardAsync(anyString());
         }
 
@@ -209,7 +209,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
             actorSelection(actorRef.path().toString());
 
-        doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
+        doReturn(primaryShardInfoReply(getSystem(), actorRef)).
             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
@@ -834,7 +834,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
 
         if(shardFound) {
-            doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+            doReturn(primaryShardInfoReply(actorSystem, shardActorRef)).
                     when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
         } else {
             doReturn(Futures.failed(new PrimaryNotFoundException("test")))
@@ -1399,7 +1399,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(getSystem().actorSelection(shardActorRef.path())).
                 when(mockActorContext).actorSelection(shardActorRef.path().toString());
 
-        doReturn(Futures.successful(getSystem().actorSelection(shardActorRef.path()))).
+        doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
 
         doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
index 96cd3e4..a2309be 100644 (file)
@@ -32,6 +32,7 @@ import org.mockito.InOrder;
 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
 import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort;
 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
@@ -57,7 +58,6 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
@@ -219,19 +219,19 @@ public class PreLithiumShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 
             String transactionID2 = "tx2";
             MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
                     TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
                     modification2);
 
             String transactionID3 = "tx3";
             MutableCompositeModification modification3 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+            ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
index bbfff70..7016ada 100644 (file)
@@ -8,7 +8,7 @@
 package org.opendaylight.controller.cluster.datastore.modification;
 
 import static org.junit.Assert.assertEquals;
-import org.apache.commons.lang.SerializationUtils;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.Test;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -43,8 +43,7 @@ public class ModificationPayloadTest {
         assertEquals("getPath", writePath, write.getPath());
         assertEquals("getData", writeData, write.getData());
 
-        ModificationPayload cloned =
-                (ModificationPayload) SerializationUtils.clone(payload);
+        ModificationPayload cloned = SerializationUtils.clone(payload);
 
         deserialized = (MutableCompositeModification) payload.getModification();
 
index 6b4f633..bc80937 100644 (file)
@@ -42,6 +42,7 @@ import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -411,32 +412,36 @@ public class ActorContextTest extends AbstractActorTest{
             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
 
+            final String expPrimaryPath = "akka://test-system/find-primary-shard";
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
                             mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
+                            return Futures.successful((Object) new PrimaryFound(expPrimaryPath));
                         }
                     };
 
 
-            Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
-            ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+            PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
 
             assertNotNull(actual);
+            assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
+            assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+                    expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
 
-            Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
-            ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+            PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
 
-            assertEquals(cachedSelection, actual);
+            assertEquals(cachedInfo, actual);
 
             // Wait for 200 Milliseconds. The cached entry should have been removed.
 
             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
 
-            cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+            cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
             assertNull(cached);
 
@@ -461,7 +466,7 @@ public class ActorContextTest extends AbstractActorTest{
                     };
 
 
-            Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
 
             try {
                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
@@ -470,7 +475,7 @@ public class ActorContextTest extends AbstractActorTest{
 
             }
 
-            Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
             assertNull(cached);
     }
@@ -494,7 +499,7 @@ public class ActorContextTest extends AbstractActorTest{
                     };
 
 
-            Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
 
             try {
                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
@@ -503,7 +508,7 @@ public class ActorContextTest extends AbstractActorTest{
 
             }
 
-            Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
             assertNull(cached);
     }
index 4240608..60420dc 100644 (file)
@@ -7,54 +7,54 @@
  */
 package org.opendaylight.controller.md.cluster.datastore.model;
 
+import com.google.common.io.Resources;
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collections;
-import java.util.Set;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.parser.api.YangSyntaxErrorException;
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
 
 public class TestModel {
 
-  public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
-          "test");
-
-  public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13",
-          "junk");
-
-
-  public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
-  public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
-  public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
-  public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
-  public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
-  public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
-  public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
-  private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
-
-  public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
-  public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME);
-  public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
-          node(OUTER_LIST_QNAME).build();
-  public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
-          node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build();
-  public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two");
-  public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three");
-
-
-  public static final InputStream getDatastoreTestInputStream() {
-    return getInputStream(DATASTORE_TEST_YANG);
-  }
-
-  private static InputStream getInputStream(final String resourceName) {
-    return TestModel.class.getResourceAsStream(DATASTORE_TEST_YANG);
-  }
-
-  public static SchemaContext createTestContext() {
-    YangParserImpl parser = new YangParserImpl();
-    Set<Module> modules = parser.parseYangModelsFromStreams(Collections.singletonList(getDatastoreTestInputStream()));
-    return parser.resolveSchemaContext(modules);
-  }
+    public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
+            "test");
+
+    public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13",
+            "junk");
+
+
+    public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
+    public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
+    public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
+    public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
+    public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
+    public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
+    public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
+    private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
+
+    public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
+    public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME);
+    public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
+            node(OUTER_LIST_QNAME).build();
+    public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
+            node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build();
+    public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two");
+    public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three");
+
+
+    public static final InputStream getDatastoreTestInputStream() {
+        return TestModel.class.getResourceAsStream(DATASTORE_TEST_YANG);
+    }
+
+    public static SchemaContext createTestContext() {
+        YangParserImpl parser = new YangParserImpl();
+        try {
+            return parser.parseSources(Collections.singleton(Resources.asByteSource(TestModel.class.getResource(DATASTORE_TEST_YANG))));
+        } catch (IOException | YangSyntaxErrorException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
 }
diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataTreeListenerTest.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataTreeListenerTest.java
new file mode 100644 (file)
index 0000000..c3038ca
--- /dev/null
@@ -0,0 +1,454 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ForwardingExecutorService;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class DOMDataTreeListenerTest {
+
+    private SchemaContext schemaContext;
+    private AbstractDOMDataBroker domBroker;
+    private ListeningExecutorService executor;
+    private ExecutorService futureExecutor;
+    private CommitExecutorService commitExecutor;
+
+    private static final DataContainerChild<?, ?> OUTER_LIST = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
+            .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1))
+            .build();
+
+    private static final DataContainerChild<?, ?> OUTER_LIST_2 = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
+            .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2))
+            .build();
+
+    private static final NormalizedNode<?, ?> TEST_CONTAINER = Builders.containerBuilder()
+            .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
+            .withChild(OUTER_LIST)
+            .build();
+
+    private static final NormalizedNode<?, ?> TEST_CONTAINER_2 = Builders.containerBuilder()
+            .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
+            .withChild(OUTER_LIST_2)
+            .build();
+
+    private static DOMDataTreeIdentifier ROOT_DATA_TREE_ID = new DOMDataTreeIdentifier(
+            LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+
+    private static DOMDataTreeIdentifier OUTER_LIST_DATA_TREE_ID = new DOMDataTreeIdentifier(
+            LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_LIST_PATH);
+
+    @Before
+    public void setupStore() {
+        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
+                MoreExecutors.newDirectExecutorService());
+        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
+                MoreExecutors.newDirectExecutorService());
+        schemaContext = TestModel.createTestContext();
+
+        operStore.onGlobalContextUpdated(schemaContext);
+        configStore.onGlobalContextUpdated(schemaContext);
+
+        ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore>builder() //
+                .put(CONFIGURATION, configStore) //
+                .put(OPERATIONAL, operStore) //
+                .build();
+
+        commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
+        futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
+        executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
+                TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, futureExecutor);
+        domBroker = new SerializedDOMDataBroker(stores, executor);
+    }
+
+    @After
+    public void tearDown() {
+        if (executor != null) {
+            executor.shutdownNow();
+        }
+
+        if (futureExecutor != null) {
+            futureExecutor.shutdownNow();
+        }
+    }
+
+    @Test
+    public void writeContainerEmptyTreeTest() throws InterruptedException {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService();
+        assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!",
+                dataTreeChangeService);
+
+        final TestDataTreeListener listener = new TestDataTreeListener(latch);
+        final ListenerRegistration<TestDataTreeListener> listenerReg =
+                dataTreeChangeService.registerDataTreeChangeListener(ROOT_DATA_TREE_ID, listener);
+
+        final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+        writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER);
+        writeTx.submit();
+
+        latch.await(5, TimeUnit.SECONDS);
+
+        assertEquals(1, listener.getReceivedChanges().size());
+        final Collection<DataTreeCandidate> changes = listener.getReceivedChanges().get(0);
+        assertEquals(1, changes.size());
+
+        DataTreeCandidate candidate = changes.iterator().next();
+        assertNotNull(candidate);
+        DataTreeCandidateNode candidateRoot = candidate.getRootNode();
+        checkChange(null, TEST_CONTAINER, ModificationType.WRITE, candidateRoot);
+        listenerReg.close();
+    }
+
+    @Test
+    public void replaceContainerContainerInTreeTest() throws InterruptedException, TransactionCommitFailedException {
+        CountDownLatch latch = new CountDownLatch(2);
+
+        DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService();
+        assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!",
+                dataTreeChangeService);
+
+        DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+        writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER);
+        writeTx.submit().checkedGet();
+
+        final TestDataTreeListener listener = new TestDataTreeListener(latch);
+        final ListenerRegistration<TestDataTreeListener> listenerReg =
+                dataTreeChangeService.registerDataTreeChangeListener(ROOT_DATA_TREE_ID, listener);
+        writeTx = domBroker.newWriteOnlyTransaction();
+        writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER_2);
+        writeTx.submit();
+
+        latch.await(5, TimeUnit.SECONDS);
+
+        assertEquals(2, listener.getReceivedChanges().size());
+        Collection<DataTreeCandidate> changes = listener.getReceivedChanges().get(0);
+        assertEquals(1, changes.size());
+
+        DataTreeCandidate candidate = changes.iterator().next();
+        assertNotNull(candidate);
+        DataTreeCandidateNode candidateRoot = candidate.getRootNode();
+        checkChange(null, TEST_CONTAINER, ModificationType.WRITE, candidateRoot);
+
+        changes = listener.getReceivedChanges().get(1);
+        assertEquals(1, changes.size());
+
+        candidate = changes.iterator().next();
+        assertNotNull(candidate);
+        candidateRoot = candidate.getRootNode();
+        checkChange(TEST_CONTAINER, TEST_CONTAINER_2, ModificationType.WRITE, candidateRoot);
+        listenerReg.close();
+    }
+
+    @Test
+    public void deleteContainerContainerInTreeTest() throws InterruptedException, TransactionCommitFailedException {
+        CountDownLatch latch = new CountDownLatch(2);
+
+        DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService();
+        assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!",
+                dataTreeChangeService);
+
+        DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+        writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER);
+        writeTx.submit().checkedGet();
+
+        final TestDataTreeListener listener = new TestDataTreeListener(latch);
+        final ListenerRegistration<TestDataTreeListener> listenerReg =
+                dataTreeChangeService.registerDataTreeChangeListener(ROOT_DATA_TREE_ID, listener);
+
+        writeTx = domBroker.newWriteOnlyTransaction();
+        writeTx.delete(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+        writeTx.submit();
+
+        latch.await(5, TimeUnit.SECONDS);
+
+        assertEquals(2, listener.getReceivedChanges().size());
+        Collection<DataTreeCandidate> changes = listener.getReceivedChanges().get(0);
+        assertEquals(1, changes.size());
+
+        DataTreeCandidate candidate = changes.iterator().next();
+        assertNotNull(candidate);
+        DataTreeCandidateNode candidateRoot = candidate.getRootNode();
+        checkChange(null, TEST_CONTAINER, ModificationType.WRITE, candidateRoot);
+
+        changes = listener.getReceivedChanges().get(1);
+        assertEquals(1, changes.size());
+
+        candidate = changes.iterator().next();
+        assertNotNull(candidate);
+        candidateRoot = candidate.getRootNode();
+        checkChange(TEST_CONTAINER, null, ModificationType.DELETE, candidateRoot);
+        listenerReg.close();
+    }
+
+    @Test
+    public void replaceChildListContainerInTreeTest() throws InterruptedException, TransactionCommitFailedException {
+        CountDownLatch latch = new CountDownLatch(2);
+
+        DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService();
+        assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!",
+                dataTreeChangeService);
+
+        DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+        writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER);
+        writeTx.submit().checkedGet();
+
+        final TestDataTreeListener listener = new TestDataTreeListener(latch);
+        final ListenerRegistration<TestDataTreeListener> listenerReg =
+                dataTreeChangeService.registerDataTreeChangeListener(ROOT_DATA_TREE_ID, listener);
+
+        writeTx = domBroker.newWriteOnlyTransaction();
+        writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_LIST_PATH, OUTER_LIST_2);
+        writeTx.submit();
+
+        latch.await(5, TimeUnit.SECONDS);
+
+        assertEquals(2, listener.getReceivedChanges().size());
+        Collection<DataTreeCandidate> changes = listener.getReceivedChanges().get(0);
+        assertEquals(1, changes.size());
+
+        DataTreeCandidate candidate = changes.iterator().next();
+        assertNotNull(candidate);
+        DataTreeCandidateNode candidateRoot = candidate.getRootNode();
+        checkChange(null, TEST_CONTAINER, ModificationType.WRITE, candidateRoot);
+
+        changes = listener.getReceivedChanges().get(1);
+        assertEquals(1, changes.size());
+
+        candidate = changes.iterator().next();
+        assertNotNull(candidate);
+        candidateRoot = candidate.getRootNode();
+        checkChange(TEST_CONTAINER, TEST_CONTAINER_2, ModificationType.SUBTREE_MODIFIED, candidateRoot);
+        final DataTreeCandidateNode modifiedChild = candidateRoot.getModifiedChild(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME));
+        assertNotNull(modifiedChild);
+        checkChange(OUTER_LIST, OUTER_LIST_2, ModificationType.WRITE, modifiedChild);
+        listenerReg.close();
+    }
+
+    @Test
+    public void rootModificationChildListenerTest() throws InterruptedException, TransactionCommitFailedException {
+        CountDownLatch latch = new CountDownLatch(2);
+
+        DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService();
+        assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!",
+                dataTreeChangeService);
+
+        DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+        writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER);
+        writeTx.submit().checkedGet();
+
+        final TestDataTreeListener listener = new TestDataTreeListener(latch);
+        final ListenerRegistration<TestDataTreeListener> listenerReg =
+                dataTreeChangeService.registerDataTreeChangeListener(OUTER_LIST_DATA_TREE_ID, listener);
+
+        writeTx = domBroker.newWriteOnlyTransaction();
+        writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER_2);
+        writeTx.submit().checkedGet();
+
+        latch.await(1, TimeUnit.SECONDS);
+
+        assertEquals(2, listener.getReceivedChanges().size());
+        Collection<DataTreeCandidate> changes = listener.getReceivedChanges().get(0);
+        assertEquals(1, changes.size());
+
+        DataTreeCandidate candidate = changes.iterator().next();
+        assertNotNull(candidate);
+        DataTreeCandidateNode candidateRoot = candidate.getRootNode();
+        checkChange(null, OUTER_LIST, ModificationType.WRITE, candidateRoot);
+
+        changes = listener.getReceivedChanges().get(1);
+        assertEquals(1, changes.size());
+
+        candidate = changes.iterator().next();
+        assertNotNull(candidate);
+        candidateRoot = candidate.getRootNode();
+        checkChange(OUTER_LIST, OUTER_LIST_2, ModificationType.WRITE, candidateRoot);
+        listenerReg.close();
+    }
+
+    @Test
+    public void listEntryChangeNonRootRegistrationTest() throws InterruptedException, TransactionCommitFailedException {
+        CountDownLatch latch = new CountDownLatch(2);
+
+        DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService();
+        assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!",
+                dataTreeChangeService);
+
+        DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+        writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER);
+        writeTx.submit().checkedGet();
+
+        final TestDataTreeListener listener = new TestDataTreeListener(latch);
+        final ListenerRegistration<TestDataTreeListener> listenerReg =
+                dataTreeChangeService.registerDataTreeChangeListener(OUTER_LIST_DATA_TREE_ID, listener);
+
+        final YangInstanceIdentifier.NodeIdentifierWithPredicates outerListEntryId1 =
+                new YangInstanceIdentifier.NodeIdentifierWithPredicates(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1);
+        final YangInstanceIdentifier.NodeIdentifierWithPredicates outerListEntryId2 =
+                new YangInstanceIdentifier.NodeIdentifierWithPredicates(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2);
+        final YangInstanceIdentifier.NodeIdentifierWithPredicates outerListEntryId3 =
+                new YangInstanceIdentifier.NodeIdentifierWithPredicates(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 3);
+
+        final MapEntryNode outerListEntry1 = ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1);
+        final MapEntryNode outerListEntry2 = ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2);
+        final MapEntryNode outerListEntry3 = ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 3);
+
+        final MapNode listAfter = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
+                .withChild(outerListEntry2)
+                .withChild(outerListEntry3)
+                .build();
+
+        writeTx = domBroker.newWriteOnlyTransaction();
+        writeTx.delete(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_LIST_PATH.node(outerListEntryId1));
+        writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_LIST_PATH.node(outerListEntryId2),
+                outerListEntry2);
+        writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_LIST_PATH.node(outerListEntryId3),
+                outerListEntry3);
+        writeTx.submit();
+
+        latch.await(5, TimeUnit.SECONDS);
+
+        assertEquals(2, listener.getReceivedChanges().size());
+        Collection<DataTreeCandidate> changes = listener.getReceivedChanges().get(0);
+        assertEquals(1, changes.size());
+
+        DataTreeCandidate candidate = changes.iterator().next();
+        assertNotNull(candidate);
+        DataTreeCandidateNode candidateRoot = candidate.getRootNode();
+        checkChange(null, OUTER_LIST, ModificationType.WRITE, candidateRoot);
+
+        changes = listener.getReceivedChanges().get(1);
+        assertEquals(1, changes.size());
+
+        candidate = changes.iterator().next();
+        assertNotNull(candidate);
+        candidateRoot = candidate.getRootNode();
+        checkChange(OUTER_LIST, listAfter, ModificationType.SUBTREE_MODIFIED, candidateRoot);
+        final DataTreeCandidateNode entry1Canditate = candidateRoot.getModifiedChild(outerListEntryId1);
+        checkChange(outerListEntry1, null, ModificationType.DELETE, entry1Canditate);
+        final DataTreeCandidateNode entry2Canditate = candidateRoot.getModifiedChild(outerListEntryId2);
+        checkChange(null, outerListEntry2, ModificationType.WRITE, entry2Canditate);
+        final DataTreeCandidateNode entry3Canditate = candidateRoot.getModifiedChild(outerListEntryId3);
+        checkChange(null, outerListEntry3, ModificationType.WRITE, entry3Canditate);
+        listenerReg.close();
+    }
+
+    private static void checkChange(NormalizedNode<?, ?> expectedBefore,
+                                    NormalizedNode<?, ?> expectedAfter,
+                                    ModificationType expectedMod,
+                                    DataTreeCandidateNode candidateNode) {
+        if (expectedBefore != null) {
+            assertTrue(candidateNode.getDataBefore().isPresent());
+            assertEquals(expectedBefore, candidateNode.getDataBefore().get());
+        } else {
+            assertFalse(candidateNode.getDataBefore().isPresent());
+        }
+
+        if (expectedAfter != null) {
+            assertTrue(candidateNode.getDataAfter().isPresent());
+            assertEquals(expectedAfter, candidateNode.getDataAfter().get());
+        } else {
+            assertFalse(candidateNode.getDataAfter().isPresent());
+        }
+
+        assertEquals(expectedMod, candidateNode.getModificationType());
+    }
+
+    private DOMDataTreeChangeService getDOMDataTreeChangeService() {
+        final DOMDataBrokerExtension extension = domBroker.getSupportedExtensions()
+                .get(DOMDataTreeChangeService.class);
+        if (extension == null) {
+            return null;
+        }
+        DOMDataTreeChangeService dataTreeChangeService = null;
+        if (extension instanceof DOMDataTreeChangeService) {
+            dataTreeChangeService = (DOMDataTreeChangeService) extension;
+        }
+        return dataTreeChangeService;
+    }
+
+
+    static class CommitExecutorService extends ForwardingExecutorService {
+
+        ExecutorService delegate;
+
+        public CommitExecutorService(final ExecutorService delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        protected ExecutorService delegate() {
+            return delegate;
+        }
+    }
+
+    static class TestDataTreeListener implements DOMDataTreeChangeListener {
+
+        private List<Collection<DataTreeCandidate>> receivedChanges = new ArrayList<>();
+        private CountDownLatch latch;
+
+        public TestDataTreeListener(final CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        @Override
+        public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
+            receivedChanges.add(changes);
+            latch.countDown();
+        }
+
+        public List<Collection<DataTreeCandidate>> getReceivedChanges() {
+            return receivedChanges;
+        }
+    }
+}
index f043d2c..106abca 100644 (file)
@@ -49,7 +49,8 @@ public abstract class AbstractDOMStoreTransaction<T> implements DOMStoreTransact
      * @return The context in which this transaction was allocated, or null
      *         if the context was not recorded.
      */
-    @Nullable public final Throwable getDebugContext() {
+    @Nullable
+    public final Throwable getDebugContext() {
         return debugContext;
     }
 
diff --git a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractSnapshotBackedTransactionChain.java b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractSnapshotBackedTransactionChain.java
new file mode 100644 (file)
index 0000000..b7776b2
--- /dev/null
@@ -0,0 +1,270 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.core.spi.data;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract implementation of the {@link DOMStoreTransactionChain} interface relying on {@link DataTreeSnapshot} supplier
+ * and backend commit coordinator.
+ *
+ * @param <T> transaction identifier type
+ */
+@Beta
+public abstract class AbstractSnapshotBackedTransactionChain<T> extends TransactionReadyPrototype<T> implements DOMStoreTransactionChain {
+    private static abstract class State {
+        /**
+         * Allocate a new snapshot.
+         *
+         * @return A new snapshot
+         */
+        protected abstract DataTreeSnapshot getSnapshot();
+    }
+
+    private static final class Idle extends State {
+        private final AbstractSnapshotBackedTransactionChain<?> chain;
+
+        Idle(final AbstractSnapshotBackedTransactionChain<?> chain) {
+            this.chain = Preconditions.checkNotNull(chain);
+        }
+
+        @Override
+        protected DataTreeSnapshot getSnapshot() {
+            return chain.takeSnapshot();
+        }
+    }
+
+    /**
+     * We have a transaction out there.
+     */
+    private static final class Allocated extends State {
+        private static final AtomicReferenceFieldUpdater<Allocated, DataTreeSnapshot> SNAPSHOT_UPDATER =
+                AtomicReferenceFieldUpdater.newUpdater(Allocated.class, DataTreeSnapshot.class, "snapshot");
+        private final DOMStoreWriteTransaction transaction;
+        private volatile DataTreeSnapshot snapshot;
+
+        Allocated(final DOMStoreWriteTransaction transaction) {
+            this.transaction = Preconditions.checkNotNull(transaction);
+        }
+
+        public DOMStoreWriteTransaction getTransaction() {
+            return transaction;
+        }
+
+        @Override
+        protected DataTreeSnapshot getSnapshot() {
+            final DataTreeSnapshot ret = snapshot;
+            Preconditions.checkState(ret != null, "Previous transaction %s is not ready yet", transaction.getIdentifier());
+            return ret;
+        }
+
+        void setSnapshot(final DataTreeSnapshot snapshot) {
+            final boolean success = SNAPSHOT_UPDATER.compareAndSet(this, null, snapshot);
+            Preconditions.checkState(success, "Transaction %s has already been marked as ready", transaction.getIdentifier());
+        }
+    }
+
+    /**
+     * Chain is logically shut down, no further allocation allowed.
+     */
+    private static final class Shutdown extends State {
+        private final String message;
+
+        Shutdown(final String message) {
+            this.message = Preconditions.checkNotNull(message);
+        }
+
+        @Override
+        protected DataTreeSnapshot getSnapshot() {
+            throw new IllegalStateException(message);
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static final AtomicReferenceFieldUpdater<AbstractSnapshotBackedTransactionChain, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(AbstractSnapshotBackedTransactionChain.class, State.class, "state");
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractSnapshotBackedTransactionChain.class);
+    private static final Shutdown CLOSED = new Shutdown("Transaction chain is closed");
+    private static final Shutdown FAILED = new Shutdown("Transaction chain has failed");
+    private final Idle idleState;
+    private volatile State state;
+
+    protected AbstractSnapshotBackedTransactionChain() {
+        idleState = new Idle(this);
+        state = idleState;
+    }
+
+    private Entry<State, DataTreeSnapshot> getSnapshot() {
+        final State localState = state;
+        return new SimpleEntry<>(localState, localState.getSnapshot());
+    }
+
+    private boolean recordTransaction(final State expected, final DOMStoreWriteTransaction transaction) {
+        final State state = new Allocated(transaction);
+        return STATE_UPDATER.compareAndSet(this, expected, state);
+    }
+
+    @Override
+    public final DOMStoreReadTransaction newReadOnlyTransaction() {
+        final Entry<State, DataTreeSnapshot> entry = getSnapshot();
+        return SnapshotBackedTransactions.newReadTransaction(nextTransactionIdentifier(), getDebugTransactions(), entry.getValue());
+    }
+
+    @Override
+    public final DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        Entry<State, DataTreeSnapshot> entry;
+        DOMStoreReadWriteTransaction ret;
+
+        do {
+            entry = getSnapshot();
+            ret = new SnapshotBackedReadWriteTransaction<T>(nextTransactionIdentifier(),
+                getDebugTransactions(), entry.getValue(), this);
+        } while (!recordTransaction(entry.getKey(), ret));
+
+        return ret;
+    }
+
+    @Override
+    public final DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        Entry<State, DataTreeSnapshot> entry;
+        DOMStoreWriteTransaction ret;
+
+        do {
+            entry = getSnapshot();
+            ret = new SnapshotBackedWriteTransaction<T>(nextTransactionIdentifier(),
+                getDebugTransactions(), entry.getValue(), this);
+        } while (!recordTransaction(entry.getKey(), ret));
+
+        return ret;
+    }
+
+    @Override
+    protected final void transactionAborted(final SnapshotBackedWriteTransaction<T> tx) {
+        final State localState = state;
+        if (localState instanceof Allocated) {
+            final Allocated allocated = (Allocated)localState;
+            if (allocated.getTransaction().equals(tx)) {
+                final boolean success = STATE_UPDATER.compareAndSet(this, localState, idleState);
+                if (!success) {
+                    LOG.warn("Transaction {} aborted, but chain {} state already transitioned from {} to {}, very strange",
+                        tx, this, localState, state);
+                }
+            }
+        }
+    }
+
+    @Override
+    protected final DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction<T> tx, final DataTreeModification tree) {
+        final State localState = state;
+
+        if (localState instanceof Allocated) {
+            final Allocated allocated = (Allocated)localState;
+            final DOMStoreWriteTransaction transaction = allocated.getTransaction();
+            Preconditions.checkState(tx.equals(transaction), "Mis-ordered ready transaction %s last allocated was %s", tx, transaction);
+            allocated.setSnapshot(tree);
+        } else {
+            LOG.debug("Ignoring transaction {} readiness due to state {}", tx, localState);
+        }
+
+        return createCohort(tx, tree);
+    }
+
+    @Override
+    public final void close() {
+        final State localState = state;
+
+        do {
+            Preconditions.checkState(!CLOSED.equals(localState), "Transaction chain {} has been closed", this);
+
+            if (FAILED.equals(localState)) {
+                LOG.debug("Ignoring user close in failed state");
+                return;
+            }
+        } while (!STATE_UPDATER.compareAndSet(this, localState, CLOSED));
+    }
+
+    /**
+     * Notify the base logic that a previously-submitted transaction has been committed successfully.
+     *
+     * @param transaction Transaction which completed successfully.
+     */
+    protected final void onTransactionCommited(final SnapshotBackedWriteTransaction<T> transaction) {
+        // If the committed transaction was the one we allocated last,
+        // we clear it and the ready snapshot, so the next transaction
+        // allocated refers to the data tree directly.
+        final State localState = state;
+
+        if (!(localState instanceof Allocated)) {
+            // This can legally happen if the chain is shut down before the transaction was committed
+            // by the backend.
+            LOG.debug("Ignoring successful transaction {} in state {}", transaction, localState);
+            return;
+        }
+
+        final Allocated allocated = (Allocated)localState;
+        final DOMStoreWriteTransaction tx = allocated.getTransaction();
+        if (!tx.equals(transaction)) {
+            LOG.debug("Ignoring non-latest successful transaction {} in state {}", transaction, allocated);
+            return;
+        }
+
+        if (!STATE_UPDATER.compareAndSet(this, localState, idleState)) {
+            LOG.debug("Transaction chain {} has already transitioned from {} to {}, not making it idle", this, localState, state);
+        }
+    }
+
+    /**
+     * Notify the base logic that a previously-submitted transaction has failed.
+     *
+     * @param transaction Transaction which failed.
+     * @param cause Failure cause
+     */
+    protected final void onTransactionFailed(final SnapshotBackedWriteTransaction<T> transaction, final Throwable cause) {
+        LOG.debug("Transaction chain {} failed on transaction {}", this, transaction, cause);
+        state = FAILED;
+    }
+
+    /**
+     * Return the next transaction identifier.
+     *
+     * @return transaction identifier.
+     */
+    protected abstract T nextTransactionIdentifier();
+
+    /**
+     * Inquire as to whether transactions should record their allocation context.
+     *
+     * @return True if allocation context should be recorded.
+     */
+    protected abstract boolean getDebugTransactions();
+
+    /**
+     * Take a fresh {@link DataTreeSnapshot} from the backend.
+     *
+     * @return A new snapshot.
+     */
+    protected abstract DataTreeSnapshot takeSnapshot();
+
+    /**
+     * Create a cohort for driving the transaction through the commit process.
+     *
+     * @param transaction Transaction handle
+     * @param modification {@link DataTreeModification} which needs to be applied to the backend
+     * @return A {@link DOMStoreThreePhaseCommitCohort} cohort.
+     */
+    protected abstract DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction<T> transaction, final DataTreeModification modification);
+}
@@ -5,16 +5,15 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.md.sal.dom.store.impl;
+package org.opendaylight.controller.sal.core.spi.data;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.annotations.Beta;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
@@ -28,14 +27,21 @@ import org.slf4j.LoggerFactory;
  * Implementation of read-only transaction backed by {@link DataTreeSnapshot}
  * which delegates most of its calls to similar methods provided by underlying snapshot.
  *
+ * <T> identifier type
  */
-final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction<Object>
-                                          implements DOMStoreReadTransaction {
-
+@Beta
+public final class SnapshotBackedReadTransaction<T> extends AbstractDOMStoreTransaction<T> implements DOMStoreReadTransaction {
     private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadTransaction.class);
     private volatile DataTreeSnapshot stableSnapshot;
 
-    public SnapshotBackedReadTransaction(final Object identifier, final boolean debug, final DataTreeSnapshot snapshot) {
+    /**
+     * Creates a new read-only transaction.
+     *
+     * @param identifier Transaction Identifier
+     * @param debug Enable transaction debugging
+     * @param snapshot Snapshot which will be modified.
+     */
+    SnapshotBackedReadTransaction(final T identifier, final boolean debug, final DataTreeSnapshot snapshot) {
         super(identifier, debug);
         this.stableSnapshot = Preconditions.checkNotNull(snapshot);
         LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot);
@@ -71,8 +77,7 @@ final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction<Ob
         checkNotNull(path, "Path must not be null.");
 
         try {
-            return Futures.immediateCheckedFuture(
-                read(path).checkedGet().isPresent());
+            return Futures.immediateCheckedFuture(read(path).checkedGet().isPresent());
         } catch (ReadFailedException e) {
             return Futures.immediateFailedCheckedFuture(e);
         }
@@ -5,14 +5,15 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.md.sal.dom.store.impl;
+package org.opendaylight.controller.sal.core.spi.data;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.annotations.Beta;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
@@ -23,20 +24,15 @@ import org.slf4j.LoggerFactory;
  * Implementation of Read-Write transaction which is backed by {@link DataTreeSnapshot}
  * and executed according to {@link TransactionReadyPrototype}.
  *
+ * @param <T> identifier type
  */
-final class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction implements DOMStoreReadWriteTransaction {
+@Beta
+public final class SnapshotBackedReadWriteTransaction<T> extends SnapshotBackedWriteTransaction<T> implements DOMStoreReadWriteTransaction {
     private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadWriteTransaction.class);
 
-    /**
-     * Creates new read-write transaction.
-     *
-     * @param identifier transaction Identifier
-     * @param snapshot Snapshot which will be modified.
-     * @param readyImpl Implementation of ready method.
-     */
-    protected SnapshotBackedReadWriteTransaction(final Object identifier, final boolean debug,
-            final DataTreeSnapshot snapshot, final TransactionReadyPrototype store) {
-        super(identifier, debug, snapshot, store);
+    SnapshotBackedReadWriteTransaction(final T identifier, final boolean debug,
+            final DataTreeSnapshot snapshot, final TransactionReadyPrototype<T> readyImpl) {
+        super(identifier, debug, snapshot, readyImpl);
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedTransactions.java b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedTransactions.java
new file mode 100644 (file)
index 0000000..3368c8a
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.core.spi.data;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+/**
+ * Public utility class for instantiating snapshot-backed transactions.
+ */
+@Beta
+public final class SnapshotBackedTransactions {
+    private SnapshotBackedTransactions() {
+        throw new UnsupportedOperationException("Utility class");
+    }
+
+    /**
+     * Creates a new read-only transaction.
+     *
+     * @param identifier Transaction Identifier
+     * @param debug Enable transaction debugging
+     * @param snapshot Snapshot which will be modified.
+     */
+    public static <T> SnapshotBackedReadTransaction<T> newReadTransaction(final T identifier, final boolean debug, final DataTreeSnapshot snapshot) {
+        return new SnapshotBackedReadTransaction<T>(identifier, debug, snapshot);
+    }
+
+    /**
+     * Creates a new read-write transaction.
+     *
+     * @param identifier transaction Identifier
+     * @param debug Enable transaction debugging
+     * @param snapshot Snapshot which will be modified.
+     * @param readyImpl Implementation of ready method.
+     */
+    public static <T> SnapshotBackedReadWriteTransaction<T> newReadWriteTransaction(final T identifier, final boolean debug,
+            final DataTreeSnapshot snapshot, final TransactionReadyPrototype<T> readyImpl) {
+        return new SnapshotBackedReadWriteTransaction<T>(identifier, debug, snapshot, readyImpl);
+    }
+
+    /**
+     * Creates a new write-only transaction.
+     *
+     * @param identifier transaction Identifier
+     * @param debug Enable transaction debugging
+     * @param snapshot Snapshot which will be modified.
+     * @param readyImpl Implementation of ready method.
+     */
+    public static <T> SnapshotBackedWriteTransaction<T> newWriteTransaction(final T identifier, final boolean debug,
+            final DataTreeSnapshot snapshot, final TransactionReadyPrototype<T> readyImpl) {
+        return new SnapshotBackedWriteTransaction<T>(identifier, debug, snapshot, readyImpl);
+    }
+}
@@ -5,17 +5,15 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.md.sal.dom.store.impl;
+package org.opendaylight.controller.sal.core.spi.data;
 
 import static com.google.common.base.Preconditions.checkState;
+import com.google.common.annotations.Beta;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
@@ -26,33 +24,27 @@ import org.slf4j.LoggerFactory;
 /**
  * Implementation of Write transaction which is backed by
  * {@link DataTreeSnapshot} and executed according to
- * {@link org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype}.
+ * {@link org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype}.
  *
+ * @param <T> Identifier type
  */
-class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction<Object> implements DOMStoreWriteTransaction {
+@Beta
+public class SnapshotBackedWriteTransaction<T> extends AbstractDOMStoreTransaction<T> implements DOMStoreWriteTransaction {
     private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedWriteTransaction.class);
+    @SuppressWarnings("rawtypes")
     private static final AtomicReferenceFieldUpdater<SnapshotBackedWriteTransaction, TransactionReadyPrototype> READY_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(SnapshotBackedWriteTransaction.class, TransactionReadyPrototype.class, "readyImpl");
+    @SuppressWarnings("rawtypes")
     private static final AtomicReferenceFieldUpdater<SnapshotBackedWriteTransaction, DataTreeModification> TREE_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(SnapshotBackedWriteTransaction.class, DataTreeModification.class, "mutableTree");
 
     // non-null when not ready
-    private volatile TransactionReadyPrototype readyImpl;
+    private volatile TransactionReadyPrototype<T> readyImpl;
     // non-null when not committed/closed
     private volatile DataTreeModification mutableTree;
 
-    /**
-     * Creates new write-only transaction.
-     *
-     * @param identifier
-     *            transaction Identifier
-     * @param snapshot
-     *            Snapshot which will be modified.
-     * @param readyImpl
-     *            Implementation of ready method.
-     */
-    public SnapshotBackedWriteTransaction(final Object identifier, final boolean debug,
-            final DataTreeSnapshot snapshot, final TransactionReadyPrototype readyImpl) {
+    SnapshotBackedWriteTransaction(final T identifier, final boolean debug,
+            final DataTreeSnapshot snapshot, final TransactionReadyPrototype<T> readyImpl) {
         super(identifier, debug);
         this.readyImpl = Preconditions.checkNotNull(readyImpl, "readyImpl must not be null.");
         mutableTree = snapshot.newModification();
@@ -126,7 +118,7 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction<Object>
      * @param path Path to read
      * @return null if the the transaction has been closed;
      */
-    protected final Optional<NormalizedNode<?, ?>> readSnapshotNode(final YangInstanceIdentifier path) {
+    final Optional<NormalizedNode<?, ?>> readSnapshotNode(final YangInstanceIdentifier path) {
         return readyImpl == null ? null : mutableTree.readNode(path);
     }
 
@@ -136,7 +128,8 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction<Object>
 
     @Override
     public DOMStoreThreePhaseCommitCohort ready() {
-        final TransactionReadyPrototype wasReady = READY_UPDATER.getAndSet(this, null);
+        @SuppressWarnings("unchecked")
+        final TransactionReadyPrototype<T> wasReady = READY_UPDATER.getAndSet(this, null);
         checkState(wasReady != null, "Transaction %s is no longer open", getIdentifier());
 
         LOG.debug("Store transaction: {} : Ready", getIdentifier());
@@ -149,7 +142,8 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction<Object>
 
     @Override
     public void close() {
-        final TransactionReadyPrototype wasReady = READY_UPDATER.getAndSet(this, null);
+        @SuppressWarnings("unchecked")
+        final TransactionReadyPrototype<T> wasReady = READY_UPDATER.getAndSet(this, null);
         if (wasReady != null) {
             LOG.debug("Store transaction: {} : Closed", getIdentifier());
             TREE_UPDATER.lazySet(this, null);
@@ -166,21 +160,22 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction<Object>
 
     /**
      * Prototype implementation of
-     * {@link #ready(org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction)}
+     * {@link #ready(org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction)}
      *
      * This class is intended to be implemented by Transaction factories
-     * responsible for allocation of {@link org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction} and
+     * responsible for allocation of {@link org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction} and
      * providing underlying logic for applying implementation.
      *
+     * @param <T> identifier type
      */
-    abstract static class TransactionReadyPrototype {
+    public abstract static class TransactionReadyPrototype<T> {
         /**
          * Called when a transaction is closed without being readied. This is not invoked for
          * transactions which are ready.
          *
          * @param tx Transaction which got aborted.
          */
-        protected abstract void transactionAborted(final SnapshotBackedWriteTransaction tx);
+        protected abstract void transactionAborted(final SnapshotBackedWriteTransaction<T> tx);
 
         /**
          * Returns a commit coordinator associated with supplied transactions.
@@ -193,6 +188,6 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction<Object>
          *            Modified data tree which has been constructed.
          * @return DOMStoreThreePhaseCommitCohort associated with transaction
          */
-        protected abstract DOMStoreThreePhaseCommitCohort transactionReady(SnapshotBackedWriteTransaction tx, DataTreeModification tree);
+        protected abstract DOMStoreThreePhaseCommitCohort transactionReady(SnapshotBackedWriteTransaction<T> tx, DataTreeModification tree);
     }
 }
\ No newline at end of file
index 05e3d5c..35d891d 100644 (file)
@@ -8,44 +8,24 @@
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.ForwardingDOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
-final class ChainedTransactionCommitImpl extends ForwardingDOMStoreThreePhaseCommitCohort {
-    private final SnapshotBackedWriteTransaction transaction;
-    private final DOMStoreThreePhaseCommitCohort delegate;
+final class ChainedTransactionCommitImpl extends InMemoryDOMStoreThreePhaseCommitCohort {
     private final DOMStoreTransactionChainImpl txChain;
 
-    ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
-            final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
-        this.transaction = Preconditions.checkNotNull(transaction);
-        this.delegate = Preconditions.checkNotNull(delegate);
+    ChainedTransactionCommitImpl(final InMemoryDOMDataStore store, final SnapshotBackedWriteTransaction<String> transaction,
+        final DataTreeModification modification, final DOMStoreTransactionChainImpl txChain) {
+        super(store, transaction, modification);
         this.txChain = Preconditions.checkNotNull(txChain);
     }
 
-    @Override
-    protected DOMStoreThreePhaseCommitCohort delegate() {
-        return delegate;
-    }
-
     @Override
     public ListenableFuture<Void> commit() {
-        ListenableFuture<Void> commitFuture = super.commit();
-        Futures.addCallback(commitFuture, new FutureCallback<Void>() {
-            @Override
-            public void onFailure(final Throwable t) {
-                txChain.onTransactionFailed(transaction, t);
-            }
-
-            @Override
-            public void onSuccess(final Void result) {
-                txChain.onTransactionCommited(transaction);
-            }
-        });
-        return commitFuture;
+        ListenableFuture<Void> ret = super.commit();
+        txChain.transactionCommited(getTransaction());
+        return ret;
     }
 
 }
\ No newline at end of file
index 3f731cf..2cf79d8 100644 (file)
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
 import com.google.common.base.Preconditions;
-import java.util.AbstractMap.SimpleEntry;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.AbstractSnapshotBackedTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-final class DOMStoreTransactionChainImpl extends TransactionReadyPrototype implements DOMStoreTransactionChain {
-    private static abstract class State {
-        /**
-         * Allocate a new snapshot.
-         *
-         * @return A new snapshot
-         */
-        protected abstract DataTreeSnapshot getSnapshot();
-    }
-
-    private static final class Idle extends State {
-        private final InMemoryDOMDataStore store;
-
-        Idle(final InMemoryDOMDataStore store) {
-            this.store = Preconditions.checkNotNull(store);
-        }
-
-        @Override
-        protected DataTreeSnapshot getSnapshot() {
-            return store.takeSnapshot();
-        }
-    }
-
-    /**
-     * We have a transaction out there.
-     */
-    private static final class Allocated extends State {
-        private static final AtomicReferenceFieldUpdater<Allocated, DataTreeSnapshot> SNAPSHOT_UPDATER =
-                AtomicReferenceFieldUpdater.newUpdater(Allocated.class, DataTreeSnapshot.class, "snapshot");
-        private final DOMStoreWriteTransaction transaction;
-        private volatile DataTreeSnapshot snapshot;
-
-        Allocated(final DOMStoreWriteTransaction transaction) {
-            this.transaction = Preconditions.checkNotNull(transaction);
-        }
-
-        public DOMStoreWriteTransaction getTransaction() {
-            return transaction;
-        }
-
-        @Override
-        protected DataTreeSnapshot getSnapshot() {
-            final DataTreeSnapshot ret = snapshot;
-            Preconditions.checkState(ret != null, "Previous transaction %s is not ready yet", transaction.getIdentifier());
-            return ret;
-        }
-
-        void setSnapshot(final DataTreeSnapshot snapshot) {
-            final boolean success = SNAPSHOT_UPDATER.compareAndSet(this, null, snapshot);
-            Preconditions.checkState(success, "Transaction %s has already been marked as ready", transaction.getIdentifier());
-        }
-    }
-
-    /**
-     * Chain is logically shut down, no further allocation allowed.
-     */
-    private static final class Shutdown extends State {
-        private final String message;
-
-        Shutdown(final String message) {
-            this.message = Preconditions.checkNotNull(message);
-        }
-
-        @Override
-        protected DataTreeSnapshot getSnapshot() {
-            throw new IllegalStateException(message);
-        }
-    }
-
-    private static final AtomicReferenceFieldUpdater<DOMStoreTransactionChainImpl, State> STATE_UPDATER =
-            AtomicReferenceFieldUpdater.newUpdater(DOMStoreTransactionChainImpl.class, State.class, "state");
-    private static final Logger LOG = LoggerFactory.getLogger(DOMStoreTransactionChainImpl.class);
-    private static final Shutdown CLOSED = new Shutdown("Transaction chain is closed");
-    private static final Shutdown FAILED = new Shutdown("Transaction chain has failed");
+final class DOMStoreTransactionChainImpl extends AbstractSnapshotBackedTransactionChain<String> {
     private final InMemoryDOMDataStore store;
-    private final Idle idleState;
-    private volatile State state;
 
     DOMStoreTransactionChainImpl(final InMemoryDOMDataStore store) {
         this.store = Preconditions.checkNotNull(store);
-        idleState = new Idle(store);
-        state = idleState;
-    }
-
-    private Entry<State, DataTreeSnapshot> getSnapshot() {
-        final State localState = state;
-        return new SimpleEntry<>(localState, localState.getSnapshot());
-    }
-
-    private boolean recordTransaction(final State expected, final DOMStoreWriteTransaction transaction) {
-        final State state = new Allocated(transaction);
-        return STATE_UPDATER.compareAndSet(this, expected, state);
     }
 
     @Override
-    public DOMStoreReadTransaction newReadOnlyTransaction() {
-        final Entry<State, DataTreeSnapshot> entry = getSnapshot();
-        return new SnapshotBackedReadTransaction(store.nextIdentifier(), store.getDebugTransactions(), entry.getValue());
+    protected DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction<String> tx, final DataTreeModification modification) {
+        return new ChainedTransactionCommitImpl(store, tx, modification, this);
     }
 
     @Override
-    public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        Entry<State, DataTreeSnapshot> entry;
-        DOMStoreReadWriteTransaction ret;
-
-        do {
-            entry = getSnapshot();
-            ret = new SnapshotBackedReadWriteTransaction(store.nextIdentifier(),
-                store.getDebugTransactions(), entry.getValue(), this);
-        } while (!recordTransaction(entry.getKey(), ret));
-
-        return ret;
-    }
-
-    @Override
-    public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        Entry<State, DataTreeSnapshot> entry;
-        DOMStoreWriteTransaction ret;
-
-        do {
-            entry = getSnapshot();
-            ret = new SnapshotBackedWriteTransaction(store.nextIdentifier(),
-                store.getDebugTransactions(), entry.getValue(), this);
-        } while (!recordTransaction(entry.getKey(), ret));
-
-        return ret;
+    protected DataTreeSnapshot takeSnapshot() {
+        return store.takeSnapshot();
     }
 
     @Override
-    protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
-        final State localState = state;
-        if (localState instanceof Allocated) {
-            final Allocated allocated = (Allocated)localState;
-            if (allocated.getTransaction().equals(tx)) {
-                final boolean success = STATE_UPDATER.compareAndSet(this, localState, idleState);
-                if (!success) {
-                    LOG.info("State already transitioned from {} to {}", localState, state);
-                }
-            }
-        }
+    protected String nextTransactionIdentifier() {
+        return store.nextIdentifier();
     }
 
     @Override
-    protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
-        final State localState = state;
-
-        if (localState instanceof Allocated) {
-            final Allocated allocated = (Allocated)localState;
-            final DOMStoreWriteTransaction transaction = allocated.getTransaction();
-            Preconditions.checkState(tx.equals(transaction), "Mis-ordered ready transaction %s last allocated was %s", tx, transaction);
-            allocated.setSnapshot(tree);
-        } else {
-            LOG.debug("Ignoring transaction {} readiness due to state {}", tx, localState);
-        }
-
-        return new ChainedTransactionCommitImpl(tx, store.transactionReady(tx, tree), this);
+    protected boolean getDebugTransactions() {
+        return store.getDebugTransactions();
     }
 
-    @Override
-    public void close() {
-        final State localState = state;
-
-        do {
-            Preconditions.checkState(!CLOSED.equals(localState), "Transaction chain {} has been closed", this);
-
-            if (FAILED.equals(localState)) {
-                LOG.debug("Ignoring user close in failed state");
-                return;
-            }
-        } while (!STATE_UPDATER.compareAndSet(this, localState, CLOSED));
-    }
-
-    void onTransactionFailed(final SnapshotBackedWriteTransaction transaction, final Throwable t) {
-        LOG.debug("Transaction chain {} failed on transaction {}", this, transaction, t);
-        state = FAILED;
-    }
-
-    void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
-        // If the committed transaction was the one we allocated last,
-        // we clear it and the ready snapshot, so the next transaction
-        // allocated refers to the data tree directly.
-        final State localState = state;
-
-        if (!(localState instanceof Allocated)) {
-            LOG.debug("Ignoring successful transaction {} in state {}", transaction, localState);
-            return;
-        }
-
-        final Allocated allocated = (Allocated)localState;
-        final DOMStoreWriteTransaction tx = allocated.getTransaction();
-        if (!tx.equals(transaction)) {
-            LOG.debug("Ignoring non-latest successful transaction {} in state {}", transaction, allocated);
-            return;
-        }
-
-        if (!STATE_UPDATER.compareAndSet(this, localState, idleState)) {
-            LOG.debug("Transaction chain {} has already transitioned from {} to {}, not making it idle", this, localState, state);
-        }
+    void transactionCommited(final SnapshotBackedWriteTransaction<String> transaction) {
+        super.onTransactionCommited(transaction);
     }
-}
\ No newline at end of file
+}
index 354abcf..a85d8ac 100644 (file)
@@ -7,22 +7,15 @@
  */
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
-import static com.google.common.base.Preconditions.checkState;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
-import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
@@ -30,6 +23,9 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
@@ -38,7 +34,6 @@ import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invoker;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
@@ -55,14 +50,12 @@ import org.slf4j.LoggerFactory;
  *
  * Implementation of {@link DOMStore} which uses {@link DataTree} and other
  * classes such as {@link SnapshotBackedWriteTransaction}.
- * {@link SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask}
+ * {@link org.opendaylight.controller.sal.core.spi.data.SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask}
  * to implement {@link DOMStore} contract.
  *
  */
-public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable, DOMStoreTreeChangePublisher {
+public class InMemoryDOMDataStore extends TransactionReadyPrototype<String> implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable, DOMStoreTreeChangePublisher {
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
-    private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
-    private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
 
     private static final Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
             new Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent>() {
@@ -120,17 +113,17 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new SnapshotBackedReadTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot());
+        return SnapshotBackedTransactions.newReadTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot());
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new SnapshotBackedReadWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
+        return SnapshotBackedTransactions.newReadWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new SnapshotBackedWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
+        return SnapshotBackedTransactions.newWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
     }
 
     @Override
@@ -214,99 +207,31 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
     }
 
     @Override
-    protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
+    protected void transactionAborted(final SnapshotBackedWriteTransaction<String> tx) {
         LOG.debug("Tx: {} is closed.", tx.getIdentifier());
     }
 
     @Override
-    protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
-        LOG.debug("Tx: {} is submitted. Modifications: {}", tx.getIdentifier(), tree);
-        return new ThreePhaseCommitImpl(tx, tree);
+    protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction<String> tx, final DataTreeModification modification) {
+        LOG.debug("Tx: {} is submitted. Modifications: {}", tx.getIdentifier(), modification);
+        return new InMemoryDOMStoreThreePhaseCommitCohort(this, tx, modification);
     }
 
-    Object nextIdentifier() {
+    String nextIdentifier() {
         return name + "-" + txCounter.getAndIncrement();
     }
 
-    private static void warnDebugContext(final AbstractDOMStoreTransaction<?> transaction) {
-        final Throwable ctx = transaction.getDebugContext();
-        if (ctx != null) {
-            LOG.warn("Transaction {} has been allocated in the following context", transaction.getIdentifier(), ctx);
-        }
+    void validate(final DataTreeModification modification) throws DataValidationFailedException {
+        dataTree.validate(modification);
     }
 
-    private final class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
-        private final SnapshotBackedWriteTransaction transaction;
-        private final DataTreeModification modification;
-
-        private ResolveDataChangeEventsTask listenerResolver;
-        private DataTreeCandidate candidate;
-
-        public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction, final DataTreeModification modification) {
-            this.transaction = writeTransaction;
-            this.modification = modification;
-        }
-
-        @Override
-        public ListenableFuture<Boolean> canCommit() {
-            try {
-                dataTree.validate(modification);
-                LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
-                return CAN_COMMIT_FUTURE;
-            } catch (ConflictingModificationAppliedException e) {
-                LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
-                        e.getPath());
-                warnDebugContext(transaction);
-                return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
-            } catch (DataValidationFailedException e) {
-                LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
-                        e.getPath(), e);
-                warnDebugContext(transaction);
-
-                // For debugging purposes, allow dumping of the modification. Coupled with the above
-                // precondition log, it should allow us to understand what went on.
-                LOG.trace("Store Tx: {} modifications: {} tree: {}", modification, dataTree);
-
-                return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
-            } catch (Exception e) {
-                LOG.warn("Unexpected failure in validation phase", e);
-                return Futures.immediateFailedFuture(e);
-            }
-        }
-
-        @Override
-        public ListenableFuture<Void> preCommit() {
-            try {
-                candidate = dataTree.prepare(modification);
-                listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
-                return SUCCESSFUL_FUTURE;
-            } catch (Exception e) {
-                LOG.warn("Unexpected failure in pre-commit phase", e);
-                return Futures.immediateFailedFuture(e);
-            }
-        }
-
-        @Override
-        public ListenableFuture<Void> abort() {
-            candidate = null;
-            return SUCCESSFUL_FUTURE;
-        }
-
-        @Override
-        public ListenableFuture<Void> commit() {
-            checkState(candidate != null, "Proposed subtree must be computed");
-
-            /*
-             * The commit has to occur atomically with regard to listener
-             * registrations.
-             */
-            synchronized (InMemoryDOMDataStore.this) {
-                dataTree.commit(candidate);
-                changePublisher.publishChange(candidate);
-                listenerResolver.resolve(dataChangeListenerNotificationManager);
-            }
+    DataTreeCandidate prepare(final DataTreeModification modification) {
+        return dataTree.prepare(modification);
+    }
 
-            return SUCCESSFUL_FUTURE;
-        }
+    synchronized void commit(final DataTreeCandidate candidate) {
+        dataTree.commit(candidate);
+        changePublisher.publishChange(candidate);
+        ResolveDataChangeEventsTask.create(candidate, listenerTree).resolve(dataChangeListenerNotificationManager);
     }
 }
diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreThreePhaseCommitCohort.java
new file mode 100644 (file)
index 0000000..dba71bf
--- /dev/null
@@ -0,0 +1,100 @@
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class InMemoryDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMStoreThreePhaseCommitCohort.class);
+    private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
+    private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
+    private final SnapshotBackedWriteTransaction<String> transaction;
+    private final DataTreeModification modification;
+    private final InMemoryDOMDataStore store;
+    private DataTreeCandidate candidate;
+
+    public InMemoryDOMStoreThreePhaseCommitCohort(final InMemoryDOMDataStore store, final SnapshotBackedWriteTransaction<String> writeTransaction, final DataTreeModification modification) {
+        this.transaction = Preconditions.checkNotNull(writeTransaction);
+        this.modification = Preconditions.checkNotNull(modification);
+        this.store = Preconditions.checkNotNull(store);
+    }
+
+    private static void warnDebugContext(final AbstractDOMStoreTransaction<?> transaction) {
+        final Throwable ctx = transaction.getDebugContext();
+        if (ctx != null) {
+            LOG.warn("Transaction {} has been allocated in the following context", transaction.getIdentifier(), ctx);
+        }
+    }
+
+    @Override
+    public final ListenableFuture<Boolean> canCommit() {
+        try {
+            store.validate(modification);
+            LOG.debug("Store Transaction: {} can be committed", getTransaction().getIdentifier());
+            return CAN_COMMIT_FUTURE;
+        } catch (ConflictingModificationAppliedException e) {
+            LOG.warn("Store Tx: {} Conflicting modification for {}.", getTransaction().getIdentifier(),
+                    e.getPath());
+            warnDebugContext(getTransaction());
+            return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
+        } catch (DataValidationFailedException e) {
+            LOG.warn("Store Tx: {} Data Precondition failed for {}.", getTransaction().getIdentifier(),
+                    e.getPath(), e);
+            warnDebugContext(getTransaction());
+
+            // For debugging purposes, allow dumping of the modification. Coupled with the above
+            // precondition log, it should allow us to understand what went on.
+            LOG.trace("Store Tx: {} modifications: {} tree: {}", modification, store);
+
+            return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
+        } catch (Exception e) {
+            LOG.warn("Unexpected failure in validation phase", e);
+            return Futures.immediateFailedFuture(e);
+        }
+    }
+
+    @Override
+    public final ListenableFuture<Void> preCommit() {
+        try {
+            candidate = store.prepare(modification);
+            return SUCCESSFUL_FUTURE;
+        } catch (Exception e) {
+            LOG.warn("Unexpected failure in pre-commit phase", e);
+            return Futures.immediateFailedFuture(e);
+        }
+    }
+
+    @Override
+    public final ListenableFuture<Void> abort() {
+        candidate = null;
+        return SUCCESSFUL_FUTURE;
+    }
+
+    protected final SnapshotBackedWriteTransaction<String> getTransaction() {
+        return transaction;
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        checkState(candidate != null, "Proposed subtree must be computed");
+
+        /*
+         * The commit has to occur atomically with regard to listener
+         * registrations.
+         */
+        store.commit(candidate);
+        return SUCCESSFUL_FUTURE;
+    }
+}
\ No newline at end of file
index 9de4892..568f883 100644 (file)
@@ -21,12 +21,13 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
@@ -37,7 +38,6 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-
 public class InMemoryDataStoreTest {
 
     private SchemaContext schemaContext;
@@ -268,7 +268,7 @@ public class InMemoryDataStoreTest {
         Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockSnapshot )
         .readNode( Mockito.any( YangInstanceIdentifier.class ) );
 
-        DOMStoreReadTransaction readTx = new SnapshotBackedReadTransaction("1", true, mockSnapshot);
+        DOMStoreReadTransaction readTx = SnapshotBackedTransactions.newReadTransaction("1", true, mockSnapshot);
 
         doReadAndThrowEx( readTx );
     }
@@ -292,14 +292,14 @@ public class InMemoryDataStoreTest {
         Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockModification )
         .readNode( Mockito.any( YangInstanceIdentifier.class ) );
         Mockito.doReturn( mockModification ).when( mockSnapshot ).newModification();
-        TransactionReadyPrototype mockReady = Mockito.mock( TransactionReadyPrototype.class );
-        DOMStoreReadTransaction readTx = new SnapshotBackedReadWriteTransaction("1", false, mockSnapshot, mockReady);
+        @SuppressWarnings("unchecked")
+        TransactionReadyPrototype<String> mockReady = Mockito.mock( TransactionReadyPrototype.class );
+        DOMStoreReadTransaction readTx = SnapshotBackedTransactions.newReadWriteTransaction("1", false, mockSnapshot, mockReady);
 
         doReadAndThrowEx( readTx );
     }
 
-    private void doReadAndThrowEx( final DOMStoreReadTransaction readTx ) throws Throwable {
-
+    private static void doReadAndThrowEx( final DOMStoreReadTransaction readTx ) throws Throwable {
         try {
             readTx.read(TestModel.TEST_PATH).get();
         } catch( ExecutionException e ) {
index e11cac2..10399ff 100644 (file)
@@ -27,6 +27,9 @@ import org.opendaylight.controller.sal.restconf.impl.NormalizedNodeContext;
 import org.opendaylight.controller.sal.restconf.impl.RestconfDocumentedException;
 import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorTag;
 import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorType;
+import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
@@ -84,9 +87,18 @@ public class JsonNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPr
             final JsonReader reader = new JsonReader(new InputStreamReader(entityStream));
             jsonParser.parse(reader);
 
-            final NormalizedNode<?, ?> partialResult = resultHolder.getResult();
+            NormalizedNode<?, ?> partialResult = resultHolder.getResult();
             final NormalizedNode<?, ?> result;
-            if(partialResult instanceof MapNode) {
+
+            // unwrap result from augmentation and choice nodes on PUT
+            if (!isPost()) {
+                while (partialResult instanceof AugmentationNode || partialResult instanceof ChoiceNode) {
+                    final Object childNode = ((DataContainerNode) partialResult).getValue().iterator().next();
+                    partialResult = (NormalizedNode<?, ?>) childNode;
+                }
+            }
+
+            if (partialResult instanceof MapNode) {
                 result = Iterables.getOnlyElement(((MapNode) partialResult).getValue());
             } else {
                 result = partialResult;
index 4257e17..74a9bd2 100644 (file)
@@ -34,6 +34,8 @@ import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorType;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlUtils;
 import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.parser.DomToNormalizedNodeParserFactory;
+import org.opendaylight.yangtools.yang.model.api.AugmentationSchema;
+import org.opendaylight.yangtools.yang.model.api.AugmentationTarget;
 import org.opendaylight.yangtools.yang.model.api.ChoiceCaseNode;
 import org.opendaylight.yangtools.yang.model.api.ChoiceSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
@@ -126,18 +128,25 @@ public class XmlNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPro
         final String docRootElm = doc.getDocumentElement().getLocalName();
         final String schemaNodeName = pathContext.getSchemaNode().getQName().getLocalName();
 
+        // FIXME the factory instance should be cached if the schema context is the same
+        final DomToNormalizedNodeParserFactory parserFactory =
+                DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, pathContext.getSchemaContext());
+
         if (!schemaNodeName.equalsIgnoreCase(docRootElm)) {
             final DataSchemaNode foundSchemaNode = findSchemaNodeOrParentChoiceByName(schemaNode, docRootElm);
             if (foundSchemaNode != null) {
+                if (schemaNode instanceof AugmentationTarget) {
+                    final AugmentationSchema augmentSchemaNode = findCorrespondingAugment(schemaNode, foundSchemaNode);
+                    if (augmentSchemaNode != null) {
+                        return parserFactory.getAugmentationNodeParser().parse(elements, augmentSchemaNode);
+                    }
+                }
                 schemaNode = foundSchemaNode;
             }
         }
 
-        // FIXME the factory instance should be cached if the schema context is the same
-        final DomToNormalizedNodeParserFactory parserFactory =
-                DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, pathContext.getSchemaContext());
-
         NormalizedNode<?, ?> parsed = null;
+
         if(schemaNode instanceof ContainerSchemaNode) {
             return parserFactory.getContainerNodeParser().parse(Collections.singletonList(doc.getDocumentElement()), (ContainerSchemaNode) schemaNode);
         } else if(schemaNode instanceof ListSchemaNode) {
@@ -147,7 +156,6 @@ public class XmlNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPro
             final ChoiceSchemaNode casted = (ChoiceSchemaNode) schemaNode;
             return parserFactory.getChoiceNodeParser().parse(elements, casted);
         }
-
         // FIXME : add another DataSchemaNode extensions e.g. LeafSchemaNode
 
         return parsed;
@@ -175,5 +183,17 @@ public class XmlNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPro
         }
         return null;
     }
+
+    private static AugmentationSchema findCorrespondingAugment(final DataSchemaNode parent, final DataSchemaNode child) {
+        if (parent instanceof AugmentationTarget && !((parent instanceof ChoiceCaseNode) || (parent instanceof ChoiceSchemaNode))) {
+            for (AugmentationSchema augmentation : ((AugmentationTarget) parent).getAvailableAugmentations()) {
+                DataSchemaNode childInAugmentation = augmentation.getDataChildByName(child.getQName());
+                if (childInAugmentation != null) {
+                    return augmentation;
+                }
+            }
+        }
+        return null;
+    }
 }
 
index 3379588..8e88be6 100644 (file)
@@ -828,12 +828,13 @@ public class RestconfImpl implements RestconfService {
             throw new RestconfDocumentedException("Input is required.", ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE);
         }
 
-        final URI payloadNS = payload.getData().getNodeType().getNamespace();
-        if (payloadNS == null) {
-            throw new RestconfDocumentedException(
-                    "Data has bad format. Root element node must have namespace (XML format) or module name(JSON format)",
-                    ErrorType.PROTOCOL, ErrorTag.UNKNOWN_NAMESPACE);
-        }
+        // FIXME: move this to parsing stage (we can have augmentation nodes here which do not have namespace)
+//        final URI payloadNS = payload.getData().getNodeType().getNamespace();
+//        if (payloadNS == null) {
+//            throw new RestconfDocumentedException(
+//                    "Data has bad format. Root element node must have namespace (XML format) or module name(JSON format)",
+//                    ErrorType.PROTOCOL, ErrorTag.UNKNOWN_NAMESPACE);
+//        }
 
         final DOMMountPoint mountPoint = payload.getInstanceIdentifierContext().getMountPoint();
         final InstanceIdentifierContext<?> iiWithData = payload.getInstanceIdentifierContext();

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.