Merge "Bug 1764 - added service for pushing default node configuration"
authorTony Tkacik <ttkacik@cisco.com>
Fri, 17 Oct 2014 10:56:13 +0000 (10:56 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 17 Oct 2014 10:56:13 +0000 (10:56 +0000)
19 files changed:
opendaylight/commons/opendaylight/pom.xml
opendaylight/distribution/opendaylight-karaf/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChainedTransactionCommitImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DOMStoreTransactionChainImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/NormalizedNodeXmlBodyWriter.java

index 77fad0a9f75ba76ab57863e7dcb3f7fa43b3e0bb..ffb9ef746da4118982b6ed61a715024491a00a72 100644 (file)
         <type>xml</type>
         <scope>runtime</scope>
       </dependency>
+    <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>features-netconf-connector</artifactId>
+        <version>${mdsal.version}</version>
+        <classifier>features</classifier>
+        <type>xml</type>
+        <scope>runtime</scope>
+    </dependency>
       <!-- JMH Benchmark dependencies -->
       <dependency>
         <groupId>org.openjdk.jmh</groupId>
index 1612f3434382655aa67bf0c732e9ca4eaacc4258..3a2f4b058020f86c3f20faaa5f5e61c4a558dfbb 100644 (file)
       <type>xml</type>
       <scope>runtime</scope>
     </dependency>
+    <!-- Netconf connector features. When this is included, users can test the netconf connector using netconf-testtool -->
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>features-netconf-connector</artifactId>
+      <classifier>features</classifier>
+      <type>xml</type>
+    </dependency>
+
   </dependencies>
 
   <build>
index f1c0df4c3ad2a336a6aa8edc7282aa399f160c13..a498826e98977d8b3fba89348e3284f0a4781ef3 100644 (file)
@@ -10,10 +10,8 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.Props;
 import akka.japi.Creator;
-
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
@@ -24,7 +22,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public class DataChangeListener extends AbstractUntypedActor {
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
-    private volatile boolean notificationsEnabled = false;
+    private boolean notificationsEnabled = false;
 
     public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier,
                                                       NormalizedNode<?, ?>> listener) {
@@ -55,7 +53,9 @@ public class DataChangeListener extends AbstractUntypedActor {
             change = reply.getChange();
         this.listener.onDataChanged(change);
 
-        if(getSender() != null){
+        // It seems the sender is never null but it doesn't hurt to check. If the caller passes in
+        // a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor.
+        if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
             getSender().tell(new DataChangedReply(), getSelf());
         }
     }
index a1b6b9252eb452082f101fe1bb748fb2d6b28536..1a0ee8c2fa6866dfdab97cfeab7087ce37d52ca6 100644 (file)
@@ -8,10 +8,9 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-
 import com.google.common.base.Preconditions;
-
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -33,6 +32,6 @@ public class DataChangeListenerProxy implements AsyncDataChangeListener<YangInst
 
     @Override public void onDataChanged(
         AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
-        dataChangeListenerActor.tell(new DataChanged(schemaContext, change), null);
+        dataChangeListenerActor.tell(new DataChanged(schemaContext, change), ActorRef.noSender());
     }
 }
index f5ca6e3c5aa2334eb26c52408dd7279f08e33d3a..b16ec0ac9f919b52855aaa7c608801b6a7ce2d01 100644 (file)
@@ -13,12 +13,10 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.ReceiveTimeout;
 import akka.japi.Creator;
-
+import akka.serialization.Serialization;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-
-
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
@@ -207,8 +205,8 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
         DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
         ActorRef cohortActor = getContext().actorOf(
             ThreePhaseCommitCohort.props(cohort, shardActor, modification, shardStats), "cohort");
-        getSender()
-        .tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf());
+        getSender().tell(new ReadyTransactionReply(
+            Serialization.serializedActorPath(cohortActor)).toSerializable(), getSelf());
 
     }
 
index 515be372e8c76b7008b7356a666bbf94e19a1972..6e7669695eb2e6317d00b6c69cc33861bb24fa48 100644 (file)
@@ -8,16 +8,13 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -30,7 +27,6 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.concurrent.Future;
 import scala.runtime.AbstractFunction1;
 
@@ -45,29 +41,29 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
 
     private final ActorContext actorContext;
-    private final List<Future<ActorPath>> cohortPathFutures;
-    private volatile List<ActorPath> cohortPaths;
+    private final List<Future<ActorSelection>> cohortFutures;
+    private volatile List<ActorSelection> cohorts;
     private final String transactionId;
 
     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
-            List<Future<ActorPath>> cohortPathFutures, String transactionId) {
+            List<Future<ActorSelection>> cohortFutures, String transactionId) {
         this.actorContext = actorContext;
-        this.cohortPathFutures = cohortPathFutures;
+        this.cohortFutures = cohortFutures;
         this.transactionId = transactionId;
     }
 
