BUG-5280: implement message queueing
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractTransactionContextFactory.java
index 225f3c201f5c5df7c6f5d41d988f7d266f1141fe..b5afd596bf3b1ded0abdbaceb9c80e309ccaacda 100644 (file)
@@ -14,9 +14,11 @@ import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
@@ -34,27 +36,37 @@ import scala.util.Try;
  */
 abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
-
-    protected static final AtomicLong TX_COUNTER = new AtomicLong();
+    @SuppressWarnings("rawtypes")
+    private static final AtomicLongFieldUpdater<AbstractTransactionContextFactory> TX_COUNTER_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(AbstractTransactionContextFactory.class, "nextTx");
 
     private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
+    private final LocalHistoryIdentifier historyId;
     private final ActorContext actorContext;
 
-    protected AbstractTransactionContextFactory(final ActorContext actorContext) {
+    // Used via TX_COUNTER_UPDATER
+    @SuppressWarnings("unused")
+    private volatile long nextTx;
+
+    protected AbstractTransactionContextFactory(final ActorContext actorContext,
+            final LocalHistoryIdentifier historyId) {
         this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.historyId = Preconditions.checkNotNull(historyId);
     }
 
     final ActorContext getActorContext() {
         return actorContext;
     }
 
+    final LocalHistoryIdentifier getHistoryId() {
+        return historyId;
+    }
+
     private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent, final String shardName) {
         final LocalTransactionFactory local = knownLocal.get(shardName);
         if (local != null) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Tx {} - Creating local component for shard {} using factory {}",
-                        parent.getIdentifier(), shardName, local);
-            }
+            LOG.debug("Tx {} - Creating local component for shard {} using factory {}", parent.getIdentifier(),
+                shardName, local);
 
             try {
                 return createLocalTransactionContext(local, parent);
@@ -145,13 +157,8 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
         }
     }
 
-    protected String getMemberName() {
-        String memberName = getActorContext().getCurrentMemberName();
-        if (memberName == null) {
-            memberName = "UNKNOWN-MEMBER";
-        }
-
-        return memberName;
+    protected final MemberName getMemberName() {
+        return historyId.getClientId().getFrontendId().getMemberName();
     }
 
     /**
@@ -159,7 +166,9 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
      * factory.
      * @return Transaction identifier, may not be null.
      */
-    protected abstract TransactionIdentifier nextIdentifier();
+    protected final TransactionIdentifier nextIdentifier() {
+        return new TransactionIdentifier(historyId, TX_COUNTER_UPDATER.getAndIncrement(this));
+    }
 
     /**
      * Find the primary shard actor.