From: Moiz Raja Date: Tue, 24 Jun 2014 21:41:58 +0000 (-0700) Subject: Implement TransactionProxy#read X-Git-Tag: release/helium~574^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=3df36bc424589cee4cbbc74e1e75b883d32046ad Implement TransactionProxy#read - Move utility code into ActorContext - Create a reusable MockActorContext for tests - Update DistributedDataStore to use the common code - Update TransactionChainProxy to construct the appropriate TransactionProxy's Change-Id: I30386d2370f2683020e05d8a3842c0c63708df6f Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 29fc259bb7..f64c6f1a86 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -9,14 +9,11 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; -import akka.actor.ActorSelection; import akka.actor.ActorSystem; -import akka.util.Timeout; -import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.sal.core.spi.data.DOMStore; @@ -31,14 +28,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; - -import java.util.concurrent.TimeUnit; - -import static akka.pattern.Patterns.ask; /** * @@ -48,93 +37,59 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); - final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS); - final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS); - private final ActorRef shardManager; - private final ActorSystem actorSystem; private final String type; - + private final ActorContext actorContext; public DistributedDataStore(ActorSystem actorSystem, String type) { - this.actorSystem = actorSystem; + this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type))), type); + } + + public DistributedDataStore(ActorContext actorContext, String type) { this.type = type; - shardManager = actorSystem.actorOf(ShardManager.props(type)); + this.actorContext = actorContext; } + @Override public >> ListenerRegistration registerChangeListener( InstanceIdentifier path, L listener, AsyncDataBroker.DataChangeScope scope) { - ActorSelection primary = findPrimary(); + ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(DataChangeListener.props()); - ActorRef dataChangeListenerActor = actorSystem.actorOf(DataChangeListener.props()); - - Object result = - getResult(primary, new RegisterChangeListener(path, dataChangeListenerActor.path(), - AsyncDataBroker.DataChangeScope.BASE), ASK_DURATION); + Object result = actorContext.executeShardOperation(Shard.DEFAULT_NAME, + new RegisterChangeListener(path, dataChangeListenerActor.path(), + AsyncDataBroker.DataChangeScope.BASE), + ActorContext.ASK_DURATION); RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result; return new ListenerRegistrationProxy(reply.getListenerRegistrationPath()); } - private ActorSelection findPrimary() { - Object result = getResult(shardManager, new FindPrimary(Shard.DEFAULT_NAME), ASK_DURATION); - - if(result instanceof PrimaryFound){ - PrimaryFound found = (PrimaryFound) result; - LOG.error("Primary found {}", found.getPrimaryPath()); - - return actorSystem.actorSelection(found.getPrimaryPath()); - } - throw new RuntimeException("primary was not found"); - } - - private Object getResult(ActorRef actor, Object message, FiniteDuration duration){ - Future future = - ask(actor, message, new Timeout(duration)); - - try { - return Await.result(future, AWAIT_DURATION); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private Object getResult(ActorSelection actor, Object message, FiniteDuration duration){ - Future future = - ask(actor, message, new Timeout(duration)); - - try { - return Await.result(future, AWAIT_DURATION); - } catch (Exception e) { - throw new RuntimeException(e); - } - } @Override public DOMStoreTransactionChain createTransactionChain() { - return new TransactionChainProxy(); + return new TransactionChainProxy(actorContext); } @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - return new TransactionProxy(); + return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return new TransactionProxy(); + return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return new TransactionProxy(); + return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE); } @Override public void onGlobalContextUpdated(SchemaContext schemaContext) { - shardManager.tell(new UpdateSchemaContext(schemaContext), null); + actorContext.getShardManager().tell(new UpdateSchemaContext(schemaContext), null); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 1ee0d89e61..837ffc1b51 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; @@ -17,23 +18,32 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard */ public class TransactionChainProxy implements DOMStoreTransactionChain{ + private final ActorContext actorContext; + + public TransactionChainProxy(ActorContext actorContext) { + this.actorContext = actorContext; + } + @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - throw new UnsupportedOperationException("newReadOnlyTransaction"); + return new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - throw new UnsupportedOperationException("newReadWriteTransaction"); + return new TransactionProxy(actorContext, + TransactionProxy.TransactionType.WRITE_ONLY); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - throw new UnsupportedOperationException("newWriteOnlyTransaction"); + return new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_WRITE); } @Override public void close() { - throw new UnsupportedOperationException("close"); + throw new UnsupportedOperationException("close - not sure what to do here?"); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 609dea0b36..c3c7e7c00c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -8,13 +8,25 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorSelection; import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; +import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; + /** * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard * @@ -27,9 +39,58 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; * */ public class TransactionProxy implements DOMStoreReadWriteTransaction { + + public enum TransactionType { + READ_ONLY, + WRITE_ONLY, + READ_WRITE + } + + private final TransactionType readOnly; + private final ActorContext actorContext; + private final Map remoteTransactionPaths = new HashMap<>(); + + public TransactionProxy( + ActorContext actorContext, + TransactionType readOnly) { + + this.readOnly = readOnly; + this.actorContext = actorContext; + + Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(), ActorContext.ASK_DURATION); + if(response instanceof CreateTransactionReply){ + CreateTransactionReply reply = (CreateTransactionReply) response; + remoteTransactionPaths.put(Shard.DEFAULT_NAME, actorContext.actorSelection(reply.getTransactionPath())); + } + } + @Override - public ListenableFuture>> read(InstanceIdentifier path) { - throw new UnsupportedOperationException("read"); + public ListenableFuture>> read(final InstanceIdentifier path) { + final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path); + + Callable>> call = new Callable() { + + @Override public Optional> call() throws Exception { + Object response = actorContext + .executeRemoteOperation(remoteTransaction, new ReadData(path), + ActorContext.ASK_DURATION); + if(response instanceof ReadDataReply){ + ReadDataReply reply = (ReadDataReply) response; + //FIXME : A cast should not be required here ??? + return (Optional>) Optional.of(reply.getNormalizedNode()); + } + + return Optional.absent(); + } + }; + + ListenableFutureTask>> + future = ListenableFutureTask.create(call); + + //FIXME : Use a thread pool here + Executors.newSingleThreadExecutor().submit(future); + + return future; } @Override @@ -61,4 +122,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public void close() { throw new UnsupportedOperationException("close"); } + + private ActorSelection remoteTransactionFromIdentifier(InstanceIdentifier path){ + String shardName = shardNameFromIdentifier(path); + return remoteTransactionPaths.get(shardName); + } + + private String shardNameFromIdentifier(InstanceIdentifier path){ + return Shard.DEFAULT_NAME; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java new file mode 100644 index 0000000000..ba4d4de6bf --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -0,0 +1,144 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import akka.actor.ActorPath; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.util.Timeout; +import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +import static akka.pattern.Patterns.ask; + +/** + * The ActorContext class contains utility methods which could be used by + * non-actors (like DistributedDataStore) to work with actors a little more + * easily. An ActorContext can be freely passed around to local object instances + * but should not be passed to actors especially remote actors + */ +public class ActorContext { + private static final Logger + LOG = LoggerFactory.getLogger(ActorContext.class); + + public static final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS); + public static final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS); + + private final ActorSystem actorSystem; + private final ActorRef shardManager; + + public ActorContext(ActorSystem actorSystem, ActorRef shardManager){ + this.actorSystem = actorSystem; + this.shardManager = shardManager; + } + + public ActorSystem getActorSystem() { + return actorSystem; + } + + public ActorRef getShardManager() { + return shardManager; + } + + public ActorSelection actorSelection(String actorPath){ + return actorSystem.actorSelection(actorPath); + } + + public ActorSelection actorSelection(ActorPath actorPath){ + return actorSystem.actorSelection(actorPath); + } + + + /** + * Finds the primary for a given shard + * + * @param shardName + * @return + */ + public ActorSelection findPrimary(String shardName) { + Object result = executeLocalOperation(shardManager, + new FindPrimary(shardName), ASK_DURATION); + + if(result instanceof PrimaryFound){ + PrimaryFound found = (PrimaryFound) result; + + LOG.error("Primary found {}", found.getPrimaryPath()); + + return actorSystem.actorSelection(found.getPrimaryPath()); + } + throw new RuntimeException("primary was not found"); + } + + /** + * Executes an operation on a local actor and wait for it's response + * @param actor + * @param message + * @param duration + * @return The response of the operation + */ + public Object executeLocalOperation(ActorRef actor, Object message, + FiniteDuration duration){ + Future future = + ask(actor, message, new Timeout(duration)); + + try { + return Await.result(future, AWAIT_DURATION); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Execute an operation on a remote actor and wait for it's response + * @param actor + * @param message + * @param duration + * @return + */ + public Object executeRemoteOperation(ActorSelection actor, Object message, + FiniteDuration duration){ + Future future = + ask(actor, message, new Timeout(duration)); + + try { + return Await.result(future, AWAIT_DURATION); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Execute an operation on the primary for a given shard + *

