* Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
* the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
* be created on each of those shards by the TransactionProxy
- *
+ *
+ *
* The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
* shards will be executed.
- *
+ *
*/
public class TransactionProxy implements DOMStoreReadWriteTransaction {
+
+ public static enum TransactionType {
+ READ_ONLY,
+ WRITE_ONLY,
+ READ_WRITE
+ }
+
+ static final Mapper SAME_FAILURE_TRANSFORMER =
+ new Mapper() {
+ @Override
+ public Throwable apply(Throwable failure) {
+ return failure;
+ }
+ };
+
+ private static final AtomicLong counter = new AtomicLong();
+
+ private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
+
+ /**
+ * Time interval in between transaction create retries.
+ */
+ private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
+ FiniteDuration.create(1, TimeUnit.SECONDS);
+
+ /**
+ * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
+ * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
+ * trickery to clean up its internal thread when the bundle is unloaded.
+ */
+ private static final FinalizableReferenceQueue phantomReferenceQueue =
+ new FinalizableReferenceQueue();
+
+ /**
+ * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
+ * necessary because PhantomReferences need a hard reference so they're not garbage collected.
+ * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
+ * and thus becomes eligible for garbage collection.
+ */
+ private static final Map phantomReferenceCache =
+ new ConcurrentHashMap<>();
+
+ /**
+ * A PhantomReference that closes remote transactions for a TransactionProxy when it's
+ * garbage collected. This is used for read-only transactions as they're not explicitly closed
+ * by clients. So the only way to detect that a transaction is no longer in use and it's safe
+ * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
+ * but TransactionProxy instances should generally be short-lived enough to avoid being moved
+ * to the old generation space and thus should be cleaned up in a timely manner as the GC
+ * runs on the young generation (eden, swap1...) space much more frequently.
+ */
+ private static class TransactionProxyCleanupPhantomReference
+ extends FinalizablePhantomReference {
+
+ private final List remoteTransactionActors;
+ private final AtomicBoolean remoteTransactionActorsMB;
+ private final ActorContext actorContext;
+ private final TransactionIdentifier identifier;
+
+ protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
+ super(referent, phantomReferenceQueue);
+
+ // Note we need to cache the relevant fields from the TransactionProxy as we can't
+ // have a hard reference to the TransactionProxy instance itself.
+
+ remoteTransactionActors = referent.remoteTransactionActors;
+ remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
+ actorContext = referent.actorContext;
+ identifier = referent.identifier;
+ }
+
+ @Override
+ public void finalizeReferent() {
+ LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
+ remoteTransactionActors.size(), identifier);
+
+ phantomReferenceCache.remove(this);
+
+ // Access the memory barrier volatile to ensure all previous updates to the
+ // remoteTransactionActors list are visible to this thread.
+
+ if(remoteTransactionActorsMB.get()) {
+ for(ActorSelection actor : remoteTransactionActors) {
+ LOG.trace("Sending CloseTransaction to {}", actor);
+ actorContext.sendOperationAsync(actor,
+ new CloseTransaction().toSerializable());
+ }
+ }
+ }
+ }
+
+ /**
+ * Stores the remote Tx actors for each requested data store path to be used by the
+ * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
+ * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
+ * remoteTransactionActors list so they will be visible to the thread accessing the
+ * PhantomReference.
+ */
+ private List remoteTransactionActors;
+ private AtomicBoolean remoteTransactionActorsMB;
+
+ /**
+ * Stores the create transaction results per shard.
+ */
+ private final Map txFutureCallbackMap = new HashMap<>();
+
+ private final TransactionType transactionType;
+ private final ActorContext actorContext;
+ private final TransactionIdentifier identifier;
+ private final String transactionChainId;
+ private final SchemaContext schemaContext;
+ private boolean inReadyState;
+
+ public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
+ this(actorContext, transactionType, "");
+ }
+
+ public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
+ String transactionChainId) {
+ this.actorContext = Preconditions.checkNotNull(actorContext,
+ "actorContext should not be null");
+ this.transactionType = Preconditions.checkNotNull(transactionType,
+ "transactionType should not be null");
+ this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
+ "schemaContext should not be null");
+ this.transactionChainId = transactionChainId;
+
+ String memberName = actorContext.getCurrentMemberName();
+ if(memberName == null){
+ memberName = "UNKNOWN-MEMBER";
+ }
+
+ this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
+ counter.getAndIncrement()).build();
+
+ if(transactionType == TransactionType.READ_ONLY) {
+ // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
+ // to close the remote Tx's when this instance is no longer in use and is garbage
+ // collected.
+
+ remoteTransactionActors = Lists.newArrayList();
+ remoteTransactionActorsMB = new AtomicBoolean();
+
+ TransactionProxyCleanupPhantomReference cleanup =
+ new TransactionProxyCleanupPhantomReference(this);
+ phantomReferenceCache.put(cleanup, cleanup);
+ }
+
+ LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId);
+ }
+
+ @VisibleForTesting
+ List> getRecordedOperationFutures() {
+ List> recordedOperationFutures = Lists.newArrayList();
+ for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ if(transactionContext != null) {
+ recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+ }
+ }
+
+ return recordedOperationFutures;
+ }
+
+ @VisibleForTesting
+ boolean hasTransactionContext() {
+ for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
+ TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ if(transactionContext != null) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
@Override
- public ListenableFuture>> read(InstanceIdentifier path) {
- throw new UnsupportedOperationException("read");
+ public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) {
+
+ Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
+ "Read operation on write-only transaction is not allowed");
+
+ LOG.debug("Tx {} read {}", identifier, path);
+
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+ return txFutureCallback.enqueueReadOperation(new ReadOperation>>() {
+ @Override
+ public CheckedFuture>, ReadFailedException> invoke(
+ TransactionContext transactionContext) {
+ return transactionContext.readData(path);
+ }
+ });
}
@Override
- public void write(InstanceIdentifier path, NormalizedNode, ?> data) {
- throw new UnsupportedOperationException("write");
+ public CheckedFuture exists(final YangInstanceIdentifier path) {
+
+ Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
+ "Exists operation on write-only transaction is not allowed");
+
+ LOG.debug("Tx {} exists {}", identifier, path);
+
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+ return txFutureCallback.enqueueReadOperation(new ReadOperation() {
+ @Override
+ public CheckedFuture invoke(TransactionContext transactionContext) {
+ return transactionContext.dataExists(path);
+ }
+ });
+ }
+
+
+ private void checkModificationState() {
+ Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
+ "Modification operation on read-only transaction is not allowed");
+ Preconditions.checkState(!inReadyState,
+ "Transaction is sealed - further modifications are not allowed");
+ }
+
+ @Override
+ public void write(final YangInstanceIdentifier path, final NormalizedNode, ?> data) {
+
+ checkModificationState();
+
+ LOG.debug("Tx {} write {}", identifier, path);
+
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+ txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ transactionContext.writeData(path, data);
+ }
+ });
}
@Override
- public void merge(InstanceIdentifier path, NormalizedNode, ?> data) {
- throw new UnsupportedOperationException("merge");
+ public void merge(final YangInstanceIdentifier path, final NormalizedNode, ?> data) {
+
+ checkModificationState();
+
+ LOG.debug("Tx {} merge {}", identifier, path);
+
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+ txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ transactionContext.mergeData(path, data);
+ }
+ });
}
@Override
- public void delete(InstanceIdentifier path) {
- throw new UnsupportedOperationException("delete");
+ public void delete(final YangInstanceIdentifier path) {
+
+ checkModificationState();
+
+ LOG.debug("Tx {} delete {}", identifier, path);
+
+ TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+ txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ transactionContext.deleteData(path);
+ }
+ });
}
@Override
public DOMStoreThreePhaseCommitCohort ready() {
- throw new UnsupportedOperationException("ready");
+
+ checkModificationState();
+
+ inReadyState = true;
+
+ LOG.debug("Tx {} Readying {} transactions for commit", identifier,
+ txFutureCallbackMap.size());
+
+ List> cohortFutures = Lists.newArrayList();
+
+ for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
+
+ LOG.debug("Tx {} Readying transaction for shard {} chain {}", identifier,
+ txFutureCallback.getShardName(), transactionChainId);
+
+ Future future = txFutureCallback.enqueueFutureOperation(new FutureOperation() {
+ @Override
+ public Future invoke(TransactionContext transactionContext) {
+ return transactionContext.readyTransaction();
+ }
+ });
+
+ cohortFutures.add(future);
+ }
+
+ onTransactionReady(cohortFutures);
+
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
+ identifier.toString());
+ }
+
+ /**
+ * Method for derived classes to be notified when the transaction has been readied.
+ *
+ * @param cohortFutures the cohort Futures for each shard transaction.
+ */
+ protected void onTransactionReady(List> cohortFutures) {
+ }
+
+ /**
+ * Method called to send a CreateTransaction message to a shard.
+ *
+ * @param shard the shard actor to send to
+ * @param serializedCreateMessage the serialized message to send
+ * @return the response Future
+ */
+ protected Future