Bug 2038: Ensure only one concurrent 3-phase commit in Shard 95/11795/10
authortpantelis <tpanteli@brocade.com>
Sun, 5 Oct 2014 03:05:16 +0000 (23:05 -0400)
committerMoiz Raja <moraja@cisco.com>
Fri, 17 Oct 2014 06:36:27 +0000 (06:36 +0000)
Added a ShardCommitCoordinator class that ensures there's only one
concurrent 3-phase commit.

The following outlines the new commit workflow:

- On ready, the ShardTransaction creates the dom store cohort and
forwards a new ForwardedReadyTransaction message to the shard.

- The shard calls its ShardCommitCoordinator to add the cohort and
modificaton to a cached keyed by transaction ID.

- On CanCommitTransaction message, the ShardCommitCoordinator looks up and removes
the cohort entry from the cache corresponding to the transaction ID
passed via the CanCommit message. The ShardCommitCoordinator also caches
the cohort entry for the current transaction in progress. If there's no
transaction in progress, the committing transaction becomes the current
transaction and canCommit is called on the cohort. Otherwise, the cohort
entry is queued to be processed after the current tranaction completes.

- On CommitTransaction message, if the transaction ID passed via the
Commit message matches the currently cached cohort entry, the preCommit
and commit phases are performed. When complete, the ShardCommitCoordinator
dequeues the next waiting transaction cohort entry, if any, and process
it.

If a Tx is aborted and it is the current transaction, the
ShardCommitCoordinator handles it as a completed Tx.

Implemented a timeout mechanism using the akka scheduler such that if
the commit message isn't received after a period of time (default 30 s)
after the canCommit message, the transaction is aborted so that the next
transaction can proceed. This is to handle remote node or network
failures during a 3-phrase commit.

The ThreePhaseCommitCohort actor was removed along with the
ForwardedCommitTransaction.

Change-Id: Iaa5692ca45cd7635d1a06a609f4bf98bec50df14
Signed-off-by: tpantelis <tpanteli@brocade.com>
40 files changed:
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cohort3pc/ThreePhaseCommitCohortMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionChainMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/Cohort.proto
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionChainMessagesTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChain.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChainReply.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedCommitTransaction.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java

index 22a93c0..e43b445 100644 (file)
@@ -10,6 +10,21 @@ public final class ThreePhaseCommitCohortMessages {
   }
   public interface CanCommitTransactionOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