+ * This method first finds the primary for a given shard ,then sends + * the message to the remote shard and waits for a response + *

+ * @param shardName + * @param message + * @param duration + * @throws java.lang.RuntimeException when a primary is not found or if the message to the remote shard fails or times out + * + * @return + */ + public Object executeShardOperation(String shardName, Object message, FiniteDuration duration){ + ActorSelection primary = findPrimary(shardName); + + return executeRemoteOperation(primary, message, duration); + } + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java index 2a9356e63d..3a74a4ca76 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java @@ -1,6 +1,12 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; +import akka.actor.Props; import junit.framework.Assert; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; +import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; @@ -16,11 +22,24 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; public class DistributedDataStoreTest extends AbstractActorTest{ private DistributedDataStore distributedDataStore; + private MockActorContext mockActorContext; + private ActorRef doNothingActorRef; @org.junit.Before public void setUp() throws Exception { - distributedDataStore = new DistributedDataStore(getSystem(), "config"); - distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext()); + final Props props = Props.create(DoNothingActor.class); + + doNothingActorRef = getSystem().actorOf(props); + + mockActorContext = new MockActorContext(getSystem(), doNothingActorRef); + distributedDataStore = new DistributedDataStore(mockActorContext, "config"); + distributedDataStore.onGlobalContextUpdated( + TestModel.createTestContext()); + + // Make CreateTransactionReply as the default response. Will need to be + // tuned if a specific test requires some other response + mockActorContext.setExecuteShardOperationResponse( + new CreateTransactionReply(doNothingActorRef.path())); } @org.junit.After @@ -30,6 +49,7 @@ public class DistributedDataStoreTest extends AbstractActorTest{ @org.junit.Test public void testRegisterChangeListener() throws Exception { + mockActorContext.setExecuteShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path())); ListenerRegistration registration = distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener>() { @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java new file mode 100644 index 0000000000..db33e862fe --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -0,0 +1,56 @@ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.ListenableFuture; +import junit.framework.Assert; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; +import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; +import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; + +public class TransactionProxyTest extends AbstractActorTest { + + + + @Test + public void testRead() throws Exception { + + new JavaTestKit(getSystem()) {{ + final Props props = Props.create(DoNothingActor.class); + final ActorRef actorRef = getSystem().actorOf(props); + + final MockActorContext actorContext = new MockActorContext(this.getSystem()); + actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); + actorContext.setExecuteRemoteOperationResponse("message"); + + TransactionProxy transactionProxy = + new TransactionProxy(actorContext, + TransactionProxy.TransactionType.READ_ONLY); + + + ListenableFuture>> read = + transactionProxy.read(TestModel.TEST_PATH); + + Optional> normalizedNodeOptional = read.get(); + + Assert.assertFalse(normalizedNodeOptional.isPresent()); + + actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( + ImmutableNodes.containerNode(TestModel.TEST_QNAME))); + + read = transactionProxy.read(TestModel.TEST_PATH); + + normalizedNodeOptional = read.get(); + + Assert.assertTrue(normalizedNodeOptional.isPresent()); + + }}; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/DoNothingActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/DoNothingActor.java new file mode 100644 index 0000000000..819cfd08c7 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/DoNothingActor.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import akka.actor.UntypedActor; + +public class DoNothingActor extends UntypedActor { + + @Override public void onReceive(Object message) throws Exception { + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java new file mode 100644 index 0000000000..fe62516098 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import scala.concurrent.duration.FiniteDuration; + +public class MockActorContext extends ActorContext { + + private Object executeShardOperationResponse; + private Object executeRemoteOperationResponse; + private Object executeLocalOperationResponse; + + public MockActorContext(ActorSystem actorSystem) { + super(actorSystem, null); + } + + public MockActorContext(ActorSystem actorSystem, ActorRef shardManager) { + super(actorSystem, shardManager); + } + + + @Override public Object executeShardOperation(String shardName, + Object message, FiniteDuration duration) { + return executeShardOperationResponse; + } + + @Override public Object executeRemoteOperation(ActorSelection actor, + Object message, FiniteDuration duration) { + return executeRemoteOperationResponse; + } + + @Override public ActorSelection findPrimary(String shardName) { + return null; + } + + public void setExecuteShardOperationResponse(Object response){ + executeShardOperationResponse = response; + } + + public void setExecuteRemoteOperationResponse(Object response){ + executeRemoteOperationResponse = response; + } + + public void setExecuteLocalOperationResponse( + Object executeLocalOperationResponse) { + this.executeLocalOperationResponse = executeLocalOperationResponse; + } + + +}