import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
* PhantomReference.
*/
private List<ActorSelection> remoteTransactionActors;
- private AtomicBoolean remoteTransactionActorsMB;
+ private volatile AtomicBoolean remoteTransactionActorsMB;
/**
* Stores the create transaction results per shard.
private final String transactionChainId;
private final SchemaContext schemaContext;
private boolean inReadyState;
- private final Semaphore operationLimiter;
- private final OperationCompleter operationCompleter;
+
+ private volatile boolean initialized;
+ private Semaphore operationLimiter;
+ private OperationCompleter operationCompleter;
public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
this(actorContext, transactionType, "");
memberName = "UNKNOWN-MEMBER";
}
- this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
- counter.getAndIncrement()).build();
-
- if(transactionType == TransactionType.READ_ONLY) {
- // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
- // to close the remote Tx's when this instance is no longer in use and is garbage
- // collected.
-
- remoteTransactionActors = Lists.newArrayList();
- remoteTransactionActorsMB = new AtomicBoolean();
-
- TransactionProxyCleanupPhantomReference cleanup =
- new TransactionProxyCleanupPhantomReference(this);
- phantomReferenceCache.put(cleanup, cleanup);
- }
-
- // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
- this.operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
- this.operationCompleter = new OperationCompleter(operationLimiter);
+ this.identifier = new TransactionIdentifier(memberName, counter.getAndIncrement());
LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId);
}
}
private void throttleOperation(int acquirePermits) {
+ if(!initialized) {
+ // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
+ operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
+ operationCompleter = new OperationCompleter(operationLimiter);
+
+ // Make sure we write this last because it's volatile and will also publish the non-volatile writes
+ // above as well so they'll be visible to other threads.
+ initialized = true;
+ }
+
try {
if(!operationLimiter.tryAcquire(acquirePermits,
actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
checkModificationState();
- throttleOperation(txFutureCallbackMap.size());
-
inReadyState = true;
LOG.debug("Tx {} Readying {} transactions for commit", identifier,
txFutureCallbackMap.size());
+ if(txFutureCallbackMap.size() == 0) {
+ onTransactionReady(Collections.<Future<ActorSelection>>emptyList());
+ return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
+ }
+
+ throttleOperation(txFutureCallbackMap.size());
+
List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
txFutureCallbackMap.clear();
- if(transactionType == TransactionType.READ_ONLY) {
+ if(remoteTransactionActorsMB != null) {
remoteTransactionActors.clear();
remoteTransactionActorsMB.set(true);
}
}
}
+ // Mainly checking for state violation here to perform a volatile read of "initialized" to
+ // ensure updates to operationLimter et al are visible to this thread (ie we're doing
+ // "piggy-back" synchronization here).
+ Preconditions.checkState(initialized, "Tx was not propertly initialized.");
+
// Create the TransactionContext from the response or failure. Store the new
// TransactionContext locally until we've completed invoking the
// TransactionOperations. This avoids thread timing issues which could cause
ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
if (transactionType == TransactionType.READ_ONLY) {
+ // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
+ // to close the remote Tx's when this instance is no longer in use and is garbage
+ // collected.
+
+ if(remoteTransactionActorsMB == null) {
+ remoteTransactionActors = Lists.newArrayList();
+ remoteTransactionActorsMB = new AtomicBoolean();
+
+ TransactionProxyCleanupPhantomReference cleanup =
+ new TransactionProxyCleanupPhantomReference(TransactionProxy.this);
+ phantomReferenceCache.put(cleanup, cleanup);
+ }
+
// Add the actor to the remoteTransactionActors list for access by the
// cleanup PhantonReference.
remoteTransactionActors.add(transactionActor);
}
}
}
+
+ private static class NoOpDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+ static NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort();
+
+ private static final ListenableFuture<Void> IMMEDIATE_VOID_SUCCESS =
+ com.google.common.util.concurrent.Futures.immediateFuture(null);
+ private static final ListenableFuture<Boolean> IMMEDIATE_BOOLEAN_SUCCESS =
+ com.google.common.util.concurrent.Futures.immediateFuture(Boolean.TRUE);
+
+ private NoOpDOMStoreThreePhaseCommitCohort() {
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ return IMMEDIATE_BOOLEAN_SUCCESS;
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ return IMMEDIATE_VOID_SUCCESS;
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ return IMMEDIATE_VOID_SUCCESS;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ return IMMEDIATE_VOID_SUCCESS;
+ }
+ }
}