import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
+import org.opendaylight.controller.cluster.datastore.messages.Monitor;
public abstract class AbstractUntypedActor extends UntypedActor {
protected final LoggingAdapter LOG =
Logging.getLogger(getContext().system(), this);
+
+ public AbstractUntypedActor(){
+ LOG.debug("Actor created {}", getSelf());
+ getContext().
+ system().
+ actorSelection("user/termination-monitor").
+ tell(new Monitor(getSelf()), getSelf());
+ }
+
@Override public void onReceive(Object message) throws Exception {
LOG.debug("Received message {}", message);
handleReceive(message);
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.google.common.base.Function;
import com.typesafe.config.ConfigFactory;
+import javax.annotation.Nullable;
+
public class ActorSystemFactory {
- private static final ActorSystem actorSystem =
- ActorSystem.create("opendaylight-cluster", ConfigFactory
- .load().getConfig("ODLCluster"));
+ private static final ActorSystem actorSystem = (new Function<Void, ActorSystem>(){
+
+ @Nullable @Override public ActorSystem apply(@Nullable Void aVoid) {
+ ActorSystem system =
+ ActorSystem.create("opendaylight-cluster", ConfigFactory
+ .load().getConfig("ODLCluster"));
+ system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+ return system;
+ }
+ }).apply(null);
public static final ActorSystem getInstance(){
return actorSystem;
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.japi.Creator;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-public class DataChangeListenerRegistration extends AbstractUntypedActor{
+public class DataChangeListenerRegistration extends AbstractUntypedActor {
- private final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration;
+ private final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
+ registration;
- public DataChangeListenerRegistration(
- org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration) {
- this.registration = registration;
- }
+ public DataChangeListenerRegistration(
+ org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+ this.registration = registration;
+ }
+
+ @Override
+ public void handleReceive(Object message) throws Exception {
+ if (message instanceof CloseDataChangeListenerRegistration) {
+ closeListenerRegistration(
+ (CloseDataChangeListenerRegistration) message);
+ }
+ }
+
+ public static Props props(
+ final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+ return Props.create(new Creator<DataChangeListenerRegistration>() {
+
+ @Override
+ public DataChangeListenerRegistration create() throws Exception {
+ return new DataChangeListenerRegistration(registration);
+ }
+ });
+ }
- @Override
- public void handleReceive(Object message) throws Exception {
- if(message instanceof CloseDataChangeListenerRegistration){
- closeListenerRegistration((CloseDataChangeListenerRegistration) message);
+ private void closeListenerRegistration(
+ CloseDataChangeListenerRegistration message) {
+ registration.close();
+ getSender()
+ .tell(new CloseDataChangeListenerRegistrationReply(), getSelf());
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
}
- }
-
- public static Props props(final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration){
- return Props.create(new Creator<DataChangeListenerRegistration>(){
-
- @Override
- public DataChangeListenerRegistration create() throws Exception {
- return new DataChangeListenerRegistration(registration);
- }
- });
- }
-
- private void closeListenerRegistration(CloseDataChangeListenerRegistration message){
- registration.close();
- getSender().tell(new CloseDataChangeListenerRegistrationReply(), getSelf());
- }
}
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
private final ActorSelection listenerRegistrationActor;
private final AsyncDataChangeListener listener;
+ private final ActorRef dataChangeListenerActor;
public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
DataChangeListenerRegistrationProxy(
ActorSelection listenerRegistrationActor,
- L listener) {
+ L listener, ActorRef dataChangeListenerActor) {
this.listenerRegistrationActor = listenerRegistrationActor;
this.listener = listener;
+ this.dataChangeListenerActor = dataChangeListenerActor;
}
@Override
@Override
public void close() {
listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration(), null);
+ dataChangeListenerActor.tell(PoisonPill.getInstance(), null);
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
/**
*
*/
private final String type;
private final ActorContext actorContext;
+
+ /**
+ * Executor used to run FutureTask's
+ *
+ * This is typically used when we need to make a request to an actor and
+ * wait for it's response and the consumer needs to be provided a Future.
+ *
+ * FIXME : Make the thread pool configurable
+ */
+ private final ExecutorService executor =
+ Executors.newFixedThreadPool(10);
+
public DistributedDataStore(ActorSystem actorSystem, String type) {
- this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type))), type);
+ this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type), "shardmanager-" + type)), type);
}
public DistributedDataStore(ActorContext actorContext, String type) {
);
RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
- return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener);
+ return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener, dataChangeListenerActor);
}
@Override
public DOMStoreTransactionChain createTransactionChain() {
- return new TransactionChainProxy(actorContext);
+ return new TransactionChainProxy(actorContext, executor);
}
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
+ return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
+ executor);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
+ return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY,
+ executor);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
+ return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE,
+ executor);
}
@Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
} else if (message instanceof Persistent) {
commit((Modification) ((Persistent) message).payload());
} else if (message instanceof CreateTransaction) {
- createTransaction();
+ createTransaction((CreateTransaction) message);
} else if(message instanceof NonPersistent){
commit((Modification) ((NonPersistent) message).payload());
}
}
- private void createTransaction() {
+ private void createTransaction(CreateTransaction createTransaction) {
DOMStoreReadWriteTransaction transaction =
store.newReadWriteTransaction();
ActorRef transactionActor = getContext().actorOf(
- ShardTransaction.props(transaction, getSelf()));
+ ShardTransaction.props(transaction, getSelf()), "shard-" + createTransaction.getTransactionId());
getSender()
- .tell(new CreateTransactionReply(transactionActor.path()),
+ .tell(new CreateTransactionReply(transactionActor.path(), createTransaction.getTransactionId()),
getSelf());
}
future.get();
sender.tell(new CommitTransactionReply(), self);
} catch (InterruptedException | ExecutionException e) {
+ // FIXME : Handle this properly
log.error(e, "An exception happened when committing");
}
}
}
private void handleForwardedCommit(ForwardedCommitTransaction message) {
- log.info("received forwarded transaction");
modificationToCohort
.put(message.getModification(), message.getCohort());
if(persistent) {
* configuration or operational
*/
private ShardManager(String type){
- ActorRef actor = getContext().actorOf(Shard.props(Shard.DEFAULT_NAME + "-" + type));
+ ActorRef actor = getContext().actorOf(Shard.props("shard-" + Shard.DEFAULT_NAME + "-" + type), "shard-" + Shard.DEFAULT_NAME + "-" + type);
defaultShardPath = actor.path();
}
private void readyTransaction(ReadyTransaction message) {
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
ActorRef cohortActor = getContext().actorOf(
- ThreePhaseCommitCohort.props(cohort, shardActor, modification));
+ ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort");
getSender()
.tell(new ReadyTransactionReply(cohortActor.path()), getSelf());
/**
* The ShardTransactionChain Actor represents a remote TransactionChain
*/
-public class ShardTransactionChain extends AbstractUntypedActor{
-
- private final DOMStoreTransactionChain chain;
-
- public ShardTransactionChain(DOMStoreTransactionChain chain) {
- this.chain = chain;
- }
-
- @Override
- public void handleReceive(Object message) throws Exception {
- if(message instanceof CreateTransaction){
- DOMStoreReadWriteTransaction transaction = chain.newReadWriteTransaction();
- ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(chain, transaction, getContext().parent()));
- getSender().tell(new CreateTransactionReply(transactionActor.path()), getSelf());
- } else if (message instanceof CloseTransactionChain){
- chain.close();
- getSender().tell(new CloseTransactionChainReply(), getSelf());
+public class ShardTransactionChain extends AbstractUntypedActor {
+
+ private final DOMStoreTransactionChain chain;
+
+ public ShardTransactionChain(DOMStoreTransactionChain chain) {
+ this.chain = chain;
}
- }
- public static Props props(final DOMStoreTransactionChain chain){
- return Props.create(new Creator<ShardTransactionChain>(){
+ @Override
+ public void handleReceive(Object message) throws Exception {
+ if (message instanceof CreateTransaction) {
+ CreateTransaction createTransaction = (CreateTransaction) message;
+ createTransaction(createTransaction);
+ } else if (message instanceof CloseTransactionChain) {
+ chain.close();
+ getSender().tell(new CloseTransactionChainReply(), getSelf());
+ }
+ }
- @Override
- public ShardTransactionChain create() throws Exception {
- return new ShardTransactionChain(chain);
- }
- });
- }
+ private void createTransaction(CreateTransaction createTransaction) {
+ DOMStoreReadWriteTransaction transaction =
+ chain.newReadWriteTransaction();
+ ActorRef transactionActor = getContext().actorOf(ShardTransaction
+ .props(chain, transaction, getContext().parent()), "shard-" + createTransaction.getTransactionId());
+ getSender()
+ .tell(new CreateTransactionReply(transactionActor.path(), createTransaction.getTransactionId()),
+ getSelf());
+ }
+
+ public static Props props(final DOMStoreTransactionChain chain) {
+ return Props.create(new Creator<ShardTransactionChain>() {
+
+ @Override
+ public ShardTransactionChain create() throws Exception {
+ return new ShardTransactionChain(chain);
+ }
+ });
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Terminated;
+import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import org.opendaylight.controller.cluster.datastore.messages.Monitor;
+
+public class TerminationMonitor extends UntypedActor{
+ protected final LoggingAdapter LOG =
+ Logging.getLogger(getContext().system(), this);
+
+ public TerminationMonitor(){
+ LOG.info("Created TerminationMonitor");
+ }
+
+ @Override public void onReceive(Object message) throws Exception {
+ if(message instanceof Terminated){
+ Terminated terminated = (Terminated) message;
+ LOG.debug("Actor terminated : {}", terminated.actor());
+ } else if(message instanceof Monitor){
+ Monitor monitor = (Monitor) message;
+ getContext().watch(monitor.getActorRef());
+ }
+ }
+}
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
/**
* ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
private final ActorContext actorContext;
private final List<ActorPath> cohortPaths;
- //FIXME : Use a thread pool here
- private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ private final ExecutorService executor;
+ private final String transactionId;
- public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths) {
+ public ThreePhaseCommitCohortProxy(ActorContext actorContext,
+ List<ActorPath> cohortPaths,
+ String transactionId,
+ ExecutorService executor) {
+
this.actorContext = actorContext;
this.cohortPaths = cohortPaths;
+ this.transactionId = transactionId;
+ this.executor = executor;
}
@Override public ListenableFuture<Boolean> canCommit() {
ListenableFutureTask<Boolean>
future = ListenableFutureTask.create(call);
- executorService.submit(future);
+ executor.submit(future);
return future;
}
ListenableFutureTask<Void>
future = ListenableFutureTask.create(call);
- executorService.submit(future);
+ executor.submit(future);
return future;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import java.util.concurrent.ExecutorService;
+
/**
* TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
*/
public class TransactionChainProxy implements DOMStoreTransactionChain{
private final ActorContext actorContext;
+ private final ExecutorService transactionExecutor;
- public TransactionChainProxy(ActorContext actorContext) {
+ public TransactionChainProxy(ActorContext actorContext, ExecutorService transactionExecutor) {
this.actorContext = actorContext;
+ this.transactionExecutor = transactionExecutor;
}
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.WRITE_ONLY);
+ TransactionProxy.TransactionType.WRITE_ONLY, transactionExecutor);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_WRITE);
+ TransactionProxy.TransactionType.READ_WRITE, transactionExecutor);
}
@Override
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
/**
* </p>
*/
public class TransactionProxy implements DOMStoreReadWriteTransaction {
-
public enum TransactionType {
READ_ONLY,
WRITE_ONLY,
private final ActorContext actorContext;
private final Map<String, ActorSelection> remoteTransactionPaths = new HashMap<>();
private final String identifier;
+ private final ExecutorService executor;
public TransactionProxy(
ActorContext actorContext,
- TransactionType transactionType) {
+ TransactionType transactionType,
+ ExecutorService executor
+ ) {
- this.identifier = "transaction-" + counter.getAndIncrement();
+ this.identifier = "txn-" + counter.getAndIncrement();
this.transactionType = transactionType;
this.actorContext = actorContext;
+ this.executor = executor;
- Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(), ActorContext.ASK_DURATION);
+ Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(identifier), ActorContext.ASK_DURATION);
if(response instanceof CreateTransactionReply){
CreateTransactionReply reply = (CreateTransactionReply) response;
remoteTransactionPaths.put(Shard.DEFAULT_NAME, actorContext.actorSelection(reply.getTransactionPath()));
ListenableFutureTask<Optional<NormalizedNode<?, ?>>>
future = ListenableFutureTask.create(call);
- //FIXME : Use a thread pool here
- Executors.newSingleThreadExecutor().submit(future);
+ executor.submit(future);
return future;
}
}
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths);
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor);
}
@Override
package org.opendaylight.controller.cluster.datastore.messages;
public class CreateTransaction {
+ private final String transactionId;
+ public CreateTransaction(String transactionId){
+
+ this.transactionId = transactionId;
+ }
+
+ public String getTransactionId() {
+ return transactionId;
+ }
}
import akka.actor.ActorPath;
public class CreateTransactionReply {
- private final ActorPath transactionPath;
+ private final ActorPath transactionPath;
+ private final String transactionId;
- public CreateTransactionReply(ActorPath transactionPath) {
- this.transactionPath = transactionPath;
- }
+ public CreateTransactionReply(ActorPath transactionPath,
+ String transactionId) {
+ this.transactionPath = transactionPath;
+ this.transactionId = transactionId;
+ }
- public ActorPath getTransactionPath() {
- return transactionPath;
- }
+ public ActorPath getTransactionPath() {
+ return transactionPath;
+ }
+
+ public String getTransactionId() {
+ return transactionId;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+
+public class Monitor {
+ private final ActorRef actorRef;
+
+ public Monitor(ActorRef actorRef){
+
+ this.actorRef = actorRef;
+ }
+
+ public ActorRef getActorRef() {
+ return actorRef;
+ }
+}
Assert.assertNotNull(transactionChain);
- transactionChain.tell(new CreateTransaction(), getRef());
+ transactionChain.tell(new CreateTransaction("txn-1"), getRef());
final ActorSelection transaction =
new ExpectMsg<ActorSelection>("CreateTransactionReply") {
Assert.assertTrue(preCommitDone);
+ // FIXME : When we commit on the cohort it "kills" the Transaction.
+ // This in turn kills the child of Transaction as well.
+ // The order in which we receive the terminated event for both
+ // these actors is not fixed which may cause this test to fail
cohort.tell(new CommitTransaction(), getRef());
final Boolean terminatedCohort =
public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
+ private ActorRef dataChangeListenerActor = getSystem().actorOf(Props.create(DoNothingActor.class));
+
private static class MockDataChangeListener implements
AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> {
DataChangeListenerRegistrationProxy proxy =
new DataChangeListenerRegistrationProxy(
getSystem().actorSelection(actorRef.path()),
- listener);
+ listener, dataChangeListenerActor);
Assert.assertEquals(listener, proxy.getInstance());
DataChangeListenerRegistrationProxy proxy =
new DataChangeListenerRegistrationProxy(
getSystem().actorSelection(actorRef.path()),
- new MockDataChangeListener());
+ new MockDataChangeListener(), dataChangeListenerActor);
proxy.close();
// Make CreateTransactionReply as the default response. Will need to be
// tuned if a specific test requires some other response
mockActorContext.setExecuteShardOperationResponse(
- new CreateTransactionReply(doNothingActorRef.path()));
+ new CreateTransactionReply(doNothingActorRef.path(), "txn-1 "));
}
@org.junit.After
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ShardTest extends AbstractActorTest {
}
}.get(); // this extracts the received message
- assertTrue(out.matches(
- "akka:\\/\\/test\\/user\\/testCreateTransactionChain\\/\\$.*"));
- // Will wait for the rest of the 3 seconds
+ assertEquals("Unexpected transaction path " + out,
+ "akka://test/user/testCreateTransactionChain/$a",
+ out);
+
expectNoMsg();
}
new UpdateSchemaContext(TestModel.createTestContext()),
getRef());
- subject.tell(new CreateTransaction(),
+ subject.tell(new CreateTransaction("txn-1"),
getRef());
final String out = new ExpectMsg<String>("match hint") {
}
}.get(); // this extracts the received message
- assertTrue(out.matches(
- "akka:\\/\\/test\\/user\\/testCreateTransaction\\/\\$.*"));
- // Will wait for the rest of the 3 seconds
+ assertEquals("Unexpected transaction path " + out,
+ "akka://test/user/testCreateTransaction/shard-txn-1",
+ out);
expectNoMsg();
}
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
public class ShardTransactionChainTest extends AbstractActorTest {
new Within(duration("1 seconds")) {
protected void run() {
- subject.tell(new CreateTransaction(), getRef());
+ subject.tell(new CreateTransaction("txn-1"), getRef());
final String out = new ExpectMsg<String>("match hint") {
// do not put code outside this method, will run afterwards
}
}.get(); // this extracts the received message
- assertTrue(out.matches("akka:\\/\\/test\\/user\\/testCreateTransaction\\/\\$.*"));
- // Will wait for the rest of the 3 seconds
+ assertEquals("Unexpected transaction path " + out,
+ "akka://test/user/testCreateTransaction/shard-txn-1",
+ out);
+
+ // Will wait for the rest of the 3 seconds
expectNoMsg();
}
};
}};
}
-}
\ No newline at end of file
+}
import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import static org.junit.Assert.assertNotNull;
private Props props;
private ActorRef actorRef;
private MockActorContext actorContext;
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
@Before
public void setUp(){
proxy =
new ThreePhaseCommitCohortProxy(actorContext,
- Arrays.asList(actorRef.path()));
+ Arrays.asList(actorRef.path()), "txn-1", executor);
}
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
public class TransactionProxyTest extends AbstractActorTest {
+ private ExecutorService transactionExecutor =
+ Executors.newSingleThreadExecutor();
+
@Test
public void testRead() throws Exception {
final Props props = Props.create(DoNothingActor.class);
final ActorRef actorRef = getSystem().actorOf(props);
final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
actorContext.setExecuteRemoteOperationResponse("message");
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
final ActorRef actorRef = getSystem().actorOf(props);
final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
actorContext.setExecuteRemoteOperationResponse("message");
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
final ActorRef actorRef = getSystem().actorOf(props);
final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
actorContext.setExecuteRemoteOperationResponse("message");
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
transactionProxy.write(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.NAME_QNAME));
final ActorRef actorRef = getSystem().actorOf(props);
final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
actorContext.setExecuteRemoteOperationResponse("message");
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
transactionProxy.merge(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.NAME_QNAME));
final ActorRef actorRef = getSystem().actorOf(props);
final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
actorContext.setExecuteRemoteOperationResponse("message");
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
transactionProxy.delete(TestModel.TEST_PATH);
final ActorRef doNothingActorRef = getSystem().actorOf(props);
final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(doNothingActorRef.path()));
+ actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()));
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
final ActorRef doNothingActorRef = getSystem().actorOf(props);
final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse(
- new CreateTransactionReply(doNothingActorRef.path()));
+ actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
Assert.assertNotNull(transactionProxy.getIdentifier());
}
final ActorRef actorRef = getSystem().actorOf(props);
final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
actorContext.setExecuteRemoteOperationResponse("message");
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
transactionProxy.close();
Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction);
}
+
+ private CreateTransactionReply createTransactionReply(ActorRef actorRef){
+ return new CreateTransactionReply(actorRef.path(), "txn-1");
+ }
}