From: Moiz Raja
Date: Mon, 23 Jun 2014 23:25:41 +0000 (-0700)
Subject: Implement commiting of data
X-Git-Tag: release/helium~578
X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=da6ca13e3f8ecb73a62f7e8f9ae3bdcae9d851f8
Implement commiting of data
- Implement ThreePhaseCommitCohort Actor
- Implement a BasicIntegrationTest to test out using a Shard upto committing
- Make modifications in Shard, ShardTransaction, ShardTransactionChain to make the flow work
Change-Id: I4eff32833c09d89f81753db29ea38ac26b9dfbf6
Signed-off-by: Moiz Raja
---
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
index f96cb14510..d75edc7922 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
@@ -13,20 +13,29 @@ import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;
+import akka.persistence.Persistent;
import akka.persistence.UntypedProcessor;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
/**
@@ -43,6 +52,8 @@ public class Shard extends UntypedProcessor {
private final InMemoryDOMDataStore store;
+ private final Map modificationToCohort = new HashMap<>();
+
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
private Shard(String name){
@@ -68,7 +79,40 @@ public class Shard extends UntypedProcessor {
registerChangeListener((RegisterChangeListener) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext((UpdateSchemaContext) message);
+ } else if (message instanceof ForwardedCommitTransaction ) {
+ handleForwardedCommit((ForwardedCommitTransaction) message);
+ } else if (message instanceof Persistent){
+ commit((Persistent) message);
+ }
+ }
+
+ private void commit(Persistent message) {
+ Modification modification = (Modification) message.payload();
+ DOMStoreThreePhaseCommitCohort cohort = modificationToCohort.remove(modification);
+ if(cohort == null){
+ log.error("Could not find cohort for modification : " + modification);
+ return;
}
+ final ListenableFuture future = cohort.commit();
+ final ActorRef sender = getSender();
+ final ActorRef self = getSelf();
+ future.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ future.get();
+ sender.tell(new CommitTransactionReply(), self);
+ } catch (InterruptedException | ExecutionException e) {
+ log.error(e, "An exception happened when committing");
+ }
+ }
+ }, getContext().dispatcher());
+ }
+
+ private void handleForwardedCommit(ForwardedCommitTransaction message) {
+ log.info("received forwarded transaction");
+ modificationToCohort.put(message.getModification(), message.getCohort());
+ getSelf().forward(Persistent.create(message.getModification()), getContext());
}
private void updateSchemaContext(UpdateSchemaContext message) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
index f43dd7b52a..75744cad5b 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
@@ -65,23 +65,26 @@ import java.util.concurrent.ExecutionException;
*/
public class ShardTransaction extends UntypedActor {
+ private final ActorRef shardActor;
+
private final DOMStoreReadWriteTransaction transaction;
private final MutableCompositeModification modification = new MutableCompositeModification();
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
- public ShardTransaction(DOMStoreReadWriteTransaction transaction) {
+ public ShardTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor) {
this.transaction = transaction;
+ this.shardActor = shardActor;
}
- public static Props props(final DOMStoreReadWriteTransaction transaction){
+ public static Props props(final DOMStoreReadWriteTransaction transaction, final ActorRef shardActor){
return Props.create(new Creator(){
@Override
public ShardTransaction create() throws Exception {
- return new ShardTransaction(transaction);
+ return new ShardTransaction(transaction, shardActor);
}
});
}
@@ -151,7 +154,7 @@ public class ShardTransaction extends UntypedActor {
private void readyTransaction(ReadyTransaction message){
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
- ActorRef cohortActor = getContext().actorOf(ThreePhaseCommitCohort.props(cohort));
+ ActorRef cohortActor = getContext().actorOf(ThreePhaseCommitCohort.props(cohort, shardActor, modification));
getSender().tell(new ReadyTransactionReply(cohortActor.path()), getSelf());
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
index 83913fe416..79aaa86b28 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
@@ -34,7 +34,7 @@ public class ShardTransactionChain extends UntypedActor{
public void onReceive(Object message) throws Exception {
if(message instanceof CreateTransaction){
DOMStoreReadWriteTransaction transaction = chain.newReadWriteTransaction();
- ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(transaction));
+ ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(transaction, getContext().parent()));
getSender().tell(new CreateTransactionReply(transactionActor.path()), getSelf());
} else if (message instanceof CloseTransactionChain){
chain.close();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
index 8e21cb2d86..61baf1ab64 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
@@ -8,30 +8,121 @@
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
import akka.japi.Creator;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import java.util.concurrent.ExecutionException;
+
public class ThreePhaseCommitCohort extends UntypedActor{
private final DOMStoreThreePhaseCommitCohort cohort;
+ private final ActorRef shardActor;
+ private final CompositeModification modification;
- public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort) {
-
+ public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort, ActorRef shardActor, CompositeModification modification) {
this.cohort = cohort;
+ this.shardActor = shardActor;
+ this.modification = modification;
}
- @Override
- public void onReceive(Object message) throws Exception {
- throw new UnsupportedOperationException("onReceive");
- }
+ private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
- public static Props props(final DOMStoreThreePhaseCommitCohort cohort) {
+ public static Props props(final DOMStoreThreePhaseCommitCohort cohort, final ActorRef shardActor, final CompositeModification modification) {
return Props.create(new Creator(){
@Override
public ThreePhaseCommitCohort create() throws Exception {
- return new ThreePhaseCommitCohort(cohort);
+ return new ThreePhaseCommitCohort(cohort, shardActor, modification);
}
});
}
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ if(message instanceof CanCommitTransaction){
+ canCommit((CanCommitTransaction) message);
+ } else if(message instanceof PreCommitTransaction) {
+ preCommit((PreCommitTransaction) message);
+ } else if(message instanceof CommitTransaction){
+ commit((CommitTransaction) message);
+ } else if (message instanceof AbortTransaction){
+ abort((AbortTransaction) message);
+ }
+ }
+
+ private void abort(AbortTransaction message) {
+ final ListenableFuture future = cohort.abort();
+ final ActorRef sender = getSender();
+ final ActorRef self = getSelf();
+
+ future.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ future.get();
+ sender.tell(new AbortTransactionReply(), self);
+ } catch (InterruptedException | ExecutionException e) {
+ log.error(e, "An exception happened when aborting");
+ }
+ }
+ }, getContext().dispatcher());
+ }
+
+ private void commit(CommitTransaction message) {
+ // Forward the commit to the shard
+ log.info("Commit transaction now + " + shardActor);
+ shardActor.forward(new ForwardedCommitTransaction(cohort, modification), getContext());
+
+ }
+
+ private void preCommit(PreCommitTransaction message) {
+ final ListenableFuture future = cohort.preCommit();
+ final ActorRef sender = getSender();
+ final ActorRef self = getSelf();
+
+ future.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ future.get();
+ sender.tell(new PreCommitTransactionReply(), self);
+ } catch (InterruptedException | ExecutionException e) {
+ log.error(e, "An exception happened when preCommitting");
+ }
+ }
+ }, getContext().dispatcher());
+
+ }
+
+ private void canCommit(CanCommitTransaction message) {
+ final ListenableFuture future = cohort.canCommit();
+ final ActorRef sender = getSender();
+ final ActorRef self = getSelf();
+
+ future.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Boolean canCommit = future.get();
+ sender.tell(new CanCommitTransactionReply(canCommit), self);
+ } catch (InterruptedException | ExecutionException e) {
+ log.error(e, "An exception happened when aborting");
+ }
+ }
+ }, getContext().dispatcher());
+
+ }
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransaction.java
new file mode 100644
index 0000000000..4cf713a0b3
--- /dev/null
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransaction.java
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class AbortTransaction {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java
new file mode 100644
index 0000000000..84234e5807
--- /dev/null
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class AbortTransactionReply {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransaction.java
new file mode 100644
index 0000000000..526d60fc75
--- /dev/null
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransaction.java
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CanCommitTransaction {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java
new file mode 100644
index 0000000000..d143c14b3b
--- /dev/null
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CanCommitTransactionReply {
+ private final Boolean canCommit;
+
+ public CanCommitTransactionReply(Boolean canCommit) {
+ this.canCommit = canCommit;
+ }
+
+ public Boolean getCanCommit() {
+ return canCommit;
+ }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransaction.java
new file mode 100644
index 0000000000..d7b210fd03
--- /dev/null
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransaction.java
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CommitTransaction {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java
new file mode 100644
index 0000000000..a0e5e895d2
--- /dev/null
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CommitTransactionReply {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedCommitTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedCommitTransaction.java
new file mode 100644
index 0000000000..01049930b5
--- /dev/null
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedCommitTransaction.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+public class ForwardedCommitTransaction {
+ private final DOMStoreThreePhaseCommitCohort cohort;
+ private final Modification modification;
+
+ public ForwardedCommitTransaction(DOMStoreThreePhaseCommitCohort cohort, Modification modification){
+ this.cohort = cohort;
+ this.modification = modification;
+ }
+
+ public DOMStoreThreePhaseCommitCohort getCohort() {
+ return cohort;
+ }
+
+ public Modification getModification() {
+ return modification;
+ }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransaction.java
new file mode 100644
index 0000000000..87a9c77a4f
--- /dev/null
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransaction.java
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class PreCommitTransaction {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransactionReply.java
new file mode 100644
index 0000000000..f499c720d2
--- /dev/null
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransactionReply.java
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class PreCommitTransactionReply {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java
index 48565d4fbb..32d31bf84d 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java
@@ -11,14 +11,14 @@ package org.opendaylight.controller.cluster.datastore.messages;
import akka.actor.ActorPath;
public class ReadyTransactionReply {
- private final ActorPath path;
+ private final ActorPath cohortPath;
- public ReadyTransactionReply(ActorPath path) {
+ public ReadyTransactionReply(ActorPath cohortPath) {
- this.path = path;
+ this.cohortPath = cohortPath;
}
- public ActorPath getPath() {
- return path;
+ public ActorPath getCohortPath() {
+ return cohortPath;
}
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java
index ffb263519a..5d9f96277d 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java
@@ -8,15 +8,22 @@
package org.opendaylight.controller.cluster.datastore.modification;
+
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import java.io.Serializable;
+
/**
* Base class to be used for all simple modifications that can be applied to a DOMStoreTransaction
*/
-public abstract class AbstractModification implements Modification {
- protected final InstanceIdentifier path;
+public abstract class AbstractModification implements Modification,
+ Serializable {
+
+ private static final long serialVersionUID = 1638042650152084457L;
+
+ protected final InstanceIdentifier path;
- protected AbstractModification(InstanceIdentifier path) {
- this.path = path;
- }
+ protected AbstractModification(InstanceIdentifier path) {
+ this.path = path;
+ }
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/CompositeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/CompositeModification.java
index 3a8eff1aa5..4c856d31eb 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/CompositeModification.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/CompositeModification.java
@@ -18,5 +18,9 @@ import java.util.List;
*
*/
public interface CompositeModification extends Modification {
- List getModifications();
+ /**
+ * Get a list of Modifications contained by this Composite
+ * @return
+ */
+ List getModifications();
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java
index 983ac860e1..9f37ba42d3 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java
@@ -10,26 +10,40 @@ package org.opendaylight.controller.cluster.datastore.modification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-public class MutableCompositeModification implements CompositeModification {
- private final List modifications = new ArrayList<>();
+/**
+ * MutableCompositeModification is just a mutable version of a
+ * CompositeModification {@link org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification#addModification(Modification)}
+ */
+public class MutableCompositeModification
+ implements CompositeModification, Serializable {
- @Override
- public void apply(DOMStoreWriteTransaction transaction) {
- for(Modification modification : modifications){
- modification.apply(transaction);
- }
- }
+ private static final long serialVersionUID = 1163377899140186790L;
+
+ private final List modifications = new ArrayList<>();
- public void addModification(Modification modification){
- modifications.add(modification);
- }
+ @Override
+ public void apply(DOMStoreWriteTransaction transaction) {
+ for (Modification modification : modifications) {
+ modification.apply(transaction);
+ }
+ }
- public List getModifications(){
- return Collections.unmodifiableList(modifications);
- }
+ /**
+ * Add a new Modification to the list of Modifications represented by this
+ * composite
+ *
+ * @param modification
+ */
+ public void addModification(Modification modification) {
+ modifications.add(modification);
+ }
+ public List getModifications() {
+ return Collections.unmodifiableList(modifications);
+ }
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java
index c5b12d071a..1b2a87f42b 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java
@@ -16,8 +16,8 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
* WriteModification stores all the parameters required to write data to the specified path
*/
public class WriteModification extends AbstractModification {
- private final NormalizedNode data;
+ private final NormalizedNode data;
public WriteModification(InstanceIdentifier path, NormalizedNode data) {
super(path);
@@ -28,4 +28,5 @@ public class WriteModification extends AbstractModification {
public void apply(DOMStoreWriteTransaction transaction) {
transaction.write(path, data);
}
+
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
new file mode 100644
index 0000000000..8c3ec82a54
--- /dev/null
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
@@ -0,0 +1,168 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+public class BasicIntegrationTest extends AbstractActorTest {
+
+ @Test
+ public void integrationTest() {
+ // This test will
+ // - create a Shard
+ // - initiate a transaction
+ // - write something
+ // - read the transaction for commit
+ // - commit the transaction
+
+
+ new JavaTestKit(getSystem()) {{
+ final Props props = Shard.props("config");
+ final ActorRef shard = getSystem().actorOf(props);
+
+ new Within(duration("5 seconds")) {
+ protected void run() {
+
+ shard.tell(
+ new UpdateSchemaContext(TestModel.createTestContext()),
+ getRef());
+
+ shard.tell(new CreateTransactionChain(), getRef());
+
+ final ActorSelection transactionChain =
+ new ExpectMsg("match hint") {
+ protected ActorSelection match(Object in) {
+ if (in instanceof CreateTransactionChainReply) {
+ ActorPath transactionChainPath =
+ ((CreateTransactionChainReply) in)
+ .getTransactionChainPath();
+ return getSystem()
+ .actorSelection(transactionChainPath);
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertNotNull(transactionChain);
+
+ transactionChain.tell(new CreateTransaction(), getRef());
+
+ final ActorSelection transaction =
+ new ExpectMsg("match hint") {
+ protected ActorSelection match(Object in) {
+ if (in instanceof CreateTransactionReply) {
+ ActorPath transactionPath =
+ ((CreateTransactionReply) in)
+ .getTransactionPath();
+ return getSystem()
+ .actorSelection(transactionPath);
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertNotNull(transaction);
+
+ transaction.tell(new WriteData(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+ getRef());
+
+ Boolean writeDone = new ExpectMsg("match hint") {
+ protected Boolean match(Object in) {
+ if (in instanceof WriteDataReply) {
+ return true;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertTrue(writeDone);
+
+ transaction.tell(new ReadyTransaction(), getRef());
+
+ final ActorSelection cohort =
+ new ExpectMsg("match hint") {
+ protected ActorSelection match(Object in) {
+ if (in instanceof ReadyTransactionReply) {
+ ActorPath cohortPath =
+ ((ReadyTransactionReply) in)
+ .getCohortPath();
+ return getSystem()
+ .actorSelection(cohortPath);
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertNotNull(cohort);
+
+ cohort.tell(new PreCommitTransaction(), getRef());
+
+ Boolean preCommitDone =
+ new ExpectMsg("match hint") {
+ protected Boolean match(Object in) {
+ if (in instanceof PreCommitTransactionReply) {
+ return true;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertTrue(preCommitDone);
+
+ cohort.tell(new CommitTransaction(), getRef());
+
+ final Boolean commitDone =
+ new ExpectMsg("match hint") {
+ protected Boolean match(Object in) {
+ if (in instanceof CommitTransactionReply) {
+ return true;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertTrue(commitDone);
+
+ }
+
+
+ };
+ }};
+
+
+ }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
index 7f2a836b6f..a9d8042ce2 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
@@ -87,6 +87,8 @@ public class ShardTest extends AbstractActorTest{
}};
}
+
+
private AsyncDataChangeListener> noOpDataChangeListener(){
return new AsyncDataChangeListener>() {
@Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
index 075001b0d8..9116f24c92 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
@@ -43,7 +43,8 @@ public class ShardTransactionTest extends AbstractActorTest {
@Test
public void testOnReceiveReadData() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
final ActorRef subject = getSystem().actorOf(props, "testReadData");
new Within(duration("1 seconds")) {
@@ -103,7 +104,8 @@ public class ShardTransactionTest extends AbstractActorTest {
@Test
public void testOnReceiveWriteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
final ActorRef subject = getSystem().actorOf(props, "testWriteData");
new Within(duration("1 seconds")) {
@@ -136,7 +138,8 @@ public class ShardTransactionTest extends AbstractActorTest {
@Test
public void testOnReceiveMergeData() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
final ActorRef subject = getSystem().actorOf(props, "testMergeData");
new Within(duration("1 seconds")) {
@@ -170,7 +173,8 @@ public class ShardTransactionTest extends AbstractActorTest {
@Test
public void testOnReceiveDeleteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
final ActorRef subject = getSystem().actorOf(props, "testDeleteData");
new Within(duration("1 seconds")) {
@@ -204,7 +208,8 @@ public class ShardTransactionTest extends AbstractActorTest {
@Test
public void testOnReceiveReadyTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
final ActorRef subject = getSystem().actorOf(props, "testReadyTransaction");
new Within(duration("1 seconds")) {
@@ -237,7 +242,8 @@ public class ShardTransactionTest extends AbstractActorTest {
@Test
public void testOnReceiveCloseTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction");
new Within(duration("1 seconds")) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
new file mode 100644
index 0000000000..2647850667
--- /dev/null
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
@@ -0,0 +1,11 @@
+akka {
+ actor {
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ }
+
+ serialization-bindings {
+ "org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification" = java
+ }
+ }
+}
\ No newline at end of file