import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
/**
* </p>
*/
public class TransactionProxy implements DOMStoreReadWriteTransaction {
-
public enum TransactionType {
READ_ONLY,
WRITE_ONLY,
private final ActorContext actorContext;
private final Map<String, ActorSelection> remoteTransactionPaths = new HashMap<>();
private final String identifier;
+ private final ExecutorService executor;
public TransactionProxy(
ActorContext actorContext,
- TransactionType transactionType) {
+ TransactionType transactionType,
+ ExecutorService executor
+ ) {
- this.identifier = "transaction-" + counter.getAndIncrement();
+ this.identifier = "txn-" + counter.getAndIncrement();
this.transactionType = transactionType;
this.actorContext = actorContext;
+ this.executor = executor;
- Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(), ActorContext.ASK_DURATION);
+ Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(identifier), ActorContext.ASK_DURATION);
if(response instanceof CreateTransactionReply){
CreateTransactionReply reply = (CreateTransactionReply) response;
remoteTransactionPaths.put(Shard.DEFAULT_NAME, actorContext.actorSelection(reply.getTransactionPath()));
ListenableFutureTask<Optional<NormalizedNode<?, ?>>>
future = ListenableFutureTask.create(call);
- //FIXME : Use a thread pool here
- Executors.newSingleThreadExecutor().submit(future);
+ executor.submit(future);
return future;
}
}
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths);
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor);
}
@Override