From 996ab3e41386da6b27cf21f6464ef1e55363e1ca Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Fri, 27 Jun 2014 13:45:05 -0700 Subject: [PATCH] Enhancements to actor naming, logging and monitoring - Actor names have now been changed to be more meaningful. This will be helpful when trying to follow the logging. - Added logging for when the actor is created and when it is terminated Change-Id: I825270779ce19c319807c5a3c56d4885f8cc0996 Signed-off-by: Moiz Raja --- .../datastore/AbstractUntypedActor.java | 10 +++ .../cluster/datastore/ActorSystemFactory.java | 17 +++++- .../DataChangeListenerRegistration.java | 59 ++++++++++-------- .../DataChangeListenerRegistrationProxy.java | 7 ++- .../datastore/DistributedDataStore.java | 30 +++++++-- .../controller/cluster/datastore/Shard.java | 10 +-- .../cluster/datastore/ShardManager.java | 2 +- .../cluster/datastore/ShardTransaction.java | 2 +- .../datastore/ShardTransactionChain.java | 61 +++++++++++-------- .../cluster/datastore/TerminationMonitor.java | 34 +++++++++++ .../ThreePhaseCommitCohortProxy.java | 17 ++++-- .../datastore/TransactionChainProxy.java | 12 ++-- .../cluster/datastore/TransactionProxy.java | 18 +++--- .../datastore/messages/CreateTransaction.java | 9 +++ .../messages/CreateTransactionReply.java | 21 ++++--- .../cluster/datastore/messages/Monitor.java | 24 ++++++++ .../datastore/BasicIntegrationTest.java | 6 +- ...taChangeListenerRegistrationProxyTest.java | 6 +- .../datastore/DistributedDataStoreTest.java | 2 +- .../cluster/datastore/ShardTest.java | 16 ++--- .../datastore/ShardTransactionChainTest.java | 12 ++-- .../datastore/ShardTransactionTest.java | 2 - .../ThreePhaseCommitCohortProxyTest.java | 5 +- .../datastore/TransactionProxyTest.java | 42 +++++++------ 24 files changed, 294 insertions(+), 130 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/Monitor.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java index aae468fbc6..0f10258e9e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java @@ -11,11 +11,21 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; +import org.opendaylight.controller.cluster.datastore.messages.Monitor; public abstract class AbstractUntypedActor extends UntypedActor { protected final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); + + public AbstractUntypedActor(){ + LOG.debug("Actor created {}", getSelf()); + getContext(). + system(). + actorSelection("user/termination-monitor"). + tell(new Monitor(getSelf()), getSelf()); + } + @Override public void onReceive(Object message) throws Exception { LOG.debug("Received message {}", message); handleReceive(message); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java index c562e6f50f..baf04fe43b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java @@ -9,12 +9,23 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSystem; +import akka.actor.Props; +import com.google.common.base.Function; import com.typesafe.config.ConfigFactory; +import javax.annotation.Nullable; + public class ActorSystemFactory { - private static final ActorSystem actorSystem = - ActorSystem.create("opendaylight-cluster", ConfigFactory - .load().getConfig("ODLCluster")); + private static final ActorSystem actorSystem = (new Function(){ + + @Nullable @Override public ActorSystem apply(@Nullable Void aVoid) { + ActorSystem system = + ActorSystem.create("opendaylight-cluster", ConfigFactory + .load().getConfig("ODLCluster")); + system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor"); + return system; + } + }).apply(null); public static final ActorSystem getInstance(){ return actorSystem; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java index c2eab0df44..dca9735487 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.PoisonPill; import akka.actor.Props; import akka.japi.Creator; import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; @@ -16,34 +17,40 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -public class DataChangeListenerRegistration extends AbstractUntypedActor{ +public class DataChangeListenerRegistration extends AbstractUntypedActor { - private final org.opendaylight.yangtools.concepts.ListenerRegistration>> registration; + private final org.opendaylight.yangtools.concepts.ListenerRegistration>> + registration; - public DataChangeListenerRegistration( - org.opendaylight.yangtools.concepts.ListenerRegistration>> registration) { - this.registration = registration; - } + public DataChangeListenerRegistration( + org.opendaylight.yangtools.concepts.ListenerRegistration>> registration) { + this.registration = registration; + } + + @Override + public void handleReceive(Object message) throws Exception { + if (message instanceof CloseDataChangeListenerRegistration) { + closeListenerRegistration( + (CloseDataChangeListenerRegistration) message); + } + } + + public static Props props( + final org.opendaylight.yangtools.concepts.ListenerRegistration>> registration) { + return Props.create(new Creator() { + + @Override + public DataChangeListenerRegistration create() throws Exception { + return new DataChangeListenerRegistration(registration); + } + }); + } - @Override - public void handleReceive(Object message) throws Exception { - if(message instanceof CloseDataChangeListenerRegistration){ - closeListenerRegistration((CloseDataChangeListenerRegistration) message); + private void closeListenerRegistration( + CloseDataChangeListenerRegistration message) { + registration.close(); + getSender() + .tell(new CloseDataChangeListenerRegistrationReply(), getSelf()); + getSelf().tell(PoisonPill.getInstance(), getSelf()); } - } - - public static Props props(final org.opendaylight.yangtools.concepts.ListenerRegistration>> registration){ - return Props.create(new Creator(){ - - @Override - public DataChangeListenerRegistration create() throws Exception { - return new DataChangeListenerRegistration(registration); - } - }); - } - - private void closeListenerRegistration(CloseDataChangeListenerRegistration message){ - registration.close(); - getSender().tell(new CloseDataChangeListenerRegistrationReply(), getSelf()); - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java index 89cc969525..83737cfac5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java @@ -8,7 +8,9 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; import akka.actor.ActorSelection; +import akka.actor.PoisonPill; import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.yangtools.concepts.ListenerRegistration; @@ -25,13 +27,15 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; public class DataChangeListenerRegistrationProxy implements ListenerRegistration { private final ActorSelection listenerRegistrationActor; private final AsyncDataChangeListener listener; + private final ActorRef dataChangeListenerActor; public >> DataChangeListenerRegistrationProxy( ActorSelection listenerRegistrationActor, - L listener) { + L listener, ActorRef dataChangeListenerActor) { this.listenerRegistrationActor = listenerRegistrationActor; this.listener = listener; + this.dataChangeListenerActor = dataChangeListenerActor; } @Override @@ -42,5 +46,6 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration @Override public void close() { listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration(), null); + dataChangeListenerActor.tell(PoisonPill.getInstance(), null); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 58b22a9970..4401104a85 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -29,6 +29,9 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + /** * */ @@ -41,8 +44,20 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au private final String type; private final ActorContext actorContext; + + /** + * Executor used to run FutureTask's + * + * This is typically used when we need to make a request to an actor and + * wait for it's response and the consumer needs to be provided a Future. + * + * FIXME : Make the thread pool configurable + */ + private final ExecutorService executor = + Executors.newFixedThreadPool(10); + public DistributedDataStore(ActorSystem actorSystem, String type) { - this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type))), type); + this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type), "shardmanager-" + type)), type); } public DistributedDataStore(ActorContext actorContext, String type) { @@ -66,29 +81,32 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au ); RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result; - return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener); + return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener, dataChangeListenerActor); } @Override public DOMStoreTransactionChain createTransactionChain() { - return new TransactionChainProxy(actorContext); + return new TransactionChainProxy(actorContext, executor); } @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY); + return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY, + executor); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY); + return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY, + executor); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE); + return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE, + executor); } @Override public void onGlobalContextUpdated(SchemaContext schemaContext) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 221e874fd5..3425608d23 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -105,19 +105,19 @@ public class Shard extends UntypedProcessor { } else if (message instanceof Persistent) { commit((Modification) ((Persistent) message).payload()); } else if (message instanceof CreateTransaction) { - createTransaction(); + createTransaction((CreateTransaction) message); } else if(message instanceof NonPersistent){ commit((Modification) ((NonPersistent) message).payload()); } } - private void createTransaction() { + private void createTransaction(CreateTransaction createTransaction) { DOMStoreReadWriteTransaction transaction = store.newReadWriteTransaction(); ActorRef transactionActor = getContext().actorOf( - ShardTransaction.props(transaction, getSelf())); + ShardTransaction.props(transaction, getSelf()), "shard-" + createTransaction.getTransactionId()); getSender() - .tell(new CreateTransactionReply(transactionActor.path()), + .tell(new CreateTransactionReply(transactionActor.path(), createTransaction.getTransactionId()), getSelf()); } @@ -139,6 +139,7 @@ public class Shard extends UntypedProcessor { future.get(); sender.tell(new CommitTransactionReply(), self); } catch (InterruptedException | ExecutionException e) { + // FIXME : Handle this properly log.error(e, "An exception happened when committing"); } } @@ -146,7 +147,6 @@ public class Shard extends UntypedProcessor { } private void handleForwardedCommit(ForwardedCommitTransaction message) { - log.info("received forwarded transaction"); modificationToCohort .put(message.getModification(), message.getCohort()); if(persistent) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 79e90c3fc9..250ef49e6f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -68,7 +68,7 @@ public class ShardManager extends AbstractUntypedActor { * configuration or operational */ private ShardManager(String type){ - ActorRef actor = getContext().actorOf(Shard.props(Shard.DEFAULT_NAME + "-" + type)); + ActorRef actor = getContext().actorOf(Shard.props("shard-" + Shard.DEFAULT_NAME + "-" + type), "shard-" + Shard.DEFAULT_NAME + "-" + type); defaultShardPath = actor.path(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index ff02bfbcce..e3d1e2d9d4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -190,7 +190,7 @@ public class ShardTransaction extends AbstractUntypedActor { private void readyTransaction(ReadyTransaction message) { DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); ActorRef cohortActor = getContext().actorOf( - ThreePhaseCommitCohort.props(cohort, shardActor, modification)); + ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort"); getSender() .tell(new ReadyTransactionReply(cohortActor.path()), getSelf()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java index 57c935b0ad..1092e9a793 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java @@ -21,33 +21,42 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; /** * The ShardTransactionChain Actor represents a remote TransactionChain */ -public class ShardTransactionChain extends AbstractUntypedActor{ - - private final DOMStoreTransactionChain chain; - - public ShardTransactionChain(DOMStoreTransactionChain chain) { - this.chain = chain; - } - - @Override - public void handleReceive(Object message) throws Exception { - if(message instanceof CreateTransaction){ - DOMStoreReadWriteTransaction transaction = chain.newReadWriteTransaction(); - ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(chain, transaction, getContext().parent())); - getSender().tell(new CreateTransactionReply(transactionActor.path()), getSelf()); - } else if (message instanceof CloseTransactionChain){ - chain.close(); - getSender().tell(new CloseTransactionChainReply(), getSelf()); +public class ShardTransactionChain extends AbstractUntypedActor { + + private final DOMStoreTransactionChain chain; + + public ShardTransactionChain(DOMStoreTransactionChain chain) { + this.chain = chain; } - } - public static Props props(final DOMStoreTransactionChain chain){ - return Props.create(new Creator(){ + @Override + public void handleReceive(Object message) throws Exception { + if (message instanceof CreateTransaction) { + CreateTransaction createTransaction = (CreateTransaction) message; + createTransaction(createTransaction); + } else if (message instanceof CloseTransactionChain) { + chain.close(); + getSender().tell(new CloseTransactionChainReply(), getSelf()); + } + } - @Override - public ShardTransactionChain create() throws Exception { - return new ShardTransactionChain(chain); - } - }); - } + private void createTransaction(CreateTransaction createTransaction) { + DOMStoreReadWriteTransaction transaction = + chain.newReadWriteTransaction(); + ActorRef transactionActor = getContext().actorOf(ShardTransaction + .props(chain, transaction, getContext().parent()), "shard-" + createTransaction.getTransactionId()); + getSender() + .tell(new CreateTransactionReply(transactionActor.path(), createTransaction.getTransactionId()), + getSelf()); + } + + public static Props props(final DOMStoreTransactionChain chain) { + return Props.create(new Creator() { + + @Override + public ShardTransactionChain create() throws Exception { + return new ShardTransactionChain(chain); + } + }); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java new file mode 100644 index 0000000000..e6ac7f8dbc --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.Terminated; +import akka.actor.UntypedActor; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import org.opendaylight.controller.cluster.datastore.messages.Monitor; + +public class TerminationMonitor extends UntypedActor{ + protected final LoggingAdapter LOG = + Logging.getLogger(getContext().system(), this); + + public TerminationMonitor(){ + LOG.info("Created TerminationMonitor"); + } + + @Override public void onReceive(Object message) throws Exception { + if(message instanceof Terminated){ + Terminated terminated = (Terminated) message; + LOG.debug("Actor terminated : {}", terminated.actor()); + } else if(message instanceof Monitor){ + Monitor monitor = (Monitor) message; + getContext().watch(monitor.getActorRef()); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index d12dc2b55a..279ecba409 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -30,7 +30,6 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies @@ -43,13 +42,19 @@ public class ThreePhaseCommitCohortProxy implements private final ActorContext actorContext; private final List cohortPaths; - //FIXME : Use a thread pool here - private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final ExecutorService executor; + private final String transactionId; - public ThreePhaseCommitCohortProxy(ActorContext actorContext, List cohortPaths) { + public ThreePhaseCommitCohortProxy(ActorContext actorContext, + List cohortPaths, + String transactionId, + ExecutorService executor) { + this.actorContext = actorContext; this.cohortPaths = cohortPaths; + this.transactionId = transactionId; + this.executor = executor; } @Override public ListenableFuture canCommit() { @@ -86,7 +91,7 @@ public class ThreePhaseCommitCohortProxy implements ListenableFutureTask future = ListenableFutureTask.create(call); - executorService.submit(future); + executor.submit(future); return future; } @@ -136,7 +141,7 @@ public class ThreePhaseCommitCohortProxy implements ListenableFutureTask future = ListenableFutureTask.create(call); - executorService.submit(future); + executor.submit(future); return future; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 91e903f9e8..71b61ffaa0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -14,32 +14,36 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import java.util.concurrent.ExecutorService; + /** * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard */ public class TransactionChainProxy implements DOMStoreTransactionChain{ private final ActorContext actorContext; + private final ExecutorService transactionExecutor; - public TransactionChainProxy(ActorContext actorContext) { + public TransactionChainProxy(ActorContext actorContext, ExecutorService transactionExecutor) { this.actorContext = actorContext; + this.transactionExecutor = transactionExecutor; } @Override public DOMStoreReadTransaction newReadOnlyTransaction() { return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.WRITE_ONLY); + TransactionProxy.TransactionType.WRITE_ONLY, transactionExecutor); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_WRITE); + TransactionProxy.TransactionType.READ_WRITE, transactionExecutor); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 00196ebd07..74245c4259 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -34,7 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; /** @@ -50,7 +50,6 @@ import java.util.concurrent.atomic.AtomicLong; *

*/ public class TransactionProxy implements DOMStoreReadWriteTransaction { - public enum TransactionType { READ_ONLY, WRITE_ONLY, @@ -63,16 +62,20 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final ActorContext actorContext; private final Map remoteTransactionPaths = new HashMap<>(); private final String identifier; + private final ExecutorService executor; public TransactionProxy( ActorContext actorContext, - TransactionType transactionType) { + TransactionType transactionType, + ExecutorService executor + ) { - this.identifier = "transaction-" + counter.getAndIncrement(); + this.identifier = "txn-" + counter.getAndIncrement(); this.transactionType = transactionType; this.actorContext = actorContext; + this.executor = executor; - Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(), ActorContext.ASK_DURATION); + Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(identifier), ActorContext.ASK_DURATION); if(response instanceof CreateTransactionReply){ CreateTransactionReply reply = (CreateTransactionReply) response; remoteTransactionPaths.put(Shard.DEFAULT_NAME, actorContext.actorSelection(reply.getTransactionPath())); @@ -105,8 +108,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { ListenableFutureTask>> future = ListenableFutureTask.create(call); - //FIXME : Use a thread pool here - Executors.newSingleThreadExecutor().submit(future); + executor.submit(future); return future; } @@ -145,7 +147,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } - return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths); + return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java index e0cdd3cc2b..6110641696 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java @@ -9,5 +9,14 @@ package org.opendaylight.controller.cluster.datastore.messages; public class CreateTransaction { + private final String transactionId; + public CreateTransaction(String transactionId){ + + this.transactionId = transactionId; + } + + public String getTransactionId() { + return transactionId; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java index 4faf9d370d..46b7194c84 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java @@ -11,13 +11,20 @@ package org.opendaylight.controller.cluster.datastore.messages; import akka.actor.ActorPath; public class CreateTransactionReply { - private final ActorPath transactionPath; + private final ActorPath transactionPath; + private final String transactionId; - public CreateTransactionReply(ActorPath transactionPath) { - this.transactionPath = transactionPath; - } + public CreateTransactionReply(ActorPath transactionPath, + String transactionId) { + this.transactionPath = transactionPath; + this.transactionId = transactionId; + } - public ActorPath getTransactionPath() { - return transactionPath; - } + public ActorPath getTransactionPath() { + return transactionPath; + } + + public String getTransactionId() { + return transactionId; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/Monitor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/Monitor.java new file mode 100644 index 0000000000..567f14aa6a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/Monitor.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +import akka.actor.ActorRef; + +public class Monitor { + private final ActorRef actorRef; + + public Monitor(ActorRef actorRef){ + + this.actorRef = actorRef; + } + + public ActorRef getActorRef() { + return actorRef; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java index 74c858e4a6..dfefc5ed57 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java @@ -77,7 +77,7 @@ public class BasicIntegrationTest extends AbstractActorTest { Assert.assertNotNull(transactionChain); - transactionChain.tell(new CreateTransaction(), getRef()); + transactionChain.tell(new CreateTransaction("txn-1"), getRef()); final ActorSelection transaction = new ExpectMsg("CreateTransactionReply") { @@ -152,6 +152,10 @@ public class BasicIntegrationTest extends AbstractActorTest { Assert.assertTrue(preCommitDone); + // FIXME : When we commit on the cohort it "kills" the Transaction. + // This in turn kills the child of Transaction as well. + // The order in which we receive the terminated event for both + // these actors is not fixed which may cause this test to fail cohort.tell(new CommitTransaction(), getRef()); final Boolean terminatedCohort = diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java index be7be1723d..33b5d95611 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java @@ -17,6 +17,8 @@ import java.util.List; public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{ + private ActorRef dataChangeListenerActor = getSystem().actorOf(Props.create(DoNothingActor.class)); + private static class MockDataChangeListener implements AsyncDataChangeListener> { @@ -36,7 +38,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{ DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( getSystem().actorSelection(actorRef.path()), - listener); + listener, dataChangeListenerActor); Assert.assertEquals(listener, proxy.getInstance()); @@ -50,7 +52,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{ DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy( getSystem().actorSelection(actorRef.path()), - new MockDataChangeListener()); + new MockDataChangeListener(), dataChangeListenerActor); proxy.close(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java index 3a74a4ca76..5f82b40140 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java @@ -39,7 +39,7 @@ public class DistributedDataStoreTest extends AbstractActorTest{ // Make CreateTransactionReply as the default response. Will need to be // tuned if a specific test requires some other response mockActorContext.setExecuteShardOperationResponse( - new CreateTransactionReply(doNothingActorRef.path())); + new CreateTransactionReply(doNothingActorRef.path(), "txn-1 ")); } @org.junit.After diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index b9ab8a3282..ed447e004f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -18,6 +18,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class ShardTest extends AbstractActorTest { @@ -47,9 +48,10 @@ public class ShardTest extends AbstractActorTest { } }.get(); // this extracts the received message - assertTrue(out.matches( - "akka:\\/\\/test\\/user\\/testCreateTransactionChain\\/\\$.*")); - // Will wait for the rest of the 3 seconds + assertEquals("Unexpected transaction path " + out, + "akka://test/user/testCreateTransactionChain/$a", + out); + expectNoMsg(); } @@ -115,7 +117,7 @@ public class ShardTest extends AbstractActorTest { new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - subject.tell(new CreateTransaction(), + subject.tell(new CreateTransaction("txn-1"), getRef()); final String out = new ExpectMsg("match hint") { @@ -132,9 +134,9 @@ public class ShardTest extends AbstractActorTest { } }.get(); // this extracts the received message - assertTrue(out.matches( - "akka:\\/\\/test\\/user\\/testCreateTransaction\\/\\$.*")); - // Will wait for the rest of the 3 seconds + assertEquals("Unexpected transaction path " + out, + "akka://test/user/testCreateTransaction/shard-txn-1", + out); expectNoMsg(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java index bc3a104656..b07cbfd87c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java @@ -14,7 +14,6 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; public class ShardTransactionChainTest extends AbstractActorTest { @@ -34,7 +33,7 @@ public class ShardTransactionChainTest extends AbstractActorTest { new Within(duration("1 seconds")) { protected void run() { - subject.tell(new CreateTransaction(), getRef()); + subject.tell(new CreateTransaction("txn-1"), getRef()); final String out = new ExpectMsg("match hint") { // do not put code outside this method, will run afterwards @@ -47,8 +46,11 @@ public class ShardTransactionChainTest extends AbstractActorTest { } }.get(); // this extracts the received message - assertTrue(out.matches("akka:\\/\\/test\\/user\\/testCreateTransaction\\/\\$.*")); - // Will wait for the rest of the 3 seconds + assertEquals("Unexpected transaction path " + out, + "akka://test/user/testCreateTransaction/shard-txn-1", + out); + + // Will wait for the rest of the 3 seconds expectNoMsg(); } @@ -88,4 +90,4 @@ public class ShardTransactionChainTest extends AbstractActorTest { }; }}; } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index e4d8e1b23a..2d9ae93d9e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -355,6 +355,4 @@ public class ShardTransactionTest extends AbstractActorTest { }}; } - - } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index af3da57571..8ff785c879 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -14,6 +14,8 @@ import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.junit.Assert.assertNotNull; @@ -23,6 +25,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { private Props props; private ActorRef actorRef; private MockActorContext actorContext; + private ExecutorService executor = Executors.newSingleThreadExecutor(); @Before public void setUp(){ @@ -32,7 +35,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { proxy = new ThreePhaseCommitCohortProxy(actorContext, - Arrays.asList(actorRef.path())); + Arrays.asList(actorRef.path()), "txn-1", executor); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 8b70e00da9..a8df49f5ca 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -23,21 +23,26 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class TransactionProxyTest extends AbstractActorTest { + private ExecutorService transactionExecutor = + Executors.newSingleThreadExecutor(); + @Test public void testRead() throws Exception { final Props props = Props.create(DoNothingActor.class); final ActorRef actorRef = getSystem().actorOf(props); final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); + actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); actorContext.setExecuteRemoteOperationResponse("message"); TransactionProxy transactionProxy = new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); ListenableFuture>> read = @@ -63,12 +68,12 @@ public class TransactionProxyTest extends AbstractActorTest { final ActorRef actorRef = getSystem().actorOf(props); final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); + actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); actorContext.setExecuteRemoteOperationResponse("message"); TransactionProxy transactionProxy = new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); ListenableFuture>> read = @@ -94,12 +99,12 @@ public class TransactionProxyTest extends AbstractActorTest { final ActorRef actorRef = getSystem().actorOf(props); final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); + actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); actorContext.setExecuteRemoteOperationResponse("message"); TransactionProxy transactionProxy = new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.NAME_QNAME)); @@ -126,12 +131,12 @@ public class TransactionProxyTest extends AbstractActorTest { final ActorRef actorRef = getSystem().actorOf(props); final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); + actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); actorContext.setExecuteRemoteOperationResponse("message"); TransactionProxy transactionProxy = new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.NAME_QNAME)); @@ -158,12 +163,12 @@ public class TransactionProxyTest extends AbstractActorTest { final ActorRef actorRef = getSystem().actorOf(props); final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); + actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); actorContext.setExecuteRemoteOperationResponse("message"); TransactionProxy transactionProxy = new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); transactionProxy.delete(TestModel.TEST_PATH); @@ -189,12 +194,12 @@ public class TransactionProxyTest extends AbstractActorTest { final ActorRef doNothingActorRef = getSystem().actorOf(props); final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(doNothingActorRef.path())); + actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef)); actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path())); TransactionProxy transactionProxy = new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); @@ -213,12 +218,11 @@ public class TransactionProxyTest extends AbstractActorTest { final ActorRef doNothingActorRef = getSystem().actorOf(props); final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse( - new CreateTransactionReply(doNothingActorRef.path())); + actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) ); TransactionProxy transactionProxy = new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); Assert.assertNotNull(transactionProxy.getIdentifier()); } @@ -229,12 +233,12 @@ public class TransactionProxyTest extends AbstractActorTest { final ActorRef actorRef = getSystem().actorOf(props); final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); + actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); actorContext.setExecuteRemoteOperationResponse("message"); TransactionProxy transactionProxy = new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); transactionProxy.close(); @@ -253,4 +257,8 @@ public class TransactionProxyTest extends AbstractActorTest { Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction); } + + private CreateTransactionReply createTransactionReply(ActorRef actorRef){ + return new CreateTransactionReply(actorRef.path(), "txn-1"); + } } -- 2.36.6