Implement commiting of data 47/8347/1
authorMoiz Raja <moraja@cisco.com>
Mon, 23 Jun 2014 23:25:41 +0000 (16:25 -0700)
committerMoiz Raja <moraja@cisco.com>
Wed, 25 Jun 2014 18:53:41 +0000 (11:53 -0700)
- Implement ThreePhaseCommitCohort Actor
- Implement a BasicIntegrationTest to test out using a Shard upto committing
- Make modifications in Shard, ShardTransaction, ShardTransactionChain to make the flow work

Change-Id: I4dd9a8947e456da0a37193d9daf56fcc3783c0cd
Signed-off-by: Moiz Raja <moraja@cisco.com>
22 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedCommitTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransactionReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/CompositeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf [new file with mode: 0644]

index f96cb14510a9d3b95ab4acd3820f8704d689af96..d75edc7922f54ea46d72e4fd62fc4bc0e0aa3143 100644 (file)
@@ -13,20 +13,29 @@ import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
+import akka.persistence.Persistent;
 import akka.persistence.UntypedProcessor;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 
 /**
@@ -43,6 +52,8 @@ public class Shard extends UntypedProcessor {
 
   private final InMemoryDOMDataStore store;
 
+  private final Map<Modification, DOMStoreThreePhaseCommitCohort> modificationToCohort = new HashMap<>();
+
   private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
   private Shard(String name){
@@ -68,7 +79,40 @@ public class Shard extends UntypedProcessor {
       registerChangeListener((RegisterChangeListener) message);
     } else if (message instanceof UpdateSchemaContext) {
       updateSchemaContext((UpdateSchemaContext) message);
+    } else if (message instanceof ForwardedCommitTransaction ) {
+      handleForwardedCommit((ForwardedCommitTransaction) message);
+    } else if (message instanceof Persistent){
+      commit((Persistent) message);
+    }
+  }
+
+  private void commit(Persistent message) {
+    Modification modification = (Modification) message.payload();
+    DOMStoreThreePhaseCommitCohort cohort = modificationToCohort.remove(modification);
+    if(cohort == null){
+      log.error("Could not find cohort for modification : " + modification);
+      return;
     }
+    final ListenableFuture<Void> future = cohort.commit();
+    final ActorRef sender = getSender();
+    final ActorRef self = getSelf();
+    future.addListener(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          future.get();
+          sender.tell(new CommitTransactionReply(), self);
+        } catch (InterruptedException | ExecutionException e) {
+          log.error(e, "An exception happened when committing");
+        }
+      }
+    }, getContext().dispatcher());
+  }
+
+  private void handleForwardedCommit(ForwardedCommitTransaction message) {
+    log.info("received forwarded transaction");
+    modificationToCohort.put(message.getModification(), message.getCohort());
+    getSelf().forward(Persistent.create(message.getModification()), getContext());
   }
 
   private void updateSchemaContext(UpdateSchemaContext message) {
index f43dd7b52a62b745a17929d545107a9b57fbc1ff..75744cad5b920b942efda865d0fa6ed29c41f3d7 100644 (file)
@@ -65,23 +65,26 @@ import java.util.concurrent.ExecutionException;
  */
 public class ShardTransaction extends UntypedActor {
 
+  private final ActorRef shardActor;
+
   private final DOMStoreReadWriteTransaction transaction;
 
   private final MutableCompositeModification modification = new MutableCompositeModification();
 
   private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
-  public ShardTransaction(DOMStoreReadWriteTransaction transaction) {
+  public ShardTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor) {
     this.transaction = transaction;
+    this.shardActor = shardActor;
   }
 
 
-  public static Props props(final DOMStoreReadWriteTransaction transaction){
+  public static Props props(final DOMStoreReadWriteTransaction transaction, final ActorRef shardActor){
     return Props.create(new Creator<ShardTransaction>(){
 
       @Override
       public ShardTransaction create() throws Exception {
-        return new ShardTransaction(transaction);
+        return new ShardTransaction(transaction, shardActor);
       }
     });
   }
@@ -151,7 +154,7 @@ public class ShardTransaction extends UntypedActor {
 
   private void readyTransaction(ReadyTransaction message){
     DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
-    ActorRef cohortActor = getContext().actorOf(ThreePhaseCommitCohort.props(cohort));
+    ActorRef cohortActor = getContext().actorOf(ThreePhaseCommitCohort.props(cohort, shardActor, modification));
     getSender().tell(new ReadyTransactionReply(cohortActor.path()), getSelf());
 
   }
index 83913fe416fb766698ce9bd4294819d48276905d..79aaa86b28baaa71f161dceca0d56f59528d94a1 100644 (file)
@@ -34,7 +34,7 @@ public class ShardTransactionChain extends UntypedActor{
   public void onReceive(Object message) throws Exception {
     if(message instanceof CreateTransaction){
       DOMStoreReadWriteTransaction transaction = chain.newReadWriteTransaction();
-      ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(transaction));
+      ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(transaction, getContext().parent()));
       getSender().tell(new CreateTransactionReply(transactionActor.path()), getSelf());
     } else if (message instanceof CloseTransactionChain){
       chain.close();
index 8e21cb2d86fc846d4923526024b62e0aff332f12..61baf1ab64421e04f76d52ec684709ca33f38d25 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
 import akka.japi.Creator;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 
+import java.util.concurrent.ExecutionException;
+
 public class ThreePhaseCommitCohort extends UntypedActor{
   private final DOMStoreThreePhaseCommitCohort cohort;
+  private final ActorRef shardActor;
+  private final CompositeModification modification;
 
-  public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort) {
-
+  public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort, ActorRef shardActor, CompositeModification modification) {
     this.cohort = cohort;
+    this.shardActor = shardActor;
+    this.modification = modification;
   }
 
-  @Override
-  public void onReceive(Object message) throws Exception {
-    throw new UnsupportedOperationException("onReceive");
-  }
+  private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
-  public static Props props(final DOMStoreThreePhaseCommitCohort cohort) {
+  public static Props props(final DOMStoreThreePhaseCommitCohort cohort, final ActorRef shardActor, final CompositeModification modification) {
     return Props.create(new Creator<ThreePhaseCommitCohort>(){
       @Override
       public ThreePhaseCommitCohort create() throws Exception {
-        return new ThreePhaseCommitCohort(cohort);
+        return new ThreePhaseCommitCohort(cohort, shardActor, modification);
       }
     });
   }
+
+  @Override
+  public void onReceive(Object message) throws Exception {
+    if(message instanceof CanCommitTransaction){
+      canCommit((CanCommitTransaction) message);
+    } else if(message instanceof PreCommitTransaction) {
+      preCommit((PreCommitTransaction) message);
+    } else if(message instanceof CommitTransaction){
+      commit((CommitTransaction) message);
+    } else if (message instanceof AbortTransaction){
+      abort((AbortTransaction) message);
+    }
+  }
+
+  private void abort(AbortTransaction message) {
+    final ListenableFuture<Void> future = cohort.abort();
+    final ActorRef sender = getSender();
+    final ActorRef self = getSelf();
+
+    future.addListener(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          future.get();
+          sender.tell(new AbortTransactionReply(), self);
+        } catch (InterruptedException | ExecutionException e) {
+          log.error(e, "An exception happened when aborting");
+        }
+      }
+    }, getContext().dispatcher());
+  }
+
+  private void commit(CommitTransaction message) {
+    // Forward the commit to the shard
+    log.info("Commit transaction now + " + shardActor);
+    shardActor.forward(new ForwardedCommitTransaction(cohort, modification), getContext());
+
+  }
+
+  private void preCommit(PreCommitTransaction message) {
+    final ListenableFuture<Void> future = cohort.preCommit();
+    final ActorRef sender = getSender();
+    final ActorRef self = getSelf();
+
+    future.addListener(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          future.get();
+          sender.tell(new PreCommitTransactionReply(), self);
+        } catch (InterruptedException | ExecutionException e) {
+          log.error(e, "An exception happened when preCommitting");
+        }
+      }
+    }, getContext().dispatcher());
+
+  }
+
+  private void canCommit(CanCommitTransaction message) {
+    final ListenableFuture<Boolean> future = cohort.canCommit();
+    final ActorRef sender = getSender();
+    final ActorRef self = getSelf();
+
+    future.addListener(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          Boolean canCommit = future.get();
+          sender.tell(new CanCommitTransactionReply(canCommit), self);
+        } catch (InterruptedException | ExecutionException e) {
+          log.error(e, "An exception happened when aborting");
+        }
+      }
+    }, getContext().dispatcher());
+
+  }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransaction.java
new file mode 100644 (file)
index 0000000..4cf713a
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class AbortTransaction {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java
new file mode 100644 (file)
index 0000000..84234e5
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class AbortTransactionReply {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransaction.java
new file mode 100644 (file)
index 0000000..526d60f
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CanCommitTransaction {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java
new file mode 100644 (file)
index 0000000..d143c14
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CanCommitTransactionReply {
+  private final Boolean canCommit;
+
+  public CanCommitTransactionReply(Boolean canCommit) {
+    this.canCommit = canCommit;
+  }
+
+  public Boolean getCanCommit() {
+    return canCommit;
+  }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransaction.java
new file mode 100644 (file)
index 0000000..d7b210f
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CommitTransaction {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java
new file mode 100644 (file)
index 0000000..a0e5e89
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CommitTransactionReply {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedCommitTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedCommitTransaction.java
new file mode 100644 (file)
index 0000000..0104993
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+public class ForwardedCommitTransaction {
+  private final DOMStoreThreePhaseCommitCohort cohort;
+  private final Modification modification;
+
+  public ForwardedCommitTransaction(DOMStoreThreePhaseCommitCohort cohort, Modification modification){
+    this.cohort = cohort;
+    this.modification = modification;
+  }
+
+  public DOMStoreThreePhaseCommitCohort getCohort() {
+    return cohort;
+  }
+
+  public Modification getModification() {
+    return modification;
+  }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransaction.java
new file mode 100644 (file)
index 0000000..87a9c77
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class PreCommitTransaction {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransactionReply.java
new file mode 100644 (file)
index 0000000..f499c72
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class PreCommitTransactionReply {
+}
index 48565d4fbb5b8847b1c90beea6ce7ca964b00256..32d31bf84db44e87a4ebbbd6328c97c3898fd9ae 100644 (file)
@@ -11,14 +11,14 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import akka.actor.ActorPath;
 
 public class ReadyTransactionReply {
-  private final ActorPath path;
+  private final ActorPath cohortPath;
 
-  public ReadyTransactionReply(ActorPath path) {
+  public ReadyTransactionReply(ActorPath cohortPath) {
 
-    this.path = path;
+    this.cohortPath = cohortPath;
   }
 
-  public ActorPath getPath() {
-    return path;
+  public ActorPath getCohortPath() {
+    return cohortPath;
   }
 }
index ffb263519a26cbceb4da2f09c3f17d66e77f7fd2..5d9f96277ddcba65b05801ea328c8e80333744f3 100644 (file)
@@ -8,15 +8,22 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
+
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 
+import java.io.Serializable;
+
 /**
  * Base class to be used for all simple modifications that can be applied to a DOMStoreTransaction
  */
-public abstract class AbstractModification implements Modification {
-  protected final InstanceIdentifier path;
+public abstract class AbstractModification implements Modification,
+    Serializable {
+
+    private static final long serialVersionUID = 1638042650152084457L;
+
+    protected final InstanceIdentifier path;
 
-  protected AbstractModification(InstanceIdentifier path) {
-    this.path = path;
-  }
+    protected AbstractModification(InstanceIdentifier path) {
+        this.path = path;
+    }
 }
index 3a8eff1aa511e8194992a72d98598251cd61a9ed..4c856d31eb35803d045dc87a204de5adcbce1dde 100644 (file)
@@ -18,5 +18,9 @@ import java.util.List;
  * </p>
  */
 public interface CompositeModification extends Modification {
-  List<Modification> getModifications();
+    /**
+     * Get a list of Modifications contained by this Composite
+     * @return
+     */
+    List<Modification> getModifications();
 }
index 983ac860e1e458f068b2cbf5df7575d38e7ba6b2..9f37ba42d3c97076af0dfff3146c61338036968d 100644 (file)
@@ -10,26 +10,40 @@ package org.opendaylight.controller.cluster.datastore.modification;
 
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-public class MutableCompositeModification implements CompositeModification {
-  private final List<Modification> modifications = new ArrayList<>();
+/**
+ * MutableCompositeModification is just a mutable version of a
+ * CompositeModification {@link org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification#addModification(Modification)}
+ */
+public class MutableCompositeModification
+    implements CompositeModification, Serializable {
 
-  @Override
-  public void apply(DOMStoreWriteTransaction transaction) {
-    for(Modification modification : modifications){
-      modification.apply(transaction);
-    }
-  }
+    private static final long serialVersionUID = 1163377899140186790L;
+
+    private final List<Modification> modifications = new ArrayList<>();
 
-  public void addModification(Modification modification){
-    modifications.add(modification);
-  }
+    @Override
+    public void apply(DOMStoreWriteTransaction transaction) {
+        for (Modification modification : modifications) {
+            modification.apply(transaction);
+        }
+    }
 
-  public List<Modification> getModifications(){
-    return Collections.unmodifiableList(modifications);
-  }
+    /**
+     * Add a new Modification to the list of Modifications represented by this
+     * composite
+     *
+     * @param modification
+     */
+    public void addModification(Modification modification) {
+        modifications.add(modification);
+    }
 
+    public List<Modification> getModifications() {
+        return Collections.unmodifiableList(modifications);
+    }
 }
index c5b12d071afb58d42eeec55fb578365dc7fddd4c..1b2a87f42bb0cbfc61852534b1a9d65419491741 100644 (file)
@@ -16,8 +16,8 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * WriteModification stores all the parameters required to write data to the specified path
  */
 public class WriteModification extends AbstractModification {
-  private final NormalizedNode data;
 
+  private final NormalizedNode data;
 
   public WriteModification(InstanceIdentifier path, NormalizedNode data) {
     super(path);
@@ -28,4 +28,5 @@ public class WriteModification extends AbstractModification {
   public void apply(DOMStoreWriteTransaction transaction) {
     transaction.write(path, data);
   }
+
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
new file mode 100644 (file)
index 0000000..8c3ec82
--- /dev/null
@@ -0,0 +1,168 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+public class BasicIntegrationTest extends AbstractActorTest {
+
+    @Test
+    public void integrationTest() {
+        // This test will
+        // - create a Shard
+        // - initiate a transaction
+        // - write something
+        // - read the transaction for commit
+        // - commit the transaction
+
+
+        new JavaTestKit(getSystem()) {{
+            final Props props = Shard.props("config");
+            final ActorRef shard = getSystem().actorOf(props);
+
+            new Within(duration("5 seconds")) {
+                protected void run() {
+
+                    shard.tell(
+                        new UpdateSchemaContext(TestModel.createTestContext()),
+                        getRef());
+
+                    shard.tell(new CreateTransactionChain(), getRef());
+
+                    final ActorSelection transactionChain =
+                        new ExpectMsg<ActorSelection>("match hint") {
+                            protected ActorSelection match(Object in) {
+                                if (in instanceof CreateTransactionChainReply) {
+                                    ActorPath transactionChainPath =
+                                        ((CreateTransactionChainReply) in)
+                                            .getTransactionChainPath();
+                                    return getSystem()
+                                        .actorSelection(transactionChainPath);
+                                } else {
+                                    throw noMatch();
+                                }
+                            }
+                        }.get(); // this extracts the received message
+
+                    Assert.assertNotNull(transactionChain);
+
+                    transactionChain.tell(new CreateTransaction(), getRef());
+
+                    final ActorSelection transaction =
+                        new ExpectMsg<ActorSelection>("match hint") {
+                            protected ActorSelection match(Object in) {
+                                if (in instanceof CreateTransactionReply) {
+                                    ActorPath transactionPath =
+                                        ((CreateTransactionReply) in)
+                                            .getTransactionPath();
+                                    return getSystem()
+                                        .actorSelection(transactionPath);
+                                } else {
+                                    throw noMatch();
+                                }
+                            }
+                        }.get(); // this extracts the received message
+
+                    Assert.assertNotNull(transaction);
+
+                    transaction.tell(new WriteData(TestModel.TEST_PATH,
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+                        getRef());
+
+                    Boolean writeDone = new ExpectMsg<Boolean>("match hint") {
+                        protected Boolean match(Object in) {
+                            if (in instanceof WriteDataReply) {
+                                return true;
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    Assert.assertTrue(writeDone);
+
+                    transaction.tell(new ReadyTransaction(), getRef());
+
+                    final ActorSelection cohort =
+                        new ExpectMsg<ActorSelection>("match hint") {
+                            protected ActorSelection match(Object in) {
+                                if (in instanceof ReadyTransactionReply) {
+                                    ActorPath cohortPath =
+                                        ((ReadyTransactionReply) in)
+                                            .getCohortPath();
+                                    return getSystem()
+                                        .actorSelection(cohortPath);
+                                } else {
+                                    throw noMatch();
+                                }
+                            }
+                        }.get(); // this extracts the received message
+
+                    Assert.assertNotNull(cohort);
+
+                    cohort.tell(new PreCommitTransaction(), getRef());
+
+                    Boolean preCommitDone =
+                        new ExpectMsg<Boolean>("match hint") {
+                            protected Boolean match(Object in) {
+                                if (in instanceof PreCommitTransactionReply) {
+                                    return true;
+                                } else {
+                                    throw noMatch();
+                                }
+                            }
+                        }.get(); // this extracts the received message
+
+                    Assert.assertTrue(preCommitDone);
+
+                    cohort.tell(new CommitTransaction(), getRef());
+
+                    final Boolean commitDone =
+                        new ExpectMsg<Boolean>("match hint") {
+                            protected Boolean match(Object in) {
+                                if (in instanceof CommitTransactionReply) {
+                                    return true;
+                                } else {
+                                    throw noMatch();
+                                }
+                            }
+                        }.get(); // this extracts the received message
+
+                    Assert.assertTrue(commitDone);
+
+                }
+
+
+            };
+        }};
+
+
+    }
+}
index 7f2a836b6f9cf7218037a6437044dbf6c15984df..a9d8042ce238ffdc3be211a7625f50be957dc9f1 100644 (file)
@@ -87,6 +87,8 @@ public class ShardTest extends AbstractActorTest{
     }};
   }
 
+
+
   private  AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener(){
     return new AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>() {
       @Override
index 075001b0d8327c17c03373126d0a1019da4d45fe..9116f24c92971b3f0491b6de52d07eff01d84645 100644 (file)
@@ -43,7 +43,8 @@ public class ShardTransactionTest extends AbstractActorTest {
   @Test
   public void testOnReceiveReadData() throws Exception {
     new JavaTestKit(getSystem()) {{
-      final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+      final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+      final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
       final ActorRef subject = getSystem().actorOf(props, "testReadData");
 
       new Within(duration("1 seconds")) {
@@ -103,7 +104,8 @@ public class ShardTransactionTest extends AbstractActorTest {
   @Test
   public void testOnReceiveWriteData() throws Exception {
     new JavaTestKit(getSystem()) {{
-      final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+      final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+      final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
       final ActorRef subject = getSystem().actorOf(props, "testWriteData");
 
       new Within(duration("1 seconds")) {
@@ -136,7 +138,8 @@ public class ShardTransactionTest extends AbstractActorTest {
   @Test
   public void testOnReceiveMergeData() throws Exception {
     new JavaTestKit(getSystem()) {{
-      final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+      final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+      final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
       final ActorRef subject = getSystem().actorOf(props, "testMergeData");
 
       new Within(duration("1 seconds")) {
@@ -170,7 +173,8 @@ public class ShardTransactionTest extends AbstractActorTest {
   @Test
   public void testOnReceiveDeleteData() throws Exception {
     new JavaTestKit(getSystem()) {{
-      final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+      final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+      final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
       final ActorRef subject = getSystem().actorOf(props, "testDeleteData");
 
       new Within(duration("1 seconds")) {
@@ -204,7 +208,8 @@ public class ShardTransactionTest extends AbstractActorTest {
   @Test
   public void testOnReceiveReadyTransaction() throws Exception {
     new JavaTestKit(getSystem()) {{
-      final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+      final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+      final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
       final ActorRef subject = getSystem().actorOf(props, "testReadyTransaction");
 
       new Within(duration("1 seconds")) {
@@ -237,7 +242,8 @@ public class ShardTransactionTest extends AbstractActorTest {
   @Test
   public void testOnReceiveCloseTransaction() throws Exception {
     new JavaTestKit(getSystem()) {{
-      final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+      final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+      final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
       final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction");
 
       new Within(duration("1 seconds")) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
new file mode 100644 (file)
index 0000000..2647850
--- /dev/null
@@ -0,0 +1,11 @@
+akka {
+    actor {
+        serializers {
+          java = "akka.serialization.JavaSerializer"
+        }
+
+        serialization-bindings {
+            "org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification" = java
+        }
+    }
+}
\ No newline at end of file