BUG-5626: make CloseTransactionChain implement Identifiable
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 94753e553d760c5787055682b9c70f8e8b0e813b..b6b7ae12c8d9eaac33f4e5bdadaf5dc483a13f39 100644 (file)
@@ -16,6 +16,7 @@ import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -23,13 +24,13 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
@@ -51,8 +52,8 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
@@ -64,10 +65,8 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.util.StringIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
@@ -175,7 +174,8 @@ public class Shard extends RaftActor {
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(
                         Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
 
-        snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
+        snapshotCohort = new ShardSnapshotCohort(builder.getId().getMemberName(), transactionActorFactory, store,
+            LOG, this.name);
 
         messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
     }
@@ -325,21 +325,32 @@ public class Shard extends RaftActor {
 
     void continueCommit(final CohortEntry cohortEntry) {
         final DataTreeCandidate candidate = cohortEntry.getCandidate();
+        final TransactionIdentifier transactionId = cohortEntry.getTransactionID();
 
         // If we do not have any followers and we are not using persistence
         // or if cohortEntry has no modifications
         // we can apply modification to the state immediately
         if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
-            applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
-        } else {
-            persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
-                    DataTreeCandidatePayload.create(candidate));
+            applyModificationToState(cohortEntry.getReplySender(), transactionId, candidate);
+            return;
         }
+
+        final Payload payload;
+        try {
+            payload = CommitTransactionPayload.create(transactionId, candidate);
+        } catch (IOException e) {
+            LOG.error("{}: failed to encode transaction {} candidate {}", persistenceId(), transactionId, candidate,
+                e);
+            // TODO: do we need to do something smarter here?
+            throw Throwables.propagate(e);
+        }
+
+        persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), payload);
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
         if (isLeader()) {
-            if(!commitCoordinator.handleCommit(new StringIdentifier(commit.getTransactionID()), getSender(), this)) {
+            if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
                 shardMBean.incrementFailedTransactionsCount();
             }
         } else {
@@ -438,7 +449,7 @@ public class Shard extends RaftActor {
         LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
 
         if (isLeader()) {
-        commitCoordinator.handleCanCommit(new StringIdentifier(canCommit.getTransactionID()), getSender(), this);
+        commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
@@ -568,8 +579,8 @@ public class Shard extends RaftActor {
         doAbortTransaction(abort.getTransactionID(), getSender());
     }
 
-    void doAbortTransaction(final String transactionID, final ActorRef sender) {
-        commitCoordinator.handleAbort(new StringIdentifier(transactionID), sender, this);
+    void doAbortTransaction(final TransactionIdentifier transactionID, final ActorRef sender) {
+        commitCoordinator.handleAbort(transactionID, sender, this);
     }
 
     private void handleCreateTransaction(final Object message) {
@@ -584,14 +595,7 @@ public class Shard extends RaftActor {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        store.closeTransactionChain(closeTransactionChain.getTransactionChainId());
-    }
-
-    private ActorRef createTypedTransactionActor(int transactionType,
-            ShardTransactionIdentifier transactionId, String transactionChainId) {
-
-        return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
-                transactionId, transactionChainId);
+        store.closeTransactionChain(closeTransactionChain.getIdentifier());
     }
 
     private void createTransaction(CreateTransaction createTransaction) {
@@ -602,7 +606,7 @@ public class Shard extends RaftActor {
             }
 
             ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
-                createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
+                createTransaction.getTransactionId());
 
             getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
                     createTransaction.getTransactionId(), createTransaction.getVersion()).toSerializable(), getSelf());
@@ -611,24 +615,14 @@ public class Shard extends RaftActor {
         }
     }
 
-    private ActorRef createTransaction(int transactionType, String remoteTransactionId,
-            String transactionChainId) {
-
-
-        ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId);
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
-        }
-
-        ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
-                transactionChainId);
-
-        return transactionActor;
+    private ActorRef createTransaction(int transactionType, TransactionIdentifier transactionId) {
+        LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
+        return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
+            transactionId);
     }
 
-    private void commitWithNewTransaction(final Modification modification) {
-        ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.toString(), null);
+    private void commitWithNewTransaction(final BatchedModifications modification) {
+        ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.getTransactionID());
         modification.apply(tx.getSnapshot());
         try {
             snapshotCohort.syncCommitTransaction(tx);
@@ -687,11 +681,11 @@ public class Shard extends RaftActor {
 
     @Override
     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
-        if (data instanceof DataTreeCandidatePayload) {
+        if (data instanceof DataTreeCandidateSupplier) {
             if (clientActor == null) {
                 // No clientActor indicates a replica coming from the leader
                 try {
-                    store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate());
+                    store.applyForeignCandidate(identifier, ((DataTreeCandidateSupplier)data).getCandidate().getValue());
                 } catch (DataValidationFailedException | IOException e) {
                     LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
                 }
@@ -699,18 +693,9 @@ public class Shard extends RaftActor {
                 // Replication consensus reached, proceed to commit
                 finishCommit(clientActor, identifier);
             }
-        } else if (data instanceof CompositeModificationPayload) {
-            Object modification = ((CompositeModificationPayload) data).getModification();
-
-            applyModificationToState(clientActor, identifier, modification);
-        } else if(data instanceof CompositeModificationByteStringPayload ){
-            Object modification = ((CompositeModificationByteStringPayload) data).getModification();
-
-            applyModificationToState(clientActor, identifier, modification);
         } else {
-            LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
-                    persistenceId(), data, data.getClass().getClassLoader(),
-                    CompositeModificationPayload.class.getClassLoader());
+            LOG.error("{}: Unknown state received {} ClassLoader {}", persistenceId(), data,
+                data.getClass().getClassLoader());
         }
     }
 
@@ -722,7 +707,11 @@ public class Shard extends RaftActor {
         } else if(clientActor == null) {
             // There's no clientActor to which to send a commit reply so we must be applying
             // replicated state from the leader.
-            commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
+
+            // The only implementation we know of is BatchedModifications, which also carries a transaction
+            // identifier -- which we really need that.
+            Preconditions.checkArgument(modification instanceof BatchedModifications);
+            commitWithNewTransaction((BatchedModifications)modification);
         } else {
             // This must be the OK to commit after replication consensus.
             finishCommit(clientActor, identifier);