+
+    // required string transactionId = 1;
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    boolean hasTransactionId();
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    java.lang.String getTransactionId();
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    com.google.protobuf.ByteString
+        getTransactionIdBytes();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.mdsal.CanCommitTransaction}
@@ -44,6 +59,7 @@ public final class ThreePhaseCommitCohortMessages {
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws com.google.protobuf.InvalidProtocolBufferException {
       initFields();
+      int mutable_bitField0_ = 0;
       com.google.protobuf.UnknownFieldSet.Builder unknownFields =
           com.google.protobuf.UnknownFieldSet.newBuilder();
       try {
@@ -61,6 +77,11 @@ public final class ThreePhaseCommitCohortMessages {
               }
               break;
             }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              transactionId_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -100,13 +121,62 @@ public final class ThreePhaseCommitCohortMessages {
       return PARSER;
     }
 
+    private int bitField0_;
+    // required string transactionId = 1;
+    public static final int TRANSACTIONID_FIELD_NUMBER = 1;
+    private java.lang.Object transactionId_;
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    public boolean hasTransactionId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    public java.lang.String getTransactionId() {
+      java.lang.Object ref = transactionId_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs =
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          transactionId_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    public com.google.protobuf.ByteString
+        getTransactionIdBytes() {
+      java.lang.Object ref = transactionId_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b =
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        transactionId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
+      transactionId_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
       byte isInitialized = memoizedIsInitialized;
       if (isInitialized != -1) return isInitialized == 1;
 
+      if (!hasTransactionId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -114,6 +184,9 @@ public final class ThreePhaseCommitCohortMessages {
     public void writeTo(com.google.protobuf.CodedOutputStream output)
                         throws java.io.IOException {
       getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getTransactionIdBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -123,6 +196,10 @@ public final class ThreePhaseCommitCohortMessages {
       if (size != -1) return size;
 
       size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, getTransactionIdBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -239,6 +316,8 @@ public final class ThreePhaseCommitCohortMessages {
 
       public Builder clear() {
         super.clear();
+        transactionId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
         return this;
       }
 
@@ -265,6 +344,13 @@ public final class ThreePhaseCommitCohortMessages {
 
       public org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages.CanCommitTransaction buildPartial() {
         org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages.CanCommitTransaction result = new org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages.CanCommitTransaction(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.transactionId_ = transactionId_;
+        result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
       }
@@ -280,11 +366,20 @@ public final class ThreePhaseCommitCohortMessages {
 
       public Builder mergeFrom(org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages.CanCommitTransaction other) {
         if (other == org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages.CanCommitTransaction.getDefaultInstance()) return this;
+        if (other.hasTransactionId()) {
+          bitField0_ |= 0x00000001;
+          transactionId_ = other.transactionId_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
 
       public final boolean isInitialized() {
+        if (!hasTransactionId()) {
+
+          return false;
+        }
         return true;
       }
 
@@ -305,6 +400,81 @@ public final class ThreePhaseCommitCohortMessages {
         }
         return this;
       }
+      private int bitField0_;
+
+      // required string transactionId = 1;
+      private java.lang.Object transactionId_ = "";
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public boolean hasTransactionId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public java.lang.String getTransactionId() {
+        java.lang.Object ref = transactionId_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          transactionId_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public com.google.protobuf.ByteString
+          getTransactionIdBytes() {
+        java.lang.Object ref = transactionId_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b =
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          transactionId_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public Builder setTransactionId(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        transactionId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public Builder clearTransactionId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        transactionId_ = getDefaultInstance().getTransactionId();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public Builder setTransactionIdBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        transactionId_ = value;
+        onChanged();
+        return this;
+      }
 
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CanCommitTransaction)
     }
@@ -723,6 +893,21 @@ public final class ThreePhaseCommitCohortMessages {
 
   public interface AbortTransactionOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
+
+    // required string transactionId = 1;
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    boolean hasTransactionId();
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    java.lang.String getTransactionId();
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    com.google.protobuf.ByteString
+        getTransactionIdBytes();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.mdsal.AbortTransaction}
@@ -757,6 +942,7 @@ public final class ThreePhaseCommitCohortMessages {
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws com.google.protobuf.InvalidProtocolBufferException {
       initFields();
+      int mutable_bitField0_ = 0;
       com.google.protobuf.UnknownFieldSet.Builder unknownFields =
           com.google.protobuf.UnknownFieldSet.newBuilder();
       try {
@@ -774,6 +960,11 @@ public final class ThreePhaseCommitCohortMessages {
               }
               break;
             }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              transactionId_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -813,13 +1004,62 @@ public final class ThreePhaseCommitCohortMessages {
       return PARSER;
     }
 
+    private int bitField0_;
+    // required string transactionId = 1;
+    public static final int TRANSACTIONID_FIELD_NUMBER = 1;
+    private java.lang.Object transactionId_;
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    public boolean hasTransactionId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    public java.lang.String getTransactionId() {
+      java.lang.Object ref = transactionId_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs =
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          transactionId_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    public com.google.protobuf.ByteString
+        getTransactionIdBytes() {
+      java.lang.Object ref = transactionId_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b =
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        transactionId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
+      transactionId_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
       byte isInitialized = memoizedIsInitialized;
       if (isInitialized != -1) return isInitialized == 1;
 
+      if (!hasTransactionId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -827,6 +1067,9 @@ public final class ThreePhaseCommitCohortMessages {
     public void writeTo(com.google.protobuf.CodedOutputStream output)
                         throws java.io.IOException {
       getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getTransactionIdBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -836,6 +1079,10 @@ public final class ThreePhaseCommitCohortMessages {
       if (size != -1) return size;
 
       size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, getTransactionIdBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -952,6 +1199,8 @@ public final class ThreePhaseCommitCohortMessages {
 
       public Builder clear() {
         super.clear();
+        transactionId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
         return this;
       }
 
@@ -978,6 +1227,13 @@ public final class ThreePhaseCommitCohortMessages {
 
       public org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages.AbortTransaction buildPartial() {
         org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages.AbortTransaction result = new org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages.AbortTransaction(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.transactionId_ = transactionId_;
+        result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
       }
@@ -993,11 +1249,20 @@ public final class ThreePhaseCommitCohortMessages {
 
       public Builder mergeFrom(org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages.AbortTransaction other) {
         if (other == org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages.AbortTransaction.getDefaultInstance()) return this;
+        if (other.hasTransactionId()) {
+          bitField0_ |= 0x00000001;
+          transactionId_ = other.transactionId_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
 
       public final boolean isInitialized() {
+        if (!hasTransactionId()) {
+
+          return false;
+        }
         return true;
       }
 
@@ -1018,6 +1283,81 @@ public final class ThreePhaseCommitCohortMessages {
         }
         return this;
       }
+      private int bitField0_;
+
+      // required string transactionId = 1;
+      private java.lang.Object transactionId_ = "";
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public boolean hasTransactionId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public java.lang.String getTransactionId() {
+        java.lang.Object ref = transactionId_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          transactionId_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public com.google.protobuf.ByteString
+          getTransactionIdBytes() {
+        java.lang.Object ref = transactionId_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b =
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          transactionId_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public Builder setTransactionId(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        transactionId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public Builder clearTransactionId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        transactionId_ = getDefaultInstance().getTransactionId();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public Builder setTransactionIdBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        transactionId_ = value;
+        onChanged();
+        return this;
+      }
 
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.AbortTransaction)
     }
@@ -1341,6 +1681,21 @@ public final class ThreePhaseCommitCohortMessages {
 
   public interface CommitTransactionOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
+
+    // required string transactionId = 1;
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    boolean hasTransactionId();
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    java.lang.String getTransactionId();
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    com.google.protobuf.ByteString
+        getTransactionIdBytes();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.mdsal.CommitTransaction}
@@ -1375,6 +1730,7 @@ public final class ThreePhaseCommitCohortMessages {
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws com.google.protobuf.InvalidProtocolBufferException {
       initFields();
+      int mutable_bitField0_ = 0;
       com.google.protobuf.UnknownFieldSet.Builder unknownFields =
           com.google.protobuf.UnknownFieldSet.newBuilder();
       try {
@@ -1392,6 +1748,11 @@ public final class ThreePhaseCommitCohortMessages {
               }
               break;
             }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              transactionId_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1431,13 +1792,62 @@ public final class ThreePhaseCommitCohortMessages {
       return PARSER;
     }
 
+    private int bitField0_;
+    // required string transactionId = 1;
+    public static final int TRANSACTIONID_FIELD_NUMBER = 1;
+    private java.lang.Object transactionId_;
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    public boolean hasTransactionId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    public java.lang.String getTransactionId() {
+      java.lang.Object ref = transactionId_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs =
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          transactionId_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>required string transactionId = 1;</code>
+     */
+    public com.google.protobuf.ByteString
+        getTransactionIdBytes() {
+      java.lang.Object ref = transactionId_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b =
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        transactionId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
+      transactionId_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
       byte isInitialized = memoizedIsInitialized;
       if (isInitialized != -1) return isInitialized == 1;
 
+      if (!hasTransactionId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -1445,6 +1855,9 @@ public final class ThreePhaseCommitCohortMessages {
     public void writeTo(com.google.protobuf.CodedOutputStream output)
                         throws java.io.IOException {
       getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getTransactionIdBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1454,6 +1867,10 @@ public final class ThreePhaseCommitCohortMessages {
       if (size != -1) return size;
 
       size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, getTransactionIdBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1570,6 +1987,8 @@ public final class ThreePhaseCommitCohortMessages {
 
       public Builder clear() {
         super.clear();
+        transactionId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
         return this;
       }
 
@@ -1596,6 +2015,13 @@ public final class ThreePhaseCommitCohortMessages {
 
       public org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages.CommitTransaction buildPartial() {
         org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages.CommitTransaction result = new org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages.CommitTransaction(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.transactionId_ = transactionId_;
+        result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
       }
@@ -1611,11 +2037,20 @@ public final class ThreePhaseCommitCohortMessages {
 
       public Builder mergeFrom(org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages.CommitTransaction other) {
         if (other == org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages.CommitTransaction.getDefaultInstance()) return this;
+        if (other.hasTransactionId()) {
+          bitField0_ |= 0x00000001;
+          transactionId_ = other.transactionId_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
 
       public final boolean isInitialized() {
+        if (!hasTransactionId()) {
+
+          return false;
+        }
         return true;
       }
 
@@ -1636,6 +2071,81 @@ public final class ThreePhaseCommitCohortMessages {
         }
         return this;
       }
+      private int bitField0_;
+
+      // required string transactionId = 1;
+      private java.lang.Object transactionId_ = "";
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public boolean hasTransactionId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public java.lang.String getTransactionId() {
+        java.lang.Object ref = transactionId_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          transactionId_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public com.google.protobuf.ByteString
+          getTransactionIdBytes() {
+        java.lang.Object ref = transactionId_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b =
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          transactionId_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public Builder setTransactionId(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        transactionId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public Builder clearTransactionId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        transactionId_ = getDefaultInstance().getTransactionId();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required string transactionId = 1;</code>
+       */
+      public Builder setTransactionIdBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        transactionId_ = value;
+        onChanged();
+        return this;
+      }
 
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CommitTransaction)
     }
@@ -2625,14 +3135,16 @@ public final class ThreePhaseCommitCohortMessages {
   static {
     java.lang.String[] descriptorData = {
       "\n\014Cohort.proto\022!org.opendaylight.control" +
-      "ler.mdsal\"\026\n\024CanCommitTransaction\".\n\031Can" +
-      "CommitTransactionReply\022\021\n\tcanCommit\030\001 \002(" +
-      "\010\"\022\n\020AbortTransaction\"\027\n\025AbortTransactio" +
-      "nReply\"\023\n\021CommitTransaction\"\030\n\026CommitTra" +
-      "nsactionReply\"\026\n\024PreCommitTransaction\"\033\n" +
-      "\031PreCommitTransactionReplyBZ\n8org.openda" +
-      "ylight.controller.protobuff.messages.coh" +
-      "ort3pcB\036ThreePhaseCommitCohortMessages"
+      "ler.mdsal\"-\n\024CanCommitTransaction\022\025\n\rtra" +
+      "nsactionId\030\001 \002(\t\".\n\031CanCommitTransaction" +
+      "Reply\022\021\n\tcanCommit\030\001 \002(\010\")\n\020AbortTransac" +
+      "tion\022\025\n\rtransactionId\030\001 \002(\t\"\027\n\025AbortTran" +
+      "sactionReply\"*\n\021CommitTransaction\022\025\n\rtra" +
+      "nsactionId\030\001 \002(\t\"\030\n\026CommitTransactionRep" +
+      "ly\"\026\n\024PreCommitTransaction\"\033\n\031PreCommitT" +
+      "ransactionReplyBZ\n8org.opendaylight.cont" +
+      "roller.protobuff.messages.cohort3pcB\036Thr",
+      "eePhaseCommitCohortMessages"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -2644,7 +3156,7 @@ public final class ThreePhaseCommitCohortMessages {
           internal_static_org_opendaylight_controller_mdsal_CanCommitTransaction_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_CanCommitTransaction_descriptor,
-              new java.lang.String[] { });
+              new java.lang.String[] { "TransactionId", });
           internal_static_org_opendaylight_controller_mdsal_CanCommitTransactionReply_descriptor =
             getDescriptor().getMessageTypes().get(1);
           internal_static_org_opendaylight_controller_mdsal_CanCommitTransactionReply_fieldAccessorTable = new
@@ -2656,7 +3168,7 @@ public final class ThreePhaseCommitCohortMessages {
           internal_static_org_opendaylight_controller_mdsal_AbortTransaction_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_AbortTransaction_descriptor,
-              new java.lang.String[] { });
+              new java.lang.String[] { "TransactionId", });
           internal_static_org_opendaylight_controller_mdsal_AbortTransactionReply_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_org_opendaylight_controller_mdsal_AbortTransactionReply_fieldAccessorTable = new
@@ -2668,7 +3180,7 @@ public final class ThreePhaseCommitCohortMessages {
           internal_static_org_opendaylight_controller_mdsal_CommitTransaction_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_CommitTransaction_descriptor,
-              new java.lang.String[] { });
+              new java.lang.String[] { "TransactionId", });
           internal_static_org_opendaylight_controller_mdsal_CommitTransactionReply_descriptor =
             getDescriptor().getMessageTypes().get(5);
           internal_static_org_opendaylight_controller_mdsal_CommitTransactionReply_fieldAccessorTable = new
index d956bb1..feb60ae 100644 (file)
@@ -788,794 +788,6 @@ public final class ShardTransactionChainMessages {
     // @@protoc_insertion_point(class_scope:org.opendaylight.controller.mdsal.CloseTransactionChainReply)
   }
 
-  public interface CreateTransactionChainOrBuilder
-      extends com.google.protobuf.MessageOrBuilder {
-  }
-  /**
-   * Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransactionChain}
-   */
-  public static final class CreateTransactionChain extends
-      com.google.protobuf.GeneratedMessage
-      implements CreateTransactionChainOrBuilder {
-    // Use CreateTransactionChain.newBuilder() to construct.
-    private CreateTransactionChain(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
-      super(builder);
-      this.unknownFields = builder.getUnknownFields();
-    }
-    private CreateTransactionChain(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
-
-    private static final CreateTransactionChain defaultInstance;
-    public static CreateTransactionChain getDefaultInstance() {
-      return defaultInstance;
-    }
-
-    public CreateTransactionChain getDefaultInstanceForType() {
-      return defaultInstance;
-    }
-
-    private final com.google.protobuf.UnknownFieldSet unknownFields;
-    @java.lang.Override
-    public final com.google.protobuf.UnknownFieldSet
-        getUnknownFields() {
-      return this.unknownFields;
-    }
-    private CreateTransactionChain(
-        com.google.protobuf.CodedInputStream input,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws com.google.protobuf.InvalidProtocolBufferException {
-      initFields();
-      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
-          com.google.protobuf.UnknownFieldSet.newBuilder();
-      try {
-        boolean done = false;
-        while (!done) {
-          int tag = input.readTag();
-          switch (tag) {
-            case 0:
-              done = true;
-              break;
-            default: {
-              if (!parseUnknownField(input, unknownFields,
-                                     extensionRegistry, tag)) {
-                done = true;
-              }
-              break;
-            }
-          }
-        }
-      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
-        throw e.setUnfinishedMessage(this);
-      } catch (java.io.IOException e) {
-        throw new com.google.protobuf.InvalidProtocolBufferException(
-            e.getMessage()).setUnfinishedMessage(this);
-      } finally {
-        this.unknownFields = unknownFields.build();
-        makeExtensionsImmutable();
-      }
-    }
-    public static final com.google.protobuf.Descriptors.Descriptor
-        getDescriptor() {
-      return org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.internal_static_org_opendaylight_controller_mdsal_CreateTransactionChain_descriptor;
-    }
-
-    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
-        internalGetFieldAccessorTable() {
-      return org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.internal_static_org_opendaylight_controller_mdsal_CreateTransactionChain_fieldAccessorTable
-          .ensureFieldAccessorsInitialized(
-              org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain.class, org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain.Builder.class);
-    }
-
-    public static com.google.protobuf.Parser<CreateTransactionChain> PARSER =
-        new com.google.protobuf.AbstractParser<CreateTransactionChain>() {
-      public CreateTransactionChain parsePartialFrom(
-          com.google.protobuf.CodedInputStream input,
-          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-          throws com.google.protobuf.InvalidProtocolBufferException {
-        return new CreateTransactionChain(input, extensionRegistry);
-      }
-    };
-
-    @java.lang.Override
-    public com.google.protobuf.Parser<CreateTransactionChain> getParserForType() {
-      return PARSER;
-    }
-
-    private void initFields() {
-    }
-    private byte memoizedIsInitialized = -1;
-    public final boolean isInitialized() {
-      byte isInitialized = memoizedIsInitialized;
-      if (isInitialized != -1) return isInitialized == 1;
-
-      memoizedIsInitialized = 1;
-      return true;
-    }
-
-    public void writeTo(com.google.protobuf.CodedOutputStream output)
-                        throws java.io.IOException {
-      getSerializedSize();
-      getUnknownFields().writeTo(output);
-    }
-
-    private int memoizedSerializedSize = -1;
-    public int getSerializedSize() {
-      int size = memoizedSerializedSize;
-      if (size != -1) return size;
-
-      size = 0;
-      size += getUnknownFields().getSerializedSize();
-      memoizedSerializedSize = size;
-      return size;
-    }
-
-    private static final long serialVersionUID = 0L;
-    @java.lang.Override
-    protected java.lang.Object writeReplace()
-        throws java.io.ObjectStreamException {
-      return super.writeReplace();
-    }
-
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain parseFrom(
-        com.google.protobuf.ByteString data)
-        throws com.google.protobuf.InvalidProtocolBufferException {
-      return PARSER.parseFrom(data);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain parseFrom(
-        com.google.protobuf.ByteString data,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws com.google.protobuf.InvalidProtocolBufferException {
-      return PARSER.parseFrom(data, extensionRegistry);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain parseFrom(byte[] data)
-        throws com.google.protobuf.InvalidProtocolBufferException {
-      return PARSER.parseFrom(data);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain parseFrom(
-        byte[] data,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws com.google.protobuf.InvalidProtocolBufferException {
-      return PARSER.parseFrom(data, extensionRegistry);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain parseFrom(java.io.InputStream input)
-        throws java.io.IOException {
-      return PARSER.parseFrom(input);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain parseFrom(
-        java.io.InputStream input,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws java.io.IOException {
-      return PARSER.parseFrom(input, extensionRegistry);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain parseDelimitedFrom(java.io.InputStream input)
-        throws java.io.IOException {
-      return PARSER.parseDelimitedFrom(input);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain parseDelimitedFrom(
-        java.io.InputStream input,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws java.io.IOException {
-      return PARSER.parseDelimitedFrom(input, extensionRegistry);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain parseFrom(
-        com.google.protobuf.CodedInputStream input)
-        throws java.io.IOException {
-      return PARSER.parseFrom(input);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain parseFrom(
-        com.google.protobuf.CodedInputStream input,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws java.io.IOException {
-      return PARSER.parseFrom(input, extensionRegistry);
-    }
-
-    public static Builder newBuilder() { return Builder.create(); }
-    public Builder newBuilderForType() { return newBuilder(); }
-    public static Builder newBuilder(org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain prototype) {
-      return newBuilder().mergeFrom(prototype);
-    }
-    public Builder toBuilder() { return newBuilder(this); }
-
-    @java.lang.Override
-    protected Builder newBuilderForType(
-        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
-      Builder builder = new Builder(parent);
-      return builder;
-    }
-    /**
-     * Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransactionChain}
-     */
-    public static final class Builder extends
-        com.google.protobuf.GeneratedMessage.Builder<Builder>
-       implements org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainOrBuilder {
-      public static final com.google.protobuf.Descriptors.Descriptor
-          getDescriptor() {
-        return org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.internal_static_org_opendaylight_controller_mdsal_CreateTransactionChain_descriptor;
-      }
-
-      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
-          internalGetFieldAccessorTable() {
-        return org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.internal_static_org_opendaylight_controller_mdsal_CreateTransactionChain_fieldAccessorTable
-            .ensureFieldAccessorsInitialized(
-                org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain.class, org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain.Builder.class);
-      }
-
-      // Construct using org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain.newBuilder()
-      private Builder() {
-        maybeForceBuilderInitialization();
-      }
-
-      private Builder(
-          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
-        super(parent);
-        maybeForceBuilderInitialization();
-      }
-      private void maybeForceBuilderInitialization() {
-        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
-        }
-      }
-      private static Builder create() {
-        return new Builder();
-      }
-
-      public Builder clear() {
-        super.clear();
-        return this;
-      }
-
-      public Builder clone() {
-        return create().mergeFrom(buildPartial());
-      }
-
-      public com.google.protobuf.Descriptors.Descriptor
-          getDescriptorForType() {
-        return org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.internal_static_org_opendaylight_controller_mdsal_CreateTransactionChain_descriptor;
-      }
-
-      public org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain getDefaultInstanceForType() {
-        return org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain.getDefaultInstance();
-      }
-
-      public org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain build() {
-        org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain result = buildPartial();
-        if (!result.isInitialized()) {
-          throw newUninitializedMessageException(result);
-        }
-        return result;
-      }
-
-      public org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain buildPartial() {
-        org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain result = new org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain(this);
-        onBuilt();
-        return result;
-      }
-
-      public Builder mergeFrom(com.google.protobuf.Message other) {
-        if (other instanceof org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain) {
-          return mergeFrom((org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain)other);
-        } else {
-          super.mergeFrom(other);
-          return this;
-        }
-      }
-
-      public Builder mergeFrom(org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain other) {
-        if (other == org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain.getDefaultInstance()) return this;
-        this.mergeUnknownFields(other.getUnknownFields());
-        return this;
-      }
-
-      public final boolean isInitialized() {
-        return true;
-      }
-
-      public Builder mergeFrom(
-          com.google.protobuf.CodedInputStream input,
-          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-          throws java.io.IOException {
-        org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain parsedMessage = null;
-        try {
-          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
-        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
-          parsedMessage = (org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChain) e.getUnfinishedMessage();
-          throw e;
-        } finally {
-          if (parsedMessage != null) {
-            mergeFrom(parsedMessage);
-          }
-        }
-        return this;
-      }
-
-      // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransactionChain)
-    }
-
-    static {
-      defaultInstance = new CreateTransactionChain(true);
-      defaultInstance.initFields();
-    }
-
-    // @@protoc_insertion_point(class_scope:org.opendaylight.controller.mdsal.CreateTransactionChain)
-  }
-
-  public interface CreateTransactionChainReplyOrBuilder
-      extends com.google.protobuf.MessageOrBuilder {
-
-    // required string transactionChainPath = 1;
-    /**
-     * <code>required string transactionChainPath = 1;</code>
-     */
-    boolean hasTransactionChainPath();
-    /**
-     * <code>required string transactionChainPath = 1;</code>
-     */
-    java.lang.String getTransactionChainPath();
-    /**
-     * <code>required string transactionChainPath = 1;</code>
-     */
-    com.google.protobuf.ByteString
-        getTransactionChainPathBytes();
-  }
-  /**
-   * Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransactionChainReply}
-   */
-  public static final class CreateTransactionChainReply extends
-      com.google.protobuf.GeneratedMessage
-      implements CreateTransactionChainReplyOrBuilder {
-    // Use CreateTransactionChainReply.newBuilder() to construct.
-    private CreateTransactionChainReply(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
-      super(builder);
-      this.unknownFields = builder.getUnknownFields();
-    }
-    private CreateTransactionChainReply(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
-
-    private static final CreateTransactionChainReply defaultInstance;
-    public static CreateTransactionChainReply getDefaultInstance() {
-      return defaultInstance;
-    }
-
-    public CreateTransactionChainReply getDefaultInstanceForType() {
-      return defaultInstance;
-    }
-
-    private final com.google.protobuf.UnknownFieldSet unknownFields;
-    @java.lang.Override
-    public final com.google.protobuf.UnknownFieldSet
-        getUnknownFields() {
-      return this.unknownFields;
-    }
-    private CreateTransactionChainReply(
-        com.google.protobuf.CodedInputStream input,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws com.google.protobuf.InvalidProtocolBufferException {
-      initFields();
-      int mutable_bitField0_ = 0;
-      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
-          com.google.protobuf.UnknownFieldSet.newBuilder();
-      try {
-        boolean done = false;
-        while (!done) {
-          int tag = input.readTag();
-          switch (tag) {
-            case 0:
-              done = true;
-              break;
-            default: {
-              if (!parseUnknownField(input, unknownFields,
-                                     extensionRegistry, tag)) {
-                done = true;
-              }
-              break;
-            }
-            case 10: {
-              bitField0_ |= 0x00000001;
-              transactionChainPath_ = input.readBytes();
-              break;
-            }
-          }
-        }
-      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
-        throw e.setUnfinishedMessage(this);
-      } catch (java.io.IOException e) {
-        throw new com.google.protobuf.InvalidProtocolBufferException(
-            e.getMessage()).setUnfinishedMessage(this);
-      } finally {
-        this.unknownFields = unknownFields.build();
-        makeExtensionsImmutable();
-      }
-    }
-    public static final com.google.protobuf.Descriptors.Descriptor
-        getDescriptor() {
-      return org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.internal_static_org_opendaylight_controller_mdsal_CreateTransactionChainReply_descriptor;
-    }
-
-    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
-        internalGetFieldAccessorTable() {
-      return org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.internal_static_org_opendaylight_controller_mdsal_CreateTransactionChainReply_fieldAccessorTable
-          .ensureFieldAccessorsInitialized(
-              org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply.class, org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply.Builder.class);
-    }
-
-    public static com.google.protobuf.Parser<CreateTransactionChainReply> PARSER =
-        new com.google.protobuf.AbstractParser<CreateTransactionChainReply>() {
-      public CreateTransactionChainReply parsePartialFrom(
-          com.google.protobuf.CodedInputStream input,
-          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-          throws com.google.protobuf.InvalidProtocolBufferException {
-        return new CreateTransactionChainReply(input, extensionRegistry);
-      }
-    };
-
-    @java.lang.Override
-    public com.google.protobuf.Parser<CreateTransactionChainReply> getParserForType() {
-      return PARSER;
-    }
-
-    private int bitField0_;
-    // required string transactionChainPath = 1;
-    public static final int TRANSACTIONCHAINPATH_FIELD_NUMBER = 1;
-    private java.lang.Object transactionChainPath_;
-    /**
-     * <code>required string transactionChainPath = 1;</code>
-     */
-    public boolean hasTransactionChainPath() {
-      return ((bitField0_ & 0x00000001) == 0x00000001);
-    }
-    /**
-     * <code>required string transactionChainPath = 1;</code>
-     */
-    public java.lang.String getTransactionChainPath() {
-      java.lang.Object ref = transactionChainPath_;
-      if (ref instanceof java.lang.String) {
-        return (java.lang.String) ref;
-      } else {
-        com.google.protobuf.ByteString bs =
-            (com.google.protobuf.ByteString) ref;
-        java.lang.String s = bs.toStringUtf8();
-        if (bs.isValidUtf8()) {
-          transactionChainPath_ = s;
-        }
-        return s;
-      }
-    }
-    /**
-     * <code>required string transactionChainPath = 1;</code>
-     */
-    public com.google.protobuf.ByteString
-        getTransactionChainPathBytes() {
-      java.lang.Object ref = transactionChainPath_;
-      if (ref instanceof java.lang.String) {
-        com.google.protobuf.ByteString b =
-            com.google.protobuf.ByteString.copyFromUtf8(
-                (java.lang.String) ref);
-        transactionChainPath_ = b;
-        return b;
-      } else {
-        return (com.google.protobuf.ByteString) ref;
-      }
-    }
-
-    private void initFields() {
-      transactionChainPath_ = "";
-    }
-    private byte memoizedIsInitialized = -1;
-    public final boolean isInitialized() {
-      byte isInitialized = memoizedIsInitialized;
-      if (isInitialized != -1) return isInitialized == 1;
-
-      if (!hasTransactionChainPath()) {
-        memoizedIsInitialized = 0;
-        return false;
-      }
-      memoizedIsInitialized = 1;
-      return true;
-    }
-
-    public void writeTo(com.google.protobuf.CodedOutputStream output)
-                        throws java.io.IOException {
-      getSerializedSize();
-      if (((bitField0_ & 0x00000001) == 0x00000001)) {
-        output.writeBytes(1, getTransactionChainPathBytes());
-      }
-      getUnknownFields().writeTo(output);
-    }
-
-    private int memoizedSerializedSize = -1;
-    public int getSerializedSize() {
-      int size = memoizedSerializedSize;
-      if (size != -1) return size;
-
-      size = 0;
-      if (((bitField0_ & 0x00000001) == 0x00000001)) {
-        size += com.google.protobuf.CodedOutputStream
-          .computeBytesSize(1, getTransactionChainPathBytes());
-      }
-      size += getUnknownFields().getSerializedSize();
-      memoizedSerializedSize = size;
-      return size;
-    }
-
-    private static final long serialVersionUID = 0L;
-    @java.lang.Override
-    protected java.lang.Object writeReplace()
-        throws java.io.ObjectStreamException {
-      return super.writeReplace();
-    }
-
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply parseFrom(
-        com.google.protobuf.ByteString data)
-        throws com.google.protobuf.InvalidProtocolBufferException {
-      return PARSER.parseFrom(data);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply parseFrom(
-        com.google.protobuf.ByteString data,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws com.google.protobuf.InvalidProtocolBufferException {
-      return PARSER.parseFrom(data, extensionRegistry);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply parseFrom(byte[] data)
-        throws com.google.protobuf.InvalidProtocolBufferException {
-      return PARSER.parseFrom(data);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply parseFrom(
-        byte[] data,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws com.google.protobuf.InvalidProtocolBufferException {
-      return PARSER.parseFrom(data, extensionRegistry);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply parseFrom(java.io.InputStream input)
-        throws java.io.IOException {
-      return PARSER.parseFrom(input);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply parseFrom(
-        java.io.InputStream input,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws java.io.IOException {
-      return PARSER.parseFrom(input, extensionRegistry);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply parseDelimitedFrom(java.io.InputStream input)
-        throws java.io.IOException {
-      return PARSER.parseDelimitedFrom(input);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply parseDelimitedFrom(
-        java.io.InputStream input,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws java.io.IOException {
-      return PARSER.parseDelimitedFrom(input, extensionRegistry);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply parseFrom(
-        com.google.protobuf.CodedInputStream input)
-        throws java.io.IOException {
-      return PARSER.parseFrom(input);
-    }
-    public static org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply parseFrom(
-        com.google.protobuf.CodedInputStream input,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws java.io.IOException {
-      return PARSER.parseFrom(input, extensionRegistry);
-    }
-
-    public static Builder newBuilder() { return Builder.create(); }
-    public Builder newBuilderForType() { return newBuilder(); }
-    public static Builder newBuilder(org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply prototype) {
-      return newBuilder().mergeFrom(prototype);
-    }
-    public Builder toBuilder() { return newBuilder(this); }
-
-    @java.lang.Override
-    protected Builder newBuilderForType(
-        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
-      Builder builder = new Builder(parent);
-      return builder;
-    }
-    /**
-     * Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransactionChainReply}
-     */
-    public static final class Builder extends
-        com.google.protobuf.GeneratedMessage.Builder<Builder>
-       implements org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReplyOrBuilder {
-      public static final com.google.protobuf.Descriptors.Descriptor
-          getDescriptor() {
-        return org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.internal_static_org_opendaylight_controller_mdsal_CreateTransactionChainReply_descriptor;
-      }
-
-      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
-          internalGetFieldAccessorTable() {
-        return org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.internal_static_org_opendaylight_controller_mdsal_CreateTransactionChainReply_fieldAccessorTable
-            .ensureFieldAccessorsInitialized(
-                org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply.class, org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply.Builder.class);
-      }
-
-      // Construct using org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply.newBuilder()
-      private Builder() {
-        maybeForceBuilderInitialization();
-      }
-
-      private Builder(
-          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
-        super(parent);
-        maybeForceBuilderInitialization();
-      }
-      private void maybeForceBuilderInitialization() {
-        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
-        }
-      }
-      private static Builder create() {
-        return new Builder();
-      }
-
-      public Builder clear() {
-        super.clear();
-        transactionChainPath_ = "";
-        bitField0_ = (bitField0_ & ~0x00000001);
-        return this;
-      }
-
-      public Builder clone() {
-        return create().mergeFrom(buildPartial());
-      }
-
-      public com.google.protobuf.Descriptors.Descriptor
-          getDescriptorForType() {
-        return org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.internal_static_org_opendaylight_controller_mdsal_CreateTransactionChainReply_descriptor;
-      }
-
-      public org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply getDefaultInstanceForType() {
-        return org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply.getDefaultInstance();
-      }
-
-      public org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply build() {
-        org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply result = buildPartial();
-        if (!result.isInitialized()) {
-          throw newUninitializedMessageException(result);
-        }
-        return result;
-      }
-
-      public org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply buildPartial() {
-        org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply result = new org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply(this);
-        int from_bitField0_ = bitField0_;
-        int to_bitField0_ = 0;
-        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
-          to_bitField0_ |= 0x00000001;
-        }
-        result.transactionChainPath_ = transactionChainPath_;
-        result.bitField0_ = to_bitField0_;
-        onBuilt();
-        return result;
-      }
-
-      public Builder mergeFrom(com.google.protobuf.Message other) {
-        if (other instanceof org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply) {
-          return mergeFrom((org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply)other);
-        } else {
-          super.mergeFrom(other);
-          return this;
-        }
-      }
-
-      public Builder mergeFrom(org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply other) {
-        if (other == org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply.getDefaultInstance()) return this;
-        if (other.hasTransactionChainPath()) {
-          bitField0_ |= 0x00000001;
-          transactionChainPath_ = other.transactionChainPath_;
-          onChanged();
-        }
-        this.mergeUnknownFields(other.getUnknownFields());
-        return this;
-      }
-
-      public final boolean isInitialized() {
-        if (!hasTransactionChainPath()) {
-
-          return false;
-        }
-        return true;
-      }
-
-      public Builder mergeFrom(
-          com.google.protobuf.CodedInputStream input,
-          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-          throws java.io.IOException {
-        org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply parsedMessage = null;
-        try {
-          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
-        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
-          parsedMessage = (org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CreateTransactionChainReply) e.getUnfinishedMessage();
-          throw e;
-        } finally {
-          if (parsedMessage != null) {
-            mergeFrom(parsedMessage);
-          }
-        }
-        return this;
-      }
-      private int bitField0_;
-
-      // required string transactionChainPath = 1;
-      private java.lang.Object transactionChainPath_ = "";
-      /**
-       * <code>required string transactionChainPath = 1;</code>
-       */
-      public boolean hasTransactionChainPath() {
-        return ((bitField0_ & 0x00000001) == 0x00000001);
-      }
-      /**
-       * <code>required string transactionChainPath = 1;</code>
-       */
-      public java.lang.String getTransactionChainPath() {
-        java.lang.Object ref = transactionChainPath_;
-        if (!(ref instanceof java.lang.String)) {
-          java.lang.String s = ((com.google.protobuf.ByteString) ref)
-              .toStringUtf8();
-          transactionChainPath_ = s;
-          return s;
-        } else {
-          return (java.lang.String) ref;
-        }
-      }
-      /**
-       * <code>required string transactionChainPath = 1;</code>
-       */
-      public com.google.protobuf.ByteString
-          getTransactionChainPathBytes() {
-        java.lang.Object ref = transactionChainPath_;
-        if (ref instanceof String) {
-          com.google.protobuf.ByteString b =
-              com.google.protobuf.ByteString.copyFromUtf8(
-                  (java.lang.String) ref);
-          transactionChainPath_ = b;
-          return b;
-        } else {
-          return (com.google.protobuf.ByteString) ref;
-        }
-      }
-      /**
-       * <code>required string transactionChainPath = 1;</code>
-       */
-      public Builder setTransactionChainPath(
-          java.lang.String value) {
-        if (value == null) {
-    throw new NullPointerException();
-  }
-  bitField0_ |= 0x00000001;
-        transactionChainPath_ = value;
-        onChanged();
-        return this;
-      }
-      /**
-       * <code>required string transactionChainPath = 1;</code>
-       */
-      public Builder clearTransactionChainPath() {
-        bitField0_ = (bitField0_ & ~0x00000001);
-        transactionChainPath_ = getDefaultInstance().getTransactionChainPath();
-        onChanged();
-        return this;
-      }
-      /**
-       * <code>required string transactionChainPath = 1;</code>
-       */
-      public Builder setTransactionChainPathBytes(
-          com.google.protobuf.ByteString value) {
-        if (value == null) {
-    throw new NullPointerException();
-  }
-  bitField0_ |= 0x00000001;
-        transactionChainPath_ = value;
-        onChanged();
-        return this;
-      }
-
-      // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransactionChainReply)
-    }
-
-    static {
-      defaultInstance = new CreateTransactionChainReply(true);
-      defaultInstance.initFields();
-    }
-
-    // @@protoc_insertion_point(class_scope:org.opendaylight.controller.mdsal.CreateTransactionChainReply)
-  }
-
   private static com.google.protobuf.Descriptors.Descriptor
     internal_static_org_opendaylight_controller_mdsal_CloseTransactionChain_descriptor;
   private static
@@ -1586,16 +798,6 @@ public final class ShardTransactionChainMessages {
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_org_opendaylight_controller_mdsal_CloseTransactionChainReply_fieldAccessorTable;
-  private static com.google.protobuf.Descriptors.Descriptor
-    internal_static_org_opendaylight_controller_mdsal_CreateTransactionChain_descriptor;
-  private static
-    com.google.protobuf.GeneratedMessage.FieldAccessorTable
-      internal_static_org_opendaylight_controller_mdsal_CreateTransactionChain_fieldAccessorTable;
-  private static com.google.protobuf.Descriptors.Descriptor
-    internal_static_org_opendaylight_controller_mdsal_CreateTransactionChainReply_descriptor;
-  private static
-    com.google.protobuf.GeneratedMessage.FieldAccessorTable
-      internal_static_org_opendaylight_controller_mdsal_CreateTransactionChainReply_fieldAccessorTable;
 
   public static com.google.protobuf.Descriptors.FileDescriptor
       getDescriptor() {
@@ -1608,12 +810,9 @@ public final class ShardTransactionChainMessages {
       "\n\033ShardTransactionChain.proto\022!org.opend" +
       "aylight.controller.mdsal\"3\n\025CloseTransac" +
       "tionChain\022\032\n\022transactionChainId\030\001 \001(\t\"\034\n" +
-      "\032CloseTransactionChainReply\"\030\n\026CreateTra" +
-      "nsactionChain\";\n\033CreateTransactionChainR" +
-      "eply\022\034\n\024transactionChainPath\030\001 \002(\tB[\n:or" +
-      "g.opendaylight.controller.protobuff.mess" +
-      "ages.transactionB\035ShardTransactionChainM" +
-      "essages"
+      "\032CloseTransactionChainReplyB[\n:org.opend" +
+      "aylight.controller.protobuff.messages.tr" +
+      "ansactionB\035ShardTransactionChainMessages"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1632,18 +831,6 @@ public final class ShardTransactionChainMessages {
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_CloseTransactionChainReply_descriptor,
               new java.lang.String[] { });
-          internal_static_org_opendaylight_controller_mdsal_CreateTransactionChain_descriptor =
-            getDescriptor().getMessageTypes().get(2);
-          internal_static_org_opendaylight_controller_mdsal_CreateTransactionChain_fieldAccessorTable = new
-            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
-              internal_static_org_opendaylight_controller_mdsal_CreateTransactionChain_descriptor,
-              new java.lang.String[] { });
-          internal_static_org_opendaylight_controller_mdsal_CreateTransactionChainReply_descriptor =
-            getDescriptor().getMessageTypes().get(3);
-          internal_static_org_opendaylight_controller_mdsal_CreateTransactionChainReply_fieldAccessorTable = new
-            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
-              internal_static_org_opendaylight_controller_mdsal_CreateTransactionChainReply_descriptor,
-              new java.lang.String[] { "TransactionChainPath", });
           return null;
         }
       };
index dab6413..49c6cd0 100644 (file)
@@ -5,7 +5,7 @@ option java_outer_classname = "ThreePhaseCommitCohortMessages";
 
 
 message CanCommitTransaction{
-
+  required string transactionId = 1;
 }
 
 message CanCommitTransactionReply{
@@ -14,7 +14,7 @@ message CanCommitTransactionReply{
 }
 
 message AbortTransaction{
-
+  required string transactionId = 1;
 }
 
 message AbortTransactionReply {
@@ -22,7 +22,7 @@ message AbortTransactionReply {
 }
 
 message CommitTransaction{
-
+  required string transactionId = 1;
 }
 
 message CommitTransactionReply{
index c0e91ab..dba8f4f 100644 (file)
 
 package org.opendaylight.controller.protobuff.messages.transaction;
 
-import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.protobuff.messages.AbstractMessagesTest;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.yangtools.yang.common.QName;
 
 /**
  * This test case is present to ensure that if others have used proper version of protocol buffer
@@ -34,29 +31,6 @@ public class ShardTransactionChainMessagesTest extends AbstractMessagesTest {
   @Override
   @Test
   public void verifySerialization() throws Exception {
-    String testTransactionChainPath =
-        "/actor/path";
-
-    ShardTransactionChainMessages.CreateTransactionChainReply.Builder builder =
-        ShardTransactionChainMessages.CreateTransactionChainReply.newBuilder();
-    builder.setTransactionChainPath(testTransactionChainPath);
-
-    writeToFile((com.google.protobuf.GeneratedMessage.Builder<?>) builder);
-
-    // Here we will read the same and check we got back what we had saved
-    ShardTransactionChainMessages.CreateTransactionChainReply replyNew =
-        (ShardTransactionChainMessages.CreateTransactionChainReply) readFromFile(ShardTransactionChainMessages.CreateTransactionChainReply.PARSER);
-
-    Assert.assertEquals(replyNew.getTransactionChainPath(),testTransactionChainPath);
-
-    // the following will compare with the version we had shipped
-    ShardTransactionChainMessages.CreateTransactionChainReply replyOriginal =
-        (ShardTransactionChainMessages.CreateTransactionChainReply) readFromTestDataFile(ShardTransactionChainMessages.CreateTransactionChainReply.PARSER);
-
-
-    Assert.assertEquals(replyOriginal.getTransactionChainPath(),
-        replyNew.getTransactionChainPath());
-
   }
 
   @Override
index 83164b0..722e230 100644 (file)
@@ -11,10 +11,8 @@ package org.opendaylight.controller.cluster.datastore;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
-
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
-
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -29,29 +27,24 @@ public class DatastoreContext {
     private final int operationTimeoutInSeconds;
     private final String dataStoreMXBeanType;
     private final ConfigParams shardRaftConfig;
+    private final int shardTransactionCommitTimeoutInSeconds;
+    private final int shardTransactionCommitQueueCapacity;
 
-    public DatastoreContext() {
-        this("DistributedDatastore", null, Duration.create(10, TimeUnit.MINUTES), 5, 1000, 20000, 500);
-    }
-
-    public DatastoreContext(String dataStoreMXBeanType,
-            InMemoryDOMDataStoreConfigProperties dataStoreProperties,
-            Duration shardTransactionIdleTimeout,
-            int operationTimeoutInSeconds,
-            int shardJournalRecoveryLogBatchSize,
-            int shardSnapshotBatchCount,
-            int shardHeartbeatIntervalInMillis) {
-        this.dataStoreMXBeanType = dataStoreMXBeanType;
+    private DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
+            ConfigParams shardRaftConfig, String dataStoreMXBeanType, int operationTimeoutInSeconds,
+            Duration shardTransactionIdleTimeout, int shardTransactionCommitTimeoutInSeconds,
+            int shardTransactionCommitQueueCapacity) {
         this.dataStoreProperties = dataStoreProperties;
-        this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
+        this.shardRaftConfig = shardRaftConfig;
+        this.dataStoreMXBeanType = dataStoreMXBeanType;
         this.operationTimeoutInSeconds = operationTimeoutInSeconds;
+        this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
+        this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
+        this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
+    }
 
-        DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
-        raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
-                TimeUnit.MILLISECONDS));
-        raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
-        raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
-        shardRaftConfig = raftConfig;
+    public static Builder newBuilder() {
+        return new Builder();
     }
 
     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
@@ -73,4 +66,81 @@ public class DatastoreContext {
     public ConfigParams getShardRaftConfig() {
         return shardRaftConfig;
     }
+
+    public int getShardTransactionCommitTimeoutInSeconds() {
+        return shardTransactionCommitTimeoutInSeconds;
+    }
+
+    public int getShardTransactionCommitQueueCapacity() {
+        return shardTransactionCommitQueueCapacity;
+    }
+
+    public static class Builder {
+        private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
+        private Duration shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
+        private int operationTimeoutInSeconds = 5;
+        private String dataStoreMXBeanType;
+        private int shardTransactionCommitTimeoutInSeconds = 30;
+        private int shardJournalRecoveryLogBatchSize = 1000;
+        private int shardSnapshotBatchCount = 20000;
+        private int shardHeartbeatIntervalInMillis = 500;
+        private int shardTransactionCommitQueueCapacity = 20000;
+
+        public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
+            this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
+            return this;
+        }
+
+        public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) {
+            this.operationTimeoutInSeconds = operationTimeoutInSeconds;
+            return this;
+        }
+
+        public Builder dataStoreMXBeanType(String dataStoreMXBeanType) {
+            this.dataStoreMXBeanType = dataStoreMXBeanType;
+            return this;
+        }
+
+        public Builder dataStoreProperties(InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+            this.dataStoreProperties = dataStoreProperties;
+            return this;
+        }
+
+        public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) {
+            this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
+            return this;
+        }
+
+        public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
+            this.shardJournalRecoveryLogBatchSize = shardJournalRecoveryLogBatchSize;
+            return this;
+        }
+
+        public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) {
+            this.shardSnapshotBatchCount = shardSnapshotBatchCount;
+            return this;
+        }
+
+        public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
+            this.shardHeartbeatIntervalInMillis = shardHeartbeatIntervalInMillis;
+            return this;
+        }
+
+        public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) {
+            this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
+            return this;
+        }
+
+        public DatastoreContext build() {
+            DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
+            raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
+                    TimeUnit.MILLISECONDS));
+            raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
+            raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
+
+            return new DatastoreContext(dataStoreProperties, raftConfig, dataStoreMXBeanType,
+                    operationTimeoutInSeconds, shardTransactionIdleTimeout,
+                    shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity);
+        }
+    }
 }
index 5195a2f..f6c31aa 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.ActorSystem;
 import akka.dispatch.OnComplete;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -146,4 +147,9 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     public void close() throws Exception {
         actorContext.shutdown();
     }
+
+    @VisibleForTesting
+    ActorContext getActorContext() {
+        return actorContext;
+    }
 }
index 3934489..fef7e22 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.event.Logging;
@@ -21,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -29,20 +29,26 @@ import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
+import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
@@ -55,11 +61,9 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotRep
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
@@ -68,12 +72,16 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
 
 /**
  * A Shard represents a portion of the logical data tree <br/>
@@ -83,14 +91,15 @@ import java.util.concurrent.ExecutionException;
  */
 public class Shard extends RaftActor {
 
+    private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
+
+    private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
+
     public static final String DEFAULT_NAME = "default";
 
     // The state of this Shard
     private final InMemoryDOMDataStore store;
 
-    private final Map<Object, DOMStoreThreePhaseCommitCohort>
-        modificationToCohort = new HashMap<>();
-
     private final LoggingAdapter LOG =
         Logging.getLogger(getContext().system(), this);
 
@@ -111,6 +120,14 @@ public class Shard extends RaftActor {
 
     private ActorRef createSnapshotTransaction;
 
+    private int createSnapshotTransactionCounter;
+
+    private final ShardCommitCoordinator commitCoordinator;
+
+    private final long transactionCommitTimeout;
+
+    private Cancellable txCommitTimeoutCheckSchedule;
+
     /**
      * Coordinates persistence recovery on startup.
      */
@@ -149,6 +166,12 @@ public class Shard extends RaftActor {
         if (isMetricsCaptureEnabled()) {
             getContext().become(new MeteringBehavior(this));
         }
+
+        commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
+                datastoreContext.getShardTransactionCommitQueueCapacity());
+
+        transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
+                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
     }
 
     private static Map<String, String> mapPeerAddresses(
@@ -174,7 +197,17 @@ public class Shard extends RaftActor {
         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
     }
 
-    @Override public void onReceiveRecover(Object message) {
+    @Override
+    public void postStop() {
+        super.postStop();
+
+        if(txCommitTimeoutCheckSchedule != null) {
+            txCommitTimeoutCheckSchedule.cancel();
+        }
+    }
+
+    @Override
+    public void onReceiveRecover(Object message) {
         if(LOG.isDebugEnabled()) {
             LOG.debug("onReceiveRecover: Received message {} from {}",
                 message.getClass().toString(),
@@ -188,54 +221,238 @@ public class Shard extends RaftActor {
         }
     }
 
-    @Override public void onReceiveCommand(Object message) {
+    @Override
+    public void onReceiveCommand(Object message) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("onReceiveCommand: Received message {} from {}",
-                message.getClass().toString(),
-                getSender());
+            LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
         }
 
         if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
-            // This must be for install snapshot. Don't want to open this up and trigger
-            // deSerialization
-            self()
-                .tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
-                    self());
-
-            createSnapshotTransaction = null;
-            // Send a PoisonPill instead of sending close transaction because we do not really need
-            // a response
-            getSender().tell(PoisonPill.getInstance(), self());
-
+            handleReadDataReply(message);
+        } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+            handleCreateTransaction(message);
+        } else if(message instanceof ForwardedReadyTransaction) {
+            handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
+        } else if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+            handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
+        } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+            handleCommitTransaction(CommitTransaction.fromSerializable(message));
+        } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+            handleAbortTransaction(AbortTransaction.fromSerializable(message));
         } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
             closeTransactionChain(CloseTransactionChain.fromSerializable(message));
         } else if (message instanceof RegisterChangeListener) {
             registerChangeListener((RegisterChangeListener) message);
         } else if (message instanceof UpdateSchemaContext) {
             updateSchemaContext((UpdateSchemaContext) message);
-        } else if (message instanceof ForwardedCommitTransaction) {
-            handleForwardedCommit((ForwardedCommitTransaction) message);
-        } else if (message.getClass()
-            .equals(CreateTransaction.SERIALIZABLE_CLASS)) {
-            if (isLeader()) {
-                createTransaction(CreateTransaction.fromSerializable(message));
-            } else if (getLeader() != null) {
-                getLeader().forward(message, getContext());
-            } else {
-                getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(
-                    "Could not find shard leader so transaction cannot be created. This typically happens" +
-                            " when system is coming up or recovering and a leader is being elected. Try again" +
-                            " later.")), getSelf());
-            }
         } else if (message instanceof PeerAddressResolved) {
             PeerAddressResolved resolved = (PeerAddressResolved) message;
             setPeerAddress(resolved.getPeerId().toString(),
                 resolved.getPeerAddress());
+        } else if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+            handleTransactionCommitTimeoutCheck();
         } else {
             super.onReceiveCommand(message);
         }
     }
 
+    private void handleTransactionCommitTimeoutCheck() {
+        CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
+        if(cohortEntry != null) {
+            long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
+            if(elapsed > transactionCommitTimeout) {
+                LOG.warning("Current transaction {} has timed out after {} ms - aborting",
+                        cohortEntry.getTransactionID(), transactionCommitTimeout);
+
+                doAbortTransaction(cohortEntry.getTransactionID(), null);
+            }
+        }
+    }
+
+    private void handleCommitTransaction(CommitTransaction commit) {
+        final String transactionID = commit.getTransactionID();
+
+        LOG.debug("Committing transaction {}", transactionID);
+
+        // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
+        // this transaction.
+        final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
+        if(cohortEntry == null) {
+            // We're not the current Tx - the Tx was likely expired b/c it took too long in
+            // between the canCommit and commit messages.
+            IllegalStateException ex = new IllegalStateException(
+                    String.format("Cannot commit transaction %s - it is not the current transaction",
+                            transactionID));
+            LOG.error(ex.getMessage());
+            shardMBean.incrementFailedTransactionsCount();
+            getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
+            return;
+        }
+
+        // We perform the preCommit phase here atomically with the commit phase. This is an
+        // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
+        // coordination of preCommit across shards in case of failure but preCommit should not
+        // normally fail since we ensure only one concurrent 3-phase commit.
+
+        try {
+            // We block on the future here so we don't have to worry about possibly accessing our
+            // state on a different thread outside of our dispatcher. Also, the data store
+            // currently uses a same thread executor anyway.
+            cohortEntry.getCohort().preCommit().get();
+
+            if(persistent) {
+                Shard.this.persistData(getSender(), transactionID,
+                        new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
+            } else {
+                Shard.this.finishCommit(getSender(), transactionID);
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error(e, "An exception occurred while preCommitting transaction {}",
+                    cohortEntry.getTransactionID());
+            shardMBean.incrementFailedTransactionsCount();
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
+
+        cohortEntry.updateLastAccessTime();
+    }
+
+    private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
+        // With persistence enabled, this method is called via applyState by the leader strategy
+        // after the commit has been replicated to a majority of the followers.
+
+        CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
+        if(cohortEntry == null) {
+            // The transaction is no longer the current commit. This can happen if the transaction
+            // was aborted prior, most likely due to timeout in the front-end. We need to finish
+            // committing the transaction though since it was successfully persisted and replicated
+            // however we can't use the original cohort b/c it was already preCommitted and may
+            // conflict with the current commit or may have been aborted so we commit with a new
+            // transaction.
+            cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
+            if(cohortEntry != null) {
+                commitWithNewTransaction(cohortEntry.getModification());
+                sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+            } else {
+                // This really shouldn't happen - it likely means that persistence or replication
+                // took so long to complete such that the cohort entry was expired from the cache.
+                IllegalStateException ex = new IllegalStateException(
+                        String.format("Could not finish committing transaction %s - no CohortEntry found",
+                                transactionID));
+                LOG.error(ex.getMessage());
+                sender.tell(new akka.actor.Status.Failure(ex), getSelf());
+            }
+
+            return;
+        }
+
+        LOG.debug("Finishing commit for transaction {}", cohortEntry.getTransactionID());
+
+        try {
+            // We block on the future here so we don't have to worry about possibly accessing our
+            // state on a different thread outside of our dispatcher. Also, the data store
+            // currently uses a same thread executor anyway.
+            cohortEntry.getCohort().commit().get();
+
+            sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+
+            shardMBean.incrementCommittedTransactionCount();
+            shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
+
+        } catch (InterruptedException | ExecutionException e) {
+            sender.tell(new akka.actor.Status.Failure(e), getSelf());
+
+            LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
+            shardMBean.incrementFailedTransactionsCount();
+        }
+
+        commitCoordinator.currentTransactionComplete(transactionID, true);
+    }
+
+    private void handleCanCommitTransaction(CanCommitTransaction canCommit) {
+        LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
+        commitCoordinator.handleCanCommit(canCommit, getSender(), self());
+    }
+
+    private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
+        LOG.debug("Readying transaction {}", ready.getTransactionID());
+
+        // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
+        // commitCoordinator in preparation for the subsequent three phase commit initiated by
+        // the front-end.
+        commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
+                ready.getModification());
+
+        // Return our actor path as we'll handle the three phase commit.
+        getSender().tell(new ReadyTransactionReply(Serialization.serializedActorPath(self())).
+                toSerializable(), getSelf());
+    }
+
+    private void handleAbortTransaction(AbortTransaction abort) {
+        doAbortTransaction(abort.getTransactionID(), getSender());
+    }
+
+    private void doAbortTransaction(String transactionID, final ActorRef sender) {
+        final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
+        if(cohortEntry != null) {
+            LOG.debug("Aborting transaction {}", transactionID);
+
+            // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
+            // aborted during replication in which case we may still commit locally if replication
+            // succeeds.
+            commitCoordinator.currentTransactionComplete(transactionID, false);
+
+            final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
+            final ActorRef self = getSelf();
+
+            Futures.addCallback(future, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(Void v) {
+                    shardMBean.incrementAbortTransactionsCount();
+
+                    if(sender != null) {
+                        sender.tell(new AbortTransactionReply().toSerializable(), self);
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    LOG.error(t, "An exception happened during abort");
+
+                    if(sender != null) {
+                        sender.tell(new akka.actor.Status.Failure(t), self);
+                    }
+                }
+            });
+        }
+    }
+
+    private void handleCreateTransaction(Object message) {
+        if (isLeader()) {
+            createTransaction(CreateTransaction.fromSerializable(message));
+        } else if (getLeader() != null) {
+            getLeader().forward(message, getContext());
+        } else {
+            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(
+                "Could not find shard leader so transaction cannot be created. This typically happens" +
+                " when system is coming up or recovering and a leader is being elected. Try again" +
+                " later.")), getSelf());
+        }
+    }
+
+    private void handleReadDataReply(Object message) {
+        // This must be for install snapshot. Don't want to open this up and trigger
+        // deSerialization
+
+        self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
+                self());
+
+        createSnapshotTransaction = null;
+
+        // Send a PoisonPill instead of sending close transaction because we do not really need
+        // a response
+        getSender().tell(PoisonPill.getInstance(), self());
+    }
+
     private void closeTransactionChain(CloseTransactionChain closeTransactionChain) {
         DOMStoreTransactionChain chain =
             transactionChains.remove(closeTransactionChain.getTransactionChainId());
@@ -265,33 +482,33 @@ public class Shard extends RaftActor {
             throw new NullPointerException("schemaContext should not be null");
         }
 
-        if (transactionType
-            == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+        if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
 
             shardMBean.incrementReadOnlyTransactionCount();
 
             return getContext().actorOf(
                 ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
-                        schemaContext,datastoreContext, shardMBean), transactionId.toString());
+                        schemaContext,datastoreContext, shardMBean,
+                        transactionId.getRemoteTransactionId()), transactionId.toString());
 
-        } else if (transactionType
-            == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
+        } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
 
             shardMBean.incrementReadWriteTransactionCount();
 
             return getContext().actorOf(
                 ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
-                        schemaContext, datastoreContext, shardMBean), transactionId.toString());
+                        schemaContext, datastoreContext, shardMBean,
+                        transactionId.getRemoteTransactionId()), transactionId.toString());
 
 
-        } else if (transactionType
-            == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+        } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
 
             shardMBean.incrementWriteOnlyTransactionCount();
 
             return getContext().actorOf(
                 ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
-                        schemaContext, datastoreContext, shardMBean), transactionId.toString());
+                        schemaContext, datastoreContext, shardMBean,
+                        transactionId.getRemoteTransactionId()), transactionId.toString());
         } else {
             throw new IllegalArgumentException(
                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
@@ -332,70 +549,19 @@ public class Shard extends RaftActor {
         commitCohort.commit().get();
     }
 
-
-    private void commit(final ActorRef sender, Object serialized) {
-        Modification modification = MutableCompositeModification
-            .fromSerializable(serialized, schemaContext);
-        DOMStoreThreePhaseCommitCohort cohort =
-            modificationToCohort.remove(serialized);
-        if (cohort == null) {
-            // If there's no cached cohort then we must be applying replicated state.
-            commitWithNewTransaction(serialized);
-            return;
-        }
-
-        if(sender == null) {
-            LOG.error("Commit failed. Sender cannot be null");
-            return;
-        }
-
-        ListenableFuture<Void> future = cohort.commit();
-
-        Futures.addCallback(future, new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(Void v) {
-                sender.tell(new CommitTransactionReply().toSerializable(), getSelf());
-                shardMBean.incrementCommittedTransactionCount();
-                shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                LOG.error(t, "An exception happened during commit");
-                shardMBean.incrementFailedTransactionsCount();
-                sender.tell(new akka.actor.Status.Failure(t), getSelf());
-            }
-        });
-
-    }
-
-    private void commitWithNewTransaction(Object modification) {
+    private void commitWithNewTransaction(Modification modification) {
         DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
-        MutableCompositeModification.fromSerializable(modification, schemaContext).apply(tx);
+        modification.apply(tx);
         try {
             syncCommitTransaction(tx);
             shardMBean.incrementCommittedTransactionCount();
+            shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
         } catch (InterruptedException | ExecutionException e) {
             shardMBean.incrementFailedTransactionsCount();
             LOG.error(e, "Failed to commit");
         }
     }
 
-    private void handleForwardedCommit(ForwardedCommitTransaction message) {
-        Object serializedModification =
-            message.getModification().toSerializable();
-
-        modificationToCohort
-            .put(serializedModification, message.getCohort());
-
-        if (persistent) {
-            this.persistData(getSender(), "identifier",
-                new CompositeModificationPayload(serializedModification));
-        } else {
-            this.commit(getSender(), serializedModification);
-        }
-    }
-
     private void updateSchemaContext(UpdateSchemaContext message) {
         this.schemaContext = message.getSchemaContext();
         updateSchemaContext(message.getSchemaContext());
@@ -528,6 +694,13 @@ public class Shard extends RaftActor {
 
         //notify shard manager
         getContext().parent().tell(new ActorInitialized(), getSelf());
+
+        // Schedule a message to be periodically sent to check if the current in-progress
+        // transaction should be expired and aborted.
+        FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
+        txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
+                period, period, getSelf(),
+                TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
     }
 
     @Override
@@ -536,14 +709,19 @@ public class Shard extends RaftActor {
         if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
 
-            if (modification != null) {
-                commit(clientActor, modification);
-            } else {
+            if(modification == null) {
                 LOG.error(
-                    "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
-                    identifier, clientActor != null ? clientActor.path().toString() : null);
+                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+                     identifier, clientActor != null ? clientActor.path().toString() : null);
+            } else if(clientActor == null) {
+                // There's no clientActor to which to send a commit reply so we must be applying
+                // replicated state from the leader.
+                commitWithNewTransaction(MutableCompositeModification.fromSerializable(
+                        modification, schemaContext));
+            } else {
+                // This must be the OK to commit after replication consensus.
+                finishCommit(clientActor, identifier);
             }
-
         } else {
             LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
                     data, data.getClass().getClassLoader(),
@@ -574,7 +752,7 @@ public class Shard extends RaftActor {
             // so that this actor does not get block building the snapshot
             createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
-                "createSnapshot", "");
+                "createSnapshot" + ++createSnapshotTransactionCounter, "");
 
             createSnapshotTransaction.tell(
                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
@@ -665,29 +843,8 @@ public class Shard extends RaftActor {
     }
 
     @VisibleForTesting
-    NormalizedNode<?,?> readStore(YangInstanceIdentifier id)
-            throws ExecutionException, InterruptedException {
-        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
-
-        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
-            transaction.read(id);
-
-        Optional<NormalizedNode<?, ?>> optional = future.get();
-        NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
-
-        transaction.close();
-
-        return node;
-    }
-
-    @VisibleForTesting
-    void writeToStore(YangInstanceIdentifier id, NormalizedNode<?,?> node)
-        throws ExecutionException, InterruptedException {
-        DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
-
-        transaction.write(id, node);
-
-        syncCommitTransaction(transaction);
+    InMemoryDOMDataStore getDataStore() {
+        return store;
     }
 
     @VisibleForTesting
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
new file mode 100644 (file)
index 0000000..f3b4e41
--- /dev/null
@@ -0,0 +1,265 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardCommitCoordinator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
+
+    private static final Object CAN_COMMIT_REPLY_TRUE =
+            new CanCommitTransactionReply(Boolean.TRUE).toSerializable();
+
+    private static final Object CAN_COMMIT_REPLY_FALSE =
+            new CanCommitTransactionReply(Boolean.FALSE).toSerializable();
+
+    private final Cache<String, CohortEntry> cohortCache;
+
+    private CohortEntry currentCohortEntry;
+
+    private final Queue<CohortEntry> queuedCohortEntries;
+
+    private final int queueCapacity;
+
+    public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) {
+        cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
+                cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
+
+        this.queueCapacity = queueCapacity;
+
+        // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
+        // since this should only be accessed on the shard's dispatcher.
+        queuedCohortEntries = new LinkedList<>();
+    }
+
+    /**
+     * This method caches a cohort entry for the given transactions ID in preparation for the
+     * subsequent 3-phase commit.
+     *
+     * @param transactionID the ID of the transaction
+     * @param cohort the cohort to participate in the transaction commit
+     * @param modification the modification made by the transaction
+     */
+    public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
+            Modification modification) {
+
+        cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
+    }
+
+    /**
+     * This method handles the canCommit phase for a transaction.
+     *
+     * @param canCommit the CanCommitTransaction message
+     * @param sender the actor that sent the message
+     * @param shard the transaction's shard actor
+     */
+    public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
+            final ActorRef shard) {
+        String transactionID = canCommit.getTransactionID();
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Processing canCommit for transaction {} for shard {}",
+                    transactionID, shard.path());
+        }
+
+        // Lookup the cohort entry that was cached previously (or should have been) by
+        // transactionReady (via the ForwardedReadyTransaction message).
+        final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
+        if(cohortEntry == null) {
+            // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
+            // between canCommit and ready and the entry was expired from the cache.
+            IllegalStateException ex = new IllegalStateException(
+                    String.format("No cohort entry found for transaction %s", transactionID));
+            LOG.error(ex.getMessage());
+            sender.tell(new Status.Failure(ex), shard);
+            return;
+        }
+
+        cohortEntry.setCanCommitSender(sender);
+        cohortEntry.setShard(shard);
+
+        if(currentCohortEntry != null) {
+            // There's already a Tx commit in progress - attempt to queue this entry to be
+            // committed after the current Tx completes.
+            LOG.debug("Transaction {} is already in progress - queueing transaction {}",
+                    currentCohortEntry.getTransactionID(), transactionID);
+
+            if(queuedCohortEntries.size() < queueCapacity) {
+                queuedCohortEntries.offer(cohortEntry);
+            } else {
+                removeCohortEntry(transactionID);
+
+                RuntimeException ex = new RuntimeException(
+                        String.format("Could not enqueue transaction %s - the maximum commit queue"+
+                                      " capacity %d has been reached.",
+                                transactionID, queueCapacity));
+                LOG.error(ex.getMessage());
+                sender.tell(new Status.Failure(ex), shard);
+            }
+        } else {
+            // No Tx commit currently in progress - make this the current entry and proceed with
+            // canCommit.
+            cohortEntry.updateLastAccessTime();
+            currentCohortEntry = cohortEntry;
+
+            doCanCommit(cohortEntry);
+        }
+    }
+
+    private void doCanCommit(final CohortEntry cohortEntry) {
+
+        try {
+            // We block on the future here so we don't have to worry about possibly accessing our
+            // state on a different thread outside of our dispatcher. Also, the data store
+            // currently uses a same thread executor anyway.
+            Boolean canCommit = cohortEntry.getCohort().canCommit().get();
+
+            cohortEntry.getCanCommitSender().tell(
+                    canCommit ? CAN_COMMIT_REPLY_TRUE : CAN_COMMIT_REPLY_FALSE, cohortEntry.getShard());
+
+            if(!canCommit) {
+                // Remove the entry from the cache now since the Tx will be aborted.
+                removeCohortEntry(cohortEntry.getTransactionID());
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.debug("An exception occurred during canCommit", e);
+
+            // Remove the entry from the cache now since the Tx will be aborted.
+            removeCohortEntry(cohortEntry.getTransactionID());
+            cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
+        }
+    }
+
+    /**
+     * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
+     * matches the current entry.
+     *
+     * @param transactionID the ID of the transaction
+     * @return the current CohortEntry or null if the given transaction ID does not match the
+     *         current entry.
+     */
+    public CohortEntry getCohortEntryIfCurrent(String transactionID) {
+        if(isCurrentTransaction(transactionID)) {
+            return currentCohortEntry;
+        }
+
+        return null;
+    }
+
+    public CohortEntry getCurrentCohortEntry() {
+        return currentCohortEntry;
+    }
+
+    public CohortEntry getAndRemoveCohortEntry(String transactionID) {
+        CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
+        cohortCache.invalidate(transactionID);
+        return cohortEntry;
+    }
+
+    public void removeCohortEntry(String transactionID) {
+        cohortCache.invalidate(transactionID);
+    }
+
+    public boolean isCurrentTransaction(String transactionID) {
+        return currentCohortEntry != null &&
+                currentCohortEntry.getTransactionID().equals(transactionID);
+    }
+
+    /**
+     * This method is called when a transaction is complete, successful or not. If the given
+     * given transaction ID matches the current in-progress transaction, the next cohort entry,
+     * if any, is dequeued and processed.
+     *
+     * @param transactionID the ID of the completed transaction
+     * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
+     *        the cache.
+     */
+    public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
+        if(removeCohortEntry) {
+            removeCohortEntry(transactionID);
+        }
+
+        if(isCurrentTransaction(transactionID)) {
+            // Dequeue the next cohort entry waiting in the queue.
+            currentCohortEntry = queuedCohortEntries.poll();
+            if(currentCohortEntry != null) {
+                doCanCommit(currentCohortEntry);
+            }
+        }
+    }
+
+    static class CohortEntry {
+        private final String transactionID;
+        private final DOMStoreThreePhaseCommitCohort cohort;
+        private final Modification modification;
+        private ActorRef canCommitSender;
+        private ActorRef shard;
+        private long lastAccessTime;
+
+        CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
+                Modification modification) {
+            this.transactionID = transactionID;
+            this.cohort = cohort;
+            this.modification = modification;
+        }
+
+        void updateLastAccessTime() {
+            lastAccessTime = System.currentTimeMillis();
+        }
+
+        long getLastAccessTime() {
+            return lastAccessTime;
+        }
+
+        String getTransactionID() {
+            return transactionID;
+        }
+
+        DOMStoreThreePhaseCommitCohort getCohort() {
+            return cohort;
+        }
+
+        Modification getModification() {
+            return modification;
+        }
+
+        ActorRef getCanCommitSender() {
+            return canCommitSender;
+        }
+
+        void setCanCommitSender(ActorRef canCommitSender) {
+            this.canCommitSender = canCommitSender;
+        }
+
+        ActorRef getShard() {
+            return shard;
+        }
+
+        void setShard(ActorRef shard) {
+            this.shard = shard;
+        }
+    }
+}
index 0e9fd11..29f22b2 100644 (file)
@@ -27,8 +27,8 @@ public class ShardReadTransaction extends ShardTransaction {
     private final DOMStoreReadTransaction transaction;
 
     public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats) {
-        super(shardActor, schemaContext, shardStats);
+            SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
+        super(shardActor, schemaContext, shardStats, transactionID);
         this.transaction = transaction;
     }
 
index d04ec23..2e174eb 100644 (file)
@@ -14,25 +14,20 @@ import akka.actor.ActorRef;
 
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
-import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * @author: syedbahm
  * Date: 8/6/14
  */
-public class ShardReadWriteTransaction extends ShardTransaction {
+public class ShardReadWriteTransaction extends ShardWriteTransaction {
     private final DOMStoreReadWriteTransaction transaction;
 
     public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats) {
-        super(shardActor, schemaContext, shardStats);
+            SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
+        super(transaction, shardActor, schemaContext, shardStats, transactionID);
         this.transaction = transaction;
     }
 
@@ -40,23 +35,10 @@ public class ShardReadWriteTransaction extends ShardTransaction {
     public void handleReceive(Object message) throws Exception {
         if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
             readData(transaction, ReadData.fromSerializable(message));
-        } else if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            writeData(transaction, WriteData.fromSerializable(message, schemaContext));
-        } else if(MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
-        } else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            deleteData(transaction, DeleteData.fromSerializable(message));
-        } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readyTransaction(transaction, new ReadyTransaction());
         } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
             dataExists(transaction, DataExists.fromSerializable(message));
         } else {
             super.handleReceive(message);
         }
     }
-
-    @Override
-    protected DOMStoreTransaction getDOMStoreTransaction() {
-        return transaction;
-    }
 }
index b16ec0a..edaf935 100644 (file)
@@ -13,7 +13,6 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.ReceiveTimeout;
 import akka.japi.Creator;
-import akka.serialization.Serialization;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
@@ -23,26 +22,11 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.MergeData;
-import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.WriteData;
-import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
-import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.ImmutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -74,34 +58,43 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public abstract class ShardTransaction extends AbstractUntypedActor {
 
     private final ActorRef shardActor;
-    protected final SchemaContext schemaContext;
+    private final SchemaContext schemaContext;
     private final ShardStats shardStats;
-
-    private final MutableCompositeModification modification = new MutableCompositeModification();
+    private final String transactionID;
 
     protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
-            ShardStats shardStats) {
+            ShardStats shardStats, String transactionID) {
         this.shardActor = shardActor;
         this.schemaContext = schemaContext;
         this.shardStats = shardStats;
+        this.transactionID = transactionID;
     }
 
     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats) {
+            SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
+            String transactionID) {
         return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
-           datastoreContext, shardStats));
+           datastoreContext, shardStats, transactionID));
     }
 
     protected abstract DOMStoreTransaction getDOMStoreTransaction();
 
+    protected ActorRef getShardActor() {
+        return shardActor;
+    }
+
+    protected String getTransactionID() {
+        return transactionID;
+    }
+
+    protected SchemaContext getSchemaContext() {
+        return schemaContext;
+    }
+
     @Override
     public void handleReceive(Object message) throws Exception {
         if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
             closeTransaction(true);
-        } else if (message instanceof GetCompositedModification) {
-            // This is here for testing only
-            getSender().tell(new GetCompositeModificationReply(
-                    new ImmutableCompositeModification(modification)), getSelf());
         } else if (message instanceof ReceiveTimeout) {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
@@ -160,56 +153,6 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
 
     }
 
-    protected void writeData(DOMStoreWriteTransaction transaction, WriteData message) {
-        modification.addModification(
-                new WriteModification(message.getPath(), message.getData(),schemaContext));
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("writeData at path : " + message.getPath().toString());
-        }
-        try {
-            transaction.write(message.getPath(), message.getData());
-            getSender().tell(new WriteDataReply().toSerializable(), getSelf());
-        }catch(Exception e){
-            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
-        }
-    }
-
-    protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) {
-        modification.addModification(
-                new MergeModification(message.getPath(), message.getData(), schemaContext));
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("mergeData at path : " + message.getPath().toString());
-        }
-        try {
-            transaction.merge(message.getPath(), message.getData());
-            getSender().tell(new MergeDataReply().toSerializable(), getSelf());
-        }catch(Exception e){
-            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
-        }
-    }
-
-    protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("deleteData at path : " + message.getPath().toString());
-        }
-        modification.addModification(new DeleteModification(message.getPath()));
-        try {
-            transaction.delete(message.getPath());
-            getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
-        }catch(Exception e){
-            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
-        }
-    }
-
-    protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {
-        DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
-        ActorRef cohortActor = getContext().actorOf(
-            ThreePhaseCommitCohort.props(cohort, shardActor, modification, shardStats), "cohort");
-        getSender().tell(new ReadyTransactionReply(
-            Serialization.serializedActorPath(cohortActor)).toSerializable(), getSelf());
-
-    }
-
     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
 
         private static final long serialVersionUID = 1L;
@@ -219,15 +162,17 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
         final SchemaContext schemaContext;
         final DatastoreContext datastoreContext;
         final ShardStats shardStats;
+        final String transactionID;
 
         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
                 SchemaContext schemaContext, DatastoreContext datastoreContext,
-                ShardStats shardStats) {
+                ShardStats shardStats, String transactionID) {
             this.transaction = transaction;
             this.shardActor = shardActor;
             this.shardStats = shardStats;
             this.schemaContext = schemaContext;
             this.datastoreContext = datastoreContext;
+            this.transactionID = transactionID;
         }
 
         @Override
@@ -235,37 +180,17 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
             ShardTransaction tx;
             if(transaction instanceof DOMStoreReadWriteTransaction) {
                 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
-                        shardActor, schemaContext, shardStats);
+                        shardActor, schemaContext, shardStats, transactionID);
             } else if(transaction instanceof DOMStoreReadTransaction) {
                 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
-                        schemaContext, shardStats);
+                        schemaContext, shardStats, transactionID);
             } else {
                 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
-                        shardActor, schemaContext, shardStats);
+                        shardActor, schemaContext, shardStats, transactionID);
             }
 
             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
             return tx;
         }
     }
-
-    // These classes are in here for test purposes only
-
-    static class GetCompositedModification {
-    }
-
-
-    static class GetCompositeModificationReply {
-        private final CompositeModification modification;
-
-
-        GetCompositeModificationReply(CompositeModification modification) {
-            this.modification = modification;
-        }
-
-
-        public CompositeModification getModification() {
-            return modification;
-        }
-    }
 }
index 8fe94cf..943a82f 100644 (file)
@@ -56,23 +56,26 @@ public class ShardTransactionChain extends AbstractUntypedActor {
         return getContext().parent();
     }
 
-    private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,
-            String transactionId) {
+    private ActorRef createTypedTransactionActor(CreateTransaction createTransaction) {
+        String transactionName = "shard-" + createTransaction.getTransactionId();
         if(createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
-                            schemaContext, datastoreContext, shardStats), transactionId);
+                            schemaContext, datastoreContext, shardStats,
+                            createTransaction.getTransactionId()), transactionName);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
-                            schemaContext, datastoreContext, shardStats), transactionId);
+                            schemaContext, datastoreContext, shardStats,
+                            createTransaction.getTransactionId()), transactionName);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
-                            schemaContext, datastoreContext, shardStats), transactionId);
+                            schemaContext, datastoreContext, shardStats,
+                            createTransaction.getTransactionId()), transactionName);
         } else {
             throw new IllegalArgumentException (
                     "CreateTransaction message has unidentified transaction type=" +
@@ -82,10 +85,9 @@ public class ShardTransactionChain extends AbstractUntypedActor {
 
     private void createTransaction(CreateTransaction createTransaction) {
 
-        ActorRef transactionActor = createTypedTransactionActor(createTransaction, "shard-" + createTransaction.getTransactionId());
-        getSender()
-            .tell(new CreateTransactionReply(transactionActor.path().toString(),createTransaction.getTransactionId()).toSerializable(),
-                getSelf());
+        ActorRef transactionActor = createTypedTransactionActor(createTransaction);
+        getSender().tell(new CreateTransactionReply(transactionActor.path().toString(),
+                createTransaction.getTransactionId()).toSerializable(), getSelf());
     }
 
     public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
index 396b27a..e993e4b 100644 (file)
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
-
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.ImmutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -26,31 +36,103 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  * Date: 8/6/14
  */
 public class ShardWriteTransaction extends ShardTransaction {
+
+    private final MutableCompositeModification modification = new MutableCompositeModification();
     private final DOMStoreWriteTransaction transaction;
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats) {
-        super(shardActor, schemaContext, shardStats);
+            SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
+        super(shardActor, schemaContext, shardStats, transactionID);
         this.transaction = transaction;
     }
 
+    @Override
+    protected DOMStoreTransaction getDOMStoreTransaction() {
+        return transaction;
+    }
+
     @Override
     public void handleReceive(Object message) throws Exception {
         if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            writeData(transaction, WriteData.fromSerializable(message, schemaContext));
+            writeData(transaction, WriteData.fromSerializable(message, getSchemaContext()));
         } else if(MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
+            mergeData(transaction, MergeData.fromSerializable(message, getSchemaContext()));
         } else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
             deleteData(transaction, DeleteData.fromSerializable(message));
         } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
             readyTransaction(transaction, new ReadyTransaction());
+        } else if (message instanceof GetCompositedModification) {
+            // This is here for testing only
+            getSender().tell(new GetCompositeModificationReply(
+                    new ImmutableCompositeModification(modification)), getSelf());
         } else {
             super.handleReceive(message);
         }
     }
 
-    @Override
-    protected DOMStoreTransaction getDOMStoreTransaction() {
-        return transaction;
+    private void writeData(DOMStoreWriteTransaction transaction, WriteData message) {
+        modification.addModification(
+                new WriteModification(message.getPath(), message.getData(), getSchemaContext()));
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("writeData at path : " + message.getPath().toString());
+        }
+        try {
+            transaction.write(message.getPath(), message.getData());
+            getSender().tell(new WriteDataReply().toSerializable(), getSelf());
+        }catch(Exception e){
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
+    }
+
+    private void mergeData(DOMStoreWriteTransaction transaction, MergeData message) {
+        modification.addModification(
+                new MergeModification(message.getPath(), message.getData(), getSchemaContext()));
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("mergeData at path : " + message.getPath().toString());
+        }
+        try {
+            transaction.merge(message.getPath(), message.getData());
+            getSender().tell(new MergeDataReply().toSerializable(), getSelf());
+        }catch(Exception e){
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
+    }
+
+    private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("deleteData at path : " + message.getPath().toString());
+        }
+        modification.addModification(new DeleteModification(message.getPath()));
+        try {
+            transaction.delete(message.getPath());
+            getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
+        }catch(Exception e){
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
+    }
+
+    private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {
+        DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
+
+        getShardActor().forward(new ForwardedReadyTransaction(getTransactionID(), cohort, modification),
+                getContext());
+    }
+
+    // These classes are in here for test purposes only
+
+    static class GetCompositedModification {
+    }
+
+    static class GetCompositeModificationReply {
+        private final CompositeModification modification;
+
+
+        GetCompositeModificationReply(CompositeModification modification) {
+            this.modification = modification;
+        }
+
+        public CompositeModification getModification() {
+            return modification;
+        }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
deleted file mode 100644 (file)
index df85bb1..0000000
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorRef;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Creator;
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-
-public class ThreePhaseCommitCohort extends AbstractUntypedActor {
-    private final DOMStoreThreePhaseCommitCohort cohort;
-    private final ActorRef shardActor;
-    private final CompositeModification modification;
-    private final ShardStats shardStats;
-
-    public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort,
-        ActorRef shardActor, CompositeModification modification, ShardStats shardStats) {
-
-        this.cohort = cohort;
-        this.shardActor = shardActor;
-        this.modification = modification;
-        this.shardStats = shardStats;
-    }
-
-    private final LoggingAdapter log =
-        Logging.getLogger(getContext().system(), this);
-
-    public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
-            final ActorRef shardActor, final CompositeModification modification,
-            ShardStats shardStats) {
-        return Props.create(new ThreePhaseCommitCohortCreator(cohort, shardActor, modification,
-                shardStats));
-    }
-
-    @Override
-    public void handleReceive(Object message) throws Exception {
-        if (message.getClass()
-            .equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
-            canCommit(new CanCommitTransaction());
-        } else if (message.getClass()
-            .equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
-            preCommit(new PreCommitTransaction());
-        } else if (message.getClass()
-            .equals(CommitTransaction.SERIALIZABLE_CLASS)) {
-            commit(new CommitTransaction());
-        } else if (message.getClass()
-            .equals(AbortTransaction.SERIALIZABLE_CLASS)) {
-            abort(new AbortTransaction());
-        } else {
-            unknownMessage(message);
-        }
-    }
-
-    private void abort(AbortTransaction message) {
-        final ListenableFuture<Void> future = cohort.abort();
-        final ActorRef sender = getSender();
-        final ActorRef self = getSelf();
-
-        Futures.addCallback(future, new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(Void v) {
-                shardStats.incrementAbortTransactionsCount();
-                sender
-                    .tell(new AbortTransactionReply().toSerializable(),
-                    self);
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                LOG.error(t, "An exception happened during abort");
-                sender
-                    .tell(new akka.actor.Status.Failure(t), self);
-            }
-        });
-    }
-
-    private void commit(CommitTransaction message) {
-        // Forward the commit to the shard
-        if(log.isDebugEnabled()) {
-            log.debug("Forward commit transaction to Shard {} ", shardActor);
-        }
-        shardActor.forward(new ForwardedCommitTransaction(cohort, modification),
-            getContext());
-
-        getContext().parent().tell(PoisonPill.getInstance(), getSelf());
-
-    }
-
-    private void preCommit(PreCommitTransaction message) {
-        final ListenableFuture<Void> future = cohort.preCommit();
-        final ActorRef sender = getSender();
-        final ActorRef self = getSelf();
-        Futures.addCallback(future, new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(Void v) {
-                sender
-                    .tell(new PreCommitTransactionReply().toSerializable(),
-                        self);
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                LOG.error(t, "An exception happened during pre-commit");
-                sender
-                    .tell(new akka.actor.Status.Failure(t), self);
-            }
-        });
-
-    }
-
-    private void canCommit(CanCommitTransaction message) {
-        final ListenableFuture<Boolean> future = cohort.canCommit();
-        final ActorRef sender = getSender();
-        final ActorRef self = getSelf();
-        Futures.addCallback(future, new FutureCallback<Boolean>() {
-            @Override
-            public void onSuccess(Boolean canCommit) {
-                sender.tell(new CanCommitTransactionReply(canCommit)
-                    .toSerializable(), self);
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                LOG.error(t, "An exception happened during canCommit");
-                sender
-                    .tell(new akka.actor.Status.Failure(t), self);
-            }
-        });
-    }
-
-    private static class ThreePhaseCommitCohortCreator implements Creator<ThreePhaseCommitCohort> {
-        final DOMStoreThreePhaseCommitCohort cohort;
-        final ActorRef shardActor;
-        final CompositeModification modification;
-        final ShardStats shardStats;
-
-        ThreePhaseCommitCohortCreator(DOMStoreThreePhaseCommitCohort cohort,
-            ActorRef shardActor, CompositeModification modification, ShardStats shardStats) {
-            this.cohort = cohort;
-            this.shardActor = shardActor;
-            this.modification = modification;
-            this.shardStats = shardStats;
-        }
-
-        @Override
-        public ThreePhaseCommitCohort create() throws Exception {
-            return new ThreePhaseCommitCohort(cohort, shardActor, modification, shardStats);
-        }
-    }
-}
index 6e76696..6e5ba1c 100644 (file)
@@ -21,15 +21,12 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import scala.runtime.AbstractFunction1;
-
 import java.util.Collections;
 import java.util.List;
 
@@ -40,6 +37,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
 
+    private static final ListenableFuture<Void> IMMEDIATE_SUCCESS =
+            com.google.common.util.concurrent.Futures.immediateFuture(null);
+
     private final ActorContext actorContext;
     private final List<Future<ActorSelection>> cohortFutures;
     private volatile List<ActorSelection> cohorts;
@@ -108,7 +108,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
         // their canCommit processing. If any one fails then we'll fail canCommit.
 
         Future<Iterable<Object>> combinedFuture =
-                invokeCohorts(new CanCommitTransaction().toSerializable());
+                invokeCohorts(new CanCommitTransaction(transactionId).toSerializable());
 
         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
             @Override
@@ -160,8 +160,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
     @Override
     public ListenableFuture<Void> preCommit() {
-        return voidOperation("preCommit",  new PreCommitTransaction().toSerializable(),
-                PreCommitTransactionReply.SERIALIZABLE_CLASS, true);
+        // We don't need to do anything here - preCommit is done atomically with the commit phase
+        // by the shard.
+        return IMMEDIATE_SUCCESS;
     }
 
     @Override
@@ -172,13 +173,13 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
         // exception then that exception will supersede and suppress the original exception. But
         // it's the original exception that is the root cause and of more interest to the client.
 
-        return voidOperation("abort", new AbortTransaction().toSerializable(),
+        return voidOperation("abort", new AbortTransaction(transactionId).toSerializable(),
                 AbortTransactionReply.SERIALIZABLE_CLASS, false);
     }
 
     @Override
     public ListenableFuture<Void> commit() {
-        return voidOperation("commit",  new CommitTransaction().toSerializable(),
+        return voidOperation("commit",  new CommitTransaction(transactionId).toSerializable(),
                 CommitTransactionReply.SERIALIZABLE_CLASS, true);
     }
 
index 77e8142..dd04afc 100644 (file)
@@ -13,14 +13,19 @@ import com.google.common.base.Preconditions;
 public class ShardTransactionIdentifier {
     private final String remoteTransactionId;
 
-    public ShardTransactionIdentifier(String remoteTransactionId) {
-        this.remoteTransactionId = Preconditions.checkNotNull(remoteTransactionId, "remoteTransactionId should not be null");
+    private ShardTransactionIdentifier(String remoteTransactionId) {
+        this.remoteTransactionId = Preconditions.checkNotNull(remoteTransactionId,
+                "remoteTransactionId should not be null");
     }
 
     public static Builder builder(){
         return new Builder();
     }
 
+    public String getRemoteTransactionId() {
+        return remoteTransactionId;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -45,8 +50,7 @@ public class ShardTransactionIdentifier {
     }
 
     @Override public String toString() {
-        final StringBuilder sb =
-            new StringBuilder();
+        final StringBuilder sb = new StringBuilder();
         sb.append("shard-").append(remoteTransactionId);
         return sb.toString();
     }
index c639064..d24e29c 100644 (file)
@@ -11,10 +11,27 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 
 public class AbortTransaction implements SerializableMessage {
-  public static final Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.AbortTransaction.class;
+    public static final Class<ThreePhaseCommitCohortMessages.AbortTransaction> SERIALIZABLE_CLASS =
+            ThreePhaseCommitCohortMessages.AbortTransaction.class;
 
-  @Override
-  public Object toSerializable() {
-    return ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder().build();
-  }
+    private final String transactionID;
+
+    public AbortTransaction(String transactionID) {
+        this.transactionID = transactionID;
+    }
+
+    public String getTransactionID() {
+        return transactionID;
+    }
+
+    @Override
+    public Object toSerializable() {
+        return ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder().
+                setTransactionId(transactionID).build();
+    }
+
+    public static AbortTransaction fromSerializable(Object message) {
+        return new AbortTransaction(((ThreePhaseCommitCohortMessages.AbortTransaction)message).
+                getTransactionId());
+    }
 }
index 88e2640..79c6b03 100644 (file)
@@ -11,11 +11,11 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 
 public class AbortTransactionReply implements SerializableMessage {
-  public static final Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.AbortTransactionReply.class;
+    public static final Class<ThreePhaseCommitCohortMessages.AbortTransactionReply> SERIALIZABLE_CLASS =
+            ThreePhaseCommitCohortMessages.AbortTransactionReply.class;
 
-
-  @Override
-  public Object toSerializable() {
-    return ThreePhaseCommitCohortMessages.AbortTransactionReply.newBuilder().build();
-  }
+    @Override
+    public Object toSerializable() {
+        return ThreePhaseCommitCohortMessages.AbortTransactionReply.newBuilder().build();
+    }
 }
index 08f81c1..565345a 100644 (file)
@@ -11,10 +11,26 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 
 public class CanCommitTransaction implements SerializableMessage {
-  public static final Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.CanCommitTransaction.class;
+    public static final Class<?> SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.CanCommitTransaction.class;
 
-  @Override
-  public Object toSerializable() {
-    return  ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build();
-  }
+    private final String transactionID;
+
+    public CanCommitTransaction(String transactionID) {
+        this.transactionID = transactionID;
+    }
+
+    public String getTransactionID() {
+        return transactionID;
+    }
+
+    @Override
+    public Object toSerializable() {
+        return ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().
+                setTransactionId(transactionID).build();
+    }
+
+    public static CanCommitTransaction fromSerializable(Object message) {
+        return new CanCommitTransaction(((ThreePhaseCommitCohortMessages.CanCommitTransaction)message).
+                getTransactionId());
+    }
 }
index bbcd4de..9c8909c 100644 (file)
@@ -11,24 +11,26 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 
 public class CanCommitTransactionReply implements SerializableMessage {
-  public static Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.CanCommitTransactionReply.class;
-  private final Boolean canCommit;
+    public static Class<ThreePhaseCommitCohortMessages.CanCommitTransactionReply> SERIALIZABLE_CLASS =
+            ThreePhaseCommitCohortMessages.CanCommitTransactionReply.class;
 
-  public CanCommitTransactionReply(Boolean canCommit) {
-    this.canCommit = canCommit;
-  }
+    private final Boolean canCommit;
 
-  public Boolean getCanCommit() {
-    return canCommit;
-  }
+    public CanCommitTransactionReply(Boolean canCommit) {
+        this.canCommit = canCommit;
+    }
 
-  @Override
-  public Object toSerializable() {
-    return  ThreePhaseCommitCohortMessages.CanCommitTransactionReply.newBuilder().setCanCommit(canCommit).build();
-  }
+    public Boolean getCanCommit() {
+        return canCommit;
+    }
 
+    @Override
+    public Object toSerializable() {
+        return ThreePhaseCommitCohortMessages.CanCommitTransactionReply.newBuilder().setCanCommit(canCommit).build();
+    }
 
-  public static CanCommitTransactionReply fromSerializable(Object message) {
-    return  new CanCommitTransactionReply(((ThreePhaseCommitCohortMessages.CanCommitTransactionReply)message).getCanCommit());
-  }
+    public static CanCommitTransactionReply fromSerializable(Object message) {
+        return new CanCommitTransactionReply(
+                ((ThreePhaseCommitCohortMessages.CanCommitTransactionReply) message).getCanCommit());
+    }
 }
index 92138a7..d607fe5 100644 (file)
@@ -11,10 +11,27 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 
 public class CommitTransaction implements SerializableMessage {
-  public static final  Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.CommitTransaction.class;
+    public static final Class<ThreePhaseCommitCohortMessages.CommitTransaction> SERIALIZABLE_CLASS =
+            ThreePhaseCommitCohortMessages.CommitTransaction.class;
 
-  @Override
-  public Object toSerializable() {
-    return  ThreePhaseCommitCohortMessages.CommitTransaction.newBuilder().build();
-  }
+    private final String transactionID;
+
+    public CommitTransaction(String transactionID) {
+        this.transactionID = transactionID;
+    }
+
+    public String getTransactionID() {
+        return transactionID;
+    }
+
+    @Override
+    public Object toSerializable() {
+        return ThreePhaseCommitCohortMessages.CommitTransaction.newBuilder().setTransactionId(
+                transactionID).build();
+    }
+
+    public static CommitTransaction fromSerializable(Object message) {
+        return new CommitTransaction(((ThreePhaseCommitCohortMessages.
+                CommitTransaction)message).getTransactionId());
+    }
 }
index 5751b71..3d4a168 100644 (file)
@@ -11,11 +11,11 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 
 public class CommitTransactionReply implements SerializableMessage {
+    public static final Class<ThreePhaseCommitCohortMessages.CommitTransactionReply> SERIALIZABLE_CLASS =
+            ThreePhaseCommitCohortMessages.CommitTransactionReply.class;
 
-  public static final Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.CommitTransactionReply.class;
-
-  @Override
-  public Object toSerializable() {
-    return  ThreePhaseCommitCohortMessages.CommitTransactionReply.newBuilder().build();
-  }
+    @Override
+    public Object toSerializable() {
+        return ThreePhaseCommitCohortMessages.CommitTransactionReply.newBuilder().build();
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChain.java
deleted file mode 100644 (file)
index 8dd04e5..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages;
-
-public class CreateTransactionChain implements SerializableMessage{
-  public static final Class SERIALIZABLE_CLASS = ShardTransactionChainMessages.CreateTransactionChain.class;
-
-  @Override
-  public Object toSerializable() {
-    return  ShardTransactionChainMessages.CreateTransactionChain.newBuilder().build();
-  }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChainReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChainReply.java
deleted file mode 100644 (file)
index 4a49762..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import akka.actor.ActorPath;
-import akka.actor.ActorSystem;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages;
-
-public class CreateTransactionChainReply implements SerializableMessage {
-  public static final Class SERIALIZABLE_CLASS = ShardTransactionChainMessages.CreateTransactionChainReply.class;
-  private final ActorPath transactionChainPath;
-
-  public CreateTransactionChainReply(ActorPath transactionChainPath) {
-    this.transactionChainPath = transactionChainPath;
-  }
-
-  public ActorPath getTransactionChainPath() {
-    return transactionChainPath;
-  }
-
-  @Override
-  public ShardTransactionChainMessages.CreateTransactionChainReply toSerializable() {
-    return ShardTransactionChainMessages.CreateTransactionChainReply.newBuilder()
-        .setTransactionChainPath(transactionChainPath.toString()).build();
-  }
-
-  public static CreateTransactionChainReply fromSerializable(ActorSystem actorSystem,Object serializable){
-    ShardTransactionChainMessages.CreateTransactionChainReply o = (ShardTransactionChainMessages.CreateTransactionChainReply) serializable;
-    return new CreateTransactionChainReply(
-        actorSystem.actorFor(o.getTransactionChainPath()).path());
-  }
-
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedCommitTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedCommitTransaction.java
deleted file mode 100644 (file)
index 0104993..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-
-public class ForwardedCommitTransaction {
-  private final DOMStoreThreePhaseCommitCohort cohort;
-  private final Modification modification;
-
-  public ForwardedCommitTransaction(DOMStoreThreePhaseCommitCohort cohort, Modification modification){
-    this.cohort = cohort;
-    this.modification = modification;
-  }
-
-  public DOMStoreThreePhaseCommitCohort getCohort() {
-    return cohort;
-  }
-
-  public Modification getModification() {
-    return modification;
-  }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java
new file mode 100644 (file)
index 0000000..4f8ea51
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+/**
+ * Transaction ReadyTransaction message that is forwarded to the local Shard from the ShardTransaction.
+ *
+ * @author Thomas Pantelis
+ */
+public class ForwardedReadyTransaction {
+    private final String transactionID;
+    private final DOMStoreThreePhaseCommitCohort cohort;
+    private final Modification modification;
+
+    public ForwardedReadyTransaction(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
+            Modification modification) {
+        this.transactionID = transactionID;
+        this.cohort = cohort;
+        this.modification = modification;
+    }
+
+    public String getTransactionID() {
+        return transactionID;
+    }
+
+    public DOMStoreThreePhaseCommitCohort getCohort() {
+        return cohort;
+    }
+
+    public Modification getModification() {
+        return modification;
+    }
+}
index 59dd6db..eee4891 100644 (file)
@@ -11,29 +11,30 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class ReadyTransactionReply implements SerializableMessage {
-  public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.ReadyTransactionReply.class;
-  private final String cohortPath;
+    public static final Class<ShardTransactionMessages.ReadyTransactionReply> SERIALIZABLE_CLASS =
+            ShardTransactionMessages.ReadyTransactionReply.class;
 
-  public ReadyTransactionReply(String cohortPath) {
+    private final String cohortPath;
 
-    this.cohortPath = cohortPath;
-  }
+    public ReadyTransactionReply(String cohortPath) {
 
-  public String getCohortPath() {
-    return cohortPath;
-  }
+        this.cohortPath = cohortPath;
+    }
 
-  @Override
-  public ShardTransactionMessages.ReadyTransactionReply toSerializable() {
-    return ShardTransactionMessages.ReadyTransactionReply.newBuilder()
-        .setActorPath(cohortPath).build();
-  }
+    public String getCohortPath() {
+        return cohortPath;
+    }
 
-  public static ReadyTransactionReply fromSerializable(Object serializable) {
-      ShardTransactionMessages.ReadyTransactionReply o =
-          (ShardTransactionMessages.ReadyTransactionReply) serializable;
+    @Override
+    public ShardTransactionMessages.ReadyTransactionReply toSerializable() {
+        return ShardTransactionMessages.ReadyTransactionReply.newBuilder().
+                setActorPath(cohortPath).build();
+    }
 
-      return new ReadyTransactionReply(o.getActorPath());
+    public static ReadyTransactionReply fromSerializable(Object serializable) {
+        ShardTransactionMessages.ReadyTransactionReply o =
+                (ShardTransactionMessages.ReadyTransactionReply) serializable;
 
-  }
+        return new ReadyTransactionReply(o.getActorPath());
+    }
 }
index 84614bd..de33f55 100644 (file)
@@ -40,18 +40,25 @@ public class DistributedConfigDataStoreProviderModule extends
             props = new ConfigProperties();
         }
 
-        DatastoreContext datastoreContext = new DatastoreContext("DistributedConfigDatastore",
-                InMemoryDOMDataStoreConfigProperties.create(
+        DatastoreContext datastoreContext = DatastoreContext.newBuilder()
+                .dataStoreMXBeanType("DistributedConfigDatastore")
+                .dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
                         props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
                         props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
                         props.getMaxShardDataChangeListenerQueueSize().getValue().intValue(),
-                        props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()),
-                Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
-                        TimeUnit.MINUTES),
-                props.getOperationTimeoutInSeconds().getValue(),
-                props.getShardJournalRecoveryLogBatchSize().getValue().intValue(),
-                props.getShardSnapshotBatchCount().getValue().intValue(),
-                props.getShardHearbeatIntervalInMillis().getValue());
+                        props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()))
+                .shardTransactionIdleTimeout(Duration.create(
+                        props.getShardTransactionIdleTimeoutInMinutes().getValue(), TimeUnit.MINUTES))
+                .operationTimeoutInSeconds(props.getOperationTimeoutInSeconds().getValue())
+                .shardJournalRecoveryLogBatchSize(props.getShardJournalRecoveryLogBatchSize().
+                        getValue().intValue())
+                .shardSnapshotBatchCount(props.getShardSnapshotBatchCount().getValue().intValue())
+                .shardHeartbeatIntervalInMillis(props.getShardHearbeatIntervalInMillis().getValue())
+                .shardTransactionCommitTimeoutInSeconds(
+                        props.getShardTransactionCommitTimeoutInSeconds().getValue().intValue())
+                .shardTransactionCommitQueueCapacity(
+                        props.getShardTransactionCommitQueueCapacity().getValue().intValue())
+                .build();
 
         return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
                 datastoreContext, bundleContext);
index 3183527..ee1859d 100644 (file)
@@ -40,18 +40,25 @@ public class DistributedOperationalDataStoreProviderModule extends
             props = new OperationalProperties();
         }
 
-        DatastoreContext datastoreContext = new DatastoreContext("DistributedOperationalDatastore",
-                InMemoryDOMDataStoreConfigProperties.create(
+        DatastoreContext datastoreContext = DatastoreContext.newBuilder()
+                .dataStoreMXBeanType("DistributedOperationalDatastore")
+                .dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
                         props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
                         props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
                         props.getMaxShardDataChangeListenerQueueSize().getValue().intValue(),
-                        props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()),
-                Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
-                        TimeUnit.MINUTES),
-                props.getOperationTimeoutInSeconds().getValue(),
-                props.getShardJournalRecoveryLogBatchSize().getValue().intValue(),
-                props.getShardSnapshotBatchCount().getValue().intValue(),
-                props.getShardHearbeatIntervalInMillis().getValue());
+                        props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue()))
+                .shardTransactionIdleTimeout(Duration.create(
+                        props.getShardTransactionIdleTimeoutInMinutes().getValue(), TimeUnit.MINUTES))
+                .operationTimeoutInSeconds(props.getOperationTimeoutInSeconds().getValue())
+                .shardJournalRecoveryLogBatchSize(props.getShardJournalRecoveryLogBatchSize().
+                        getValue().intValue())
+                .shardSnapshotBatchCount(props.getShardSnapshotBatchCount().getValue().intValue())
+                .shardHeartbeatIntervalInMillis(props.getShardHearbeatIntervalInMillis().getValue())
+                .shardTransactionCommitTimeoutInSeconds(
+                        props.getShardTransactionCommitTimeoutInSeconds().getValue().intValue())
+                .shardTransactionCommitQueueCapacity(
+                        props.getShardTransactionCommitQueueCapacity().getValue().intValue())
+                .build();
 
         return DistributedDataStoreFactory.createInstance("operational",
                 getOperationalSchemaServiceDependency(), datastoreContext, bundleContext);
index af43f95..167d530 100644 (file)
@@ -70,7 +70,7 @@ module distributed-datastore-provider {
          leaf max-shard-data-change-listener-queue-size {
             default 1000;
             type non-zero-uint32-type;
-            description "The maximum queue size for each shard's data store data change listeners.";
+            description "The maximum queue size for each shard's data store data change listener.";
          }
 
          leaf max-shard-data-store-executor-queue-size {
@@ -109,6 +109,18 @@ module distributed-datastore-provider {
             description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.";
          }
 
+         leaf shard-transaction-commit-timeout-in-seconds {
+            default 30;
+            type non-zero-uint32-type;
+            description "The maximum amount of time a shard transaction three-phase commit can be idle without receiving the next messages before it aborts the transaction";
+         }
+
+         leaf shard-transaction-commit-queue-capacity {
+            default 20000;
+            type non-zero-uint32-type;
+            description "The maximum allowed capacity for each shard's transaction commit queue.";
+         }
+
          leaf enable-metric-capture {
             default false;
             type boolean;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
deleted file mode 100644 (file)
index a718ca7..0000000
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Props;
-import akka.event.Logging;
-import akka.testkit.JavaTestKit;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.messages.WriteData;
-import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.Collections;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-import static org.junit.Assert.assertNotNull;
-
-public class BasicIntegrationTest extends AbstractActorTest {
-
-    @Test
-    public void integrationTest() throws Exception{
-        // System.setProperty("shard.persistent", "true");
-        // This test will
-        // - create a Shard
-        // - initiate a transaction
-        // - write something
-        // - read the transaction for commit
-        // - commit the transaction
-
-
-        new JavaTestKit(getSystem()) {{
-            final ShardIdentifier identifier =
-                ShardIdentifier.builder().memberName("member-1")
-                    .shardName("inventory").type("config").build();
-
-            final SchemaContext schemaContext = TestModel.createTestContext();
-            DatastoreContext datastoreContext = new DatastoreContext();
-
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext());
-            final ActorRef shard = getSystem().actorOf(props);
-
-            new Within(duration("10 seconds")) {
-                @Override
-                protected void run() {
-                    shard.tell(new UpdateSchemaContext(schemaContext), getRef());
-
-
-                    // Wait for a specific log message to show up
-                    final boolean result =
-                        new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
-                        ) {
-                            @Override
-                            protected Boolean run() {
-                                return true;
-                            }
-                        }.from(shard.path().toString())
-                            .message("Switching from state Candidate to Leader")
-                            .occurrences(1).exec();
-
-                    assertEquals(true, result);
-
-                    // Create a transaction on the shard
-                    shard.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.WRITE_ONLY.ordinal() ).toSerializable(), getRef());
-
-                    final ActorSelection transaction =
-                        new ExpectMsg<ActorSelection>(duration("3 seconds"), "CreateTransactionReply") {
-                            @Override
-                            protected ActorSelection match(Object in) {
-                                if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(in.getClass())) {
-                                    CreateTransactionReply reply = CreateTransactionReply.fromSerializable(in);
-                                    return getSystem()
-                                        .actorSelection(reply
-                                            .getTransactionPath());
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
-
-                    assertNotNull(transaction);
-
-                    System.out.println("Successfully created transaction");
-
-                    // 3. Write some data
-                    transaction.tell(new WriteData(TestModel.TEST_PATH,
-                        ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext).toSerializable(),
-                        getRef());
-
-                    Boolean writeDone = new ExpectMsg<Boolean>(duration("3 seconds"), "WriteDataReply") {
-                        @Override
-                        protected Boolean match(Object in) {
-                            if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
-                                return true;
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
-
-                    assertTrue(writeDone);
-
-                    System.out.println("Successfully wrote data");
-
-                    // 4. Ready the transaction for commit
-
-                    transaction.tell(new ReadyTransaction().toSerializable(), getRef());
-
-                    final ActorSelection cohort =
-                        new ExpectMsg<ActorSelection>(duration("3 seconds"), "ReadyTransactionReply") {
-                            @Override
-                            protected ActorSelection match(Object in) {
-                                if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
-                                    String cohortPath =
-                                        ReadyTransactionReply.fromSerializable(in)
-                                            .getCohortPath();
-                                    return getSystem().actorSelection(cohortPath);
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
-
-                    assertNotNull(cohort);
-
-                    System.out.println("Successfully readied the transaction");
-
-                    // 5. PreCommit the transaction
-
-                    cohort.tell(new PreCommitTransaction().toSerializable(), getRef());
-
-                    Boolean preCommitDone =
-                        new ExpectMsg<Boolean>(duration("3 seconds"), "PreCommitTransactionReply") {
-                            @Override
-                            protected Boolean match(Object in) {
-                                if (in.getClass().equals(PreCommitTransactionReply.SERIALIZABLE_CLASS)) {
-                                    return true;
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
-
-                    assertTrue(preCommitDone);
-
-                    System.out.println("Successfully pre-committed the transaction");
-
-                    // 6. Commit the transaction
-                    cohort.tell(new CommitTransaction().toSerializable(), getRef());
-
-                    // FIXME : Add assertions that the commit worked and that the cohort and transaction actors were terminated
-
-                    System.out.println("TODO : Check Successfully committed the transaction");
-                }
-
-
-            };
-        }
-
-            private ActorRef watchActor(ActorSelection actor) {
-                Future<ActorRef> future = actor
-                    .resolveOne(FiniteDuration.apply(100, "milliseconds"));
-
-                try {
-                    ActorRef actorRef = Await.result(future,
-                        FiniteDuration.apply(100, "milliseconds"));
-
-                    watch(actorRef);
-
-                    return actorRef;
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-
-            }
-        };
-
-
-    }
-}
index ec8aee2..395021d 100644 (file)
@@ -1,17 +1,12 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.event.Logging;
-import akka.testkit.JavaTestKit;
-
+import akka.actor.PoisonPill;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import junit.framework.Assert;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Before;
+import com.google.common.util.concurrent.Uninterruptibles;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
@@ -19,292 +14,268 @@ import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
+public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
-public class DistributedDataStoreIntegrationTest {
+    @Test
+    public void testWriteTransactionWithSingleShard() throws Exception{
+        System.setProperty("shard.persistent", "true");
+        new IntegrationTestKit(getSystem()) {{
+            DistributedDataStore dataStore =
+                    setupDistributedDataStore("transactionIntegrationTest", "test-1");
 
-    private static ActorSystem system;
+            testWriteTransaction(dataStore, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-    @Before
-    public void setUp() throws IOException {
-        File journal = new File("journal");
+            testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
 
-        if(journal.exists()) {
-            FileUtils.deleteDirectory(journal);
-        }
+            cleanup(dataStore);
+        }};
+    }
 
+    @Test
+    public void testWriteTransactionWithMultipleShards() throws Exception{
+        System.setProperty("shard.persistent", "true");
+        new IntegrationTestKit(getSystem()) {{
+            DistributedDataStore dataStore =
+                    setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
 
-        System.setProperty("shard.persistent", "false");
-        system = ActorSystem.create("test");
-    }
+            DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+            assertNotNull("newWriteOnlyTransaction returned null", writeTx);
 
-    @After
-    public void tearDown() {
-        JavaTestKit.shutdownActorSystem(system);
-        system = null;
-    }
+            YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH;
+            NormalizedNode<?, ?> nodeToWrite1 = CarsModel.emptyContainer();
+            writeTx.write(nodePath1, nodeToWrite1);
 
-    protected ActorSystem getSystem() {
-        return system;
-    }
+            YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH;
+            NormalizedNode<?, ?> nodeToWrite2 = PeopleModel.emptyContainer();
+            writeTx.write(nodePath2, nodeToWrite2);
 
-    @Test
-    public void integrationTest() throws Exception {
-        final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
-        ShardStrategyFactory.setConfiguration(configuration);
+            DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
 
+            Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
+            assertEquals("canCommit", true, canCommit);
+            cohort.preCommit().get(5, TimeUnit.SECONDS);
+            cohort.commit().get(5, TimeUnit.SECONDS);
 
+            // 5. Verify the data in the store
 
-        new JavaTestKit(getSystem()) {
-            {
+            DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
-                new Within(duration("10 seconds")) {
-                    @Override
-                    protected void run() {
-                        try {
-                            final DistributedDataStore distributedDataStore =
-                                new DistributedDataStore(getSystem(), "config",
-                                        new MockClusterWrapper(), configuration,
-                                        new DatastoreContext());
+            Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath1).get();
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", nodeToWrite1, optional.get());
 
-                            distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
+            optional = readTx.read(nodePath2).get();
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", nodeToWrite2, optional.get());
 
-                            // Wait for a specific log message to show up
-                            final boolean result =
-                                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
-                                    ) {
-                                    @Override
-                                    protected Boolean run() {
-                                        return true;
-                                    }
-                                }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config")
-                                    .message("Switching from state Candidate to Leader")
-                                    .occurrences(1).exec();
+            cleanup(dataStore);
+        }};
+    }
 
-                            assertEquals(true, result);
+    @Test
+    public void testReadWriteTransaction() throws Exception{
+        System.setProperty("shard.persistent", "true");
+        new IntegrationTestKit(getSystem()) {{
+            DistributedDataStore dataStore =
+                    setupDistributedDataStore("testReadWriteTransaction", "test-1");
 
-                            DOMStoreReadWriteTransaction transaction =
-                                distributedDataStore.newReadWriteTransaction();
+         // 1. Create a read-write Tx
 
-                            transaction
-                                .write(TestModel.TEST_PATH, ImmutableNodes
-                                    .containerNode(TestModel.TEST_QNAME));
+            DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
+            assertNotNull("newReadWriteTransaction returned null", readWriteTx);
 
-                            ListenableFuture<Optional<NormalizedNode<?, ?>>>
-                                future =
-                                transaction.read(TestModel.TEST_PATH);
+            // 2. Write some data
 
-                            Optional<NormalizedNode<?, ?>> optional =
-                                future.get();
+            YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
+            NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            readWriteTx.write(nodePath, nodeToWrite );
 
-                            Assert.assertTrue("Node not found", optional.isPresent());
+            // 3. Read the data from Tx
 
-                            NormalizedNode<?, ?> normalizedNode =
-                                optional.get();
+            Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
+            assertEquals("exists", true, exists);
 
-                            assertEquals(TestModel.TEST_QNAME,
-                                normalizedNode.getNodeType());
+            Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", nodeToWrite, optional.get());
 
-                            DOMStoreThreePhaseCommitCohort ready =
-                                transaction.ready();
+            // 4. Ready the Tx for commit
 
-                            ListenableFuture<Boolean> canCommit =
-                                ready.canCommit();
+            DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
 
-                            assertTrue(canCommit.get(5, TimeUnit.SECONDS));
+            // 5. Commit the Tx
 
-                            ListenableFuture<Void> preCommit =
-                                ready.preCommit();
+            Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
+            assertEquals("canCommit", true, canCommit);
+            cohort.preCommit().get(5, TimeUnit.SECONDS);
+            cohort.commit().get(5, TimeUnit.SECONDS);
 
-                            preCommit.get(5, TimeUnit.SECONDS);
+            // 6. Verify the data in the store
 
-                            ListenableFuture<Void> commit = ready.commit();
+            DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
-                            commit.get(5, TimeUnit.SECONDS);
-                        } catch (ExecutionException | TimeoutException | InterruptedException e){
-                            fail(e.getMessage());
-                        }
-                    }
-                };
-            }
-        };
+            optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", nodeToWrite, optional.get());
 
+            cleanup(dataStore);
+        }};
     }
 
     @Test
-    public void transactionChainIntegrationTest() throws Exception {
-        final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
-        ShardStrategyFactory.setConfiguration(configuration);
+    public void testTransactionAbort() throws Exception{
+        System.setProperty("shard.persistent", "true");
+        new IntegrationTestKit(getSystem()) {{
+            DistributedDataStore dataStore =
+                    setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
 
+            DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+            assertNotNull("newWriteOnlyTransaction returned null", writeTx);
 
+            writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-        new JavaTestKit(getSystem()) {
-            {
+            DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
 
-                new Within(duration("10 seconds")) {
-                    @Override
-                    protected void run() {
-                        try {
-                            final DistributedDataStore distributedDataStore =
-                                new DistributedDataStore(getSystem(), "config",
-                                    new MockClusterWrapper(), configuration,
-                                    new DatastoreContext());
+            cohort.canCommit().get(5, TimeUnit.SECONDS);
 
-                            distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
+            cohort.abort().get(5, TimeUnit.SECONDS);
 
-                            // Wait for a specific log message to show up
-                            final boolean result =
-                                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
-                                ) {
-                                    @Override
-                                    protected Boolean run() {
-                                        return true;
-                                    }
-                                }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config")
-                                    .message("Switching from state Candidate to Leader")
-                                    .occurrences(1).exec();
+            testWriteTransaction(dataStore, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-                            assertEquals(true, result);
-
-                            DOMStoreTransactionChain transactionChain =
-                                distributedDataStore.createTransactionChain();
-
-                            DOMStoreReadWriteTransaction transaction =
-                                transactionChain.newReadWriteTransaction();
+            cleanup(dataStore);
+        }};
+    }
 
-                            transaction
-                                .write(TestModel.TEST_PATH, ImmutableNodes
-                                    .containerNode(TestModel.TEST_QNAME));
+    @Test
+    public void testTransactionChain() throws Exception{
+        System.setProperty("shard.persistent", "true");
+        new IntegrationTestKit(getSystem()) {{
+            DistributedDataStore dataStore =
+                    setupDistributedDataStore("transactionChainIntegrationTest", "test-1");
 
-                            ListenableFuture<Optional<NormalizedNode<?, ?>>>
-                                future =
-                                transaction.read(TestModel.TEST_PATH);
+            // 1. Create a Tx chain and write-only Tx
 
-                            Optional<NormalizedNode<?, ?>> optional =
-                                future.get();
+            DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
 
-                            Assert.assertTrue("Node not found", optional.isPresent());
+            DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+            assertNotNull("newWriteOnlyTransaction returned null", writeTx);
 
-                            NormalizedNode<?, ?> normalizedNode =
-                                optional.get();
+            // 2. Write some data
 
-                            assertEquals(TestModel.TEST_QNAME,
-                                normalizedNode.getNodeType());
+            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            writeTx.write(TestModel.TEST_PATH, containerNode);
 
-                            DOMStoreThreePhaseCommitCohort ready =
-                                transaction.ready();
+            // 3. Ready the Tx for commit
 
-                            ListenableFuture<Boolean> canCommit =
-                                ready.canCommit();
+            DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
 
-                            assertTrue(canCommit.get(5, TimeUnit.SECONDS));
+            // 4. Commit the Tx
 
-                            ListenableFuture<Void> preCommit =
-                                ready.preCommit();
+            Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
+            assertEquals("canCommit", true, canCommit);
+            cohort.preCommit().get(5, TimeUnit.SECONDS);
+            cohort.commit().get(5, TimeUnit.SECONDS);
 
-                            preCommit.get(5, TimeUnit.SECONDS);
+            // 5. Verify the data in the store
 
-                            ListenableFuture<Void> commit = ready.commit();
+            DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
 
-                            commit.get(5, TimeUnit.SECONDS);
+            Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", containerNode, optional.get());
 
-                            transactionChain.close();
-                        } catch (ExecutionException | TimeoutException | InterruptedException e){
-                            fail(e.getMessage());
-                        }
-                    }
-                };
-            }
-        };
+            txChain.close();
 
+            cleanup(dataStore);
+        }};
     }
 
+    class IntegrationTestKit extends ShardTestKit {
 
-    //FIXME : Disabling test because it's flaky
-    //@Test
-    public void integrationTestWithMultiShardConfiguration()
-        throws ExecutionException, InterruptedException, TimeoutException {
-        final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
-
-        ShardStrategyFactory.setConfiguration(configuration);
-
-        new JavaTestKit(getSystem()) {
-            {
+        IntegrationTestKit(ActorSystem actorSystem) {
+            super(actorSystem);
+        }
 
-                new Within(duration("10 seconds")) {
-                    @Override
-                    protected void run() {
-                        try {
-                            final DistributedDataStore distributedDataStore =
-                                new DistributedDataStore(getSystem(), "config",
-                                    new MockClusterWrapper(), configuration, null);
+        DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
+            MockClusterWrapper cluster = new MockClusterWrapper();
+            Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
+            ShardStrategyFactory.setConfiguration(config);
+
+            DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
+            DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
+                    config, datastoreContext);
+
+            SchemaContext schemaContext = SchemaContextHelper.full();
+            dataStore.onGlobalContextUpdated(schemaContext);
+
+            for(String shardName: shardNames) {
+                ActorRef shard = null;
+                for(int i = 0; i < 20 * 5 && shard == null; i++) {
+                    Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+                    Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
+                    if(shardReply.isPresent()) {
+                        shard = shardReply.get();
+                    }
+                }
 
-                            distributedDataStore.onGlobalContextUpdated(
-                                SchemaContextHelper.full());
+                assertNotNull("Shard was not created", shard);
 
-                            // Wait for a specific log message to show up
-                            final boolean result =
-                                new JavaTestKit.EventFilter<Boolean>(
-                                    Logging.Info.class
-                                ) {
-                                    @Override
-                                    protected Boolean run() {
-                                        return true;
-                                    }
-                                }.from(
-                                    "akka://test/user/shardmanager-config/member-1-shard-cars-1-config")
-                                    .message(
-                                        "Switching from state Candidate to Leader")
-                                    .occurrences(1)
-                                    .exec();
+                System.out.println("!!!!!!shard: "+shard.path().toString());
+                waitUntilLeader(shard);
+            }
 
-                            Thread.sleep(1000);
+            return dataStore;
+        }
 
+        void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
+                NormalizedNode<?, ?> nodeToWrite) throws Exception {
 
-                            DOMStoreReadWriteTransaction transaction =
-                                distributedDataStore.newReadWriteTransaction();
+            // 1. Create a write-only Tx
 
-                            transaction.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
-                            transaction.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+            DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+            assertNotNull("newWriteOnlyTransaction returned null", writeTx);
 
-                            DOMStoreThreePhaseCommitCohort ready = transaction.ready();
+            // 2. Write some data
 
-                            ListenableFuture<Boolean> canCommit = ready.canCommit();
+            writeTx.write(nodePath, nodeToWrite);
 
-                            assertTrue(canCommit.get(5, TimeUnit.SECONDS));
+            // 3. Ready the Tx for commit
 
-                            ListenableFuture<Void> preCommit = ready.preCommit();
+            DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
 
-                            preCommit.get(5, TimeUnit.SECONDS);
+            // 4. Commit the Tx
 
-                            ListenableFuture<Void> commit = ready.commit();
+            Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
+            assertEquals("canCommit", true, canCommit);
+            cohort.preCommit().get(5, TimeUnit.SECONDS);
+            cohort.commit().get(5, TimeUnit.SECONDS);
 
-                            commit.get(5, TimeUnit.SECONDS);
+            // 5. Verify the data in the store
 
-                            assertEquals(true, result);
-                        } catch(ExecutionException | TimeoutException | InterruptedException e){
-                            fail(e.getMessage());
-                        }
-                    }
-                };
-            }
-        };
+            DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
+            Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", nodeToWrite, optional.get());
+        }
 
+        void cleanup(DistributedDataStore dataStore) {
+            dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
+        }
     }
 
 }
index d57a5ee..00243ea 100644 (file)
@@ -34,9 +34,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import scala.concurrent.ExecutionContextExecutor;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
-
 import java.util.concurrent.TimeUnit;
-
 import static junit.framework.TestCase.assertEquals;
 import static junit.framework.TestCase.assertNull;
 import static org.junit.Assert.assertNotNull;
@@ -88,7 +86,7 @@ public class DistributedDataStoreTest extends AbstractActorTest{
 
         new DistributedDataStore(actorSystem, "config",
             mock(ClusterWrapper.class), mock(Configuration.class),
-            new DatastoreContext());
+            DatastoreContext.newBuilder().build());
 
         verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
     }
index ed7b686..dcfa35c 100644 (file)
@@ -97,7 +97,7 @@ public class ShardManagerTest {
             {
                 final Props props = ShardManager
                         .props("config", new MockClusterWrapper(),
-                                new MockConfiguration(), new DatastoreContext());
+                                new MockConfiguration(), DatastoreContext.newBuilder().build());
 
                 final ActorRef subject = getSystem().actorOf(props);
 
@@ -114,7 +114,7 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                     .props("config", new MockClusterWrapper(),
-                            new MockConfiguration(), new DatastoreContext());
+                            new MockConfiguration(), DatastoreContext.newBuilder().build());
 
             final ActorRef subject = getSystem().actorOf(props);
 
@@ -134,7 +134,7 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                     .props("config", new MockClusterWrapper(),
-                            new MockConfiguration(), new DatastoreContext());
+                            new MockConfiguration(), DatastoreContext.newBuilder().build());
 
             final ActorRef subject = getSystem().actorOf(props);
 
@@ -163,7 +163,7 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                     .props("config", mockClusterWrapper,
-                            new MockConfiguration(), new DatastoreContext());
+                            new MockConfiguration(), DatastoreContext.newBuilder().build());
 
             final ActorRef subject = getSystem().actorOf(props);
 
@@ -194,7 +194,7 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                     .props("config", new MockClusterWrapper(),
-                            new MockConfiguration(), new DatastoreContext());
+                            new MockConfiguration(), DatastoreContext.newBuilder().build());
 
             final ActorRef subject = getSystem().actorOf(props);
 
@@ -225,7 +225,7 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                     .props("config", new MockClusterWrapper(),
-                            new MockConfiguration(), new DatastoreContext());
+                            new MockConfiguration(), DatastoreContext.newBuilder().build());
 
             final ActorRef subject = getSystem().actorOf(props);
 
@@ -253,7 +253,7 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                     .props("config", new MockClusterWrapper(),
-                            new MockConfiguration(), new DatastoreContext());
+                            new MockConfiguration(), DatastoreContext.newBuilder().build());
 
             final ActorRef subject = getSystem().actorOf(props);
 
@@ -271,7 +271,7 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                     .props("config", new MockClusterWrapper(),
-                            new MockConfiguration(), new DatastoreContext());
+                            new MockConfiguration(), DatastoreContext.newBuilder().build());
             final TestActorRef<ShardManager> subject =
                     TestActorRef.create(system, props);
 
@@ -289,7 +289,7 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                     .props("config", new MockClusterWrapper(),
-                            new MockConfiguration(), new DatastoreContext());
+                            new MockConfiguration(), DatastoreContext.newBuilder().build());
             final TestActorRef<ShardManager> subject =
                     TestActorRef.create(system, props);
 
@@ -335,7 +335,7 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                     .props("config", new MockClusterWrapper(),
-                            new MockConfiguration(), new DatastoreContext());
+                            new MockConfiguration(), DatastoreContext.newBuilder().build());
             final TestActorRef<ShardManager> subject =
                     TestActorRef.create(system, props);
 
index 2051c9d..9b4f77b 100644 (file)
@@ -1,27 +1,38 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.actor.Props;
-import akka.event.Logging;
+import akka.dispatch.Dispatchers;
+import akka.dispatch.OnComplete;
 import akka.japi.Creator;
+import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
@@ -61,7 +72,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.duration.Duration;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
@@ -69,24 +82,33 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.inOrder;
 
 public class ShardTest extends AbstractActorTest {
 
-    private static final DatastoreContext DATA_STORE_CONTEXT =
-            new DatastoreContext("", null, Duration.create(10, TimeUnit.MINUTES), 5, 3, 5000, 500);
-
     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
 
     private static final ShardIdentifier IDENTIFIER = ShardIdentifier.builder().memberName("member-1")
             .shardName("inventory").type("config").build();
 
+    private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
+
+    private static String shardName() {
+        return "shard" + NEXT_SHARD_NUM.getAndIncrement();
+    }
+
+    private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
+            shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).build();
+
     @Before
     public void setUp() {
         System.setProperty("shard.persistent", "false");
@@ -103,7 +125,7 @@ public class ShardTest extends AbstractActorTest {
 
     private Props newShardProps() {
         return Shard.props(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
-                DATA_STORE_CONTEXT, SCHEMA_CONTEXT);
+                dataStoreContext, SCHEMA_CONTEXT);
     }
 
     @Test
@@ -178,7 +200,7 @@ public class ShardTest extends AbstractActorTest {
 
             Props props = Shard.props(identifier,
                     Collections.<ShardIdentifier, String>singletonMap(identifier, null),
-                    DATA_STORE_CONTEXT, SCHEMA_CONTEXT);
+                    dataStoreContext, SCHEMA_CONTEXT);
             final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved");
 
             new Within(duration("3 seconds")) {
@@ -202,11 +224,10 @@ public class ShardTest extends AbstractActorTest {
         NormalizedNodeToNodeCodec codec =
             new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
 
-        ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(
-                TestModel.TEST_QNAME));
+        writeToStore(ref, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
-        NormalizedNode<?,?> expected = ref.underlyingActor().readStore(root);
+        NormalizedNode<?,?> expected = readStore(ref, root);
 
         NormalizedNodeMessages.Container encode = codec.encode(expected);
 
@@ -216,7 +237,7 @@ public class ShardTest extends AbstractActorTest {
 
         ref.underlyingActor().onReceiveCommand(applySnapshot);
 
-        NormalizedNode<?,?> actual = ref.underlyingActor().readStore(root);
+        NormalizedNode<?,?> actual = readStore(ref, root);
 
         assertEquals(expected, actual);
     }
@@ -236,7 +257,7 @@ public class ShardTest extends AbstractActorTest {
 
         shard.underlyingActor().onReceiveCommand(applyState);
 
-        NormalizedNode<?,?> actual = shard.underlyingActor().readStore(TestModel.TEST_PATH);
+        NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
         assertEquals("Applied state", node, actual);
     }
 
@@ -295,7 +316,7 @@ public class ShardTest extends AbstractActorTest {
             @Override
             public Shard create() throws Exception {
                 return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
-                        DATA_STORE_CONTEXT, SCHEMA_CONTEXT) {
+                        dataStoreContext, SCHEMA_CONTEXT) {
                     @Override
                     protected void onRecoveryComplete() {
                         try {
@@ -315,7 +336,7 @@ public class ShardTest extends AbstractActorTest {
 
         // Verify data in the data store.
 
-        NormalizedNode<?, ?> outerList = shard.underlyingActor().readStore(TestModel.OUTER_LIST_PATH);
+        NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
                 outerList.getValue() instanceof Iterable);
@@ -353,49 +374,682 @@ public class ShardTest extends AbstractActorTest {
         return new CompositeModificationPayload(compMod.toSerializable());
     }
 
-    @SuppressWarnings("unchecked")
+    private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
+            InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
+            MutableCompositeModification modification) {
+        return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
+    }
+
+    private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
+            InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
+            MutableCompositeModification modification,
+            final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
+
+        DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
+        tx.write(path, data);
+        final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
+        DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
+
+        doAnswer(new Answer<ListenableFuture<Boolean>>() {
+            @Override
+            public ListenableFuture<Boolean> answer(InvocationOnMock invocation) {
+                return realCohort.canCommit();
+            }
+        }).when(cohort).canCommit();
+
+        doAnswer(new Answer<ListenableFuture<Void>>() {
+            @Override
+            public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
+                if(preCommit != null) {
+                    return preCommit.apply(realCohort);
+                } else {
+                    return realCohort.preCommit();
+                }
+            }
+        }).when(cohort).preCommit();
+
+        doAnswer(new Answer<ListenableFuture<Void>>() {
+            @Override
+            public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
+                return realCohort.commit();
+            }
+        }).when(cohort).commit();
+
+        doAnswer(new Answer<ListenableFuture<Void>>() {
+            @Override
+            public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
+                return realCohort.abort();
+            }
+        }).when(cohort).abort();
+
+        modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
+
+        return cohort;
+    }
+
+    @SuppressWarnings({ "unchecked" })
     @Test
-    public void testForwardedCommitTransactionWithPersistence() throws IOException {
+    public void testConcurrentThreePhaseCommits() throws Throwable {
         System.setProperty("shard.persistent", "true");
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+
+            waitUntilLeader(shard);
+
+            // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
+
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                    modification2);
+
+            String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+                    modification3);
+
+            long timeoutSec = 5;
+            final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
+            final Timeout timeout = new Timeout(duration);
+
+            // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
+            // by the ShardTransaction.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef());
+            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
+                    expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
+
+            // Send the CanCommitTransaction message for the first Tx.
+
+            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+            // Send the ForwardedReadyTransaction for the next 2 Tx's.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+            // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
+            // processed after the first Tx completes.
+
+            Future<Object> canCommitFuture1 = Patterns.ask(shard,
+                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+
+            Future<Object> canCommitFuture2 = Patterns.ask(shard,
+                    new CanCommitTransaction(transactionID3).toSerializable(), timeout);
+
+            // Send the CommitTransaction message for the first Tx. After it completes, it should
+            // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
+
+            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Wait for the next 2 Tx's to complete.
+
+            final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
+            final CountDownLatch commitLatch = new CountDownLatch(2);
+
+            class OnFutureComplete extends OnComplete<Object> {
+                private final Class<?> expRespType;
+
+                OnFutureComplete(Class<?> expRespType) {
+                    this.expRespType = expRespType;
+                }
+
+                @Override
+                public void onComplete(Throwable error, Object resp) {
+                    if(error != null) {
+                        System.out.println(new java.util.Date()+": "+getClass().getSimpleName() + " failure: "+error);
+                        caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
+                    } else {
+                        try {
+                            assertEquals("Commit response type", expRespType, resp.getClass());
+                            onSuccess(resp);
+                        } catch (Exception e) {
+                            caughtEx.set(e);
+                        }
+                    }
+                }
+
+                void onSuccess(Object resp) throws Exception {
+                }
+            }
+
+            class OnCommitFutureComplete extends OnFutureComplete {
+                OnCommitFutureComplete() {
+                    super(CommitTransactionReply.SERIALIZABLE_CLASS);
+                }
+
+                @Override
+                public void onComplete(Throwable error, Object resp) {
+                    super.onComplete(error, resp);
+                    commitLatch.countDown();
+                }
+            }
+
+            class OnCanCommitFutureComplete extends OnFutureComplete {
+                private final String transactionID;
 
+                OnCanCommitFutureComplete(String transactionID) {
+                    super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
+                    this.transactionID = transactionID;
+                }
+
+                @Override
+                void onSuccess(Object resp) throws Exception {
+                    CanCommitTransactionReply canCommitReply =
+                            CanCommitTransactionReply.fromSerializable(resp);
+                    assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+                    Future<Object> commitFuture = Patterns.ask(shard,
+                            new CommitTransaction(transactionID).toSerializable(), timeout);
+                    commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
+                }
+            }
+
+            canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
+                    getSystem().dispatcher());
+
+            canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
+                    getSystem().dispatcher());
+
+            boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
+
+            if(caughtEx.get() != null) {
+                throw caughtEx.get();
+            }
+
+            assertEquals("Commits complete", true, done);
+
+            InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
+            inOrder.verify(cohort1).canCommit();
+            inOrder.verify(cohort1).preCommit();
+            inOrder.verify(cohort1).commit();
+            inOrder.verify(cohort2).canCommit();
+            inOrder.verify(cohort2).preCommit();
+            inOrder.verify(cohort2).commit();
+            inOrder.verify(cohort3).canCommit();
+            inOrder.verify(cohort3).preCommit();
+            inOrder.verify(cohort3).commit();
+
+            // Verify data in the data store.
+
+            NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
+            assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
+            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
+                    outerList.getValue() instanceof Iterable);
+            Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
+            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
+                       entry instanceof MapEntryNode);
+            MapEntryNode mapEntry = (MapEntryNode)entry;
+            Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
+                    mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
+            assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
+            assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
+
+            assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
+        }};
+    }
+
+    @Test
+    public void testCommitPhaseFailure() throws Throwable {
         new ShardTestKit(getSystem()) {{
-            TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
 
             waitUntilLeader(shard);
 
-            NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            // Setup 2 simulated transactions with mock cohorts. The first one fails in the
+            // commit phase.
+
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
+            doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
+            doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
+
+            FiniteDuration duration = duration("5 seconds");
+            final Timeout timeout = new Timeout(duration);
+
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class);
-            doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
+            // Send the CanCommitTransaction message for the first Tx.
+
+            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+            // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
+            // processed after the first Tx completes.
+
+            Future<Object> canCommitFuture = Patterns.ask(shard,
+                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+
+            // Send the CommitTransaction message for the first Tx. This should send back an error
+            // and trigger the 2nd Tx to proceed.
+
+            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+            // Wait for the 2nd Tx to complete the canCommit phase.
+
+            final CountDownLatch latch = new CountDownLatch(1);
+            canCommitFuture.onComplete(new OnComplete<Object>() {
+                @Override
+                public void onComplete(Throwable t, Object resp) {
+                    latch.countDown();
+                }
+            }, getSystem().dispatcher());
+
+            assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
+
+            InOrder inOrder = inOrder(cohort1, cohort2);
+            inOrder.verify(cohort1).canCommit();
+            inOrder.verify(cohort1).preCommit();
+            inOrder.verify(cohort1).commit();
+            inOrder.verify(cohort2).canCommit();
+        }};
+    }
+
+    @Test
+    public void testPreCommitPhaseFailure() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+
+            waitUntilLeader(shard);
+
+            String transactionID = "tx1";
             MutableCompositeModification modification = new MutableCompositeModification();
-            modification.addModification(new WriteModification(TestModel.TEST_PATH, node,
-                    SCHEMA_CONTEXT));
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+            doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
+
+            FiniteDuration duration = duration("5 seconds");
+
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(new ForwardedCommitTransaction(cohort, modification), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
+            // Send the CanCommitTransaction message.
 
-            verify(cohort).commit();
+            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            assertEquals("Last log index", 0, shard.underlyingActor().getShardMBean().getLastLogIndex());
+            // Send the CommitTransaction message. This should send back an error
+            // for preCommit failure.
+
+            shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+            expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+            InOrder inOrder = inOrder(cohort);
+            inOrder.verify(cohort).canCommit();
+            inOrder.verify(cohort).preCommit();
+        }};
+    }
+
+    @Test
+    public void testCanCommitPhaseFailure() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            String transactionID = "tx1";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
+
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+            // Send the CanCommitTransaction message.
+
+            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            expectMsgClass(duration, akka.actor.Status.Failure.class);
+        }};
+    }
+
+    @Test
+    public void testAbortBeforeFinishCommit() throws Throwable {
+        System.setProperty("shard.persistent", "true");
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+            final Timeout timeout = new Timeout(duration);
+
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
+            final String transactionID = "tx1";
+            final CountDownLatch abortComplete = new CountDownLatch(1);
+            Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
+                          new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
+                @Override
+                public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
+                    ListenableFuture<Void> preCommitFuture = cohort.preCommit();
+
+                    Future<Object> abortFuture = Patterns.ask(shard,
+                            new AbortTransaction(transactionID).toSerializable(), timeout);
+                    abortFuture.onComplete(new OnComplete<Object>() {
+                        @Override
+                        public void onComplete(Throwable e, Object resp) {
+                            abortComplete.countDown();
+                        }
+                    }, getSystem().dispatcher());
+
+                    return preCommitFuture;
+                }
+            };
+
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+                    modification, preCommit);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+            CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+            Future<Object> commitFuture = Patterns.ask(shard,
+                    new CommitTransaction(transactionID).toSerializable(), timeout);
+
+            assertEquals("Abort complete", true, abortComplete.await(5, TimeUnit.SECONDS));
+
+            Await.result(commitFuture, duration);
+
+            NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
+            assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
+        }};
+    }
+
+    @Test
+    public void testTransactionCommitTimeout() throws Throwable {
+        dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitTimeoutInSeconds(1).build();
+
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
+            writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+            writeToStore(shard, TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+            // Create 1st Tx - will timeout
+
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+                    modification1);
+
+            // Create 2nd Tx
+
+            String transactionID2 = "tx3";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
+                    listNodePath,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
+                    modification2);
+
+            // Ready the Tx's
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+            // canCommit 1st Tx. We don't send the commit so it should timeout.
+
+            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
+
+            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Commit the 2nd Tx.
+
+            shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            NormalizedNode<?, ?> node = readStore(shard, listNodePath);
+            assertNotNull(listNodePath + " not found", node);
+        }};
+    }
+
+    @Test
+    public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
+        dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitQueueCapacity(1).build();
+
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                    modification2);
+
+            String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
+
+            // Ready the Tx's
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+            // canCommit 1st Tx.
+
+            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // canCommit the 2nd Tx - it should get queued.
+
+            shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
+
+            // canCommit the 3rd Tx - should exceed queue capacity and fail.
+
+            shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
+            expectMsgClass(duration, akka.actor.Status.Failure.class);
+        }};
+    }
+
+    @Test
+    public void testCanCommitBeforeReadyFailure() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+
+            shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
+            expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+        }};
+    }
+
+    @Test
+    public void testAbortTransaction() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+
+            waitUntilLeader(shard);
+
+            // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
+
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
+            doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
+
+            FiniteDuration duration = duration("5 seconds");
+            final Timeout timeout = new Timeout(duration);
+
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+            // Send the CanCommitTransaction message for the first Tx.
+
+            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+            // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
+            // processed after the first Tx completes.
+
+            Future<Object> canCommitFuture = Patterns.ask(shard,
+                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+
+            // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
+            // Tx to proceed.
+
+            shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
+            expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+
+            // Wait for the 2nd Tx to complete the canCommit phase.
+
+            final CountDownLatch latch = new CountDownLatch(1);
+            canCommitFuture.onComplete(new OnComplete<Object>() {
+                @Override
+                public void onComplete(Throwable t, Object resp) {
+                    latch.countDown();
+                }
+            }, getSystem().dispatcher());
+
+            assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
+
+            InOrder inOrder = inOrder(cohort1, cohort2);
+            inOrder.verify(cohort1).canCommit();
+            inOrder.verify(cohort2).canCommit();
         }};
     }
 
     @Test
     public void testCreateSnapshot() throws IOException, InterruptedException {
         new ShardTestKit(getSystem()) {{
-            final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateSnapshot");
+            final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
+            Creator<Shard> creator = new Creator<Shard>() {
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+                            dataStoreContext, SCHEMA_CONTEXT) {
+                        @Override
+                        public void saveSnapshot(Object snapshot) {
+                            super.saveSnapshot(snapshot);
+                            latch.get().countDown();
+                        }
+                    };
+                }
+            };
 
-            waitUntilLeader(subject);
+            TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    Props.create(new DelegatingShardCreator(creator)), "testCreateSnapshot");
 
