From: Moiz Raja
Date: Fri, 27 Jun 2014 20:45:05 +0000 (-0700)
Subject: Enhancements to actor naming, logging and monitoring
X-Git-Tag: release/helium~527^2
X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=996ab3e41386da6b27cf21f6464ef1e55363e1ca;ds=sidebyside
Enhancements to actor naming, logging and monitoring
- Actor names have now been changed to be more meaningful. This will be helpful when trying to follow the logging.
- Added logging for when the actor is created and when it is terminated
Change-Id: I825270779ce19c319807c5a3c56d4885f8cc0996
Signed-off-by: Moiz Raja
---
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java
index aae468fbc6..0f10258e9e 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java
@@ -11,11 +11,21 @@ package org.opendaylight.controller.cluster.datastore;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
+import org.opendaylight.controller.cluster.datastore.messages.Monitor;
public abstract class AbstractUntypedActor extends UntypedActor {
protected final LoggingAdapter LOG =
Logging.getLogger(getContext().system(), this);
+
+ public AbstractUntypedActor(){
+ LOG.debug("Actor created {}", getSelf());
+ getContext().
+ system().
+ actorSelection("user/termination-monitor").
+ tell(new Monitor(getSelf()), getSelf());
+ }
+
@Override public void onReceive(Object message) throws Exception {
LOG.debug("Received message {}", message);
handleReceive(message);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java
index c562e6f50f..baf04fe43b 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java
@@ -9,12 +9,23 @@
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.google.common.base.Function;
import com.typesafe.config.ConfigFactory;
+import javax.annotation.Nullable;
+
public class ActorSystemFactory {
- private static final ActorSystem actorSystem =
- ActorSystem.create("opendaylight-cluster", ConfigFactory
- .load().getConfig("ODLCluster"));
+ private static final ActorSystem actorSystem = (new Function(){
+
+ @Nullable @Override public ActorSystem apply(@Nullable Void aVoid) {
+ ActorSystem system =
+ ActorSystem.create("opendaylight-cluster", ConfigFactory
+ .load().getConfig("ODLCluster"));
+ system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+ return system;
+ }
+ }).apply(null);
public static final ActorSystem getInstance(){
return actorSystem;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java
index c2eab0df44..dca9735487 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java
@@ -8,6 +8,7 @@
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.japi.Creator;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
@@ -16,34 +17,40 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-public class DataChangeListenerRegistration extends AbstractUntypedActor{
+public class DataChangeListenerRegistration extends AbstractUntypedActor {
- private final org.opendaylight.yangtools.concepts.ListenerRegistration>> registration;
+ private final org.opendaylight.yangtools.concepts.ListenerRegistration>>
+ registration;
- public DataChangeListenerRegistration(
- org.opendaylight.yangtools.concepts.ListenerRegistration>> registration) {
- this.registration = registration;
- }
+ public DataChangeListenerRegistration(
+ org.opendaylight.yangtools.concepts.ListenerRegistration>> registration) {
+ this.registration = registration;
+ }
+
+ @Override
+ public void handleReceive(Object message) throws Exception {
+ if (message instanceof CloseDataChangeListenerRegistration) {
+ closeListenerRegistration(
+ (CloseDataChangeListenerRegistration) message);
+ }
+ }
+
+ public static Props props(
+ final org.opendaylight.yangtools.concepts.ListenerRegistration>> registration) {
+ return Props.create(new Creator() {
+
+ @Override
+ public DataChangeListenerRegistration create() throws Exception {
+ return new DataChangeListenerRegistration(registration);
+ }
+ });
+ }
- @Override
- public void handleReceive(Object message) throws Exception {
- if(message instanceof CloseDataChangeListenerRegistration){
- closeListenerRegistration((CloseDataChangeListenerRegistration) message);
+ private void closeListenerRegistration(
+ CloseDataChangeListenerRegistration message) {
+ registration.close();
+ getSender()
+ .tell(new CloseDataChangeListenerRegistrationReply(), getSelf());
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
}
- }
-
- public static Props props(final org.opendaylight.yangtools.concepts.ListenerRegistration>> registration){
- return Props.create(new Creator(){
-
- @Override
- public DataChangeListenerRegistration create() throws Exception {
- return new DataChangeListenerRegistration(registration);
- }
- });
- }
-
- private void closeListenerRegistration(CloseDataChangeListenerRegistration message){
- registration.close();
- getSender().tell(new CloseDataChangeListenerRegistrationReply(), getSelf());
- }
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
index 89cc969525..83737cfac5 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
@@ -8,7 +8,9 @@
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
@@ -25,13 +27,15 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
private final ActorSelection listenerRegistrationActor;
private final AsyncDataChangeListener listener;
+ private final ActorRef dataChangeListenerActor;
public >>
DataChangeListenerRegistrationProxy(
ActorSelection listenerRegistrationActor,
- L listener) {
+ L listener, ActorRef dataChangeListenerActor) {
this.listenerRegistrationActor = listenerRegistrationActor;
this.listener = listener;
+ this.dataChangeListenerActor = dataChangeListenerActor;
}
@Override
@@ -42,5 +46,6 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
@Override
public void close() {
listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration(), null);
+ dataChangeListenerActor.tell(PoisonPill.getInstance(), null);
}
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
index 58b22a9970..4401104a85 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
@@ -29,6 +29,9 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
/**
*
*/
@@ -41,8 +44,20 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
private final String type;
private final ActorContext actorContext;
+
+ /**
+ * Executor used to run FutureTask's
+ *
+ * This is typically used when we need to make a request to an actor and
+ * wait for it's response and the consumer needs to be provided a Future.
+ *
+ * FIXME : Make the thread pool configurable
+ */
+ private final ExecutorService executor =
+ Executors.newFixedThreadPool(10);
+
public DistributedDataStore(ActorSystem actorSystem, String type) {
- this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type))), type);
+ this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type), "shardmanager-" + type)), type);
}
public DistributedDataStore(ActorContext actorContext, String type) {
@@ -66,29 +81,32 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
);
RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
- return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener);
+ return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener, dataChangeListenerActor);
}
@Override
public DOMStoreTransactionChain createTransactionChain() {
- return new TransactionChainProxy(actorContext);
+ return new TransactionChainProxy(actorContext, executor);
}
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
+ return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
+ executor);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
+ return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY,
+ executor);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
+ return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE,
+ executor);
}
@Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
index 221e874fd5..3425608d23 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
@@ -105,19 +105,19 @@ public class Shard extends UntypedProcessor {
} else if (message instanceof Persistent) {
commit((Modification) ((Persistent) message).payload());
} else if (message instanceof CreateTransaction) {
- createTransaction();
+ createTransaction((CreateTransaction) message);
} else if(message instanceof NonPersistent){
commit((Modification) ((NonPersistent) message).payload());
}
}
- private void createTransaction() {
+ private void createTransaction(CreateTransaction createTransaction) {
DOMStoreReadWriteTransaction transaction =
store.newReadWriteTransaction();
ActorRef transactionActor = getContext().actorOf(
- ShardTransaction.props(transaction, getSelf()));
+ ShardTransaction.props(transaction, getSelf()), "shard-" + createTransaction.getTransactionId());
getSender()
- .tell(new CreateTransactionReply(transactionActor.path()),
+ .tell(new CreateTransactionReply(transactionActor.path(), createTransaction.getTransactionId()),
getSelf());
}
@@ -139,6 +139,7 @@ public class Shard extends UntypedProcessor {
future.get();
sender.tell(new CommitTransactionReply(), self);
} catch (InterruptedException | ExecutionException e) {
+ // FIXME : Handle this properly
log.error(e, "An exception happened when committing");
}
}
@@ -146,7 +147,6 @@ public class Shard extends UntypedProcessor {
}
private void handleForwardedCommit(ForwardedCommitTransaction message) {
- log.info("received forwarded transaction");
modificationToCohort
.put(message.getModification(), message.getCohort());
if(persistent) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
index 79e90c3fc9..250ef49e6f 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
@@ -68,7 +68,7 @@ public class ShardManager extends AbstractUntypedActor {
* configuration or operational
*/
private ShardManager(String type){
- ActorRef actor = getContext().actorOf(Shard.props(Shard.DEFAULT_NAME + "-" + type));
+ ActorRef actor = getContext().actorOf(Shard.props("shard-" + Shard.DEFAULT_NAME + "-" + type), "shard-" + Shard.DEFAULT_NAME + "-" + type);
defaultShardPath = actor.path();
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
index ff02bfbcce..e3d1e2d9d4 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
@@ -190,7 +190,7 @@ public class ShardTransaction extends AbstractUntypedActor {
private void readyTransaction(ReadyTransaction message) {
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
ActorRef cohortActor = getContext().actorOf(
- ThreePhaseCommitCohort.props(cohort, shardActor, modification));
+ ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort");
getSender()
.tell(new ReadyTransactionReply(cohortActor.path()), getSelf());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
index 57c935b0ad..1092e9a793 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
@@ -21,33 +21,42 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
/**
* The ShardTransactionChain Actor represents a remote TransactionChain
*/
-public class ShardTransactionChain extends AbstractUntypedActor{
-
- private final DOMStoreTransactionChain chain;
-
- public ShardTransactionChain(DOMStoreTransactionChain chain) {
- this.chain = chain;
- }
-
- @Override
- public void handleReceive(Object message) throws Exception {
- if(message instanceof CreateTransaction){
- DOMStoreReadWriteTransaction transaction = chain.newReadWriteTransaction();
- ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(chain, transaction, getContext().parent()));
- getSender().tell(new CreateTransactionReply(transactionActor.path()), getSelf());
- } else if (message instanceof CloseTransactionChain){
- chain.close();
- getSender().tell(new CloseTransactionChainReply(), getSelf());
+public class ShardTransactionChain extends AbstractUntypedActor {
+
+ private final DOMStoreTransactionChain chain;
+
+ public ShardTransactionChain(DOMStoreTransactionChain chain) {
+ this.chain = chain;
}
- }
- public static Props props(final DOMStoreTransactionChain chain){
- return Props.create(new Creator(){
+ @Override
+ public void handleReceive(Object message) throws Exception {
+ if (message instanceof CreateTransaction) {
+ CreateTransaction createTransaction = (CreateTransaction) message;
+ createTransaction(createTransaction);
+ } else if (message instanceof CloseTransactionChain) {
+ chain.close();
+ getSender().tell(new CloseTransactionChainReply(), getSelf());
+ }
+ }
- @Override
- public ShardTransactionChain create() throws Exception {
- return new ShardTransactionChain(chain);
- }
- });
- }
+ private void createTransaction(CreateTransaction createTransaction) {
+ DOMStoreReadWriteTransaction transaction =
+ chain.newReadWriteTransaction();
+ ActorRef transactionActor = getContext().actorOf(ShardTransaction
+ .props(chain, transaction, getContext().parent()), "shard-" + createTransaction.getTransactionId());
+ getSender()
+ .tell(new CreateTransactionReply(transactionActor.path(), createTransaction.getTransactionId()),
+ getSelf());
+ }
+
+ public static Props props(final DOMStoreTransactionChain chain) {
+ return Props.create(new Creator() {
+
+ @Override
+ public ShardTransactionChain create() throws Exception {
+ return new ShardTransactionChain(chain);
+ }
+ });
+ }
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java
new file mode 100644
index 0000000000..e6ac7f8dbc
--- /dev/null
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Terminated;
+import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import org.opendaylight.controller.cluster.datastore.messages.Monitor;
+
+public class TerminationMonitor extends UntypedActor{
+ protected final LoggingAdapter LOG =
+ Logging.getLogger(getContext().system(), this);
+
+ public TerminationMonitor(){
+ LOG.info("Created TerminationMonitor");
+ }
+
+ @Override public void onReceive(Object message) throws Exception {
+ if(message instanceof Terminated){
+ Terminated terminated = (Terminated) message;
+ LOG.debug("Actor terminated : {}", terminated.actor());
+ } else if(message instanceof Monitor){
+ Monitor monitor = (Monitor) message;
+ getContext().watch(monitor.getActorRef());
+ }
+ }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
index d12dc2b55a..279ecba409 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
@@ -30,7 +30,6 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
/**
* ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
@@ -43,13 +42,19 @@ public class ThreePhaseCommitCohortProxy implements
private final ActorContext actorContext;
private final List cohortPaths;
- //FIXME : Use a thread pool here
- private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ private final ExecutorService executor;
+ private final String transactionId;
- public ThreePhaseCommitCohortProxy(ActorContext actorContext, List cohortPaths) {
+ public ThreePhaseCommitCohortProxy(ActorContext actorContext,
+ List cohortPaths,
+ String transactionId,
+ ExecutorService executor) {
+
this.actorContext = actorContext;
this.cohortPaths = cohortPaths;
+ this.transactionId = transactionId;
+ this.executor = executor;
}
@Override public ListenableFuture canCommit() {
@@ -86,7 +91,7 @@ public class ThreePhaseCommitCohortProxy implements
ListenableFutureTask
future = ListenableFutureTask.create(call);
- executorService.submit(future);
+ executor.submit(future);
return future;
}
@@ -136,7 +141,7 @@ public class ThreePhaseCommitCohortProxy implements
ListenableFutureTask
future = ListenableFutureTask.create(call);
- executorService.submit(future);
+ executor.submit(future);
return future;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
index 91e903f9e8..71b61ffaa0 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
@@ -14,32 +14,36 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import java.util.concurrent.ExecutorService;
+
/**
* TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
*/
public class TransactionChainProxy implements DOMStoreTransactionChain{
private final ActorContext actorContext;
+ private final ExecutorService transactionExecutor;
- public TransactionChainProxy(ActorContext actorContext) {
+ public TransactionChainProxy(ActorContext actorContext, ExecutorService transactionExecutor) {
this.actorContext = actorContext;
+ this.transactionExecutor = transactionExecutor;
}
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.WRITE_ONLY);
+ TransactionProxy.TransactionType.WRITE_ONLY, transactionExecutor);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
return new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_WRITE);
+ TransactionProxy.TransactionType.READ_WRITE, transactionExecutor);
}
@Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
index 00196ebd07..74245c4259 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
@@ -34,7 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -50,7 +50,6 @@ import java.util.concurrent.atomic.AtomicLong;
*
*/
public class TransactionProxy implements DOMStoreReadWriteTransaction {
-
public enum TransactionType {
READ_ONLY,
WRITE_ONLY,
@@ -63,16 +62,20 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
private final ActorContext actorContext;
private final Map remoteTransactionPaths = new HashMap<>();
private final String identifier;
+ private final ExecutorService executor;
public TransactionProxy(
ActorContext actorContext,
- TransactionType transactionType) {
+ TransactionType transactionType,
+ ExecutorService executor
+ ) {
- this.identifier = "transaction-" + counter.getAndIncrement();
+ this.identifier = "txn-" + counter.getAndIncrement();
this.transactionType = transactionType;
this.actorContext = actorContext;
+ this.executor = executor;
- Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(), ActorContext.ASK_DURATION);
+ Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(identifier), ActorContext.ASK_DURATION);
if(response instanceof CreateTransactionReply){
CreateTransactionReply reply = (CreateTransactionReply) response;
remoteTransactionPaths.put(Shard.DEFAULT_NAME, actorContext.actorSelection(reply.getTransactionPath()));
@@ -105,8 +108,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
ListenableFutureTask>>
future = ListenableFutureTask.create(call);
- //FIXME : Use a thread pool here
- Executors.newSingleThreadExecutor().submit(future);
+ executor.submit(future);
return future;
}
@@ -145,7 +147,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
}
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths);
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor);
}
@Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java
index e0cdd3cc2b..6110641696 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java
@@ -9,5 +9,14 @@
package org.opendaylight.controller.cluster.datastore.messages;
public class CreateTransaction {
+ private final String transactionId;
+ public CreateTransaction(String transactionId){
+
+ this.transactionId = transactionId;
+ }
+
+ public String getTransactionId() {
+ return transactionId;
+ }
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java
index 4faf9d370d..46b7194c84 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java
@@ -11,13 +11,20 @@ package org.opendaylight.controller.cluster.datastore.messages;
import akka.actor.ActorPath;
public class CreateTransactionReply {
- private final ActorPath transactionPath;
+ private final ActorPath transactionPath;
+ private final String transactionId;
- public CreateTransactionReply(ActorPath transactionPath) {
- this.transactionPath = transactionPath;
- }
+ public CreateTransactionReply(ActorPath transactionPath,
+ String transactionId) {
+ this.transactionPath = transactionPath;
+ this.transactionId = transactionId;
+ }
- public ActorPath getTransactionPath() {
- return transactionPath;
- }
+ public ActorPath getTransactionPath() {
+ return transactionPath;
+ }
+
+ public String getTransactionId() {
+ return transactionId;
+ }
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/Monitor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/Monitor.java
new file mode 100644
index 0000000000..567f14aa6a
--- /dev/null
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/Monitor.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+
+public class Monitor {
+ private final ActorRef actorRef;
+
+ public Monitor(ActorRef actorRef){
+
+ this.actorRef = actorRef;
+ }
+
+ public ActorRef getActorRef() {
+ return actorRef;
+ }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
index 74c858e4a6..dfefc5ed57 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
@@ -77,7 +77,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
Assert.assertNotNull(transactionChain);
- transactionChain.tell(new CreateTransaction(), getRef());
+ transactionChain.tell(new CreateTransaction("txn-1"), getRef());
final ActorSelection transaction =
new ExpectMsg("CreateTransactionReply") {
@@ -152,6 +152,10 @@ public class BasicIntegrationTest extends AbstractActorTest {
Assert.assertTrue(preCommitDone);
+ // FIXME : When we commit on the cohort it "kills" the Transaction.
+ // This in turn kills the child of Transaction as well.
+ // The order in which we receive the terminated event for both
+ // these actors is not fixed which may cause this test to fail
cohort.tell(new CommitTransaction(), getRef());
final Boolean terminatedCohort =
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
index be7be1723d..33b5d95611 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
@@ -17,6 +17,8 @@ import java.util.List;
public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
+ private ActorRef dataChangeListenerActor = getSystem().actorOf(Props.create(DoNothingActor.class));
+
private static class MockDataChangeListener implements
AsyncDataChangeListener> {
@@ -36,7 +38,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
DataChangeListenerRegistrationProxy proxy =
new DataChangeListenerRegistrationProxy(
getSystem().actorSelection(actorRef.path()),
- listener);
+ listener, dataChangeListenerActor);
Assert.assertEquals(listener, proxy.getInstance());
@@ -50,7 +52,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
DataChangeListenerRegistrationProxy proxy =
new DataChangeListenerRegistrationProxy(
getSystem().actorSelection(actorRef.path()),
- new MockDataChangeListener());
+ new MockDataChangeListener(), dataChangeListenerActor);
proxy.close();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
index 3a74a4ca76..5f82b40140 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
@@ -39,7 +39,7 @@ public class DistributedDataStoreTest extends AbstractActorTest{
// Make CreateTransactionReply as the default response. Will need to be
// tuned if a specific test requires some other response
mockActorContext.setExecuteShardOperationResponse(
- new CreateTransactionReply(doNothingActorRef.path()));
+ new CreateTransactionReply(doNothingActorRef.path(), "txn-1 "));
}
@org.junit.After
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
index b9ab8a3282..ed447e004f 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
@@ -18,6 +18,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ShardTest extends AbstractActorTest {
@@ -47,9 +48,10 @@ public class ShardTest extends AbstractActorTest {
}
}.get(); // this extracts the received message
- assertTrue(out.matches(
- "akka:\\/\\/test\\/user\\/testCreateTransactionChain\\/\\$.*"));
- // Will wait for the rest of the 3 seconds
+ assertEquals("Unexpected transaction path " + out,
+ "akka://test/user/testCreateTransactionChain/$a",
+ out);
+
expectNoMsg();
}
@@ -115,7 +117,7 @@ public class ShardTest extends AbstractActorTest {
new UpdateSchemaContext(TestModel.createTestContext()),
getRef());
- subject.tell(new CreateTransaction(),
+ subject.tell(new CreateTransaction("txn-1"),
getRef());
final String out = new ExpectMsg("match hint") {
@@ -132,9 +134,9 @@ public class ShardTest extends AbstractActorTest {
}
}.get(); // this extracts the received message
- assertTrue(out.matches(
- "akka:\\/\\/test\\/user\\/testCreateTransaction\\/\\$.*"));
- // Will wait for the rest of the 3 seconds
+ assertEquals("Unexpected transaction path " + out,
+ "akka://test/user/testCreateTransaction/shard-txn-1",
+ out);
expectNoMsg();
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java
index bc3a104656..b07cbfd87c 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java
@@ -14,7 +14,6 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
public class ShardTransactionChainTest extends AbstractActorTest {
@@ -34,7 +33,7 @@ public class ShardTransactionChainTest extends AbstractActorTest {
new Within(duration("1 seconds")) {
protected void run() {
- subject.tell(new CreateTransaction(), getRef());
+ subject.tell(new CreateTransaction("txn-1"), getRef());
final String out = new ExpectMsg("match hint") {
// do not put code outside this method, will run afterwards
@@ -47,8 +46,11 @@ public class ShardTransactionChainTest extends AbstractActorTest {
}
}.get(); // this extracts the received message
- assertTrue(out.matches("akka:\\/\\/test\\/user\\/testCreateTransaction\\/\\$.*"));
- // Will wait for the rest of the 3 seconds
+ assertEquals("Unexpected transaction path " + out,
+ "akka://test/user/testCreateTransaction/shard-txn-1",
+ out);
+
+ // Will wait for the rest of the 3 seconds
expectNoMsg();
}
@@ -88,4 +90,4 @@ public class ShardTransactionChainTest extends AbstractActorTest {
};
}};
}
-}
\ No newline at end of file
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
index e4d8e1b23a..2d9ae93d9e 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
@@ -355,6 +355,4 @@ public class ShardTransactionTest extends AbstractActorTest {
}};
}
-
-
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
index af3da57571..8ff785c879 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
@@ -14,6 +14,8 @@ import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor
import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import static org.junit.Assert.assertNotNull;
@@ -23,6 +25,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
private Props props;
private ActorRef actorRef;
private MockActorContext actorContext;
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
@Before
public void setUp(){
@@ -32,7 +35,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
proxy =
new ThreePhaseCommitCohortProxy(actorContext,
- Arrays.asList(actorRef.path()));
+ Arrays.asList(actorRef.path()), "txn-1", executor);
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
index 8b70e00da9..a8df49f5ca 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
@@ -23,21 +23,26 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
public class TransactionProxyTest extends AbstractActorTest {
+ private ExecutorService transactionExecutor =
+ Executors.newSingleThreadExecutor();
+
@Test
public void testRead() throws Exception {
final Props props = Props.create(DoNothingActor.class);
final ActorRef actorRef = getSystem().actorOf(props);
final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
actorContext.setExecuteRemoteOperationResponse("message");
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
ListenableFuture>> read =
@@ -63,12 +68,12 @@ public class TransactionProxyTest extends AbstractActorTest {
final ActorRef actorRef = getSystem().actorOf(props);
final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
actorContext.setExecuteRemoteOperationResponse("message");
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
ListenableFuture>> read =
@@ -94,12 +99,12 @@ public class TransactionProxyTest extends AbstractActorTest {
final ActorRef actorRef = getSystem().actorOf(props);
final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
actorContext.setExecuteRemoteOperationResponse("message");
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
transactionProxy.write(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.NAME_QNAME));
@@ -126,12 +131,12 @@ public class TransactionProxyTest extends AbstractActorTest {
final ActorRef actorRef = getSystem().actorOf(props);
final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
actorContext.setExecuteRemoteOperationResponse("message");
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
transactionProxy.merge(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.NAME_QNAME));
@@ -158,12 +163,12 @@ public class TransactionProxyTest extends AbstractActorTest {
final ActorRef actorRef = getSystem().actorOf(props);
final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
actorContext.setExecuteRemoteOperationResponse("message");
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
transactionProxy.delete(TestModel.TEST_PATH);
@@ -189,12 +194,12 @@ public class TransactionProxyTest extends AbstractActorTest {
final ActorRef doNothingActorRef = getSystem().actorOf(props);
final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(doNothingActorRef.path()));
+ actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()));
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
@@ -213,12 +218,11 @@ public class TransactionProxyTest extends AbstractActorTest {
final ActorRef doNothingActorRef = getSystem().actorOf(props);
final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse(
- new CreateTransactionReply(doNothingActorRef.path()));
+ actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
Assert.assertNotNull(transactionProxy.getIdentifier());
}
@@ -229,12 +233,12 @@ public class TransactionProxyTest extends AbstractActorTest {
final ActorRef actorRef = getSystem().actorOf(props);
final MockActorContext actorContext = new MockActorContext(this.getSystem());
- actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
actorContext.setExecuteRemoteOperationResponse("message");
TransactionProxy transactionProxy =
new TransactionProxy(actorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
transactionProxy.close();
@@ -253,4 +257,8 @@ public class TransactionProxyTest extends AbstractActorTest {
Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction);
}
+
+ private CreateTransactionReply createTransactionReply(ActorRef actorRef){
+ return new CreateTransactionReply(actorRef.path(), "txn-1");
+ }
}