BUG-5280: eliminate ShardTransactionIdentifier
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index ccd0b85c36d94e4586ff0df9bdd7dfc596da24d7..b76c0fac34c2b9f7299eb69f230fc2c714bb235e 100644 (file)
@@ -27,10 +27,8 @@ import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
-import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
@@ -65,8 +63,8 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
@@ -83,10 +81,21 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class Shard extends RaftActor {
 
-    protected static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
+    @VisibleForTesting
+    static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() {
+        @Override
+        public String toString() {
+            return "txCommitTimeoutCheck";
+        }
+    };
 
     @VisibleForTesting
-    static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage";
+    static final Object GET_SHARD_MBEAN_MESSAGE = new Object() {
+        @Override
+        public String toString() {
+            return "getShardMBeanMessage";
+        }
+    };
 
     // FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
     public static final String DEFAULT_NAME = "default";
@@ -208,16 +217,14 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void handleCommand(final Object message) {
-
-        final MessageTracker.Context context = appendEntriesReplyTracker.received(message);
-        final Optional<Error> maybeError = context.error();
-        if (maybeError.isPresent()) {
-            LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
-                maybeError.get());
-        }
+    protected void handleNonRaftCommand(final Object message) {
+        try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
+            final Optional<Error> maybeError = context.error();
+            if (maybeError.isPresent()) {
+                LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
+                    maybeError.get());
+            }
 
-        try {
             if (CreateTransaction.isSerializedType(message)) {
                 handleCreateTransaction(message);
             } else if (message instanceof BatchedModifications) {
@@ -261,11 +268,12 @@ public class Shard extends RaftActor {
                 context().parent().forward(message, context());
             } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
                 messageRetrySupport.onTimerMessage(message);
+            } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
+                commitCoordinator.processCohortRegistryCommand(getSender(),
+                        (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
             } else {
-                super.handleCommand(message);
+                super.handleNonRaftCommand(message);
             }
-        } finally {
-            context.done();
         }
     }
 
@@ -321,14 +329,14 @@ public class Shard extends RaftActor {
         if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
             applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
         } else {
-            Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
+            persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
                     DataTreeCandidatePayload.create(candidate));
         }
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
         if (isLeader()) {
-            if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
+            if(!commitCoordinator.handleCommit(new StringIdentifier(commit.getTransactionID()), getSender(), this)) {
                 shardMBean.incrementFailedTransactionsCount();
             }
         } else {
@@ -343,12 +351,13 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) {
+    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final Identifier transactionID,
+            @Nonnull final CohortEntry cohortEntry) {
         LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
 
         try {
             try {
-                cohortEntry.commit();
+            cohortEntry.commit();
             } catch(ExecutionException e) {
                 // We may get a "store tree and candidate base differ" IllegalStateException from commit under
                 // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
@@ -385,7 +394,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
+    private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull Identifier transactionID) {
         // With persistence enabled, this method is called via applyState by the leader strategy
         // after the commit has been replicated to a majority of the followers.
 
@@ -426,7 +435,7 @@ public class Shard extends RaftActor {
         LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
 
         if (isLeader()) {
-            commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+        commitCoordinator.handleCanCommit(new StringIdentifier(canCommit.getTransactionID()), getSender(), this);
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
@@ -441,7 +450,7 @@ public class Shard extends RaftActor {
 
     protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
         try {
-            commitCoordinator.handleBatchedModifications(batched, sender, this);
+            commitCoordinator.handleBatchedModifications(batched, sender, this, store.getSchemaContext());
         } catch (Exception e) {
             LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
                     batched.getTransactionID(), e);
@@ -510,7 +519,7 @@ public class Shard extends RaftActor {
         boolean isLeaderActive = isLeaderActive();
         if (isLeader() && isLeaderActive) {
             try {
-                commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+                commitCoordinator.handleReadyLocalTransaction(message, getSender(), this, store.getSchemaContext());
             } catch (Exception e) {
                 LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
                         message.getTransactionID(), e);
@@ -534,7 +543,8 @@ public class Shard extends RaftActor {
 
         boolean isLeaderActive = isLeaderActive();
         if (isLeader() && isLeaderActive) {
-            commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+            commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this,
+                    store.getSchemaContext());
         } else {
             ActorSelection leader = getLeader();
             if (!isLeaderActive || leader == null) {
@@ -556,7 +566,7 @@ public class Shard extends RaftActor {
     }
 
     void doAbortTransaction(final String transactionID, final ActorRef sender) {
-        commitCoordinator.handleAbort(transactionID, sender, this);
+        commitCoordinator.handleAbort(new StringIdentifier(transactionID), sender, this);
     }
 
     private void handleCreateTransaction(final Object message) {
@@ -574,13 +584,6 @@ public class Shard extends RaftActor {
         store.closeTransactionChain(closeTransactionChain.getTransactionChainId());
     }
 
-    private ActorRef createTypedTransactionActor(int transactionType,
-            ShardTransactionIdentifier transactionId, String transactionChainId) {
-
-        return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
-                transactionId, transactionChainId);
-    }
-
     private void createTransaction(CreateTransaction createTransaction) {
         try {
             if(TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY &&
@@ -598,20 +601,10 @@ public class Shard extends RaftActor {
         }
     }
 
-    private ActorRef createTransaction(int transactionType, String remoteTransactionId,
-            String transactionChainId) {
-
-
-        ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId);
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
-        }
-
-        ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
-                transactionChainId);
-
-        return transactionActor;
+    private ActorRef createTransaction(int transactionType, String transactionId, String transactionChainId) {
+        LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
+        return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
+            transactionId, transactionChainId);
     }
 
     private void commitWithNewTransaction(final Modification modification) {
@@ -673,7 +666,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
+    protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
         if (data instanceof DataTreeCandidatePayload) {
             if (clientActor == null) {
                 // No clientActor indicates a replica coming from the leader
@@ -686,22 +679,13 @@ public class Shard extends RaftActor {
                 // Replication consensus reached, proceed to commit
                 finishCommit(clientActor, identifier);
             }
-        } else if (data instanceof CompositeModificationPayload) {
-            Object modification = ((CompositeModificationPayload) data).getModification();
-
-            applyModificationToState(clientActor, identifier, modification);
-        } else if(data instanceof CompositeModificationByteStringPayload ){
-            Object modification = ((CompositeModificationByteStringPayload) data).getModification();
-
-            applyModificationToState(clientActor, identifier, modification);
         } else {
-            LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
-                    persistenceId(), data, data.getClass().getClassLoader(),
-                    CompositeModificationPayload.class.getClassLoader());
+            LOG.error("{}: Unknown state received {} ClassLoader {}", persistenceId(), data,
+                data.getClass().getClassLoader());
         }
     }
 
-    private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+    private void applyModificationToState(ActorRef clientActor, Identifier identifier, Object modification) {
         if(modification == null) {
             LOG.error(
                     "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",