-            subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
+            waitUntilLeader(shard);
+
+            shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
 
-            waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+            assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
-            subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
+            latch.set(new CountDownLatch(1));
+            shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
 
-            waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+            assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
         }};
     }
 
@@ -467,45 +1121,42 @@ public class ShardTest extends AbstractActorTest {
         };
     }
 
-    private static final class DelegatingShardCreator implements Creator<Shard> {
-        private final Creator<Shard> delegate;
+    private NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
+            throws ExecutionException, InterruptedException {
+        DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
 
-        DelegatingShardCreator(Creator<Shard> delegate) {
-            this.delegate = delegate;
-        }
+        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
+            transaction.read(id);
 
-        @Override
-        public Shard create() throws Exception {
-            return delegate.create();
-        }
+        Optional<NormalizedNode<?, ?>> optional = future.get();
+        NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
+
+        transaction.close();
+
+        return node;
     }
 
-    private static class ShardTestKit extends JavaTestKit {
+    private void writeToStore(TestActorRef<Shard> shard, YangInstanceIdentifier id, NormalizedNode<?,?> node)
+        throws ExecutionException, InterruptedException {
+        DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
 
-        private ShardTestKit(ActorSystem actorSystem) {
-            super(actorSystem);
-        }
+        transaction.write(id, node);
 
-        protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
-            // Wait for a specific log message to show up
-            final boolean result =
-                new JavaTestKit.EventFilter<Boolean>(logLevel
-                ) {
-                    @Override
-                    protected Boolean run() {
-                        return true;
-                    }
-                }.from(subject.path().toString())
-                    .message(logMessage)
-                    .occurrences(1).exec();
+        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+        commitCohort.preCommit().get();
+        commitCohort.commit().get();
+    }
 
-            Assert.assertEquals(true, result);
+    private static final class DelegatingShardCreator implements Creator<Shard> {
+        private final Creator<Shard> delegate;
 
+        DelegatingShardCreator(Creator<Shard> delegate) {
+            this.delegate = delegate;
         }
 
-        protected void waitUntilLeader(ActorRef subject) {
-            waitForLogMessage(Logging.Info.class, subject,
-                    "Switching from state Candidate to Leader");
+        @Override
+        public Shard create() throws Exception {
+            return delegate.create();
         }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java
new file mode 100644 (file)
index 0000000..9a0e8f9
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import com.google.common.util.concurrent.Uninterruptibles;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+
+class ShardTestKit extends JavaTestKit {
+
+    ShardTestKit(ActorSystem actorSystem) {
+        super(actorSystem);
+    }
+
+    protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
+        // Wait for a specific log message to show up
+        final boolean result =
+            new JavaTestKit.EventFilter<Boolean>(logLevel
+            ) {
+                @Override
+                protected Boolean run() {
+                    return true;
+                }
+            }.from(subject.path().toString())
+                .message(logMessage)
+                .occurrences(1).exec();
+
+        Assert.assertEquals(true, result);
+
+    }
+
+    protected void waitUntilLeader(ActorRef shard) {
+        for(int i = 0; i < 20 * 5; i++) {
+            Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(5, TimeUnit.SECONDS));
+            try {
+                FindLeaderReply resp = (FindLeaderReply)Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+                if(resp.getLeaderActor() != null) {
+                    return;
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
+
+        Assert.fail("Leader not found for shard " + shard.path());
+    }
+}
\ No newline at end of file
index 869f475..17731de 100644 (file)
@@ -13,10 +13,8 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
-
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
@@ -30,11 +28,9 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
-
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
@@ -58,7 +54,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         ShardIdentifier.builder().memberName("member-1")
             .shardName("inventory").type("operational").build();
 
-    private final DatastoreContext datastoreContext = new DatastoreContext();
+    private final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
 
     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
 
@@ -68,7 +64,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
     }
 
     private ActorRef createShard(){
-        return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext(), TestModel.createTestContext()));
+        return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext,
+                TestModel.createTestContext()));
     }
 
     @Test(expected = ReadFailedException.class)
