BUG-5280: do not use CompositeModification in Shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 94753e553d760c5787055682b9c70f8e8b0e813b..e9f543f0f20cf852019982888be55b8f1e0bb1e3 100644 (file)
@@ -29,7 +29,6 @@ import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
@@ -51,8 +50,6 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
@@ -64,8 +61,6 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.util.StringIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
@@ -587,13 +582,6 @@ public class Shard extends RaftActor {
         store.closeTransactionChain(closeTransactionChain.getTransactionChainId());
     }
 
-    private ActorRef createTypedTransactionActor(int transactionType,
-            ShardTransactionIdentifier transactionId, String transactionChainId) {
-
-        return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
-                transactionId, transactionChainId);
-    }
-
     private void createTransaction(CreateTransaction createTransaction) {
         try {
             if(TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY &&
@@ -611,24 +599,15 @@ public class Shard extends RaftActor {
         }
     }
 
-    private ActorRef createTransaction(int transactionType, String remoteTransactionId,
-            String transactionChainId) {
-
-
-        ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId);
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
-        }
-
-        ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
-                transactionChainId);
-
-        return transactionActor;
+    private ActorRef createTransaction(int transactionType, String transactionId, String transactionChainId) {
+        LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
+        return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
+            transactionId, transactionChainId);
     }
 
-    private void commitWithNewTransaction(final Modification modification) {
-        ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.toString(), null);
+    private void commitWithNewTransaction(final BatchedModifications modification) {
+        ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.getTransactionID(),
+            modification.getTransactionChainID());
         modification.apply(tx.getSnapshot());
         try {
             snapshotCohort.syncCommitTransaction(tx);
@@ -699,18 +678,9 @@ public class Shard extends RaftActor {
                 // Replication consensus reached, proceed to commit
                 finishCommit(clientActor, identifier);
             }
-        } else if (data instanceof CompositeModificationPayload) {
-            Object modification = ((CompositeModificationPayload) data).getModification();
-
-            applyModificationToState(clientActor, identifier, modification);
-        } else if(data instanceof CompositeModificationByteStringPayload ){
-            Object modification = ((CompositeModificationByteStringPayload) data).getModification();
-
-            applyModificationToState(clientActor, identifier, modification);
         } else {
-            LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
-                    persistenceId(), data, data.getClass().getClassLoader(),
-                    CompositeModificationPayload.class.getClassLoader());
+            LOG.error("{}: Unknown state received {} ClassLoader {}", persistenceId(), data,
+                data.getClass().getClassLoader());
         }
     }
 
@@ -722,7 +692,11 @@ public class Shard extends RaftActor {
         } else if(clientActor == null) {
             // There's no clientActor to which to send a commit reply so we must be applying
             // replicated state from the leader.
-            commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
+
+            // The only implementation we know of is BatchedModifications, which also carries a transaction
+            // identifier -- which we really need that.
+            Preconditions.checkArgument(modification instanceof BatchedModifications);
+            commitWithNewTransaction((BatchedModifications)modification);
         } else {
             // This must be the OK to commit after replication consensus.
             finishCommit(clientActor, identifier);