Merge "Bug 2265: Use new NormalizedNode streaming in messages"
authorMoiz Raja <moraja@cisco.com>
Mon, 26 Jan 2015 03:57:54 +0000 (03:57 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 26 Jan 2015 03:57:54 +0000 (03:57 +0000)
1  2 
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java

index f8bcb528b3e987ce36784725befdd91b4fd020e4,1661bb4b5dc95477f9e10c413171bc8d5ba03fb8..a3ef0339b7571172dfb0d5b2338f52911a86cf9c
@@@ -95,12 -95,9 +95,10 @@@ import scala.concurrent.duration.Finite
   */
  public class Shard extends RaftActor {
  
-     private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
      private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
  
 -    public static final String DEFAULT_NAME = "default";
 +    @VisibleForTesting
 +    static final String DEFAULT_NAME = "default";
  
      // The state of this Shard
      private final InMemoryDOMDataStore store;
              cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
              if(cohortEntry != null) {
                  commitWithNewTransaction(cohortEntry.getModification());
-                 sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+                 sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
              } else {
                  // This really shouldn't happen - it likely means that persistence or replication
                  // took so long to complete such that the cohort entry was expired from the cache.
              // currently uses a same thread executor anyway.
              cohortEntry.getCohort().commit().get();
  
-             sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+             sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
  
              shardMBean.incrementCommittedTransactionCount();
              shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
          // transactionId so to maintain backwards compatibility, we create a separate cohort actor
          // to provide the compatible behavior.
          ActorRef replyActorPath = self();
-         if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
+         if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
              LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
              replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
                      ready.getTransactionID()));
                      shardMBean.incrementAbortTransactionsCount();
  
                      if(sender != null) {
-                         sender.tell(new AbortTransactionReply().toSerializable(), self);
+                         sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
                      }
                  }
  
          // This must be for install snapshot. Don't want to open this up and trigger
          // deSerialization
  
-         self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
+         self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)),
                  self());
  
          createSnapshotTransaction = null;
      }
  
      private ActorRef createTypedTransactionActor(int transactionType,
-             ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
+             ShardTransactionIdentifier transactionId, String transactionChainId,
+             short clientVersion ) {
  
          DOMStoreTransactionFactory factory = store;
  
      }
  
      private ActorRef createTransaction(int transactionType, String remoteTransactionId,
-             String transactionChainId, int clientVersion) {
+             String transactionChainId, short clientVersion) {
  
          ShardTransactionIdentifier transactionId =
              ShardTransactionIdentifier.builder()
          dataChangeListeners.add(dataChangeListenerPath);
  
          AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
-                 new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
+                 new DataChangeListenerProxy(dataChangeListenerPath);
  
          LOG.debug("Registering for path {}", registerChangeListener.getPath());
  
              createSnapshotTransaction = createTransaction(
                  TransactionProxy.TransactionType.READ_ONLY.ordinal(),
                  "createSnapshot" + ++createSnapshotTransactionCounter, "",
-                 CreateTransaction.CURRENT_VERSION);
+                 DataStoreVersions.CURRENT_VERSION);
  
              createSnapshotTransaction.tell(
                  new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());