*/
public class TransactionProxy implements DOMStoreReadWriteTransaction {
- public enum TransactionType {
+
+ public static enum TransactionType {
READ_ONLY,
WRITE_ONLY,
READ_WRITE
}
- static Function1 SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
- Throwable, Throwable>() {
+ static final Mapper SAME_FAILURE_TRANSFORMER =
+ new Mapper() {
@Override
public Throwable apply(Throwable failure) {
return failure;
@@ -83,22 +88,116 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
private static final AtomicLong counter = new AtomicLong();
- private static final Logger
- LOG = LoggerFactory.getLogger(TransactionProxy.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
+
+ /**
+ * Time interval in between transaction create retries.
+ */
+ private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
+ FiniteDuration.create(1, TimeUnit.SECONDS);
+
+ /**
+ * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
+ * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
+ * trickery to clean up its internal thread when the bundle is unloaded.
+ */
+ private static final FinalizableReferenceQueue phantomReferenceQueue =
+ new FinalizableReferenceQueue();
+
+ /**
+ * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
+ * necessary because PhantomReferences need a hard reference so they're not garbage collected.
+ * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
+ * and thus becomes eligible for garbage collection.
+ */
+ private static final Map phantomReferenceCache =
+ new ConcurrentHashMap<>();
+
+ /**
+ * A PhantomReference that closes remote transactions for a TransactionProxy when it's
+ * garbage collected. This is used for read-only transactions as they're not explicitly closed
+ * by clients. So the only way to detect that a transaction is no longer in use and it's safe
+ * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
+ * but TransactionProxy instances should generally be short-lived enough to avoid being moved
+ * to the old generation space and thus should be cleaned up in a timely manner as the GC
+ * runs on the young generation (eden, swap1...) space much more frequently.
+ */
+ private static class TransactionProxyCleanupPhantomReference
+ extends FinalizablePhantomReference {
+
+ private final List remoteTransactionActors;
+ private final AtomicBoolean remoteTransactionActorsMB;
+ private final ActorContext actorContext;
+ private final TransactionIdentifier identifier;
+
+ protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
+ super(referent, phantomReferenceQueue);
+
+ // Note we need to cache the relevant fields from the TransactionProxy as we can't
+ // have a hard reference to the TransactionProxy instance itself.
+
+ remoteTransactionActors = referent.remoteTransactionActors;
+ remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
+ actorContext = referent.actorContext;
+ identifier = referent.identifier;
+ }
+
+ @Override
+ public void finalizeReferent() {
+ LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
+ remoteTransactionActors.size(), identifier);
+
+ phantomReferenceCache.remove(this);
+
+ // Access the memory barrier volatile to ensure all previous updates to the
+ // remoteTransactionActors list are visible to this thread.
+
+ if(remoteTransactionActorsMB.get()) {
+ for(ActorSelection actor : remoteTransactionActors) {
+ LOG.trace("Sending CloseTransaction to {}", actor);
+ actorContext.sendOperationAsync(actor,
+ new CloseTransaction().toSerializable());
+ }
+ }
+ }
+ }
+ /**
+ * Stores the remote Tx actors for each requested data store path to be used by the
+ * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
+ * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
+ * remoteTransactionActors list so they will be visible to the thread accessing the
+ * PhantomReference.
+ */
+ private List remoteTransactionActors;
+ private AtomicBoolean remoteTransactionActorsMB;
+
+ /**
+ * Stores the create transaction results per shard.
+ */
+ private final Map txFutureCallbackMap = new HashMap<>();
private final TransactionType transactionType;
private final ActorContext actorContext;
- private final Map remoteTransactionPaths = new HashMap<>();
private final TransactionIdentifier identifier;
+ private final String transactionChainId;
private final SchemaContext schemaContext;
private boolean inReadyState;
+ public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
+ this(actorContext, transactionType, "");
+ }
+
public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
- SchemaContext schemaContext) {
- this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
- this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
- this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+ String transactionChainId) {
+ this.actorContext = Preconditions.checkNotNull(actorContext,
+ "actorContext should not be null");
+ this.transactionType = Preconditions.checkNotNull(transactionType,
+ "transactionType should not be null");
+ this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
+ "schemaContext should not be null");
+ this.transactionChainId = transactionChainId;
String memberName = actorContext.getCurrentMemberName();
if(memberName == null){
@@ -106,17 +205,32 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
}
this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
- counter.getAndIncrement()).build();
+ counter.getAndIncrement()).build();
- LOG.debug("Created txn {} of type {}", identifier, transactionType);
+ if(transactionType == TransactionType.READ_ONLY) {
+ // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
+ // to close the remote Tx's when this instance is no longer in use and is garbage
+ // collected.
+
+ remoteTransactionActors = Lists.newArrayList();
+ remoteTransactionActorsMB = new AtomicBoolean();
+
+ TransactionProxyCleanupPhantomReference cleanup =
+ new TransactionProxyCleanupPhantomReference(this);
+ phantomReferenceCache.put(cleanup, cleanup);
+ }
+ LOG.debug("Created txn {} of type {}", identifier, transactionType);
}
@VisibleForTesting
List> getRecordedOperationFutures() {
List> recordedOperationFutures = Lists.newArrayList();
- for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
- recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+ for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ if(transactionContext != null) {
+ recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+ }
}
return recordedOperationFutures;
@@ -131,65 +245,156 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
LOG.debug("Tx {} read {}", identifier, path);
- createTransactionIfMissing(actorContext, path);
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+
+ CheckedFuture>, ReadFailedException> future;
+ if(transactionContext != null) {
+ future = transactionContext.readData(path);
+ } else {
+ // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+ // callback to be executed after the Tx is created.
+ final SettableFuture>> proxyFuture = SettableFuture.create();
+ txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ Futures.addCallback(transactionContext.readData(path),
+ new FutureCallback>>() {
+ @Override
+ public void onSuccess(Optional> data) {
+ proxyFuture.set(data);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ proxyFuture.setException(t);
+ }
+ });
+ }
+ });
+
+ future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
+ }
- return transactionContext(path).readData(path);
+ return future;
}
@Override
- public CheckedFuture exists(YangInstanceIdentifier path) {
+ public CheckedFuture exists(final YangInstanceIdentifier path) {
Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
"Exists operation on write-only transaction is not allowed");
LOG.debug("Tx {} exists {}", identifier, path);
- createTransactionIfMissing(actorContext, path);
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+
+ CheckedFuture future;
+ if(transactionContext != null) {
+ future = transactionContext.dataExists(path);
+ } else {
+ // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+ // callback to be executed after the Tx is created.
+ final SettableFuture proxyFuture = SettableFuture.create();
+ txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ Futures.addCallback(transactionContext.dataExists(path),
+ new FutureCallback() {
+ @Override
+ public void onSuccess(Boolean exists) {
+ proxyFuture.set(exists);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ proxyFuture.setException(t);
+ }
+ });
+ }
+ });
- return transactionContext(path).dataExists(path);
+ future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
+ }
+
+ return future;
}
private void checkModificationState() {
Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
"Modification operation on read-only transaction is not allowed");
Preconditions.checkState(!inReadyState,
- "Transaction is sealed - further modifications are allowed");
+ "Transaction is sealed - further modifications are not allowed");
}
@Override
- public void write(YangInstanceIdentifier path, NormalizedNode, ?> data) {
+ public void write(final YangInstanceIdentifier path, final NormalizedNode, ?> data) {
checkModificationState();
LOG.debug("Tx {} write {}", identifier, path);
- createTransactionIfMissing(actorContext, path);
-
- transactionContext(path).writeData(path, data);
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ if(transactionContext != null) {
+ transactionContext.writeData(path, data);
+ } else {
+ // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+ // callback to be executed after the Tx is created.
+ txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ transactionContext.writeData(path, data);
+ }
+ });
+ }
}
@Override
- public void merge(YangInstanceIdentifier path, NormalizedNode, ?> data) {
+ public void merge(final YangInstanceIdentifier path, final NormalizedNode, ?> data) {
checkModificationState();
LOG.debug("Tx {} merge {}", identifier, path);
- createTransactionIfMissing(actorContext, path);
-
- transactionContext(path).mergeData(path, data);
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ if(transactionContext != null) {
+ transactionContext.mergeData(path, data);
+ } else {
+ // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+ // callback to be executed after the Tx is created.
+ txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ transactionContext.mergeData(path, data);
+ }
+ });
+ }
}
@Override
- public void delete(YangInstanceIdentifier path) {
+ public void delete(final YangInstanceIdentifier path) {
checkModificationState();
LOG.debug("Tx {} delete {}", identifier, path);
- createTransactionIfMissing(actorContext, path);
-
- transactionContext(path).deleteData(path);
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ if(transactionContext != null) {
+ transactionContext.deleteData(path);
+ } else {
+ // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+ // callback to be executed after the Tx is created.
+ txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ transactionContext.deleteData(path);
+ }
+ });
+ }
}
@Override
@@ -199,23 +404,60 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
inReadyState = true;
- LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
- remoteTransactionPaths.size());
+ LOG.debug("Tx {} Readying {} transactions for commit", identifier,
+ txFutureCallbackMap.size());
- List> cohortPathFutures = Lists.newArrayList();
+ List> cohortFutures = Lists.newArrayList();
- for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+ for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
LOG.debug("Tx {} Readying transaction for shard {}", identifier,
- transactionContext.getShardName());
+ txFutureCallback.getShardName());
- cohortPathFutures.add(transactionContext.readyTransaction());
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ if(transactionContext != null) {
+ cohortFutures.add(transactionContext.readyTransaction());
+ } else {
+ // The shard Tx hasn't been created yet so create a promise to ready the Tx later
+ // after it's created.
+ final Promise cohortPromise = akka.dispatch.Futures.promise();
+ txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ cohortPromise.completeWith(transactionContext.readyTransaction());
+ }
+ });
+
+ cohortFutures.add(cohortPromise.future());
+ }
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
+ onTransactionReady(cohortFutures);
+
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
identifier.toString());
}
+ /**
+ * Method for derived classes to be notified when the transaction has been readied.
+ *
+ * @param cohortFutures the cohort Futures for each shard transaction.
+ */
+ protected void onTransactionReady(List> cohortFutures) {
+ }
+
+ /**
+ * Method called to send a CreateTransaction message to a shard.
+ *
+ * @param shard the shard actor to send to
+ * @param serializedCreateMessage the serialized message to send
+ * @return the response Future
+ */
+ protected Future