-    private Future<Void> buildCohortPathsList() {
+    private Future<Void> buildCohortList() {
 
-        Future<Iterable<ActorPath>> combinedFutures = Futures.sequence(cohortPathFutures,
+        Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
                 actorContext.getActorSystem().dispatcher());
 
-        return combinedFutures.transform(new AbstractFunction1<Iterable<ActorPath>, Void>() {
+        return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
             @Override
-            public Void apply(Iterable<ActorPath> paths) {
-                cohortPaths = Lists.newArrayList(paths);
+            public Void apply(Iterable<ActorSelection> actorSelections) {
+                cohorts = Lists.newArrayList(actorSelections);
                 if(LOG.isDebugEnabled()) {
                     LOG.debug("Tx {} successfully built cohort path list: {}",
-                        transactionId, cohortPaths);
+                        transactionId, cohorts);
                 }
                 return null;
             }
@@ -87,12 +83,12 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
         // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
         // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
 
-        buildCohortPathsList().onComplete(new OnComplete<Void>() {
+        buildCohortList().onComplete(new OnComplete<Void>() {
             @Override
             public void onComplete(Throwable failure, Void notUsed) throws Throwable {
                 if(failure != null) {
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure);
+                        LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
                     }
                     returnFuture.setException(failure);
                 } else {
@@ -150,12 +146,11 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     }
 
     private Future<Iterable<Object>> invokeCohorts(Object message) {
-        List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
-        for(ActorPath actorPath : cohortPaths) {
+        List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
+        for(ActorSelection cohort : cohorts) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath);
+                LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
             }
-            ActorSelection cohort = actorContext.actorSelection(actorPath);
 
             futureList.add(actorContext.executeOperationAsync(cohort, message));
         }
@@ -198,11 +193,11 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
         // The cohort actor list should already be built at this point by the canCommit phase but,
         // if not for some reason, we'll try to build it here.
 
-        if(cohortPaths != null) {
+        if(cohorts != null) {
             finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
                     returnFuture);
         } else {
-            buildCohortPathsList().onComplete(new OnComplete<Void>() {
+            buildCohortList().onComplete(new OnComplete<Void>() {
                 @Override
                 public void onComplete(Throwable failure, Void notUsed) throws Throwable {
                     if(failure != null) {
@@ -280,7 +275,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     }
 
     @VisibleForTesting
-    List<Future<ActorPath>> getCohortPathFutures() {
-        return Collections.unmodifiableList(cohortPathFutures);
+    List<Future<ActorSelection>> getCohortFutures() {
+        return Collections.unmodifiableList(cohortFutures);
     }
 }
index b74c89d727c2a1761a7c993272abcab1e4ad9999..b467ee4ddbf56c456c2e9f0f62381eefa173e31d 100644 (file)
@@ -8,7 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorPath;
+import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -28,7 +28,7 @@ import java.util.List;
 public class TransactionChainProxy implements DOMStoreTransactionChain{
     private final ActorContext actorContext;
     private final String transactionChainId;
-    private volatile List<Future<ActorPath>> cohortPathFutures = Collections.emptyList();
+    private volatile List<Future<ActorSelection>> cohortFutures = Collections.emptyList();
 
     public TransactionChainProxy(ActorContext actorContext) {
         this.actorContext = actorContext;
@@ -63,14 +63,14 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{
         return transactionChainId;
     }
 
-    public void onTransactionReady(List<Future<ActorPath>> cohortPathFutures){
-        this.cohortPathFutures = cohortPathFutures;
+    public void onTransactionReady(List<Future<ActorSelection>> cohortFutures){
+        this.cohortFutures = cohortFutures;
     }
 
     public void waitTillCurrentTransactionReady(){
         try {
             Await.result(Futures
-                .sequence(this.cohortPathFutures, actorContext.getActorSystem().dispatcher()),
+                .sequence(this.cohortFutures, actorContext.getActorSystem().dispatcher()),
                 actorContext.getOperationDuration());
         } catch (Exception e) {
             throw new IllegalStateException("Failed when waiting for transaction on a chain to become ready", e);
index 19d9a66a528eb417d5bff41948641b68e9c8e481..ec198510d3586f88dee40d04f9294a3fb370a9ae 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
 
@@ -315,7 +314,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
                 remoteTransactionPaths.size());
         }
-        List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
 
         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
 
@@ -323,14 +322,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 LOG.debug("Tx {} Readying transaction for shard {}", identifier,
                     transactionContext.getShardName());
             }
-            cohortPathFutures.add(transactionContext.readyTransaction());
+            cohortFutures.add(transactionContext.readyTransaction());
         }
 
         if(transactionChainProxy != null){
-            transactionChainProxy.onTransactionReady(cohortPathFutures);
+            transactionChainProxy.onTransactionReady(cohortFutures);
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
                 identifier.toString());
     }
 
@@ -439,7 +438,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         void closeTransaction();
 
-        Future<ActorPath> readyTransaction();
+        Future<ActorSelection> readyTransaction();
 
         void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
 
@@ -499,10 +498,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             return actor;
         }
 
-        private String getResolvedCohortPath(String cohortPath) {
-            return actorContext.resolvePath(actorPath, cohortPath);
-        }
-
         @Override
         public void closeTransaction() {
             if(LOG.isDebugEnabled()) {
@@ -512,7 +507,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
 
         @Override
-        public Future<ActorPath> readyTransaction() {
+        public Future<ActorSelection> readyTransaction() {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
                     identifier, recordedOperationFutures.size());
@@ -538,9 +533,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Transform the combined Future into a Future that returns the cohort actor path from
             // the ReadyTransactionReply. That's the end result of the ready operation.
 
-            return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
+            return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorSelection>() {
                 @Override
-                public ActorPath apply(Iterable<Object> notUsed) {
+                public ActorSelection apply(Iterable<Object> notUsed) {
                     if(LOG.isDebugEnabled()) {
                         LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
                             identifier);
@@ -557,16 +552,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                     if(serializedReadyReply.getClass().equals(
                                                      ReadyTransactionReply.SERIALIZABLE_CLASS)) {
                         ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
-                                actorContext.getActorSystem(), serializedReadyReply);
-
-                        String resolvedCohortPath = getResolvedCohortPath(
-                                reply.getCohortPath().toString());
+                               serializedReadyReply);
 
-                        if(LOG.isDebugEnabled()) {
-                            LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
-                                identifier, resolvedCohortPath);
-                        }
-                        return actorContext.actorFor(resolvedCohortPath);
+                        return actorContext.actorSelection(reply.getCohortPath());
                     } else {
                         // Throwing an exception here will fail the Future.
 
@@ -805,7 +793,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
 
         @Override
-        public Future<ActorPath> readyTransaction() {
+        public Future<ActorSelection> readyTransaction() {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} readyTransaction called", identifier);
             }
index 5273dc247925608326f9756e5dcbb061b106e5e2..59dd6db06e81c619f81edafe73a664d0e14189ed 100644 (file)
@@ -8,32 +8,32 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import akka.actor.ActorPath;
-import akka.actor.ActorSystem;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class ReadyTransactionReply implements SerializableMessage {
   public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.ReadyTransactionReply.class;
-  private final ActorPath cohortPath;
+  private final String cohortPath;
 
-  public ReadyTransactionReply(ActorPath cohortPath) {
+  public ReadyTransactionReply(String cohortPath) {
 
     this.cohortPath = cohortPath;
   }
 
-  public ActorPath getCohortPath() {
+  public String getCohortPath() {
     return cohortPath;
   }
 
   @Override
   public ShardTransactionMessages.ReadyTransactionReply toSerializable() {
     return ShardTransactionMessages.ReadyTransactionReply.newBuilder()
-        .setActorPath(cohortPath.toString()).build();
+        .setActorPath(cohortPath).build();
   }
 
-  public static ReadyTransactionReply fromSerializable(ActorSystem actorSystem,Object serializable){
-    ShardTransactionMessages.ReadyTransactionReply o = (ShardTransactionMessages.ReadyTransactionReply) serializable;
-    return new ReadyTransactionReply(
-        actorSystem.actorFor(o.getActorPath()).path());
+  public static ReadyTransactionReply fromSerializable(Object serializable) {
+      ShardTransactionMessages.ReadyTransactionReply o =
+          (ShardTransactionMessages.ReadyTransactionReply) serializable;
+
+      return new ReadyTransactionReply(o.getActorPath());
+
   }
 }
index 44f4ef77d7ff057dbea32ebfd9d3404f5b30f862..d8af09c86b6ec01479f038104fdad350170f4991 100644 (file)
@@ -237,34 +237,6 @@ public class ActorContext {
         actorSystem.shutdown();
     }
 
-    /**
-     * @deprecated Need to stop using this method. There are ways to send a
-     * remote ActorRef as a string which should be used instead of this hack
-     *
-     * @param primaryPath
-     * @param localPathOfRemoteActor
-     * @return
-     */
-    @Deprecated
-    public String resolvePath(final String primaryPath,
-        final String localPathOfRemoteActor) {
-        StringBuilder builder = new StringBuilder();
-        String[] primaryPathElements = primaryPath.split("/");
-        builder.append(primaryPathElements[0]).append("//")
-            .append(primaryPathElements[1]).append(primaryPathElements[2]);
-        String[] remotePathElements = localPathOfRemoteActor.split("/");
-        for (int i = 3; i < remotePathElements.length; i++) {
-            builder.append("/").append(remotePathElements[i]);
-        }
-
-        return builder.toString();
-
-    }
-
-    public ActorPath actorFor(String path){
-        return actorSystem.actorFor(path).path();
-    }
-
     public String getCurrentMemberName(){
         return clusterWrapper.getCurrentMemberName();
     }
index 7b826302f588ad76be85f54c9e17bcc1c8dff56a..a718ca7e4c31ef16d6be1fb26e839f3f551450e2 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
@@ -134,11 +133,10 @@ public class BasicIntegrationTest extends AbstractActorTest {
                             @Override
                             protected ActorSelection match(Object in) {
                                 if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
-                                    ActorPath cohortPath =
-                                        ReadyTransactionReply.fromSerializable(getSystem(),in)
+                                    String cohortPath =
+                                        ReadyTransactionReply.fromSerializable(in)
                                             .getCohortPath();
-                                    return getSystem()
-                                        .actorSelection(cohortPath);
+                                    return getSystem().actorSelection(cohortPath);
                                 } else {
                                     throw noMatch();
                                 }
index b39cc84ceff6943d045c7a609afcbb996e212b31..101a73782b498943acb14b0cb03be2dde5df1f87 100644 (file)
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import akka.actor.DeadLetter;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 public class DataChangeListenerTest extends AbstractActorTest {
 
-    private static class MockDataChangedEvent implements AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
-       Map<YangInstanceIdentifier,NormalizedNode<?,?>> createdData = new HashMap<>();
-       Map<YangInstanceIdentifier,NormalizedNode<?,?>> updatedData = new HashMap<>();
-       Map<YangInstanceIdentifier,NormalizedNode<?,?>> originalData = new HashMap<>();
-
-
-
-        @Override
-        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
-            createdData.put(CompositeModel.FAMILY_PATH, CompositeModel.createFamily());
-            return createdData;
-        }
-
-        @Override
-        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
-            updatedData.put(CompositeModel.FAMILY_PATH, CompositeModel.createFamily());
-            return updatedData;
-
-        }
-
-        @Override
-        public Set<YangInstanceIdentifier> getRemovedPaths() {
-               Set<YangInstanceIdentifier>ids = new HashSet();
-               ids.add( CompositeModel.TEST_PATH);
-              return ids;
-        }
-
-        @Override
-        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getOriginalData() {
-          originalData.put(CompositeModel.FAMILY_PATH, CompositeModel.createFamily());
-          return originalData;
-        }
-
-        @Override public NormalizedNode<?, ?> getOriginalSubtree() {
-
-
-          return originalData.put(CompositeModel.FAMILY_PATH, CompositeModel.createFamily());
-        }
-
-        @Override public NormalizedNode<?, ?> getUpdatedSubtree() {
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Test
+    public void testDataChangedWhenNotificationsAreEnabled(){
+        new JavaTestKit(getSystem()) {{
+            final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
+            final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
+            final Props props = DataChangeListener.props(mockListener);
+            final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsEnabled");
 
-          //fixme: need to have some valid data here
-          return originalData.put(CompositeModel.FAMILY_PATH, CompositeModel.createFamily());
-        }
-    }
+            // Let the DataChangeListener know that notifications should be enabled
+            subject.tell(new EnableNotification(true), getRef());
 
-    private class MockDataChangeListener implements AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
-        private boolean gotIt = false;
-        private   AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
+            subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
+                    getRef());
 
-        @Override public void onDataChanged(
-            AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
-            gotIt = true;this.change=change;
-        }
+            expectMsgClass(DataChangedReply.class);
 
-        public boolean gotIt() {
-            return gotIt;
-        }
-        public  AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> getChange(){
-          return change;
-        }
+            Mockito.verify(mockListener).onDataChanged(mockChangeEvent);
+        }};
     }
 
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     @Test
-    public void testDataChangedWhenNotificationsAreEnabled(){
+    public void testDataChangedWhenNotificationsAreDisabled(){
         new JavaTestKit(getSystem()) {{
-            final MockDataChangeListener listener = new MockDataChangeListener();
-            final Props props = DataChangeListener.props(listener);
+            final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
+            final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
+            final Props props = DataChangeListener.props(mockListener);
             final ActorRef subject =
-                getSystem().actorOf(props, "testDataChangedNotificationsEnabled");
+                getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
+
+            subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
+                    getRef());
 
             new Within(duration("1 seconds")) {
                 @Override
                 protected void run() {
-
-                    // Let the DataChangeListener know that notifications should
-                    // be enabled
-                    subject.tell(new EnableNotification(true), getRef());
-
-                    subject.tell(
-                        new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()),
-                        getRef());
-
-                    final Boolean out = new ExpectMsg<Boolean>(duration("800 millis"), "dataChanged") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected Boolean match(Object in) {
-                            if (in != null && in.getClass().equals(DataChangedReply.class)) {
-
-                                return true;
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
-
-                    assertTrue(out);
-                    assertTrue(listener.gotIt());
-                    assertNotNull(listener.getChange().getCreatedData());
-
                     expectNoMsg();
+
+                    Mockito.verify(mockListener, Mockito.never()).onDataChanged(
+                            Mockito.any(AsyncDataChangeEvent.class));
                 }
             };
         }};
     }
 
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     @Test
-    public void testDataChangedWhenNotificationsAreDisabled(){
+    public void testDataChangedWithNoSender(){
         new JavaTestKit(getSystem()) {{
-            final MockDataChangeListener listener = new MockDataChangeListener();
-            final Props props = DataChangeListener.props(listener);
-            final ActorRef subject =
-                getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
+            final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
+            final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
+            final Props props = DataChangeListener.props(mockListener);
+            final ActorRef subject = getSystem().actorOf(props, "testDataChangedWithNoSender");
 
+            // Let the DataChangeListener know that notifications should be enabled
+            subject.tell(new EnableNotification(true), ActorRef.noSender());
+
+            subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
+                    ActorRef.noSender());
+
+            getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
             new Within(duration("1 seconds")) {
                 @Override
                 protected void run() {
-
-                    subject.tell(
-                        new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()),
-                        getRef());
-
                     expectNoMsg();
                 }
             };
index 3c9d857fe81db981d341233e2c7249f06273bee6..b7ac371812df897c5c86d7970cb309049aa2810a 100644 (file)
@@ -4,19 +4,8 @@ import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.dispatch.Futures;
-
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.isA;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.times;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -33,13 +22,20 @@ import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-
 import scala.concurrent.Future;
 
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
     @SuppressWarnings("serial")
@@ -56,28 +52,28 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         doReturn(getSystem()).when(actorContext).getActorSystem();
     }
 
-    private Future<ActorPath> newCohortPath() {
+    private Future<ActorSelection> newCohort() {
         ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
-        doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
-        return Futures.successful(path);
+        ActorSelection actorSelection = getSystem().actorSelection(path);
+        return Futures.successful(actorSelection);
     }
 
     private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
-        List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
         for(int i = 1; i <= nCohorts; i++) {
-            cohortPathFutures.add(newCohortPath());
+            cohortFutures.add(newCohort());
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
     }
 
     private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
             throws Exception {
-        List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
-        cohortPathFutures.add(newCohortPath());
-        cohortPathFutures.add(Futures.<ActorPath>failed(new TestException()));
+        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
+        cohortFutures.add(newCohort());
+        cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
     }
 
     private void setupMockActorContext(Class<?> requestType, Object... responses) {
index bdcca42d15c45553d6feb8ff6cc9bd7a792795dc..592337f93f2db9cce227fb9db1f76c1dad06d508 100644 (file)
@@ -1,12 +1,12 @@
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorPath;
+import com.google.common.util.concurrent.CheckedFuture;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.dispatch.Futures;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
@@ -182,7 +182,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         return argThat(matcher);
     }
 
-    private Future<Object> readyTxReply(ActorPath path) {
+    private Future<Object> readyTxReply(String path) {
         return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
     }
 
@@ -227,12 +227,6 @@ public class TransactionProxyTest extends AbstractActorTest {
         doReturn(createTransactionReply(actorRef)).when(mockActorContext).
                 executeOperation(eq(getSystem().actorSelection(actorRef.path())),
                         eqCreateTransaction(memberName, type));
-
-        doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
-                anyString(), eq(actorRef.path().toString()));
-
-        doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
-
         return actorRef;
     }
 
@@ -631,19 +625,19 @@ public class TransactionProxyTest extends AbstractActorTest {
                 DeleteDataReply.SERIALIZABLE_CLASS);
     }
 
-    private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
-            Object... expReplies) throws Exception {
+    private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
+        Object... expReplies) throws Exception {
         assertEquals("getReadyOperationFutures size", expReplies.length,
-                proxy.getCohortPathFutures().size());
+                proxy.getCohortFutures().size());
 
         int i = 0;
-        for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
+        for( Future<ActorSelection> future: proxy.getCohortFutures()) {
             assertNotNull("Ready operation Future is null", future);
 
             Object expReply = expReplies[i++];
-            if(expReply instanceof ActorPath) {
-                ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
-                assertEquals("Cohort actor path", expReply, actual);
+            if(expReply instanceof ActorSelection) {
+                ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+                assertEquals("Cohort actor path", (ActorSelection) expReply, actual);
             } else {
                 // Expecting exception.
                 try {
@@ -669,7 +663,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
-        doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeOperationAsync(
+        doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@@ -688,7 +682,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 WriteDataReply.SERIALIZABLE_CLASS);
 
-        verifyCohortPathFutures(proxy, actorRef.path());
+        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
     }
 
     @SuppressWarnings("unchecked")
@@ -704,7 +698,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
-        doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeOperationAsync(
+        doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@@ -723,7 +717,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
 
-        verifyCohortPathFutures(proxy, TestException.class);
+        verifyCohortFutures(proxy, TestException.class);
     }
 
     @SuppressWarnings("unchecked")
@@ -754,7 +748,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 MergeDataReply.SERIALIZABLE_CLASS);
 
-        verifyCohortPathFutures(proxy, TestException.class);
+        verifyCohortFutures(proxy, TestException.class);
     }
 
     @Test
@@ -781,7 +775,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
+        verifyCohortFutures(proxy, PrimaryNotFoundException.class);
     }
 
     @SuppressWarnings("unchecked")
@@ -809,7 +803,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        verifyCohortPathFutures(proxy, IllegalArgumentException.class);
+        verifyCohortFutures(proxy, IllegalArgumentException.class);
     }
 
     @Test
index fa6d0b060f222f28ec723f1b4bc52b2f1ade3aa4..8426b03a37de93242b506df7fd9d4368d24a169f 100644 (file)
@@ -2,12 +2,12 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
+
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
@@ -26,40 +26,6 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 public class ActorContextTest extends AbstractActorTest{
-    @Test
-    public void testResolvePathForRemoteActor(){
-        ActorContext actorContext =
-            new ActorContext(mock(ActorSystem.class), mock(ActorRef.class),mock(
-                ClusterWrapper.class),
-                mock(Configuration.class));
-
-        String actual = actorContext.resolvePath(
-            "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
-            "akka://system/user/shardmanager/shard/transaction");
-
-        String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
-
-        assertEquals(expected, actual);
-    }
-
-    @Test
-    public void testResolvePathForLocalActor(){
-        ActorContext actorContext =
-            new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
-                mock(Configuration.class));
-
-        String actual = actorContext.resolvePath(
-            "akka://system/user/shardmanager/shard",
-            "akka://system/user/shardmanager/shard/transaction");
-
-        String expected = "akka://system/user/shardmanager/shard/transaction";
-
-        assertEquals(expected, actual);
-
-        System.out.println(actorContext
-            .actorFor("akka://system/user/shardmanager/shard/transaction"));
-    }
-
 
     private static class MockShardManager extends UntypedActor {
 
diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChainedTransactionCommitImpl.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChainedTransactionCommitImpl.java
new file mode 100644 (file)
index 0000000..5b0f739
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+final class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
+    private final SnapshotBackedWriteTransaction transaction;
+    private final DOMStoreThreePhaseCommitCohort delegate;
+    private final DOMStoreTransactionChainImpl txChain;
+
+    protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
+            final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
+        this.transaction = Preconditions.checkNotNull(transaction);
+        this.delegate = Preconditions.checkNotNull(delegate);
+        this.txChain = Preconditions.checkNotNull(txChain);
+    }
+
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
+        return delegate.canCommit();
+    }
+
+    @Override
+    public ListenableFuture<Void> preCommit() {
+        return delegate.preCommit();
+    }
+
+    @Override
+    public ListenableFuture<Void> abort() {
+        return delegate.abort();
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        ListenableFuture<Void> commitFuture = delegate.commit();
+        Futures.addCallback(commitFuture, new FutureCallback<Void>() {
+            @Override
+            public void onFailure(final Throwable t) {
+                txChain.onTransactionFailed(transaction, t);
+            }
+
+            @Override
+            public void onSuccess(final Void result) {
+                txChain.onTransactionCommited(transaction);
+            }
+        });
+        return commitFuture;
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DOMStoreTransactionChainImpl.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DOMStoreTransactionChainImpl.java
new file mode 100644 (file)
index 0000000..3f731cf
--- /dev/null
@@ -0,0 +1,224 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DOMStoreTransactionChainImpl extends TransactionReadyPrototype implements DOMStoreTransactionChain {
+    private static abstract class State {
+        /**
+         * Allocate a new snapshot.
+         *
+         * @return A new snapshot
+         */
+        protected abstract DataTreeSnapshot getSnapshot();
+    }
+
+    private static final class Idle extends State {
+        private final InMemoryDOMDataStore store;
+
+        Idle(final InMemoryDOMDataStore store) {
+            this.store = Preconditions.checkNotNull(store);
+        }
+
+        @Override
+        protected DataTreeSnapshot getSnapshot() {
+            return store.takeSnapshot();
+        }
+    }
+
+    /**
+     * We have a transaction out there.
+     */
+    private static final class Allocated extends State {
+        private static final AtomicReferenceFieldUpdater<Allocated, DataTreeSnapshot> SNAPSHOT_UPDATER =
+                AtomicReferenceFieldUpdater.newUpdater(Allocated.class, DataTreeSnapshot.class, "snapshot");
+        private final DOMStoreWriteTransaction transaction;
+        private volatile DataTreeSnapshot snapshot;
+
+        Allocated(final DOMStoreWriteTransaction transaction) {
+            this.transaction = Preconditions.checkNotNull(transaction);
+        }
+
+        public DOMStoreWriteTransaction getTransaction() {
+            return transaction;
+        }
+
+        @Override
+        protected DataTreeSnapshot getSnapshot() {
+            final DataTreeSnapshot ret = snapshot;
+            Preconditions.checkState(ret != null, "Previous transaction %s is not ready yet", transaction.getIdentifier());
+            return ret;
+        }
+
+        void setSnapshot(final DataTreeSnapshot snapshot) {
+            final boolean success = SNAPSHOT_UPDATER.compareAndSet(this, null, snapshot);
+            Preconditions.checkState(success, "Transaction %s has already been marked as ready", transaction.getIdentifier());
+        }
+    }
+
+    /**
+     * Chain is logically shut down, no further allocation allowed.
+     */
+    private static final class Shutdown extends State {
+        private final String message;
+
+        Shutdown(final String message) {
+            this.message = Preconditions.checkNotNull(message);
+        }
+
+        @Override
+        protected DataTreeSnapshot getSnapshot() {
+            throw new IllegalStateException(message);
+        }
+    }
+
+    private static final AtomicReferenceFieldUpdater<DOMStoreTransactionChainImpl, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(DOMStoreTransactionChainImpl.class, State.class, "state");
+    private static final Logger LOG = LoggerFactory.getLogger(DOMStoreTransactionChainImpl.class);
+    private static final Shutdown CLOSED = new Shutdown("Transaction chain is closed");
+    private static final Shutdown FAILED = new Shutdown("Transaction chain has failed");
+    private final InMemoryDOMDataStore store;
+    private final Idle idleState;
+    private volatile State state;
+
+    DOMStoreTransactionChainImpl(final InMemoryDOMDataStore store) {
+        this.store = Preconditions.checkNotNull(store);
+        idleState = new Idle(store);
+        state = idleState;
+    }
+
+    private Entry<State, DataTreeSnapshot> getSnapshot() {
+        final State localState = state;
+        return new SimpleEntry<>(localState, localState.getSnapshot());
+    }
+
+    private boolean recordTransaction(final State expected, final DOMStoreWriteTransaction transaction) {
+        final State state = new Allocated(transaction);
+        return STATE_UPDATER.compareAndSet(this, expected, state);
+    }
+
+    @Override
+    public DOMStoreReadTransaction newReadOnlyTransaction() {
+        final Entry<State, DataTreeSnapshot> entry = getSnapshot();
+        return new SnapshotBackedReadTransaction(store.nextIdentifier(), store.getDebugTransactions(), entry.getValue());
+    }
+
+    @Override
+    public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        Entry<State, DataTreeSnapshot> entry;
+        DOMStoreReadWriteTransaction ret;
+
+        do {
+            entry = getSnapshot();
+            ret = new SnapshotBackedReadWriteTransaction(store.nextIdentifier(),
+                store.getDebugTransactions(), entry.getValue(), this);
+        } while (!recordTransaction(entry.getKey(), ret));
+
+        return ret;
+    }
+
+    @Override
+    public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        Entry<State, DataTreeSnapshot> entry;
+        DOMStoreWriteTransaction ret;
+
+        do {
+            entry = getSnapshot();
+            ret = new SnapshotBackedWriteTransaction(store.nextIdentifier(),
+                store.getDebugTransactions(), entry.getValue(), this);
+        } while (!recordTransaction(entry.getKey(), ret));
+
+        return ret;
+    }
+
+    @Override
+    protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
+        final State localState = state;
+        if (localState instanceof Allocated) {
+            final Allocated allocated = (Allocated)localState;
+            if (allocated.getTransaction().equals(tx)) {
+                final boolean success = STATE_UPDATER.compareAndSet(this, localState, idleState);
+                if (!success) {
+                    LOG.info("State already transitioned from {} to {}", localState, state);
+                }
+            }
+        }
+    }
+
+    @Override
+    protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
+        final State localState = state;
+
+        if (localState instanceof Allocated) {
+            final Allocated allocated = (Allocated)localState;
+            final DOMStoreWriteTransaction transaction = allocated.getTransaction();
+            Preconditions.checkState(tx.equals(transaction), "Mis-ordered ready transaction %s last allocated was %s", tx, transaction);
+            allocated.setSnapshot(tree);
+        } else {
+            LOG.debug("Ignoring transaction {} readiness due to state {}", tx, localState);
+        }
+
+        return new ChainedTransactionCommitImpl(tx, store.transactionReady(tx, tree), this);
+    }
+
+    @Override
+    public void close() {
+        final State localState = state;
+
+        do {
+            Preconditions.checkState(!CLOSED.equals(localState), "Transaction chain {} has been closed", this);
+
+            if (FAILED.equals(localState)) {
+                LOG.debug("Ignoring user close in failed state");
+                return;
+            }
+        } while (!STATE_UPDATER.compareAndSet(this, localState, CLOSED));
+    }
+
+    void onTransactionFailed(final SnapshotBackedWriteTransaction transaction, final Throwable t) {
+        LOG.debug("Transaction chain {} failed on transaction {}", this, transaction, t);
+        state = FAILED;
+    }
+
+    void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
+        // If the committed transaction was the one we allocated last,
+        // we clear it and the ready snapshot, so the next transaction
+        // allocated refers to the data tree directly.
+        final State localState = state;
+
+        if (!(localState instanceof Allocated)) {
+            LOG.debug("Ignoring successful transaction {} in state {}", transaction, localState);
+            return;
+        }
+
+        final Allocated allocated = (Allocated)localState;
+        final DOMStoreWriteTransaction tx = allocated.getTransaction();
+        if (!tx.equals(transaction)) {
+            LOG.debug("Ignoring non-latest successful transaction {} in state {}", transaction, allocated);
+            return;
+        }
+
+        if (!STATE_UPDATER.compareAndSet(this, localState, idleState)) {
+            LOG.debug("Transaction chain {} has already transitioned from {} to {}, not making it idle", this, localState, state);
+        }
+    }
+}
\ No newline at end of file
index 74fa73afb92f869f7cb2e945a625d489b71e71c2..213f60e951cc41794864fcb1fd813a1958469728 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.md.sal.dom.store.impl;
 import static com.google.common.base.Preconditions.checkState;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -18,7 +17,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
@@ -142,7 +140,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
 
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
-        return new DOMStoreTransactionChainImpl();
+        return new DOMStoreTransactionChainImpl(this);
     }
 
     @Override
@@ -164,10 +162,14 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
         }
     }
 
-    boolean getDebugTransactions() {
+    public final boolean getDebugTransactions() {
         return debugTransactions;
     }
 
+    final DataTreeSnapshot takeSnapshot() {
+        return dataTree.takeSnapshot();
+    }
+
     @Override
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
             final YangInstanceIdentifier path, final L listener, final DataChangeScope scope) {
@@ -219,156 +221,11 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
         return new ThreePhaseCommitImpl(tx, tree);
     }
 
-    private Object nextIdentifier() {
+    Object nextIdentifier() {
         return name + "-" + txCounter.getAndIncrement();
     }
 
-    private class DOMStoreTransactionChainImpl extends TransactionReadyPrototype implements DOMStoreTransactionChain {
-        @GuardedBy("this")
-        private SnapshotBackedWriteTransaction allocatedTransaction;
-        @GuardedBy("this")
-        private DataTreeSnapshot readySnapshot;
-        @GuardedBy("this")
-        private boolean chainFailed = false;
-
-        @GuardedBy("this")
-        private void checkFailed() {
-            Preconditions.checkState(!chainFailed, "Transaction chain is failed.");
-        }
-
-        @GuardedBy("this")
-        private DataTreeSnapshot getSnapshot() {
-            checkFailed();
-
-            if (allocatedTransaction != null) {
-                Preconditions.checkState(readySnapshot != null, "Previous transaction %s is not ready yet", allocatedTransaction.getIdentifier());
-                return readySnapshot;
-            } else {
-                return dataTree.takeSnapshot();
-            }
-        }
-
-        @GuardedBy("this")
-        private <T extends SnapshotBackedWriteTransaction> T recordTransaction(final T transaction) {
-            allocatedTransaction = transaction;
-            readySnapshot = null;
-            return transaction;
-        }
-
-        @Override
-        public synchronized DOMStoreReadTransaction newReadOnlyTransaction() {
-            final DataTreeSnapshot snapshot = getSnapshot();
-            return new SnapshotBackedReadTransaction(nextIdentifier(), getDebugTransactions(), snapshot);
-        }
-
-        @Override
-        public synchronized DOMStoreReadWriteTransaction newReadWriteTransaction() {
-            final DataTreeSnapshot snapshot = getSnapshot();
-            return recordTransaction(new SnapshotBackedReadWriteTransaction(nextIdentifier(),
-                    getDebugTransactions(), snapshot, this));
-        }
-
-        @Override
-        public synchronized DOMStoreWriteTransaction newWriteOnlyTransaction() {
-            final DataTreeSnapshot snapshot = getSnapshot();
-            return recordTransaction(new SnapshotBackedWriteTransaction(nextIdentifier(),
-                    getDebugTransactions(), snapshot, this));
-        }
-
-        @Override
-        protected synchronized void transactionAborted(final SnapshotBackedWriteTransaction tx) {
-            if (tx.equals(allocatedTransaction)) {
-                Preconditions.checkState(readySnapshot == null, "Unexpected abort of transaction %s with ready snapshot %s", tx, readySnapshot);
-                allocatedTransaction = null;
-            }
-        }
-
-        @Override
-        protected synchronized DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
-            Preconditions.checkState(tx.equals(allocatedTransaction), "Mis-ordered ready transaction %s last allocated was %s", tx, allocatedTransaction);
-            if (readySnapshot != null) {
-                // The snapshot should have been cleared
-                LOG.warn("Uncleared snapshot {} encountered, overwritten with transaction {} snapshot {}", readySnapshot, tx, tree);
-            }
-
-            final DOMStoreThreePhaseCommitCohort cohort = InMemoryDOMDataStore.this.transactionReady(tx, tree);
-            readySnapshot = tree;
-            return new ChainedTransactionCommitImpl(tx, cohort, this);
-        }
-
-        @Override
-        public void close() {
-            // FIXME: this call doesn't look right here - listeningExecutor is shared and owned
-            // by the outer class.
-            //listeningExecutor.shutdownNow();
-        }
-
-        protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
-                final Throwable t) {
-            chainFailed = true;
-        }
-
-        public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
-            // If the committed transaction was the one we allocated last,
-            // we clear it and the ready snapshot, so the next transaction
-            // allocated refers to the data tree directly.
-            if (transaction.equals(allocatedTransaction)) {
-                if (readySnapshot == null) {
-                    LOG.warn("Transaction {} committed while no ready snapshot present", transaction);
-                }
-
-                allocatedTransaction = null;
-                readySnapshot = null;
-            }
-        }
-    }
-
-    private static class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
-        private final SnapshotBackedWriteTransaction transaction;
-        private final DOMStoreThreePhaseCommitCohort delegate;
-        private final DOMStoreTransactionChainImpl txChain;
-
-        protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
-                final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
-            this.transaction = transaction;
-            this.delegate = delegate;
-            this.txChain = txChain;
-        }
-
-        @Override
-        public ListenableFuture<Boolean> canCommit() {
-            return delegate.canCommit();
-        }
-
-        @Override
-        public ListenableFuture<Void> preCommit() {
-            return delegate.preCommit();
-        }
-
-        @Override
-        public ListenableFuture<Void> abort() {
-            return delegate.abort();
-        }
-
-        @Override
-        public ListenableFuture<Void> commit() {
-            ListenableFuture<Void> commitFuture = delegate.commit();
-            Futures.addCallback(commitFuture, new FutureCallback<Void>() {
-                @Override
-                public void onFailure(final Throwable t) {
-                    txChain.onTransactionFailed(transaction, t);
-                }
-
-                @Override
-                public void onSuccess(final Void result) {
-                    txChain.onTransactionCommited(transaction);
-                }
-            });
-            return commitFuture;
-        }
-    }
-
-    private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
+    private final class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
         private final SnapshotBackedWriteTransaction transaction;
         private final DataTreeModification modification;
 
index 3a6de300a0463da6cec1b0a2466375211b5adabd..bab26dfc012b1a620f2e0a619ef3d93d10b0bb2e 100644 (file)
@@ -16,7 +16,6 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
 import javax.ws.rs.ext.MessageBodyWriter;
 import javax.ws.rs.ext.Provider;
 import javax.xml.stream.FactoryConfigurationError;
@@ -28,6 +27,8 @@ import org.opendaylight.controller.sal.rest.api.RestconfService;
 import org.opendaylight.controller.sal.restconf.impl.InstanceIdentifierContext;
 import org.opendaylight.controller.sal.restconf.impl.NormalizedNodeContext;
 import org.opendaylight.controller.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorTag;
+import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorType;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
@@ -72,7 +73,9 @@ public class NormalizedNodeXmlBodyWriter implements MessageBodyWriter<Normalized
             WebApplicationException {
         InstanceIdentifierContext pathContext = t.getInstanceIdentifierContext();
         if (t.getData() == null) {
-            throw new RestconfDocumentedException(Response.Status.NOT_FOUND);
+            throw new RestconfDocumentedException(
+                    "Request could not be completed because the relevant data model content does not exist.",
+                    ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
         }
 
         XMLStreamWriter xmlWriter;