Merge "BUG 2799: SPI for EventSources"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 64f914b19fbebfa517afb2a7f23deb473a46a1a5..59c9298499c4ed0a58961b57fe40019f15214de1 100644 (file)
@@ -12,21 +12,16 @@ import akka.actor.ActorSelection;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.FinalizablePhantomReference;
-import com.google.common.base.FinalizableReferenceQueue;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,14 +30,13 @@ import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -65,26 +59,31 @@ import scala.concurrent.duration.FiniteDuration;
  * shards will be executed.
  * </p>
  */
-public class TransactionProxy implements DOMStoreReadWriteTransaction {
+public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
 
     public static enum TransactionType {
         READ_ONLY,
         WRITE_ONLY,
         READ_WRITE;
 
-        public static TransactionType fromInt(int type) {
-            if(type == WRITE_ONLY.ordinal()) {
-                return WRITE_ONLY;
-            } else if(type == READ_WRITE.ordinal()) {
-                return READ_WRITE;
-            } else if(type == READ_ONLY.ordinal()) {
-                return READ_ONLY;
-            } else {
-                throw new IllegalArgumentException("In TransactionType enum value" + type);
+        // Cache all values
+        private static final TransactionType[] VALUES = values();
+
+        public static TransactionType fromInt(final int type) {
+            try {
+                return VALUES[type];
+            } catch (IndexOutOfBoundsException e) {
+                throw new IllegalArgumentException("In TransactionType enum value " + type, e);
             }
         }
     }
 
+    private static enum TransactionState {
+        OPEN,
+        READY,
+        CLOSED,
+    }
+
     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
                                                               new Mapper<Throwable, Throwable>() {
         @Override
@@ -103,72 +102,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
             FiniteDuration.create(1, TimeUnit.SECONDS);
 
-    /**
-     * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
-     * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
-     * trickery to clean up its internal thread when the bundle is unloaded.
-     */
-    private static final FinalizableReferenceQueue phantomReferenceQueue =
-                                                                  new FinalizableReferenceQueue();
-
-    /**
-     * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
-     * necessary because PhantomReferences need a hard reference so they're not garbage collected.
-     * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
-     * and thus becomes eligible for garbage collection.
-     */
-    private static final Map<TransactionProxyCleanupPhantomReference,
-                             TransactionProxyCleanupPhantomReference> phantomReferenceCache =
-                                                                        new ConcurrentHashMap<>();
-
-    /**
-     * A PhantomReference that closes remote transactions for a TransactionProxy when it's
-     * garbage collected. This is used for read-only transactions as they're not explicitly closed
-     * by clients. So the only way to detect that a transaction is no longer in use and it's safe
-     * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
-     * but TransactionProxy instances should generally be short-lived enough to avoid being moved
-     * to the old generation space and thus should be cleaned up in a timely manner as the GC
-     * runs on the young generation (eden, swap1...) space much more frequently.
-     */
-    private static class TransactionProxyCleanupPhantomReference
-                                           extends FinalizablePhantomReference<TransactionProxy> {
-
-        private final List<ActorSelection> remoteTransactionActors;
-        private final AtomicBoolean remoteTransactionActorsMB;
-        private final ActorContext actorContext;
-        private final TransactionIdentifier identifier;
-
-        protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
-            super(referent, phantomReferenceQueue);
-
-            // Note we need to cache the relevant fields from the TransactionProxy as we can't
-            // have a hard reference to the TransactionProxy instance itself.
-
-            remoteTransactionActors = referent.remoteTransactionActors;
-            remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
-            actorContext = referent.actorContext;
-            identifier = referent.identifier;
-        }
-
-        @Override
-        public void finalizeReferent() {
-            LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
-                    remoteTransactionActors.size(), identifier);
-
-            phantomReferenceCache.remove(this);
-
-            // Access the memory barrier volatile to ensure all previous updates to the
-            // remoteTransactionActors list are visible to this thread.
-
-            if(remoteTransactionActorsMB.get()) {
-                for(ActorSelection actor : remoteTransactionActors) {
-                    LOG.trace("Sending CloseTransaction to {}", actor);
-                    actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
-                }
-            }
-        }
-    }
-
     /**
      * Stores the remote Tx actors for each requested data store path to be used by the
      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
@@ -176,8 +109,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
      * remoteTransactionActors list so they will be visible to the thread accessing the
      * PhantomReference.
      */
-    private List<ActorSelection> remoteTransactionActors;
-    private volatile AtomicBoolean remoteTransactionActorsMB;
+    List<ActorSelection> remoteTransactionActors;
+    volatile AtomicBoolean remoteTransactionActorsMB;
 
     /**
      * Stores the create transaction results per shard.
@@ -185,11 +118,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
 
     private final TransactionType transactionType;
-    private final ActorContext actorContext;
-    private final TransactionIdentifier identifier;
+    final ActorContext actorContext;
     private final String transactionChainId;
     private final SchemaContext schemaContext;
-    private boolean inReadyState;
+    private TransactionState state = TransactionState.OPEN;
 
     private volatile boolean initialized;
     private Semaphore operationLimiter;
@@ -199,8 +131,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         this(actorContext, transactionType, "");
     }
 
-    public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
-            String transactionChainId) {
+    public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
+        super(createIdentifier(actorContext));
         this.actorContext = Preconditions.checkNotNull(actorContext,
             "actorContext should not be null");
         this.transactionType = Preconditions.checkNotNull(transactionType,
@@ -209,14 +141,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             "schemaContext should not be null");
         this.transactionChainId = transactionChainId;
 
+        LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
+    }
+
+    private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
         String memberName = actorContext.getCurrentMemberName();
-        if(memberName == null){
+        if (memberName == null) {
             memberName = "UNKNOWN-MEMBER";
         }
 
-        this.identifier = new TransactionIdentifier(memberName, counter.getAndIncrement());
-
-        LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId);
+        return new TransactionIdentifier(memberName, counter.getAndIncrement());
     }
 
     @VisibleForTesting
@@ -224,8 +158,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
-            if(transactionContext != null) {
-                recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+            if (transactionContext != null) {
+                transactionContext.copyRecordedOperationFutures(recordedOperationFutures);
             }
         }
 
@@ -250,7 +184,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
                 "Read operation on write-only transaction is not allowed");
 
-        LOG.debug("Tx {} read {}", identifier, path);
+        LOG.debug("Tx {} read {}", getIdentifier(), path);
 
         throttleOperation();
 
@@ -273,7 +207,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
                 "Exists operation on write-only transaction is not allowed");
 
-        LOG.debug("Tx {} exists {}", identifier, path);
+        LOG.debug("Tx {} exists {}", getIdentifier(), path);
 
         throttleOperation();
 
@@ -293,7 +227,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private void checkModificationState() {
         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
                 "Modification operation on read-only transaction is not allowed");
-        Preconditions.checkState(!inReadyState,
+        Preconditions.checkState(state == TransactionState.OPEN,
                 "Transaction is sealed - further modifications are not allowed");
     }
 
@@ -326,13 +260,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
     }
 
-
     @Override
     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
 
         checkModificationState();
 
-        LOG.debug("Tx {} write {}", identifier, path);
+        LOG.debug("Tx {} write {}", getIdentifier(), path);
 
         throttleOperation();
 
@@ -350,7 +283,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("Tx {} merge {}", identifier, path);
+        LOG.debug("Tx {} merge {}", getIdentifier(), path);
 
         throttleOperation();
 
@@ -368,7 +301,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("Tx {} delete {}", identifier, path);
+        LOG.debug("Tx {} delete {}", getIdentifier(), path);
 
         throttleOperation();
 
@@ -381,28 +314,37 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         });
     }
 
-    @Override
-    public DOMStoreThreePhaseCommitCohort ready() {
+    private boolean seal(final TransactionState newState) {
+        if (state == TransactionState.OPEN) {
+            state = newState;
+            return true;
+        } else {
+            return false;
+        }
+    }
 
-        checkModificationState();
+    @Override
+    public AbstractThreePhaseCommitCohort ready() {
+        Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
+                "Read-only transactions cannot be readied");
 
-        inReadyState = true;
+        final boolean success = seal(TransactionState.READY);
+        Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
 
-        LOG.debug("Tx {} Readying {} transactions for commit", identifier,
+        LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
                     txFutureCallbackMap.size());
 
-        if(txFutureCallbackMap.size() == 0) {
-            onTransactionReady(Collections.<Future<ActorSelection>>emptyList());
+        if (txFutureCallbackMap.isEmpty()) {
+            TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
         }
 
         throttleOperation(txFutureCallbackMap.size());
 
-        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
-
+        List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
 
-            LOG.debug("Tx {} Readying transaction for shard {} chain {}", identifier,
+            LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
                         txFutureCallback.getShardName(), transactionChainId);
 
             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
@@ -424,27 +366,22 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             cohortFutures.add(future);
         }
 
-        onTransactionReady(cohortFutures);
-
         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
-                identifier.toString());
-    }
-
-    /**
-     * Method for derived classes to be notified when the transaction has been readied.
-     *
-     * @param cohortFutures the cohort Futures for each shard transaction.
-     */
-    protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
-    }
-
-    @Override
-    public Object getIdentifier() {
-        return this.identifier;
+            getIdentifier().toString());
     }
 
     @Override
     public void close() {
+        if (!seal(TransactionState.CLOSED)) {
+            if (state == TransactionState.CLOSED) {
+                // Idempotent no-op as per AutoCloseable recommendation
+                return;
+            }
+
+            throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
+                getIdentifier()));
+        }
+
         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
                 @Override