@@ -77,7 +74,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats);
+                testSchemaContext, datastoreContext, shardStats, "txn");
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -106,7 +103,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats);
+                testSchemaContext, datastoreContext, shardStats, "txn");
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -135,7 +132,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats);
+                testSchemaContext, datastoreContext, shardStats, "txn");
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -164,7 +161,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats);
+                testSchemaContext, datastoreContext, shardStats, "txn");
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -196,7 +193,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats);
+                testSchemaContext, datastoreContext, shardStats, "txn");
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -233,7 +230,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats);
+                testSchemaContext, datastoreContext, shardStats, "txn");
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props, "testNegativeMergeTransactionReady");
@@ -265,7 +262,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats);
+                testSchemaContext, datastoreContext, shardStats, "txn");
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
index 3f31591..8ce8f4d 100644 (file)
@@ -5,10 +5,8 @@ import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
@@ -35,16 +33,12 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
 import scala.concurrent.duration.Duration;
-
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -61,7 +55,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         ShardIdentifier.builder().memberName("member-1")
             .shardName("inventory").type("config").build();
 
-    private DatastoreContext datastoreContext = new DatastoreContext();
+    private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
 
     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
 
@@ -72,7 +66,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     private ActorRef createShard(){
         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-            Collections.EMPTY_MAP, new DatastoreContext(), TestModel.createTestContext()));
