BUG-5626: make CloseTransactionChain implement Identifiable
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 02a14022c4366d1cef8380da0183e8ec19e56a4a..b6b7ae12c8d9eaac33f4e5bdadaf5dc483a13f39 100644 (file)
@@ -16,6 +16,7 @@ import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -51,6 +52,8 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
@@ -62,6 +65,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
@@ -321,16 +325,27 @@ public class Shard extends RaftActor {
 
     void continueCommit(final CohortEntry cohortEntry) {
         final DataTreeCandidate candidate = cohortEntry.getCandidate();
 
     void continueCommit(final CohortEntry cohortEntry) {
         final DataTreeCandidate candidate = cohortEntry.getCandidate();
+        final TransactionIdentifier transactionId = cohortEntry.getTransactionID();
 
         // If we do not have any followers and we are not using persistence
         // or if cohortEntry has no modifications
         // we can apply modification to the state immediately
         if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
 
         // If we do not have any followers and we are not using persistence
         // or if cohortEntry has no modifications
         // we can apply modification to the state immediately
         if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
-            applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
-        } else {
-            persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
-                    DataTreeCandidatePayload.create(candidate));
+            applyModificationToState(cohortEntry.getReplySender(), transactionId, candidate);
+            return;
         }
         }
+
+        final Payload payload;
+        try {
+            payload = CommitTransactionPayload.create(transactionId, candidate);
+        } catch (IOException e) {
+            LOG.error("{}: failed to encode transaction {} candidate {}", persistenceId(), transactionId, candidate,
+                e);
+            // TODO: do we need to do something smarter here?
+            throw Throwables.propagate(e);
+        }
+
+        persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), payload);
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
@@ -580,7 +595,7 @@ public class Shard extends RaftActor {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        store.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+        store.closeTransactionChain(closeTransactionChain.getIdentifier());
     }
 
     private void createTransaction(CreateTransaction createTransaction) {
     }
 
     private void createTransaction(CreateTransaction createTransaction) {
@@ -666,11 +681,11 @@ public class Shard extends RaftActor {
 
     @Override
     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
 
     @Override
     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
-        if (data instanceof DataTreeCandidatePayload) {
+        if (data instanceof DataTreeCandidateSupplier) {
             if (clientActor == null) {
                 // No clientActor indicates a replica coming from the leader
                 try {
             if (clientActor == null) {
                 // No clientActor indicates a replica coming from the leader
                 try {
-                    store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate());
+                    store.applyForeignCandidate(identifier, ((DataTreeCandidateSupplier)data).getCandidate().getValue());
                 } catch (DataValidationFailedException | IOException e) {
                     LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
                 }
                 } catch (DataValidationFailedException | IOException e) {
                     LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
                 }