@@ -504,13 +441,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         return actorContext;
     }
 
-    /**
-     * Interfaces for transaction operations to be invoked later.
-     */
-    private static interface TransactionOperation {
-        void invoke(TransactionContext transactionContext);
-    }
-
     /**
      * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
      * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
@@ -567,7 +497,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(transactionType == TransactionType.WRITE_ONLY &&
                     actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
                 LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
-                        identifier, primaryShard);
+                    getIdentifier(), primaryShard);
 
                 // For write-only Tx's we prepare the transaction modifications directly on the shard actor
                 // to avoid the overhead of creating a separate transaction actor.
@@ -586,7 +516,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             boolean invokeOperation = true;
             synchronized(txOperationsOnComplete) {
                 if(transactionContext == null) {
-                    LOG.debug("Tx {} Adding operation on complete", identifier);
+                    LOG.debug("Tx {} Adding operation on complete", getIdentifier());
 
                     invokeOperation = false;
                     txOperationsOnComplete.add(operation);
@@ -614,10 +544,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          */
         private void tryCreateTransaction() {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Tx {} Primary shard {} found - trying create transaction", identifier, primaryShard);
+                LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
             }
 
-            Object serializedCreateMessage = new CreateTransaction(identifier.toString(),
+            Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
                     TransactionProxy.this.transactionType.ordinal(),
                     getTransactionChainId()).toSerializable();
 
