From: Moiz Raja Date: Wed, 25 Jun 2014 23:49:11 +0000 (-0700) Subject: Basic DistributedDataStoreIntegrationTest added X-Git-Tag: release/helium~536^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=18a4539ad844c05fcd30373efa43f873aca4c142;hp=a2b92b2d72c28b9913131c0340f87d2424f44108 Basic DistributedDataStoreIntegrationTest added Also added missing CreateTransaction handling in Shard and some logging Change-Id: I049243ace55432ba4807d88228e901608cb9784a Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index b4ad089027..09ad00598f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -20,8 +20,10 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; @@ -29,6 +31,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; @@ -61,6 +64,8 @@ public class Shard extends UntypedProcessor { Logging.getLogger(getContext().system(), this); private Shard(String name) { + log.info("Creating shard : {}", name ); + store = new InMemoryDOMDataStore(name, storeExecutor); } @@ -77,6 +82,8 @@ public class Shard extends UntypedProcessor { @Override public void onReceive(Object message) throws Exception { + log.debug("Received message {}", message); + if (message instanceof CreateTransactionChain) { createTransactionChain(); } else if (message instanceof RegisterChangeListener) { @@ -87,9 +94,21 @@ public class Shard extends UntypedProcessor { handleForwardedCommit((ForwardedCommitTransaction) message); } else if (message instanceof Persistent) { commit((Persistent) message); + } else if (message instanceof CreateTransaction) { + createTransaction(); } } + private void createTransaction() { + DOMStoreReadWriteTransaction transaction = + store.newReadWriteTransaction(); + ActorRef transactionActor = getContext().actorOf( + ShardTransaction.props(transaction, getSelf())); + getSender() + .tell(new CreateTransactionReply(transactionActor.path()), + getSelf()); + } + private void commit(Persistent message) { Modification modification = (Modification) message.payload(); DOMStoreThreePhaseCommitCohort cohort = diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index 75744cad5b..f747afa786 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -43,16 +43,16 @@ import java.util.concurrent.ExecutionException; /** * The ShardTransaction Actor represents a remote transaction - *

+ *

* The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction - *

- *

+ *

+ *

* Even though the DOMStore and the DOMStoreTransactionChain implement multiple types of transactions * the ShardTransaction Actor only works with read-write transactions. This is just to keep the logic simple. At this * time there are no known advantages for creating a read-only or write-only transaction which may change over time * at which point we can optimize things in the distributed store as well. - *

- *

+ *

+ *

* Handles Messages
* ----------------
*

  • {@link org.opendaylight.controller.cluster.datastore.messages.ReadData} @@ -65,123 +65,139 @@ import java.util.concurrent.ExecutionException; */ public class ShardTransaction extends UntypedActor { - private final ActorRef shardActor; - - private final DOMStoreReadWriteTransaction transaction; - - private final MutableCompositeModification modification = new MutableCompositeModification(); - - private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - - public ShardTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor) { - this.transaction = transaction; - this.shardActor = shardActor; - } - - - public static Props props(final DOMStoreReadWriteTransaction transaction, final ActorRef shardActor){ - return Props.create(new Creator(){ - - @Override - public ShardTransaction create() throws Exception { - return new ShardTransaction(transaction, shardActor); - } - }); - } - - @Override - public void onReceive(Object message) throws Exception { - if(message instanceof ReadData){ - readData((ReadData) message); - } else if(message instanceof WriteData){ - writeData((WriteData) message); - } else if(message instanceof MergeData){ - mergeData((MergeData) message); - } else if(message instanceof DeleteData){ - deleteData((DeleteData) message); - } else if(message instanceof ReadyTransaction){ - readyTransaction((ReadyTransaction) message); - } else if(message instanceof CloseTransaction){ - closeTransaction((CloseTransaction) message); - } else if(message instanceof GetCompositedModification){ - // This is here for testing only - getSender().tell(new GetCompositeModificationReply(new ImmutableCompositeModification(modification)), getSelf()); + private final ActorRef shardActor; + + private final DOMStoreReadWriteTransaction transaction; + + private final MutableCompositeModification modification = + new MutableCompositeModification(); + + private final LoggingAdapter log = + Logging.getLogger(getContext().system(), this); + + public ShardTransaction(DOMStoreReadWriteTransaction transaction, + ActorRef shardActor) { + this.transaction = transaction; + this.shardActor = shardActor; } - } - - private void readData(ReadData message) { - final ActorRef sender = getSender(); - final ActorRef self = getSelf(); - final InstanceIdentifier path = message.getPath(); - final ListenableFuture>> future = transaction.read(path); - - future.addListener(new Runnable() { - @Override - public void run() { - try { - Optional> optional = future.get(); - if(optional.isPresent()){ - sender.tell(new ReadDataReply(optional.get()), self); - } else { - //TODO : Need to decide what to do here - } - } catch (InterruptedException | ExecutionException e) { - log.error(e, "An exception happened when reading data from path : " + path.toString()); - } - } - }, getContext().dispatcher()); - } + public static Props props(final DOMStoreReadWriteTransaction transaction, + final ActorRef shardActor) { + return Props.create(new Creator() { + + @Override + public ShardTransaction create() throws Exception { + return new ShardTransaction(transaction, shardActor); + } + }); + } + + @Override + public void onReceive(Object message) throws Exception { + log.debug("Received message {}", message); + + if (message instanceof ReadData) { + readData((ReadData) message); + } else if (message instanceof WriteData) { + writeData((WriteData) message); + } else if (message instanceof MergeData) { + mergeData((MergeData) message); + } else if (message instanceof DeleteData) { + deleteData((DeleteData) message); + } else if (message instanceof ReadyTransaction) { + readyTransaction((ReadyTransaction) message); + } else if (message instanceof CloseTransaction) { + closeTransaction((CloseTransaction) message); + } else if (message instanceof GetCompositedModification) { + // This is here for testing only + getSender().tell(new GetCompositeModificationReply( + new ImmutableCompositeModification(modification)), getSelf()); + } + } - private void writeData(WriteData message){ - modification.addModification(new WriteModification(message.getPath(), message.getData())); - transaction.write(message.getPath(), message.getData()); - getSender().tell(new WriteDataReply(), getSelf()); - } + private void readData(ReadData message) { + final ActorRef sender = getSender(); + final ActorRef self = getSelf(); + final InstanceIdentifier path = message.getPath(); + final ListenableFuture>> future = + transaction.read(path); + + future.addListener(new Runnable() { + @Override + public void run() { + try { + Optional> optional = future.get(); + if (optional.isPresent()) { + sender.tell(new ReadDataReply(optional.get()), self); + } else { + //TODO : Need to decide what to do here + } + } catch (InterruptedException | ExecutionException e) { + log.error(e, + "An exception happened when reading data from path : " + + path.toString()); + } + + } + }, getContext().dispatcher()); + } - private void mergeData(MergeData message){ - modification.addModification(new MergeModification(message.getPath(), message.getData())); - transaction.merge(message.getPath(), message.getData()); - getSender().tell(new MergeDataReply(), getSelf()); - } - private void deleteData(DeleteData message){ - modification.addModification(new DeleteModification(message.getPath())); - transaction.delete(message.getPath()); - getSender().tell(new DeleteDataReply(), getSelf()); - } + private void writeData(WriteData message) { + modification.addModification( + new WriteModification(message.getPath(), message.getData())); + transaction.write(message.getPath(), message.getData()); + getSender().tell(new WriteDataReply(), getSelf()); + } - private void readyTransaction(ReadyTransaction message){ - DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); - ActorRef cohortActor = getContext().actorOf(ThreePhaseCommitCohort.props(cohort, shardActor, modification)); - getSender().tell(new ReadyTransactionReply(cohortActor.path()), getSelf()); + private void mergeData(MergeData message) { + modification.addModification( + new MergeModification(message.getPath(), message.getData())); + transaction.merge(message.getPath(), message.getData()); + getSender().tell(new MergeDataReply(), getSelf()); + } - } + private void deleteData(DeleteData message) { + modification.addModification(new DeleteModification(message.getPath())); + transaction.delete(message.getPath()); + getSender().tell(new DeleteDataReply(), getSelf()); + } - private void closeTransaction(CloseTransaction message){ - transaction.close(); - getSender().tell(new CloseTransactionReply(), getSelf()); - } + private void readyTransaction(ReadyTransaction message) { + DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); + ActorRef cohortActor = getContext().actorOf( + ThreePhaseCommitCohort.props(cohort, shardActor, modification)); + getSender() + .tell(new ReadyTransactionReply(cohortActor.path()), getSelf()); + } - // These classes are in here for test purposes only + private void closeTransaction(CloseTransaction message) { + transaction.close(); + getSender().tell(new CloseTransactionReply(), getSelf()); + } - static class GetCompositedModification { - } + // These classes are in here for test purposes only - static class GetCompositeModificationReply { - private final CompositeModification modification; + static class GetCompositedModification { - GetCompositeModificationReply(CompositeModification modification) { - this.modification = modification; } - public CompositeModification getModification() { - return modification; + static class GetCompositeModificationReply { + private final CompositeModification modification; + + + GetCompositeModificationReply(CompositeModification modification) { + this.modification = modification; + } + + + public CompositeModification getModification() { + return modification; + } } - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java index 61baf1ab64..00d4ab5782 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java @@ -28,101 +28,109 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh import java.util.concurrent.ExecutionException; -public class ThreePhaseCommitCohort extends UntypedActor{ - private final DOMStoreThreePhaseCommitCohort cohort; - private final ActorRef shardActor; - private final CompositeModification modification; - - public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort, ActorRef shardActor, CompositeModification modification) { - this.cohort = cohort; - this.shardActor = shardActor; - this.modification = modification; - } - - private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - - public static Props props(final DOMStoreThreePhaseCommitCohort cohort, final ActorRef shardActor, final CompositeModification modification) { - return Props.create(new Creator(){ - @Override - public ThreePhaseCommitCohort create() throws Exception { - return new ThreePhaseCommitCohort(cohort, shardActor, modification); - } - }); - } - - @Override - public void onReceive(Object message) throws Exception { - if(message instanceof CanCommitTransaction){ - canCommit((CanCommitTransaction) message); - } else if(message instanceof PreCommitTransaction) { - preCommit((PreCommitTransaction) message); - } else if(message instanceof CommitTransaction){ - commit((CommitTransaction) message); - } else if (message instanceof AbortTransaction){ - abort((AbortTransaction) message); +public class ThreePhaseCommitCohort extends UntypedActor { + private final DOMStoreThreePhaseCommitCohort cohort; + private final ActorRef shardActor; + private final CompositeModification modification; + + public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort, + ActorRef shardActor, CompositeModification modification) { + + this.cohort = cohort; + this.shardActor = shardActor; + this.modification = modification; } - } - - private void abort(AbortTransaction message) { - final ListenableFuture future = cohort.abort(); - final ActorRef sender = getSender(); - final ActorRef self = getSelf(); - - future.addListener(new Runnable() { - @Override - public void run() { - try { - future.get(); - sender.tell(new AbortTransactionReply(), self); - } catch (InterruptedException | ExecutionException e) { - log.error(e, "An exception happened when aborting"); - } - } - }, getContext().dispatcher()); - } - - private void commit(CommitTransaction message) { - // Forward the commit to the shard - log.info("Commit transaction now + " + shardActor); - shardActor.forward(new ForwardedCommitTransaction(cohort, modification), getContext()); - - } - - private void preCommit(PreCommitTransaction message) { - final ListenableFuture future = cohort.preCommit(); - final ActorRef sender = getSender(); - final ActorRef self = getSelf(); - - future.addListener(new Runnable() { - @Override - public void run() { - try { - future.get(); - sender.tell(new PreCommitTransactionReply(), self); - } catch (InterruptedException | ExecutionException e) { - log.error(e, "An exception happened when preCommitting"); - } - } - }, getContext().dispatcher()); - - } - - private void canCommit(CanCommitTransaction message) { - final ListenableFuture future = cohort.canCommit(); - final ActorRef sender = getSender(); - final ActorRef self = getSelf(); - - future.addListener(new Runnable() { - @Override - public void run() { - try { - Boolean canCommit = future.get(); - sender.tell(new CanCommitTransactionReply(canCommit), self); - } catch (InterruptedException | ExecutionException e) { - log.error(e, "An exception happened when aborting"); + + private final LoggingAdapter log = + Logging.getLogger(getContext().system(), this); + + public static Props props(final DOMStoreThreePhaseCommitCohort cohort, + final ActorRef shardActor, final CompositeModification modification) { + return Props.create(new Creator() { + @Override + public ThreePhaseCommitCohort create() throws Exception { + return new ThreePhaseCommitCohort(cohort, shardActor, + modification); + } + }); + } + + @Override + public void onReceive(Object message) throws Exception { + log.debug("Received message {}", message); + + if (message instanceof CanCommitTransaction) { + canCommit((CanCommitTransaction) message); + } else if (message instanceof PreCommitTransaction) { + preCommit((PreCommitTransaction) message); + } else if (message instanceof CommitTransaction) { + commit((CommitTransaction) message); + } else if (message instanceof AbortTransaction) { + abort((AbortTransaction) message); } - } - }, getContext().dispatcher()); + } + + private void abort(AbortTransaction message) { + final ListenableFuture future = cohort.abort(); + final ActorRef sender = getSender(); + final ActorRef self = getSelf(); + + future.addListener(new Runnable() { + @Override + public void run() { + try { + future.get(); + sender.tell(new AbortTransactionReply(), self); + } catch (InterruptedException | ExecutionException e) { + log.error(e, "An exception happened when aborting"); + } + } + }, getContext().dispatcher()); + } + + private void commit(CommitTransaction message) { + // Forward the commit to the shard + log.debug("Forward commit transaction to Shard {} ", shardActor); + shardActor.forward(new ForwardedCommitTransaction(cohort, modification), + getContext()); + + } + + private void preCommit(PreCommitTransaction message) { + final ListenableFuture future = cohort.preCommit(); + final ActorRef sender = getSender(); + final ActorRef self = getSelf(); + + future.addListener(new Runnable() { + @Override + public void run() { + try { + future.get(); + sender.tell(new PreCommitTransactionReply(), self); + } catch (InterruptedException | ExecutionException e) { + log.error(e, "An exception happened when preCommitting"); + } + } + }, getContext().dispatcher()); - } + } + + private void canCommit(CanCommitTransaction message) { + final ListenableFuture future = cohort.canCommit(); + final ActorRef sender = getSender(); + final ActorRef self = getSelf(); + + future.addListener(new Runnable() { + @Override + public void run() { + try { + Boolean canCommit = future.get(); + sender.tell(new CanCommitTransactionReply(canCommit), self); + } catch (InterruptedException | ExecutionException e) { + log.error(e, "An exception happened when aborting"); + } + } + }, getContext().dispatcher()); + + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 36b6efa2f7..d12dc2b55a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -12,6 +12,7 @@ import akka.actor.ActorPath; import akka.actor.ActorSelection; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFutureTask; +import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; @@ -109,16 +110,23 @@ public class ThreePhaseCommitCohortProxy implements for(ActorPath actorPath : cohortPaths){ ActorSelection cohort = actorContext.actorSelection(actorPath); - Object response = actorContext.executeRemoteOperation(cohort, - message, - ActorContext.ASK_DURATION); - - if(response != null && !response.getClass().equals(expectedResponseClass)){ - throw new RuntimeException( - String.format( - "did not get the expected response \n\t\t expected : %s \n\t\t actual : %s", - expectedResponseClass.toString(), - response.getClass().toString())); + try { + Object response = + actorContext.executeRemoteOperation(cohort, + message, + ActorContext.ASK_DURATION); + + if (response != null && !response.getClass() + .equals(expectedResponseClass)) { + throw new RuntimeException( + String.format( + "did not get the expected response \n\t\t expected : %s \n\t\t actual : %s", + expectedResponseClass.toString(), + response.getClass().toString()) + ); + } + } catch(TimeoutException e){ + LOG.error(String.format("A timeout occurred when processing operation : %s", message)); } } return null; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java new file mode 100644 index 0000000000..f400e74231 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -0,0 +1,54 @@ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Test; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + +public class DistributedDataStoreIntegrationTest extends AbstractActorTest { + + @Test + public void integrationTest() throws Exception { + DistributedDataStore distributedDataStore = + new DistributedDataStore(getSystem(), "config"); + + distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext()); + + DOMStoreReadWriteTransaction transaction = + distributedDataStore.newReadWriteTransaction(); + + transaction.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + ListenableFuture>> future = + transaction.read(TestModel.TEST_PATH); + + Optional> optional = future.get(); + + NormalizedNode normalizedNode = optional.get(); + + assertEquals(TestModel.TEST_QNAME, normalizedNode.getNodeType()); + + DOMStoreThreePhaseCommitCohort ready = transaction.ready(); + + ListenableFuture canCommit = ready.canCommit(); + + assertTrue(canCommit.get()); + + ListenableFuture preCommit = ready.preCommit(); + + preCommit.get(); + + ListenableFuture commit = ready.commit(); + + commit.get(); + + } + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 48365fa1a0..b9ab8a3282 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -4,8 +4,10 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.JavaTestKit; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; @@ -18,83 +20,138 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import static org.junit.Assert.assertTrue; -public class ShardTest extends AbstractActorTest{ - @Test - public void testOnReceiveCreateTransactionChain() throws Exception { - new JavaTestKit(getSystem()) {{ - final Props props = Shard.props("config"); - final ActorRef subject = getSystem().actorOf(props, "testCreateTransactionChain"); - - new Within(duration("1 seconds")) { - protected void run() { - - subject.tell(new CreateTransactionChain(), getRef()); - - final String out = new ExpectMsg("match hint") { - // do not put code outside this method, will run afterwards - protected String match(Object in) { - if (in instanceof CreateTransactionChainReply) { - CreateTransactionChainReply reply = (CreateTransactionChainReply) in; - return reply.getTransactionChainPath().toString(); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertTrue(out.matches("akka:\\/\\/test\\/user\\/testCreateTransactionChain\\/\\$.*")); - // Will wait for the rest of the 3 seconds - expectNoMsg(); - } - - - }; - }}; - } - - @Test - public void testOnReceiveRegisterListener() throws Exception { - new JavaTestKit(getSystem()) {{ - final Props props = Shard.props("config"); - final ActorRef subject = getSystem().actorOf(props, "testRegisterChangeListener"); - - new Within(duration("1 seconds")) { - protected void run() { - - subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - - subject.tell(new RegisterChangeListener(TestModel.TEST_PATH, getRef().path() , AsyncDataBroker.DataChangeScope.BASE), getRef()); +public class ShardTest extends AbstractActorTest { + @Test + public void testOnReceiveCreateTransactionChain() throws Exception { + new JavaTestKit(getSystem()) {{ + final Props props = Shard.props("config"); + final ActorRef subject = + getSystem().actorOf(props, "testCreateTransactionChain"); + + new Within(duration("1 seconds")) { + protected void run() { + + subject.tell(new CreateTransactionChain(), getRef()); + + final String out = new ExpectMsg("match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof CreateTransactionChainReply) { + CreateTransactionChainReply reply = + (CreateTransactionChainReply) in; + return reply.getTransactionChainPath() + .toString(); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertTrue(out.matches( + "akka:\\/\\/test\\/user\\/testCreateTransactionChain\\/\\$.*")); + // Will wait for the rest of the 3 seconds + expectNoMsg(); + } + + + }; + }}; + } + + @Test + public void testOnReceiveRegisterListener() throws Exception { + new JavaTestKit(getSystem()) {{ + final Props props = Shard.props("config"); + final ActorRef subject = + getSystem().actorOf(props, "testRegisterChangeListener"); + + new Within(duration("1 seconds")) { + protected void run() { + + subject.tell( + new UpdateSchemaContext(TestModel.createTestContext()), + getRef()); + + subject.tell(new RegisterChangeListener(TestModel.TEST_PATH, + getRef().path(), AsyncDataBroker.DataChangeScope.BASE), + getRef()); + + final String out = new ExpectMsg("match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof RegisterChangeListenerReply) { + RegisterChangeListenerReply reply = + (RegisterChangeListenerReply) in; + return reply.getListenerRegistrationPath() + .toString(); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertTrue(out.matches( + "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*")); + // Will wait for the rest of the 3 seconds + expectNoMsg(); + } + + + }; + }}; + } + + @Test + public void testCreateTransaction(){ + new JavaTestKit(getSystem()) {{ + final Props props = Shard.props("config"); + final ActorRef subject = + getSystem().actorOf(props, "testCreateTransaction"); + + new Within(duration("1 seconds")) { + protected void run() { + + subject.tell( + new UpdateSchemaContext(TestModel.createTestContext()), + getRef()); + + subject.tell(new CreateTransaction(), + getRef()); + + final String out = new ExpectMsg("match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof CreateTransactionReply) { + CreateTransactionReply reply = + (CreateTransactionReply) in; + return reply.getTransactionPath() + .toString(); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertTrue(out.matches( + "akka:\\/\\/test\\/user\\/testCreateTransaction\\/\\$.*")); + // Will wait for the rest of the 3 seconds + expectNoMsg(); + } + + + }; + }}; + } + + + + private AsyncDataChangeListener> noOpDataChangeListener() { + return new AsyncDataChangeListener>() { + @Override + public void onDataChanged( + AsyncDataChangeEvent> change) { - final String out = new ExpectMsg("match hint") { - // do not put code outside this method, will run afterwards - protected String match(Object in) { - if (in instanceof RegisterChangeListenerReply) { - RegisterChangeListenerReply reply = (RegisterChangeListenerReply) in; - return reply.getListenerRegistrationPath().toString(); - } else { - throw noMatch(); - } } - }.get(); // this extracts the received message - - assertTrue(out.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*")); - // Will wait for the rest of the 3 seconds - expectNoMsg(); - } - - - }; - }}; - } - - - - private AsyncDataChangeListener> noOpDataChangeListener(){ - return new AsyncDataChangeListener>() { - @Override - public void onDataChanged(AsyncDataChangeEvent> change) { - - } - }; - } + }; + } }