+            Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()));
     }
 
     @Test
@@ -80,7 +74,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats);
+                    testSchemaContext, datastoreContext, shardStats, "txn");
             final ActorRef subject = getSystem().actorOf(props, "testReadData");
 
             new Within(duration("1 seconds")) {
@@ -122,7 +116,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats);
+                    testSchemaContext, datastoreContext, shardStats, "txn");
             final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
 
             new Within(duration("1 seconds")) {
@@ -165,7 +159,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats);
+                    testSchemaContext, datastoreContext, shardStats, "txn");
             final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
 
             new Within(duration("1 seconds")) {
@@ -207,7 +201,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats);
+                    testSchemaContext, datastoreContext, shardStats, "txn");
             final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
 
             new Within(duration("1 seconds")) {
@@ -247,20 +241,20 @@ public class ShardTransactionTest extends AbstractActorTest {
     private void assertModification(final ActorRef subject,
         final Class<? extends Modification> modificationType) {
         new JavaTestKit(getSystem()) {{
-            new Within(duration("1 seconds")) {
+            new Within(duration("3 seconds")) {
                 @Override
                 protected void run() {
                     subject
-                        .tell(new ShardTransaction.GetCompositedModification(),
+                        .tell(new ShardWriteTransaction.GetCompositedModification(),
                             getRef());
 
                     final CompositeModification compositeModification =
-                        new ExpectMsg<CompositeModification>(duration("1 seconds"), "match hint") {
+                        new ExpectMsg<CompositeModification>(duration("3 seconds"), "match hint") {
                             // do not put code outside this method, will run afterwards
                             @Override
                             protected CompositeModification match(Object in) {
-                                if (in instanceof ShardTransaction.GetCompositeModificationReply) {
-                                    return ((ShardTransaction.GetCompositeModificationReply) in)
+                                if (in instanceof ShardWriteTransaction.GetCompositeModificationReply) {
+                                    return ((ShardWriteTransaction.GetCompositeModificationReply) in)
                                         .getModification();
                                 } else {
                                     throw noMatch();
@@ -284,7 +278,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats);
+                    testSchemaContext, datastoreContext, shardStats, "txn");
             final ActorRef subject =
                 getSystem().actorOf(props, "testWriteData");
 
@@ -324,7 +318,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats);
+                    testSchemaContext, datastoreContext, shardStats, "txn");
             final ActorRef subject =
                 getSystem().actorOf(props, "testMergeData");
 
@@ -365,7 +359,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats);
+                    testSchemaContext, datastoreContext, shardStats, "txn");
             final ActorRef subject =
                 getSystem().actorOf(props, "testDeleteData");
 
@@ -404,7 +398,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats);
+                    testSchemaContext, datastoreContext, shardStats, "txn");
             final ActorRef subject =
                 getSystem().actorOf(props, "testReadyTransaction");
 
@@ -442,7 +436,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats);
+                    testSchemaContext, datastoreContext, shardStats, "txn");
             final ActorRef subject =
                 getSystem().actorOf(props, "testCloseTransaction");
 
@@ -492,7 +486,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats);
+                testSchemaContext, datastoreContext, shardStats, "txn");
         final TestActorRef subject = TestActorRef.apply(props,getSystem());
 
         subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
@@ -501,14 +495,13 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testShardTransactionInactivity() {
 
-        datastoreContext = new DatastoreContext("Test",
-                InMemoryDOMDataStoreConfigProperties.getDefault(),
-                Duration.create(500, TimeUnit.MILLISECONDS), 5, 1000, 1000, 500);
+        datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
+                Duration.create(500, TimeUnit.MILLISECONDS)).build();
 
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats);
+                    testSchemaContext, datastoreContext, shardStats, "txn");
             final ActorRef subject =
                 getSystem().actorOf(props, "testShardTransactionInactivity");
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java
deleted file mode 100644 (file)
index 4e4c34b..0000000
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- *
- *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- *  This program and the accompanying materials are made available under the
- *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
- *  and is available at http://www.eclipse.org/legal/epl-v10.html
- *
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.testkit.TestActorRef;
-import akka.util.Timeout;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.when;
-
-
-public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
-
-    private static ListeningExecutorService storeExecutor =
-        MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
-
-    private static final InMemoryDOMDataStore store =
-        new InMemoryDOMDataStore("OPER", storeExecutor,
-            MoreExecutors.sameThreadExecutor());
-
-    private static final SchemaContext testSchemaContext =
-        TestModel.createTestContext();
-
-    private static final ShardIdentifier SHARD_IDENTIFIER =
-        ShardIdentifier.builder().memberName("member-1")
-            .shardName("inventory").type("config").build();
-
-    private final DatastoreContext datastoreContext = new DatastoreContext();
-
-    private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
-
-    @BeforeClass
-    public static void staticSetup() {
-        store.onGlobalContextUpdated(testSchemaContext);
-    }
-
-    private final FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
-
-    private ActorRef createShard(){
-        return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()));
-    }
-
-    @Test(expected = TestException.class)
-    public void testNegativeAbortResultsInException() throws Exception {
-
-        final ActorRef shard = createShard();
-        final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
-            .mock(DOMStoreThreePhaseCommitCohort.class);
-        final CompositeModification mockComposite =
-            Mockito.mock(CompositeModification.class);
-        final Props props =
-            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
-
-        final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
-            .create(getSystem(), props,
-                "testNegativeAbortResultsInException");
-
-        when(mockCohort.abort()).thenReturn(
-            Futures.<Void>immediateFailedFuture(new TestException()));
-
-        Future<Object> future =
-            akka.pattern.Patterns.ask(subject,
-                ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder()
-                    .build(), 3000);
-        assertTrue(future.isCompleted());
-
-        Await.result(future, ASK_RESULT_DURATION);
-    }
-
-
-    @Test(expected = OptimisticLockFailedException.class)
-    public void testNegativeCanCommitResultsInException() throws Exception {
-
-        final ActorRef shard = createShard();
-        final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
-            .mock(DOMStoreThreePhaseCommitCohort.class);
-        final CompositeModification mockComposite =
-            Mockito.mock(CompositeModification.class);
-        final Props props =
-            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
-
-        final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
-            .create(getSystem(), props,
-                "testNegativeCanCommitResultsInException");
-
-        when(mockCohort.canCommit()).thenReturn(
-            Futures
-                .<Boolean>immediateFailedFuture(
-                    new OptimisticLockFailedException("some exception")));
-
-        Future<Object> future =
-            akka.pattern.Patterns.ask(subject,
-                ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder()
-                    .build(), 3000);
-
-
-        Await.result(future, ASK_RESULT_DURATION);
-
-    }
-
-
-    @Test(expected = TestException.class)
-    public void testNegativePreCommitResultsInException() throws Exception {
-
-        final ActorRef shard = createShard();
-        final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
-            .mock(DOMStoreThreePhaseCommitCohort.class);
-        final CompositeModification mockComposite =
-            Mockito.mock(CompositeModification.class);
-        final Props props =
-            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
-
-        final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
-            .create(getSystem(), props,
-                "testNegativePreCommitResultsInException");
-
-        when(mockCohort.preCommit()).thenReturn(
-            Futures
-                .<Void>immediateFailedFuture(
-                    new TestException()));
-
-        Future<Object> future =
-            akka.pattern.Patterns.ask(subject,
-                ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder()
-                    .build(), 3000);
-
-        Await.result(future, ASK_RESULT_DURATION);
-
-    }
-
-    @Test(expected = TestException.class)
-    public void testNegativeCommitResultsInException() throws Exception {
-
-        final TestActorRef<Shard> subject = TestActorRef.create(getSystem(),
-                Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()),
-                "testNegativeCommitResultsInException");
-
-        final ActorRef shardTransaction =
-            getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
-                    testSchemaContext, datastoreContext, shardStats));
-
-        ShardTransactionMessages.WriteData writeData =
-            ShardTransactionMessages.WriteData.newBuilder()
-                .setInstanceIdentifierPathArguments(
-                    NormalizedNodeMessages.InstanceIdentifier.newBuilder()
-                        .build()).setNormalizedNode(
-                NormalizedNodeMessages.Node.newBuilder().build()
-
-            ).build();
-
-        Timeout askTimeout = new Timeout(ASK_RESULT_DURATION);
-
-        //This is done so that Modification list is updated which is used during commit
-        Future<Object> future = akka.pattern.Patterns.ask(shardTransaction, writeData, askTimeout);
-
-        //ready transaction creates the cohort so that we get into the
-        //block where in commmit is done
-        ShardTransactionMessages.ReadyTransaction readyTransaction =
-            ShardTransactionMessages.ReadyTransaction.newBuilder().build();
-
-        future = akka.pattern.Patterns.ask(shardTransaction, readyTransaction, askTimeout);
-
-        //but when the message is sent it will have the MockCommit object
-        //so that we can simulate throwing of exception
-        ForwardedCommitTransaction mockForwardCommitTransaction =
-            Mockito.mock(ForwardedCommitTransaction.class);
-        DOMStoreThreePhaseCommitCohort mockThreePhaseCommitTransaction =
-            Mockito.mock(DOMStoreThreePhaseCommitCohort.class);
-        when(mockForwardCommitTransaction.getCohort())
-            .thenReturn(mockThreePhaseCommitTransaction);
-        when(mockThreePhaseCommitTransaction.commit()).thenReturn(Futures
-            .<Void>immediateFailedFuture(
-                new TestException()));
-        Modification mockModification = Mockito.mock(
-            Modification.class);
-        when(mockForwardCommitTransaction.getModification())
-            .thenReturn(mockModification);
-
-        when(mockModification.toSerializable()).thenReturn(
-            PersistentMessages.CompositeModification.newBuilder().build());
-
-        future = akka.pattern.Patterns.ask(subject, mockForwardCommitTransaction, askTimeout);
-        Await.result(future, ASK_RESULT_DURATION);
-    }
-
-    private class TestException extends Exception {
-    }
-}
index b7ac371..46060dd 100644 (file)
@@ -194,24 +194,13 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
     @Test
     public void testPreCommit() throws Exception {
+        // Precommit is currently a no-op
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
                 new PreCommitTransactionReply());
 
         proxy.preCommit().get(5, TimeUnit.SECONDS);
-
-        verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS);
-    }
-
-    @Test(expected = ExecutionException.class)
-    public void testPreCommitWithFailure() throws Exception {
-        ThreePhaseCommitCohortProxy proxy = setupProxy(2);
-
-        setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
-                new PreCommitTransactionReply(), new RuntimeException("mock"));
-
-        proxy.preCommit().get(5, TimeUnit.SECONDS);
     }
 
     @Test
@@ -313,7 +302,6 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         proxy.commit().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
-        verifyCohortInvocations(2, PreCommitTransaction.SERIALIZABLE_CLASS);
         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
     }
 }