@@ -635,7 +565,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 // is ok.
                 if(--createTxTries > 0) {
                     LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
-                            identifier, shardName);
+                        getIdentifier(), shardName);
 
                     actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
                             new Runnable() {
@@ -667,17 +597,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // TransactionContext until after we've executed all cached TransactionOperations.
             TransactionContext localTransactionContext;
             if(failure != null) {
-                LOG.debug("Tx {} Creating NoOpTransaction because of error", identifier, failure);
+                LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
 
-                localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
-            } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
+                localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter);
+            } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
                 localTransactionContext = createValidTransactionContext(
                         CreateTransactionReply.fromSerializable(response));
             } else {
                 IllegalArgumentException exception = new IllegalArgumentException(String.format(
                         "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-                localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter);
+                localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), operationLimiter);
             }
 
             executeTxOperatonsOnComplete(localTransactionContext);
@@ -717,7 +647,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
 
         private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
-            LOG.debug("Tx {} Received {}", identifier, reply);
+            LOG.debug("Tx {} Received {}", getIdentifier(), reply);
 
             return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
                     reply.getTransactionPath(), reply.getVersion());
@@ -735,9 +665,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                     remoteTransactionActors = Lists.newArrayList();
                     remoteTransactionActorsMB = new AtomicBoolean();
 
-                    TransactionProxyCleanupPhantomReference cleanup =
-                            new TransactionProxyCleanupPhantomReference(TransactionProxy.this);
-                    phantomReferenceCache.put(cleanup, cleanup);
+                    TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
                 }
 
                 // Add the actor to the remoteTransactionActors list for access by the
@@ -754,49 +682,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
 
             if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
-                return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier,
+                return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
                         transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
                         operationCompleter);
             } else if (transactionType == TransactionType.WRITE_ONLY &&
                     actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
-                return new WriteOnlyTransactionContextImpl(transactionActor, identifier, transactionChainId,
+                return new WriteOnlyTransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
                     actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
             } else {
-                return new TransactionContextImpl(transactionActor, identifier, transactionChainId,
+                return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
                         actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
             }
         }
     }
-
-    private static class NoOpDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
-        static NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort();
-
-        private static final ListenableFuture<Void> IMMEDIATE_VOID_SUCCESS =
-                com.google.common.util.concurrent.Futures.immediateFuture(null);
-        private static final ListenableFuture<Boolean> IMMEDIATE_BOOLEAN_SUCCESS =
-                com.google.common.util.concurrent.Futures.immediateFuture(Boolean.TRUE);
-
-        private NoOpDOMStoreThreePhaseCommitCohort() {
-        }
-
-        @Override
-        public ListenableFuture<Boolean> canCommit() {
-            return IMMEDIATE_BOOLEAN_SUCCESS;
-        }
-
-        @Override
-        public ListenableFuture<Void> preCommit() {
-            return IMMEDIATE_VOID_SUCCESS;
-        }
-
-        @Override
-        public ListenableFuture<Void> abort() {
-            return IMMEDIATE_VOID_SUCCESS;
-        }
-
-        @Override
-        public ListenableFuture<Void> commit() {
-            return IMMEDIATE_VOID_SUCCESS;
-        }
-    }
 }