Merge "Use BatchedModifications message in place of ReadyTransaction message"
authorMoiz Raja <moraja@cisco.com>
Fri, 10 Apr 2015 15:19:30 +0000 (15:19 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 10 Apr 2015 15:19:31 +0000 (15:19 +0000)
1  2 
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java

index 17f1abb92cc4ec7d4d0a7dffb818cec20b5cff0b,65b6ac4bd008c95c666ce83a1f0047553104a64a..b2a86376941d4e8e609d8e3f976041b9f7e665b3
@@@ -63,11 -63,11 +63,11 @@@ import org.opendaylight.controller.clus
  import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
  import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
  import org.opendaylight.controller.cluster.raft.RaftActor;
 +import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
  import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
  import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
  import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
  import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
  import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
  import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@@ -121,6 -121,12 +121,6 @@@ public class Shard extends RaftActor 
      private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
              Serialization.serializedActorPath(getSelf()));
  
 -
 -    /**
 -     * Coordinates persistence recovery on startup.
 -     */
 -    private ShardRecoveryCoordinator recoveryCoordinator;
 -
      private final DOMTransactionFactory transactionFactory;
  
      private final String txnDispatcherPath;
  
          appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
                  getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
 -
 -        recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG);
      }
  
      private void setTransactionCommitTimeout() {
          //
          if(isLeader()) {
              try {
-                 BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
-                 sender().tell(reply, self());
+                 boolean ready = commitCoordinator.handleTransactionModifications(batched);
+                 if(ready) {
+                     sender().tell(READY_TRANSACTION_REPLY, self());
+                 } else {
+                     sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self());
+                 }
              } catch (Exception e) {
                  LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
                          batched.getTransactionID(), e);
          // node. In that case, the subsequent 3-phase commit messages won't contain the
          // transactionId so to maintain backwards compatibility, we create a separate cohort actor
          // to provide the compatible behavior.
-         if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
-             LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
-             ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
-                     ready.getTransactionID()));
+         if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
+             ActorRef replyActorPath = getSelf();
+             if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+                 LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
+                 replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+                         ready.getTransactionID()));
+             }
  
              ReadyTransactionReply readyTransactionReply =
-                     new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
+                     new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
+                             ready.getTxnClientVersion());
              getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
-                     readyTransactionReply, getSelf());
+                 readyTransactionReply, getSelf());
          } else {
-             getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
-                     READY_TRANSACTION_REPLY, getSelf());
+             getSender().tell(READY_TRANSACTION_REPLY, getSelf());
          }
      }
  
      }
  
      @Override
 -    protected
 -    void startLogRecoveryBatch(final int maxBatchSize) {
 -        recoveryCoordinator.startLogRecoveryBatch(maxBatchSize);
 -    }
 -
 -    @Override
 -    protected void appendRecoveredLogEntry(final Payload data) {
 -        recoveryCoordinator.appendRecoveredLogPayload(data);
 -    }
 -
 -    @Override
 -    protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
 -        recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes);
 -    }
 -
 -    @Override
 -    protected void applyCurrentLogRecoveryBatch() {
 -        recoveryCoordinator.applyCurrentLogRecoveryBatch();
 +    @Nonnull
 +    protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
 +        return new ShardRecoveryCoordinator(store, persistenceId(), LOG);
      }
  
      @Override
      protected void onRecoveryComplete() {
 -        recoveryCoordinator = null;
 -
          //notify shard manager
          getContext().parent().tell(new ActorInitialized(), getSelf());