From: Moiz Raja Date: Wed, 25 Jun 2014 00:01:25 +0000 (-0700) Subject: Implement rest of the methods for TransactionProxy X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F50%2F8350%2F1;p=controller.git Implement rest of the methods for TransactionProxy Implement the following methods, - write - merge - delete - getIdentifier - ready - close Introduce the ThreePhaseCommitCohortProxy Change-Id: Ibe8427f516bea9135b34dc9253edda0e79cddd7e Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java new file mode 100644 index 0000000000..197b3b70ce --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorPath; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; + +import java.util.Collections; +import java.util.List; + +/** + * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies + */ +public class ThreePhaseCommitCohortProxy implements + DOMStoreThreePhaseCommitCohort{ + + private final List cohortPaths; + + public ThreePhaseCommitCohortProxy(List cohortPaths) { + + this.cohortPaths = cohortPaths; + } + + @Override public ListenableFuture canCommit() { + throw new UnsupportedOperationException("canCommit"); + } + + @Override public ListenableFuture preCommit() { + throw new UnsupportedOperationException("preCommit"); + } + + @Override public ListenableFuture abort() { + throw new UnsupportedOperationException("abort"); + } + + @Override public ListenableFuture commit() { + throw new UnsupportedOperationException("commit"); + } + + public List getCohortPaths() { + return Collections.unmodifiableList(this.cohortPaths); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index c3c7e7c00c..32bb7d0951 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -8,35 +8,46 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorPath; import akka.actor.ActorSelection; import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFutureTask; +import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.DeleteData; +import org.opendaylight.controller.cluster.datastore.messages.MergeData; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; /** * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard - * + *

* Creating a transaction on the consumer side will create one instance of a transaction proxy. If during * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will * be created on each of those shards by the TransactionProxy - * + *

+ *

* The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various * shards will be executed. - * + *

*/ public class TransactionProxy implements DOMStoreReadWriteTransaction { @@ -46,14 +57,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { READ_WRITE } + private static final AtomicLong counter = new AtomicLong(); + private final TransactionType readOnly; private final ActorContext actorContext; private final Map remoteTransactionPaths = new HashMap<>(); + private final String identifier; public TransactionProxy( ActorContext actorContext, TransactionType readOnly) { + this.identifier = "transaction-" + counter.getAndIncrement(); this.readOnly = readOnly; this.actorContext = actorContext; @@ -95,32 +110,51 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void write(InstanceIdentifier path, NormalizedNode data) { - throw new UnsupportedOperationException("write"); + final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path); + remoteTransaction.tell(new WriteData(path, data), null); } @Override public void merge(InstanceIdentifier path, NormalizedNode data) { - throw new UnsupportedOperationException("merge"); + final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path); + remoteTransaction.tell(new MergeData(path, data), null); } @Override public void delete(InstanceIdentifier path) { - throw new UnsupportedOperationException("delete"); + final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path); + remoteTransaction.tell(new DeleteData(path), null); } @Override public DOMStoreThreePhaseCommitCohort ready() { - throw new UnsupportedOperationException("ready"); + List cohortPaths = new ArrayList<>(); + + for(ActorSelection remoteTransaction : remoteTransactionPaths.values()) { + Object result = actorContext.executeRemoteOperation(remoteTransaction, + new ReadyTransaction(), + ActorContext.ASK_DURATION + ); + + if(result instanceof ReadyTransactionReply){ + ReadyTransactionReply reply = (ReadyTransactionReply) result; + cohortPaths.add(reply.getCohortPath()); + } + } + + return new ThreePhaseCommitCohortProxy(cohortPaths); } @Override public Object getIdentifier() { - throw new UnsupportedOperationException("getIdentifier"); + return this.identifier; } @Override public void close() { - throw new UnsupportedOperationException("close"); + for(ActorSelection remoteTransaction : remoteTransactionPaths.values()) { + remoteTransaction.tell(new CloseTransaction(), null); + } } private ActorSelection remoteTransactionFromIdentifier(InstanceIdentifier path){ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index db33e862fe..6d057a4dbe 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -2,55 +2,224 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Props; -import akka.testkit.JavaTestKit; import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; import junit.framework.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.DeleteData; +import org.opendaylight.controller.cluster.datastore.messages.MergeData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; +import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import java.util.List; + public class TransactionProxyTest extends AbstractActorTest { + @Test + public void testRead() throws Exception { + final Props props = Props.create(DoNothingActor.class); + final ActorRef actorRef = getSystem().actorOf(props); + + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); + actorContext.setExecuteRemoteOperationResponse("message"); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY); + + + ListenableFuture>> read = + transactionProxy.read(TestModel.TEST_PATH); + + Optional> normalizedNodeOptional = read.get(); + Assert.assertFalse(normalizedNodeOptional.isPresent()); + + actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( + ImmutableNodes.containerNode(TestModel.TEST_QNAME))); + + read = transactionProxy.read(TestModel.TEST_PATH); + + normalizedNodeOptional = read.get(); + + Assert.assertTrue(normalizedNodeOptional.isPresent()); + } @Test - public void testRead() throws Exception { + public void testWrite() throws Exception { + final Props props = Props.create(MessageCollectorActor.class); + final ActorRef actorRef = getSystem().actorOf(props); + + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); + actorContext.setExecuteRemoteOperationResponse("message"); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.NAME_QNAME)); + + ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class))); + Object messages = testContext + .executeLocalOperation(actorRef, "messages", + ActorContext.ASK_DURATION); + + Assert.assertNotNull(messages); + + Assert.assertTrue(messages instanceof List); + + List listMessages = (List) messages; + + Assert.assertEquals(1, listMessages.size()); + + Assert.assertTrue(listMessages.get(0) instanceof WriteData); + } + + @Test + public void testMerge() throws Exception { + final Props props = Props.create(MessageCollectorActor.class); + final ActorRef actorRef = getSystem().actorOf(props); + + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); + actorContext.setExecuteRemoteOperationResponse("message"); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY); + + transactionProxy.merge(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.NAME_QNAME)); + + ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class))); + Object messages = testContext + .executeLocalOperation(actorRef, "messages", + ActorContext.ASK_DURATION); + + Assert.assertNotNull(messages); + + Assert.assertTrue(messages instanceof List); - new JavaTestKit(getSystem()) {{ - final Props props = Props.create(DoNothingActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + List listMessages = (List) messages; - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); - actorContext.setExecuteRemoteOperationResponse("message"); + Assert.assertEquals(1, listMessages.size()); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + Assert.assertTrue(listMessages.get(0) instanceof MergeData); + } + + @Test + public void testDelete() throws Exception { + final Props props = Props.create(MessageCollectorActor.class); + final ActorRef actorRef = getSystem().actorOf(props); + + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); + actorContext.setExecuteRemoteOperationResponse("message"); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY); + + transactionProxy.delete(TestModel.TEST_PATH); + + ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class))); + Object messages = testContext + .executeLocalOperation(actorRef, "messages", + ActorContext.ASK_DURATION); + + Assert.assertNotNull(messages); + + Assert.assertTrue(messages instanceof List); + + List listMessages = (List) messages; + + Assert.assertEquals(1, listMessages.size()); + + Assert.assertTrue(listMessages.get(0) instanceof DeleteData); + } + + @Test + public void testReady() throws Exception { + final Props props = Props.create(DoNothingActor.class); + final ActorRef doNothingActorRef = getSystem().actorOf(props); + + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(doNothingActorRef.path())); + actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path())); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY); + + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0); + + } + + @Test + public void testGetIdentifier(){ + final Props props = Props.create(DoNothingActor.class); + final ActorRef doNothingActorRef = getSystem().actorOf(props); + + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteShardOperationResponse( + new CreateTransactionReply(doNothingActorRef.path())); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY); + + Assert.assertNotNull(transactionProxy.getIdentifier()); + } + + @Test + public void testClose(){ + final Props props = Props.create(MessageCollectorActor.class); + final ActorRef actorRef = getSystem().actorOf(props); + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); + actorContext.setExecuteRemoteOperationResponse("message"); - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY); - Optional> normalizedNodeOptional = read.get(); + transactionProxy.close(); - Assert.assertFalse(normalizedNodeOptional.isPresent()); + ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class))); + Object messages = testContext + .executeLocalOperation(actorRef, "messages", + ActorContext.ASK_DURATION); - actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( - ImmutableNodes.containerNode(TestModel.TEST_QNAME))); + Assert.assertNotNull(messages); - read = transactionProxy.read(TestModel.TEST_PATH); + Assert.assertTrue(messages instanceof List); - normalizedNodeOptional = read.get(); + List listMessages = (List) messages; - Assert.assertTrue(normalizedNodeOptional.isPresent()); + Assert.assertEquals(1, listMessages.size()); - }}; + Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java new file mode 100644 index 0000000000..f75aa5445b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import akka.actor.UntypedActor; + +import java.util.ArrayList; +import java.util.List; + +/** + * MessageCollectorActor collects messages as it receives them. It can send + * those collected messages to any sender which sends it the "messages" message + *

+ * This class would be useful as a mock to test whether messages were sent + * to a remote actor or not. + *

+ */ +public class MessageCollectorActor extends UntypedActor { + private List messages = new ArrayList<>(); + + @Override public void onReceive(Object message) throws Exception { + if(message instanceof String){ + if("messages".equals(message)){ + getSender().tell(messages, getSelf()); + } + } else { + messages.add(message); + } + } +}