Merge "BUG-1521 Netconf-netty-util missing unit tests"
authorTony Tkacik <ttkacik@cisco.com>
Thu, 11 Sep 2014 11:57:24 +0000 (11:57 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 11 Sep 2014 11:57:24 +0000 (11:57 +0000)
22 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionChainMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto
opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransactionChain.proto
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Configuration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConfigurationImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConfigurationImplTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionCapabilities.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java

index 8135d837d3ad86563ad0246022941bfc6134b59d..c8cbcca6e8609834500af3c5511273b49a8ca329 100644 (file)
@@ -144,7 +144,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
             applySnapshot(ByteString.copyFrom(snapshot.getState()));
 
         } else if (message instanceof ReplicatedLogEntry) {
-            replicatedLog.append((ReplicatedLogEntry) message);
+            ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
+
+            // Apply State immediately
+            replicatedLog.append(logEntry);
+            applyState(null, "recovery", logEntry.getData());
+            context.setLastApplied(logEntry.getIndex());
+            context.setCommitIndex(logEntry.getIndex());
         } else if (message instanceof DeleteEntries) {
             replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
         } else if (message instanceof UpdateElectionTerm) {
@@ -152,7 +158,8 @@ public abstract class RaftActor extends UntypedPersistentActor {
         } else if (message instanceof RecoveryCompleted) {
             LOG.debug(
                 "RecoveryCompleted - Switching actor to Follower - " +
-                    "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+                    "Persistence Id =  " + persistenceId() +
+                    " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
                     "journal-size={}",
                 replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
                 replicatedLog.snapshotTerm, replicatedLog.size());
index 63dd5e7081603aed89168980a5f6ad6bb27de28e..d956bb174be4e159bc3fe0b415126ea39a4ba140 100644 (file)
@@ -10,6 +10,21 @@ public final class ShardTransactionChainMessages {
   }
   public interface CloseTransactionChainOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
+
+    // optional string transactionChainId = 1;
+    /**
+     * <code>optional string transactionChainId = 1;</code>
+     */
+    boolean hasTransactionChainId();
+    /**
+     * <code>optional string transactionChainId = 1;</code>
+     */
+    java.lang.String getTransactionChainId();
+    /**
+     * <code>optional string transactionChainId = 1;</code>
+     */
+    com.google.protobuf.ByteString
+        getTransactionChainIdBytes();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.mdsal.CloseTransactionChain}
@@ -44,6 +59,7 @@ public final class ShardTransactionChainMessages {
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws com.google.protobuf.InvalidProtocolBufferException {
       initFields();
+      int mutable_bitField0_ = 0;
       com.google.protobuf.UnknownFieldSet.Builder unknownFields =
           com.google.protobuf.UnknownFieldSet.newBuilder();
       try {
@@ -61,6 +77,11 @@ public final class ShardTransactionChainMessages {
               }
               break;
             }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              transactionChainId_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -100,7 +121,52 @@ public final class ShardTransactionChainMessages {
       return PARSER;
     }
 
+    private int bitField0_;
+    // optional string transactionChainId = 1;
+    public static final int TRANSACTIONCHAINID_FIELD_NUMBER = 1;
+    private java.lang.Object transactionChainId_;
+    /**
+     * <code>optional string transactionChainId = 1;</code>
+     */
+    public boolean hasTransactionChainId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional string transactionChainId = 1;</code>
+     */
+    public java.lang.String getTransactionChainId() {
+      java.lang.Object ref = transactionChainId_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs =
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          transactionChainId_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string transactionChainId = 1;</code>
+     */
+    public com.google.protobuf.ByteString
+        getTransactionChainIdBytes() {
+      java.lang.Object ref = transactionChainId_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b =
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        transactionChainId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
+      transactionChainId_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -114,6 +180,9 @@ public final class ShardTransactionChainMessages {
     public void writeTo(com.google.protobuf.CodedOutputStream output)
                         throws java.io.IOException {
       getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getTransactionChainIdBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -123,6 +192,10 @@ public final class ShardTransactionChainMessages {
       if (size != -1) return size;
 
       size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, getTransactionChainIdBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -239,6 +312,8 @@ public final class ShardTransactionChainMessages {
 
       public Builder clear() {
         super.clear();
+        transactionChainId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
         return this;
       }
 
@@ -265,6 +340,13 @@ public final class ShardTransactionChainMessages {
 
       public org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CloseTransactionChain buildPartial() {
         org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CloseTransactionChain result = new org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CloseTransactionChain(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.transactionChainId_ = transactionChainId_;
+        result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
       }
@@ -280,6 +362,11 @@ public final class ShardTransactionChainMessages {
 
       public Builder mergeFrom(org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CloseTransactionChain other) {
         if (other == org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages.CloseTransactionChain.getDefaultInstance()) return this;
+        if (other.hasTransactionChainId()) {
+          bitField0_ |= 0x00000001;
+          transactionChainId_ = other.transactionChainId_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -305,6 +392,81 @@ public final class ShardTransactionChainMessages {
         }
         return this;
       }
+      private int bitField0_;
+
+      // optional string transactionChainId = 1;
+      private java.lang.Object transactionChainId_ = "";
+      /**
+       * <code>optional string transactionChainId = 1;</code>
+       */
+      public boolean hasTransactionChainId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>optional string transactionChainId = 1;</code>
+       */
+      public java.lang.String getTransactionChainId() {
+        java.lang.Object ref = transactionChainId_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          transactionChainId_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string transactionChainId = 1;</code>
+       */
+      public com.google.protobuf.ByteString
+          getTransactionChainIdBytes() {
+        java.lang.Object ref = transactionChainId_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b =
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          transactionChainId_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string transactionChainId = 1;</code>
+       */
+      public Builder setTransactionChainId(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        transactionChainId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string transactionChainId = 1;</code>
+       */
+      public Builder clearTransactionChainId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        transactionChainId_ = getDefaultInstance().getTransactionChainId();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string transactionChainId = 1;</code>
+       */
+      public Builder setTransactionChainIdBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        transactionChainId_ = value;
+        onChanged();
+        return this;
+      }
 
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CloseTransactionChain)
     }
@@ -1444,13 +1606,14 @@ public final class ShardTransactionChainMessages {
   static {
     java.lang.String[] descriptorData = {
       "\n\033ShardTransactionChain.proto\022!org.opend" +
-      "aylight.controller.mdsal\"\027\n\025CloseTransac" +
-      "tionChain\"\034\n\032CloseTransactionChainReply\"" +
-      "\030\n\026CreateTransactionChain\";\n\033CreateTrans" +
-      "actionChainReply\022\034\n\024transactionChainPath" +
-      "\030\001 \002(\tB[\n:org.opendaylight.controller.pr" +
-      "otobuff.messages.transactionB\035ShardTrans" +
-      "actionChainMessages"
+      "aylight.controller.mdsal\"3\n\025CloseTransac" +
+      "tionChain\022\032\n\022transactionChainId\030\001 \001(\t\"\034\n" +
+      "\032CloseTransactionChainReply\"\030\n\026CreateTra" +
+      "nsactionChain\";\n\033CreateTransactionChainR" +
+      "eply\022\034\n\024transactionChainPath\030\001 \002(\tB[\n:or" +
+      "g.opendaylight.controller.protobuff.mess" +
+      "ages.transactionB\035ShardTransactionChainM" +
+      "essages"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1462,7 +1625,7 @@ public final class ShardTransactionChainMessages {
           internal_static_org_opendaylight_controller_mdsal_CloseTransactionChain_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_CloseTransactionChain_descriptor,
-              new java.lang.String[] { });
+              new java.lang.String[] { "TransactionChainId", });
           internal_static_org_opendaylight_controller_mdsal_CloseTransactionChainReply_descriptor =
             getDescriptor().getMessageTypes().get(1);
           internal_static_org_opendaylight_controller_mdsal_CloseTransactionChainReply_fieldAccessorTable = new
index ded80713fb3feba577065da17410d3408786046e..96a39bddd376a9a777be0c056693cae8a892925d 100644 (file)
@@ -653,6 +653,21 @@ public final class ShardTransactionMessages {
      * <code>required int32 transactionType = 2;</code>
      */
     int getTransactionType();
+
+    // optional string transactionChainId = 3;
+    /**
+     * <code>optional string transactionChainId = 3;</code>
+     */
+    boolean hasTransactionChainId();
+    /**
+     * <code>optional string transactionChainId = 3;</code>
+     */
+    java.lang.String getTransactionChainId();
+    /**
+     * <code>optional string transactionChainId = 3;</code>
+     */
+    com.google.protobuf.ByteString
+        getTransactionChainIdBytes();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransaction}
@@ -715,6 +730,11 @@ public final class ShardTransactionMessages {
               transactionType_ = input.readInt32();
               break;
             }
+            case 26: {
+              bitField0_ |= 0x00000004;
+              transactionChainId_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -814,9 +834,53 @@ public final class ShardTransactionMessages {
       return transactionType_;
     }
 
+    // optional string transactionChainId = 3;
+    public static final int TRANSACTIONCHAINID_FIELD_NUMBER = 3;
+    private java.lang.Object transactionChainId_;
+    /**
+     * <code>optional string transactionChainId = 3;</code>
+     */
+    public boolean hasTransactionChainId() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional string transactionChainId = 3;</code>
+     */
+    public java.lang.String getTransactionChainId() {
+      java.lang.Object ref = transactionChainId_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs =
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          transactionChainId_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string transactionChainId = 3;</code>
+     */
+    public com.google.protobuf.ByteString
+        getTransactionChainIdBytes() {
+      java.lang.Object ref = transactionChainId_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b =
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        transactionChainId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
       transactionId_ = "";
       transactionType_ = 0;
+      transactionChainId_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -844,6 +908,9 @@ public final class ShardTransactionMessages {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeInt32(2, transactionType_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBytes(3, getTransactionChainIdBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -861,6 +928,10 @@ public final class ShardTransactionMessages {
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(2, transactionType_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(3, getTransactionChainIdBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -981,6 +1052,8 @@ public final class ShardTransactionMessages {
         bitField0_ = (bitField0_ & ~0x00000001);
         transactionType_ = 0;
         bitField0_ = (bitField0_ & ~0x00000002);
+        transactionChainId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
 
@@ -1017,6 +1090,10 @@ public final class ShardTransactionMessages {
           to_bitField0_ |= 0x00000002;
         }
         result.transactionType_ = transactionType_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.transactionChainId_ = transactionChainId_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1041,6 +1118,11 @@ public final class ShardTransactionMessages {
         if (other.hasTransactionType()) {
           setTransactionType(other.getTransactionType());
         }
+        if (other.hasTransactionChainId()) {
+          bitField0_ |= 0x00000004;
+          transactionChainId_ = other.transactionChainId_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1183,6 +1265,80 @@ public final class ShardTransactionMessages {
         return this;
       }
 
+      // optional string transactionChainId = 3;
+      private java.lang.Object transactionChainId_ = "";
+      /**
+       * <code>optional string transactionChainId = 3;</code>
+       */
+      public boolean hasTransactionChainId() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional string transactionChainId = 3;</code>
+       */
+      public java.lang.String getTransactionChainId() {
+        java.lang.Object ref = transactionChainId_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          transactionChainId_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string transactionChainId = 3;</code>
+       */
+      public com.google.protobuf.ByteString
+          getTransactionChainIdBytes() {
+        java.lang.Object ref = transactionChainId_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b =
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          transactionChainId_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string transactionChainId = 3;</code>
+       */
+      public Builder setTransactionChainId(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        transactionChainId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string transactionChainId = 3;</code>
+       */
+      public Builder clearTransactionChainId() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        transactionChainId_ = getDefaultInstance().getTransactionChainId();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string transactionChainId = 3;</code>
+       */
+      public Builder setTransactionChainIdBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        transactionChainId_ = value;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransaction)
     }
 
@@ -7597,36 +7753,37 @@ public final class ShardTransactionMessages {
     java.lang.String[] descriptorData = {
       "\n\026ShardTransaction.proto\022!org.opendaylig" +
       "ht.controller.mdsal\032\014Common.proto\"\022\n\020Clo" +
-      "seTransaction\"\027\n\025CloseTransactionReply\"C" +
+      "seTransaction\"\027\n\025CloseTransactionReply\"_" +
       "\n\021CreateTransaction\022\025\n\rtransactionId\030\001 \002" +
-      "(\t\022\027\n\017transactionType\030\002 \002(\005\"M\n\026CreateTra" +
-      "nsactionReply\022\034\n\024transactionActorPath\030\001 " +
-      "\002(\t\022\025\n\rtransactionId\030\002 \002(\t\"\022\n\020ReadyTrans" +
-      "action\"*\n\025ReadyTransactionReply\022\021\n\tactor" +
-      "Path\030\001 \002(\t\"l\n\nDeleteData\022^\n\037instanceIden" +
-      "tifierPathArguments\030\001 \002(\01325.org.opendayl",
-      "ight.controller.mdsal.InstanceIdentifier" +
-      "\"\021\n\017DeleteDataReply\"j\n\010ReadData\022^\n\037insta" +
-      "nceIdentifierPathArguments\030\001 \002(\01325.org.o" +
-      "pendaylight.controller.mdsal.InstanceIde" +
-      "ntifier\"P\n\rReadDataReply\022?\n\016normalizedNo" +
-      "de\030\001 \001(\0132\'.org.opendaylight.controller.m" +
-      "dsal.Node\"\254\001\n\tWriteData\022^\n\037instanceIdent" +
-      "ifierPathArguments\030\001 \002(\01325.org.opendayli" +
-      "ght.controller.mdsal.InstanceIdentifier\022" +
-      "?\n\016normalizedNode\030\002 \002(\0132\'.org.opendaylig",
-      "ht.controller.mdsal.Node\"\020\n\016WriteDataRep" +
-      "ly\"\254\001\n\tMergeData\022^\n\037instanceIdentifierPa" +
-      "thArguments\030\001 \002(\01325.org.opendaylight.con" +
-      "troller.mdsal.InstanceIdentifier\022?\n\016norm" +
-      "alizedNode\030\002 \002(\0132\'.org.opendaylight.cont" +
-      "roller.mdsal.Node\"\020\n\016MergeDataReply\"l\n\nD" +
-      "ataExists\022^\n\037instanceIdentifierPathArgum" +
-      "ents\030\001 \002(\01325.org.opendaylight.controller" +
-      ".mdsal.InstanceIdentifier\"!\n\017DataExistsR" +
-      "eply\022\016\n\006exists\030\001 \002(\010BV\n:org.opendaylight",
-      ".controller.protobuff.messages.transacti" +
-      "onB\030ShardTransactionMessages"
+      "(\t\022\027\n\017transactionType\030\002 \002(\005\022\032\n\022transacti" +
+      "onChainId\030\003 \001(\t\"M\n\026CreateTransactionRepl" +
+      "y\022\034\n\024transactionActorPath\030\001 \002(\t\022\025\n\rtrans" +
+      "actionId\030\002 \002(\t\"\022\n\020ReadyTransaction\"*\n\025Re" +
+      "adyTransactionReply\022\021\n\tactorPath\030\001 \002(\t\"l" +
+      "\n\nDeleteData\022^\n\037instanceIdentifierPathAr",
+      "guments\030\001 \002(\01325.org.opendaylight.control" +
+      "ler.mdsal.InstanceIdentifier\"\021\n\017DeleteDa" +
+      "taReply\"j\n\010ReadData\022^\n\037instanceIdentifie" +
+      "rPathArguments\030\001 \002(\01325.org.opendaylight." +
+      "controller.mdsal.InstanceIdentifier\"P\n\rR" +
+      "eadDataReply\022?\n\016normalizedNode\030\001 \001(\0132\'.o" +
+      "rg.opendaylight.controller.mdsal.Node\"\254\001" +
+      "\n\tWriteData\022^\n\037instanceIdentifierPathArg" +
+      "uments\030\001 \002(\01325.org.opendaylight.controll" +
+      "er.mdsal.InstanceIdentifier\022?\n\016normalize",
+      "dNode\030\002 \002(\0132\'.org.opendaylight.controlle" +
+      "r.mdsal.Node\"\020\n\016WriteDataReply\"\254\001\n\tMerge" +
+      "Data\022^\n\037instanceIdentifierPathArguments\030" +
+      "\001 \002(\01325.org.opendaylight.controller.mdsa" +
+      "l.InstanceIdentifier\022?\n\016normalizedNode\030\002" +
+      " \002(\0132\'.org.opendaylight.controller.mdsal" +
+      ".Node\"\020\n\016MergeDataReply\"l\n\nDataExists\022^\n" +
+      "\037instanceIdentifierPathArguments\030\001 \002(\01325" +
+      ".org.opendaylight.controller.mdsal.Insta" +
+      "nceIdentifier\"!\n\017DataExistsReply\022\016\n\006exis",
+      "ts\030\001 \002(\010BV\n:org.opendaylight.controller." +
+      "protobuff.messages.transactionB\030ShardTra" +
+      "nsactionMessages"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -7650,7 +7807,7 @@ public final class ShardTransactionMessages {
           internal_static_org_opendaylight_controller_mdsal_CreateTransaction_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_CreateTransaction_descriptor,
-              new java.lang.String[] { "TransactionId", "TransactionType", });
+              new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", });
           internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_fieldAccessorTable = new
index 63b75ac430f9bbbaca2ad8d1ecb7c63f0cff37d2..26581478d97a1449ff88a07383e9a88c554aa9a5 100644 (file)
@@ -15,6 +15,7 @@ message CloseTransactionReply{
 message CreateTransaction{
   required string transactionId = 1;
   required int32  transactionType =2;
+  optional string transactionChainId = 3;
 }
 
 message CreateTransactionReply{
index 42f87cbda6dbd661b395efccd5ae9e729420759f..5dc67aa98fda691d0d014599104181508d94af08 100644 (file)
@@ -4,20 +4,10 @@ option java_package = "org.opendaylight.controller.protobuff.messages.transactio
 option java_outer_classname = "ShardTransactionChainMessages";
 
 message CloseTransactionChain {
-
+    optional string transactionChainId = 1;
 }
 
 message CloseTransactionChainReply{
 
-
-}
-
-message CreateTransactionChain {
-
-}
-
-message CreateTransactionChainReply{
-required string transactionChainPath = 1;
-
 }
 
index 34239070a3dfafe15c8ba7e71c630eaf56136e3b..b2646060539332fc45d58776aaf0a7efe2b0d3ff 100644 (file)
@@ -13,6 +13,7 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public interface Configuration {
 
@@ -52,4 +53,10 @@ public interface Configuration {
      * @return
      */
     List<String> getMembersFromShardName(String shardName);
+
+    /**
+     *
+     * @return
+     */
+    Set<String> getAllShardNames();
 }
index 37b565d2131debe797ca2ab975baf82550740a68..1a0a5dd6591c9a653ae2079b938d6448e59c036f 100644 (file)
@@ -23,8 +23,10 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class ConfigurationImpl implements Configuration {
 
@@ -161,6 +163,16 @@ public class ConfigurationImpl implements Configuration {
         return Collections.EMPTY_LIST;
     }
 
+    @Override public Set<String> getAllShardNames() {
+        Set<String> shardNames = new LinkedHashSet<>();
+        for(ModuleShard ms : moduleShards){
+            for(Shard s : ms.getShards()) {
+                shardNames.add(s.getName());
+            }
+        }
+        return shardNames;
+    }
+
 
 
     private void readModules(Config modulesConfig) {
index a1858f5f91c82fe20f56461138ba19daa4fa6681..0737d2020bcde75125eafebbfe963fc19f2258f7 100644 (file)
@@ -32,9 +32,9 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
@@ -61,6 +61,7 @@ import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessa
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -109,11 +110,12 @@ public class Shard extends RaftActor {
 
     private final DatastoreContext datastoreContext;
 
-
     private SchemaContext schemaContext;
 
     private ActorRef createSnapshotTransaction;
 
+    private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
+
     private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
             DatastoreContext datastoreContext, SchemaContext schemaContext) {
         super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
@@ -183,22 +185,19 @@ public class Shard extends RaftActor {
         LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
             getSender());
 
-        if (message.getClass()
-            .equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
-            if (isLeader()) {
-                createTransactionChain();
-            } else if (getLeader() != null) {
-                getLeader().forward(message, getContext());
-            }
-        } else if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+        if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
             // This must be for install snapshot. Don't want to open this up and trigger
             // deSerialization
-            self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), self());
+            self()
+                .tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
+                    self());
 
             // Send a PoisonPill instead of sending close transaction because we do not really need
             // a response
             getSender().tell(PoisonPill.getInstance(), self());
 
+        } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
+            closeTransactionChain(CloseTransactionChain.fromSerializable(message));
         } else if (message instanceof RegisterChangeListener) {
             registerChangeListener((RegisterChangeListener) message);
         } else if (message instanceof UpdateSchemaContext) {
@@ -211,6 +210,9 @@ public class Shard extends RaftActor {
                 createTransaction(CreateTransaction.fromSerializable(message));
             } else if (getLeader() != null) {
                 getLeader().forward(message, getContext());
+            } else {
+                getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(
+                    "Could not find leader so transaction cannot be created")), getSelf());
             }
         } else if (message instanceof PeerAddressResolved) {
             PeerAddressResolved resolved = (PeerAddressResolved) message;
@@ -221,9 +223,30 @@ public class Shard extends RaftActor {
         }
     }
 
+    private void closeTransactionChain(CloseTransactionChain closeTransactionChain) {
+        DOMStoreTransactionChain chain =
+            transactionChains.remove(closeTransactionChain.getTransactionChainId());
+
+        if(chain != null) {
+            chain.close();
+        }
+    }
+
     private ActorRef createTypedTransactionActor(
         int transactionType,
-        ShardTransactionIdentifier transactionId) {
+        ShardTransactionIdentifier transactionId,
+        String transactionChainId ) {
+
+        DOMStoreTransactionFactory factory = store;
+
+        if(!transactionChainId.isEmpty()) {
+            factory = transactionChains.get(transactionChainId);
+            if(factory == null){
+                DOMStoreTransactionChain transactionChain = store.createTransactionChain();
+                transactionChains.put(transactionChainId, transactionChain);
+                factory = transactionChain;
+            }
+        }
 
         if(this.schemaContext == null){
             throw new NullPointerException("schemaContext should not be null");
@@ -235,7 +258,7 @@ public class Shard extends RaftActor {
             shardMBean.incrementReadOnlyTransactionCount();
 
             return getContext().actorOf(
-                ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
+                ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
                         schemaContext,datastoreContext, shardMBean), transactionId.toString());
 
         } else if (transactionType
@@ -244,7 +267,7 @@ public class Shard extends RaftActor {
             shardMBean.incrementReadWriteTransactionCount();
 
             return getContext().actorOf(
-                ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
+                ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
                         schemaContext, datastoreContext, shardMBean), transactionId.toString());
 
 
@@ -254,7 +277,7 @@ public class Shard extends RaftActor {
             shardMBean.incrementWriteOnlyTransactionCount();
 
             return getContext().actorOf(
-                ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
+                ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
                         schemaContext, datastoreContext, shardMBean), transactionId.toString());
         } else {
             throw new IllegalArgumentException(
@@ -265,10 +288,10 @@ public class Shard extends RaftActor {
 
     private void createTransaction(CreateTransaction createTransaction) {
         createTransaction(createTransaction.getTransactionType(),
-            createTransaction.getTransactionId());
+            createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
     }
 
-    private ActorRef createTransaction(int transactionType, String remoteTransactionId) {
+    private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) {
 
         ShardTransactionIdentifier transactionId =
             ShardTransactionIdentifier.builder()
@@ -276,7 +299,7 @@ public class Shard extends RaftActor {
                 .build();
         LOG.debug("Creating transaction : {} ", transactionId);
         ActorRef transactionActor =
-            createTypedTransactionActor(transactionType, transactionId);
+            createTypedTransactionActor(transactionType, transactionId, transactionChainId);
 
         getSender()
             .tell(new CreateTransactionReply(
@@ -306,6 +329,9 @@ public class Shard extends RaftActor {
                 modification);
             DOMStoreWriteTransaction transaction =
                 store.newWriteOnlyTransaction();
+
+            LOG.debug("Created new transaction {}", transaction.getIdentifier().toString());
+
             modification.apply(transaction);
             try {
                 syncCommitTransaction(transaction);
@@ -319,6 +345,12 @@ public class Shard extends RaftActor {
             return;
         }
 
+
+        if(sender == null){
+            LOG.error("Commit failed. Sender cannot be null");
+            return;
+        }
+
         final ListenableFuture<Void> future = cohort.commit();
         final ActorRef self = getSelf();
 
@@ -458,7 +490,7 @@ public class Shard extends RaftActor {
             // so that this actor does not get block building the snapshot
             createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
-                "createSnapshot");
+                "createSnapshot", "");
 
             createSnapshotTransaction.tell(
                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
@@ -499,6 +531,16 @@ public class Shard extends RaftActor {
 
         shardMBean.setRaftState(getRaftState().name());
         shardMBean.setCurrentTerm(getCurrentTerm());
+
+        // If this actor is no longer the leader close all the transaction chains
+        if(!isLeader()){
+            for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
+                LOG.debug("onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", entry.getKey(), getId());
+                entry.getValue().close();
+            }
+
+            transactionChains.clear();
+        }
     }
 
     @Override public String persistenceId() {
index 9b4610a99c4a6e5977115f560d12b551131e9be5..b74c89d727c2a1761a7c993272abcab1e4ad9999 100644 (file)
@@ -8,43 +8,72 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorPath;
+import akka.dispatch.Futures;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.Collections;
+import java.util.List;
 
 /**
  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
  */
 public class TransactionChainProxy implements DOMStoreTransactionChain{
     private final ActorContext actorContext;
+    private final String transactionChainId;
+    private volatile List<Future<ActorPath>> cohortPathFutures = Collections.emptyList();
 
     public TransactionChainProxy(ActorContext actorContext) {
         this.actorContext = actorContext;
+        transactionChainId = actorContext.getCurrentMemberName() + "-" + System.currentTimeMillis();
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.READ_ONLY);
+            TransactionProxy.TransactionType.READ_ONLY, this);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.READ_WRITE);
+            TransactionProxy.TransactionType.READ_WRITE, this);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.WRITE_ONLY);
+            TransactionProxy.TransactionType.WRITE_ONLY, this);
     }
 
     @Override
     public void close() {
-        // FIXME : The problem here is don't know which shard the transaction chain is to be created on ???
-        throw new UnsupportedOperationException("close - not sure what to do here?");
+        // Send a close transaction chain request to each and every shard
+        actorContext.broadcast(new CloseTransactionChain(transactionChainId));
+    }
+
+    public String getTransactionChainId() {
+        return transactionChainId;
+    }
+
+    public void onTransactionReady(List<Future<ActorPath>> cohortPathFutures){
+        this.cohortPathFutures = cohortPathFutures;
+    }
+
+    public void waitTillCurrentTransactionReady(){
+        try {
+            Await.result(Futures
+                .sequence(this.cohortPathFutures, actorContext.getActorSystem().dispatcher()),
+                actorContext.getOperationDuration());
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed when waiting for transaction on a chain to become ready", e);
+        }
     }
 }
index a8b20c030e1dd34f274ce2e02b56669739824af3..97a9ff0bf379ef3b8f6568ed37603ed285ba35a5 100644 (file)
@@ -71,6 +71,11 @@ import java.util.concurrent.atomic.AtomicLong;
  * </p>
  */
 public class TransactionProxy implements DOMStoreReadWriteTransaction {
+
+    private final TransactionChainProxy transactionChainProxy;
+
+
+
     public enum TransactionType {
         READ_ONLY,
         WRITE_ONLY,
@@ -177,12 +182,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private boolean inReadyState;
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
+        this(actorContext, transactionType, null);
+    }
+
+    @VisibleForTesting
+    List<Future<Object>> getRecordedOperationFutures() {
+        List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
+        for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+            recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+        }
+
+        return recordedOperationFutures;
+    }
+
+    public TransactionProxy(ActorContext actorContext, TransactionType transactionType, TransactionChainProxy transactionChainProxy) {
         this.actorContext = Preconditions.checkNotNull(actorContext,
-                "actorContext should not be null");
+            "actorContext should not be null");
         this.transactionType = Preconditions.checkNotNull(transactionType,
-                "transactionType should not be null");
+            "transactionType should not be null");
         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
-                "schemaContext should not be null");
+            "schemaContext should not be null");
+        this.transactionChainProxy = transactionChainProxy;
 
         String memberName = actorContext.getCurrentMemberName();
         if(memberName == null){
@@ -190,7 +210,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
 
         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
-                counter.getAndIncrement()).build();
+            counter.getAndIncrement()).build();
 
         if(transactionType == TransactionType.READ_ONLY) {
             // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
@@ -201,23 +221,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             remoteTransactionActorsMB = new AtomicBoolean();
 
             TransactionProxyCleanupPhantomReference cleanup =
-                                              new TransactionProxyCleanupPhantomReference(this);
+                new TransactionProxyCleanupPhantomReference(this);
             phantomReferenceCache.put(cleanup, cleanup);
         }
 
         LOG.debug("Created txn {} of type {}", identifier, transactionType);
     }
 
-    @VisibleForTesting
-    List<Future<Object>> getRecordedOperationFutures() {
-        List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
-        for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
-            recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
-        }
-
-        return recordedOperationFutures;
-    }
-
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
             final YangInstanceIdentifier path) {
@@ -308,6 +318,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             cohortPathFutures.add(transactionContext.readyTransaction());
         }
 
+        if(transactionChainProxy != null){
+            transactionChainProxy.onTransactionReady(cohortPathFutures);
+        }
+
         return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
                 identifier.toString());
     }
@@ -340,20 +354,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         return ShardStrategyFactory.getStrategy(path).findShard(path);
     }
 
-    private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
+    private void createTransactionIfMissing(ActorContext actorContext,
+        YangInstanceIdentifier path) {
+
+        if(transactionChainProxy != null){
+            transactionChainProxy.waitTillCurrentTransactionReady();
+        }
+
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
         TransactionContext transactionContext =
             remoteTransactionPaths.get(shardName);
 
-        if(transactionContext != null){
+        if (transactionContext != null) {
             // A transaction already exists with that shard
             return;
         }
 
         try {
             Object response = actorContext.executeShardOperation(shardName,
-                new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable());
+                new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
+                    getTransactionChainId()).toSerializable());
             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
                 CreateTransactionReply reply =
                     CreateTransactionReply.fromSerializable(response);
@@ -364,7 +385,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
 
-                if(transactionType == TransactionType.READ_ONLY) {
+                if (transactionType == TransactionType.READ_ONLY) {
                     // Add the actor to the remoteTransactionActors list for access by the
                     // cleanup PhantonReference.
                     remoteTransactionActors.add(transactionActor);
@@ -375,19 +396,28 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
 
                 transactionContext = new TransactionContextImpl(shardName, transactionPath,
-                        transactionActor, identifier, actorContext, schemaContext);
+                    transactionActor, identifier, actorContext, schemaContext);
 
                 remoteTransactionPaths.put(shardName, transactionContext);
             } else {
                 throw new IllegalArgumentException(String.format(
-                        "Invalid reply type {} for CreateTransaction", response.getClass()));
+                    "Invalid reply type {} for CreateTransaction", response.getClass()));
             }
-        } catch(Exception e){
+        } catch (Exception e) {
             LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
-            remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e, identifier));
+            remoteTransactionPaths
+                .put(shardName, new NoOpTransactionContext(shardName, e, identifier));
         }
     }
 
+    public String getTransactionChainId() {
+        if(transactionChainProxy == null){
+            return "";
+        }
+        return transactionChainProxy.getTransactionChainId();
+    }
+
+
     private interface TransactionContext {
         String getShardName();
 
index efa51fde2090c3762f8d32e5d75b2a4a50c77757..74de6c5aeacd3a39e777512980745d9190d7838c 100644 (file)
@@ -10,10 +10,29 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages;
 
-public class CloseTransactionChain implements SerializableMessage{
-  public static final Class SERIALIZABLE_CLASS = ShardTransactionChainMessages.CloseTransactionChain.class;
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionChainMessages.CloseTransactionChain.newBuilder().build();
-  }
+public class CloseTransactionChain implements SerializableMessage {
+    public static final Class SERIALIZABLE_CLASS =
+        ShardTransactionChainMessages.CloseTransactionChain.class;
+    private final String transactionChainId;
+
+    public CloseTransactionChain(String transactionChainId){
+        this.transactionChainId = transactionChainId;
+    }
+
+    @Override
+    public Object toSerializable() {
+        return ShardTransactionChainMessages.CloseTransactionChain.newBuilder()
+            .setTransactionChainId(transactionChainId).build();
+    }
+
+    public static CloseTransactionChain fromSerializable(Object message){
+        ShardTransactionChainMessages.CloseTransactionChain closeTransactionChain
+            = (ShardTransactionChainMessages.CloseTransactionChain) message;
+
+        return new CloseTransactionChain(closeTransactionChain.getTransactionChainId());
+    }
+
+    public String getTransactionChainId() {
+        return transactionChainId;
+    }
 }
index d5c9e21611af20df37bb1999d00ead9a44495fda..361d406ac80dda74fe33950ace971ece600b3d7a 100644 (file)
@@ -13,30 +13,48 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 
 
 public class CreateTransaction implements SerializableMessage {
-  public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransaction.class;
-  private final String transactionId;
-  private final int transactionType;
-
-  public CreateTransaction(String transactionId, int transactionType){
-
-    this.transactionId = transactionId;
-    this.transactionType = transactionType;
-  }
-
-  public String getTransactionId() {
-    return transactionId;
-  }
-
-  public int getTransactionType() { return transactionType;}
-
-  @Override
-  public Object toSerializable() {
-    return  ShardTransactionMessages.CreateTransaction.newBuilder().setTransactionId(transactionId).setTransactionType(transactionType).build();
-  }
-
-  public static CreateTransaction fromSerializable(Object message){
-    ShardTransactionMessages.CreateTransaction createTransaction = (ShardTransactionMessages.CreateTransaction)message;
-    return new CreateTransaction(createTransaction.getTransactionId(),createTransaction.getTransactionType() );
-  }
-
+    public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransaction.class;
+    private final String transactionId;
+    private final int transactionType;
+    private final String transactionChainId;
+
+    public CreateTransaction(String transactionId, int transactionType) {
+        this(transactionId, transactionType, "");
+    }
+
+    public CreateTransaction(String transactionId, int transactionType, String transactionChainId) {
+
+        this.transactionId = transactionId;
+        this.transactionType = transactionType;
+        this.transactionChainId = transactionChainId;
+
+    }
+
+
+    public String getTransactionId() {
+        return transactionId;
+    }
+
+    public int getTransactionType() {
+        return transactionType;
+    }
+
+    @Override
+    public Object toSerializable() {
+        return ShardTransactionMessages.CreateTransaction.newBuilder()
+            .setTransactionId(transactionId)
+            .setTransactionType(transactionType)
+            .setTransactionChainId(transactionChainId).build();
+    }
+
+    public static CreateTransaction fromSerializable(Object message) {
+        ShardTransactionMessages.CreateTransaction createTransaction =
+            (ShardTransactionMessages.CreateTransaction) message;
+        return new CreateTransaction(createTransaction.getTransactionId(),
+            createTransaction.getTransactionType(), createTransaction.getTransactionChainId());
+    }
+
+    public String getTransactionChainId() {
+        return transactionChainId;
+    }
 }
index b87dc4f608b191c21d5fa34b8bf4a9744864f96a..c989b275df3105480b035b7972e83c0822b7182d 100644 (file)
@@ -176,7 +176,8 @@ public class ActorContext {
      */
     public Object executeRemoteOperation(ActorSelection actor, Object message) {
 
-        LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+        LOG.debug("Sending remote message {} to {}", message.getClass().toString(),
+            actor.toString());
 
         Future<Object> future = ask(actor, message, operationTimeout);
 
@@ -213,6 +214,13 @@ public class ActorContext {
         actor.tell(message, ActorRef.noSender());
     }
 
+    public void sendShardOperationAsync(String shardName, Object message) {
+        ActorSelection primary = findPrimary(shardName);
+
+        primary.tell(message, ActorRef.noSender());
+    }
+
+
     /**
      * Execute an operation on the primary for a given shard
      * <p>
@@ -295,4 +303,22 @@ public class ActorContext {
         return clusterWrapper.getCurrentMemberName();
     }
 
+    /**
+     * Send the message to each and every shard
+     *
+     * @param message
+     */
+    public void broadcast(Object message){
+        for(String shardName : configuration.getAllShardNames()){
+            try {
+                sendShardOperationAsync(shardName, message);
+            } catch(Exception e){
+                LOG.warn("broadcast failed to send message " +  message.getClass().getSimpleName() + " to shard " + shardName, e);
+            }
+        }
+    }
+
+    public FiniteDuration getOperationDuration() {
+        return operationDuration;
+    }
 }
index 50367e66ce3b759325d20228a63d0f7c2eeaab5d..7b826302f588ad76be85f54c9e17bcc1c8dff56a 100644 (file)
@@ -14,13 +14,10 @@ import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.testkit.JavaTestKit;
-
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
@@ -32,7 +29,6 @@ import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -87,31 +83,8 @@ public class BasicIntegrationTest extends AbstractActorTest {
 
                     assertEquals(true, result);
 
-                    // 1. Create a TransactionChain
-                    shard.tell(new CreateTransactionChain().toSerializable(), getRef());
-
-                    final ActorSelection transactionChain =
-                        new ExpectMsg<ActorSelection>(duration("3 seconds"), "CreateTransactionChainReply") {
-                            @Override
-                            protected ActorSelection match(Object in) {
-                                if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)) {
-                                    ActorPath transactionChainPath =
-                                        CreateTransactionChainReply.fromSerializable(getSystem(),in)
-                                            .getTransactionChainPath();
-                                    return getSystem()
-                                        .actorSelection(transactionChainPath);
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
-
-                    assertNotNull(transactionChain);
-
-                    System.out.println("Successfully created transaction chain");
-
-                    // 2. Create a Transaction on the TransactionChain
-                    transactionChain.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.WRITE_ONLY.ordinal() ).toSerializable(), getRef());
+                    // Create a transaction on the shard
+                    shard.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.WRITE_ONLY.ordinal() ).toSerializable(), getRef());
 
                     final ActorSelection transaction =
                         new ExpectMsg<ActorSelection>(duration("3 seconds"), "CreateTransactionReply") {
index 17329611b00d6b010302eaa9d9ecf503972eb7e0..8c253596b8b752c264f81cced7090087fc02879b 100644 (file)
@@ -7,6 +7,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.util.List;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -83,4 +84,15 @@ public class ConfigurationImplTest {
         File f = new File("./module-shards.conf");
         ConfigFactory.parseFile(f);
     }
+
+    @Test
+    public void testGetAllShardNames(){
+        Set<String> allShardNames = configuration.getAllShardNames();
+
+        assertEquals(4, allShardNames.size());
+        assertTrue(allShardNames.contains("default"));
+        assertTrue(allShardNames.contains("people-1"));
+        assertTrue(allShardNames.contains("cars-1"));
+        assertTrue(allShardNames.contains("test-1"));
+    }
 }
index 8a7b50d20c9003682b0ec6a2626b1c342ea73a03..ec8aee2b09d4dfaf4ed4b5b732a9168783beb73c 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
@@ -144,6 +145,94 @@ public class DistributedDataStoreIntegrationTest {
 
     }
 
+    @Test
+    public void transactionChainIntegrationTest() throws Exception {
+        final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
+        ShardStrategyFactory.setConfiguration(configuration);
+
+
+
+        new JavaTestKit(getSystem()) {
+            {
+
+                new Within(duration("10 seconds")) {
+                    @Override
+                    protected void run() {
+                        try {
+                            final DistributedDataStore distributedDataStore =
+                                new DistributedDataStore(getSystem(), "config",
+                                    new MockClusterWrapper(), configuration,
+                                    new DatastoreContext());
+
+                            distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
+
+                            // Wait for a specific log message to show up
+                            final boolean result =
+                                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
+                                ) {
+                                    @Override
+                                    protected Boolean run() {
+                                        return true;
+                                    }
+                                }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config")
+                                    .message("Switching from state Candidate to Leader")
+                                    .occurrences(1).exec();
+
+                            assertEquals(true, result);
+
+                            DOMStoreTransactionChain transactionChain =
+                                distributedDataStore.createTransactionChain();
+
+                            DOMStoreReadWriteTransaction transaction =
+                                transactionChain.newReadWriteTransaction();
+
+                            transaction
+                                .write(TestModel.TEST_PATH, ImmutableNodes
+                                    .containerNode(TestModel.TEST_QNAME));
+
+                            ListenableFuture<Optional<NormalizedNode<?, ?>>>
+                                future =
+                                transaction.read(TestModel.TEST_PATH);
+
+                            Optional<NormalizedNode<?, ?>> optional =
+                                future.get();
+
+                            Assert.assertTrue("Node not found", optional.isPresent());
+
+                            NormalizedNode<?, ?> normalizedNode =
+                                optional.get();
+
+                            assertEquals(TestModel.TEST_QNAME,
+                                normalizedNode.getNodeType());
+
+                            DOMStoreThreePhaseCommitCohort ready =
+                                transaction.ready();
+
+                            ListenableFuture<Boolean> canCommit =
+                                ready.canCommit();
+
+                            assertTrue(canCommit.get(5, TimeUnit.SECONDS));
+
+                            ListenableFuture<Void> preCommit =
+                                ready.preCommit();
+
+                            preCommit.get(5, TimeUnit.SECONDS);
+
+                            ListenableFuture<Void> commit = ready.commit();
+
+                            commit.get(5, TimeUnit.SECONDS);
+
+                            transactionChain.close();
+                        } catch (ExecutionException | TimeoutException | InterruptedException e){
+                            fail(e.getMessage());
+                        }
+                    }
+                };
+            }
+        };
+
+    }
+
 
     //FIXME : Disabling test because it's flaky
     //@Test
index 766dcb72681d3bc5ea945e0232fa25c32f1e76d5..06bcac8d786b943a0bf12087c31c0ba659c1f017 100644 (file)
@@ -14,8 +14,6 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -53,65 +51,6 @@ public class ShardTest extends AbstractActorTest {
 
     private static final DatastoreContext DATA_STORE_CONTEXT = new DatastoreContext();
 
-    @Test
-    public void testOnReceiveCreateTransactionChain() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            final ShardIdentifier identifier =
-                ShardIdentifier.builder().memberName("member-1")
-                    .shardName("inventory").type("config").build();
-
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
-            final ActorRef subject =
-                getSystem().actorOf(props, "testCreateTransactionChain");
-
-
-            // Wait for a specific log message to show up
-            final boolean result =
-                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
-                ) {
-                    @Override
-                    protected Boolean run() {
-                        return true;
-                    }
-                }.from(subject.path().toString())
-                    .message("Switching from state Candidate to Leader")
-                    .occurrences(1).exec();
-
-            Assert.assertEquals(true, result);
-
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
-
-                    subject.tell(new CreateTransactionChain().toSerializable(), getRef());
-
-                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected String match(Object in) {
-                            if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){
-                                CreateTransactionChainReply reply =
-                                    CreateTransactionChainReply.fromSerializable(getSystem(),in);
-                                return reply.getTransactionChainPath()
-                                    .toString();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
-
-                    assertEquals("Unexpected transaction path " + out,
-                        "akka://test/user/testCreateTransactionChain/$a",
-                        out);
-
-                    expectNoMsg();
-                }
-
-
-            };
-        }};
-    }
-
     @Test
     public void testOnReceiveRegisterListener() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -233,6 +172,65 @@ public class ShardTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testCreateTransactionOnChain(){
+        new JavaTestKit(getSystem()) {{
+            final ShardIdentifier identifier =
+                ShardIdentifier.builder().memberName("member-1")
+                    .shardName("inventory").type("config").build();
+
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
+            final ActorRef subject =
+                getSystem().actorOf(props, "testCreateTransactionOnChain");
+
+            // Wait for a specific log message to show up
+            final boolean result =
+                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
+                ) {
+                    @Override
+                    protected Boolean run() {
+                        return true;
+                    }
+                }.from(subject.path().toString())
+                    .message("Switching from state Candidate to Leader")
+                    .occurrences(1).exec();
+
+            Assert.assertEquals(true, result);
+
+            new Within(duration("3 seconds")) {
+                @Override
+                protected void run() {
+
+                    subject.tell(
+                        new UpdateSchemaContext(TestModel.createTestContext()),
+                        getRef());
+
+                    subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
+                        getRef());
+
+                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
+                        // do not put code outside this method, will run afterwards
+                        @Override
+                        protected String match(Object in) {
+                            if (in instanceof CreateTransactionReply) {
+                                CreateTransactionReply reply =
+                                    (CreateTransactionReply) in;
+                                return reply.getTransactionActorPath()
+                                    .toString();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    assertTrue("Unexpected transaction path " + out,
+                        out.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
+                    expectNoMsg();
+                }
+            };
+        }};
+    }
+
     @Test
     public void testPeerAddressResolved(){
         new JavaTestKit(getSystem()) {{
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java
deleted file mode 100644 (file)
index c5968c3..0000000
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
-import static org.junit.Assert.assertEquals;
-
-public class ShardTransactionChainTest extends AbstractActorTest {
-
-    private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
-
-    private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor,
-            MoreExecutors.sameThreadExecutor());
-
-    private static final SchemaContext testSchemaContext = TestModel.createTestContext();
-
-    private static final DatastoreContext DATA_STORE_CONTEXT = new DatastoreContext();
-
-    private static final String mockShardName = "mockShardName";
-
-    private final ShardStats shardStats = new ShardStats(mockShardName, "DataStore");
-
-    @BeforeClass
-    public static void staticSetup() {
-        store.onGlobalContextUpdated(testSchemaContext);
-    }
-
-    @Test
-    public void testOnReceiveCreateTransaction() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            final Props props = ShardTransactionChain.props(store.createTransactionChain(),
-                    testSchemaContext, DATA_STORE_CONTEXT, shardStats);
-            final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
-
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
-
-                    subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
-
-                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected String match(Object in) {
-                            if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
-                                return CreateTransactionReply.fromSerializable(in).getTransactionPath();
-                            }else{
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
-
-                    assertEquals("Unexpected transaction path " + out,
-                            "akka://test/user/testCreateTransaction/shard-txn-1",
-                            out);
-
-                    // Will wait for the rest of the 3 seconds
-                    expectNoMsg();
-                }
-
-
-            };
-        }};
-    }
-
-    @Test
-    public void testOnReceiveCloseTransactionChain() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            final Props props = ShardTransactionChain.props(store.createTransactionChain(),
-                    testSchemaContext, DATA_STORE_CONTEXT, shardStats );
-            final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain");
-
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
-
-                    subject.tell(new CloseTransactionChain().toSerializable(), getRef());
-
-                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected String match(Object in) {
-                            if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) {
-                                return "match";
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get(); // this extracts the received message
-
-                    assertEquals("match", out);
-                    // Will wait for the rest of the 3 seconds
-                    expectNoMsg();
-                }
-
-
-            };
-        }};
-    }
-}
index 93145bdd6d86360070dce5102dbd968c6ebc0629..4cca1bf9ad6698e9daa32409ced7055836cee351 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore;
 
-import static org.mockito.Mockito.doReturn;
-
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
@@ -23,9 +20,15 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 public class TransactionChainProxyTest {
-    ActorContext actorContext = Mockito.mock(ActorContext.class);
-    SchemaContext schemaContext = Mockito.mock(SchemaContext.class);
+    ActorContext actorContext = mock(ActorContext.class);
+    SchemaContext schemaContext = mock(SchemaContext.class);
 
     @Before
     public void setUp() {
@@ -57,8 +60,12 @@ public class TransactionChainProxyTest {
 
     }
 
-    @Test(expected=UnsupportedOperationException.class)
+    @Test
     public void testClose() throws Exception {
-        new TransactionChainProxy(actorContext).close();
+        ActorContext context = mock(ActorContext.class);
+
+        new TransactionChainProxy(context).close();
+
+        verify(context, times(1)).broadcast(anyObject());
     }
 }
index 8d49c6fac32bb85213a5c7a5906352e62bc41e58..06c5767bd030c1b5872969025902566a7cc3e27c 100644 (file)
@@ -16,6 +16,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class MockConfiguration implements Configuration{
     @Override public List<String> getMemberShardNames(String memberName) {
@@ -46,4 +47,8 @@ public class MockConfiguration implements Configuration{
 
         return Collections.EMPTY_LIST;
     }
+
+    @Override public Set<String> getAllShardNames() {
+        return Collections.emptySet();
+    }
 }
index 0999efff0f6aed20bba510d2de207e316166c3df..2642116927cde25de425d8de95884cb5d0ae3b57 100644 (file)
@@ -119,6 +119,10 @@ public final class NetconfSessionCapabilities {
         return fromStrings(session.getServerCapabilities());
     }
 
+    private static final QName cachedQName(String namespace, String revision, String moduleName) {
+        return QName.cachedReference(QName.create(namespace, revision, moduleName));
+    }
+
     public static NetconfSessionCapabilities fromStrings(final Collection<String> capabilities) {
         final Set<QName> moduleBasedCaps = new HashSet<>();
         final Set<String> nonModuleCaps = Sets.newHashSet(capabilities);
@@ -138,7 +142,7 @@ public final class NetconfSessionCapabilities {
 
             String revision = REVISION_PARAM.from(queryParams);
             if (revision != null) {
-                moduleBasedCaps.add(QName.create(namespace, revision, moduleName));
+                moduleBasedCaps.add(cachedQName(namespace, revision, moduleName));
                 nonModuleCaps.remove(capability);
                 continue;
             }
@@ -158,7 +162,7 @@ public final class NetconfSessionCapabilities {
             }
 
             // FIXME: do we really want to continue here?
-            moduleBasedCaps.add(QName.cachedReference(QName.create(namespace, revision, moduleName)));
+            moduleBasedCaps.add(cachedQName(namespace, revision, moduleName));
             nonModuleCaps.remove(capability);
         }
 
index 5d8c910afc31fa9d6420fc6d3a67466c34924317..a95a64b2c23d2011979726d24e95a3e382c06ce2 100644 (file)
@@ -17,6 +17,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import java.math.BigInteger;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -137,13 +138,24 @@ public class RestconfImpl implements RestconfService {
 
     private static final String SCOPE_PARAM_NAME = "scope";
 
+    private static final String NETCONF_BASE = "urn:ietf:params:xml:ns:netconf:base:1.0";
+
+    private static final String NETCONF_BASE_PAYLOAD_NAME = "data";
+
+    private static final QName NETCONF_BASE_QNAME;
+
     static {
         try {
             EVENT_SUBSCRIPTION_AUGMENT_REVISION = new SimpleDateFormat("yyyy-MM-dd").parse("2014-07-08");
+            NETCONF_BASE_QNAME = QName.create(QNameModule.create(new URI(NETCONF_BASE), null), NETCONF_BASE_PAYLOAD_NAME );
         } catch (ParseException e) {
             throw new RestconfDocumentedException(
                     "It wasn't possible to convert revision date of sal-remote-augment to date", ErrorType.APPLICATION,
                     ErrorTag.OPERATION_FAILED);
+        } catch (URISyntaxException e) {
+            throw new RestconfDocumentedException(
+                    "It wasn't possible to create instance of URI class with "+NETCONF_BASE+" URI", ErrorType.APPLICATION,
+                    ErrorTag.OPERATION_FAILED);
         }
     }
 
@@ -705,11 +717,13 @@ public class RestconfImpl implements RestconfService {
         validateInput(iiWithData.getSchemaNode(), payload);
 
         DOMMountPoint mountPoint = iiWithData.getMountPoint();
+        validateTopLevelNodeName(payload, iiWithData.getInstanceIdentifier());
         final CompositeNode value = this.normalizeNode(payload, iiWithData.getSchemaNode(), mountPoint);
         validateListKeysEqualityInPayloadAndUri(iiWithData, value);
         final NormalizedNode<?, ?> datastoreNormalizedNode = compositeNodeToDatastoreNormalizedNode(value,
                 iiWithData.getSchemaNode());
 
+
         YangInstanceIdentifier normalizedII;
         if (mountPoint != null) {
             normalizedII = new DataNormalizer(mountPoint.getSchemaContext()).toNormalized(
@@ -760,6 +774,29 @@ public class RestconfImpl implements RestconfService {
         return Response.status(Status.OK).build();
     }
 
+    private void validateTopLevelNodeName(final Node<?> node,
+            final YangInstanceIdentifier identifier) {
+        final String payloadName = getName(node);
+        final Iterator<PathArgument> pathArguments = identifier.getReversePathArguments().iterator();
+
+        //no arguments
+        if (!pathArguments.hasNext()) {
+            //no "data" payload
+            if (!node.getNodeType().equals(NETCONF_BASE_QNAME)) {
+                throw new RestconfDocumentedException("Instance identifier has to contain at least one path argument",
+                        ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE);
+            }
+        //any arguments
+        } else {
+            final String identifierName = pathArguments.next().getNodeType().getLocalName();
+            if (!payloadName.equals(identifierName)) {
+                throw new RestconfDocumentedException("Payload name (" + payloadName
+                        + ") is different from identifier name (" + identifierName + ")", ErrorType.PROTOCOL,
+                        ErrorTag.MALFORMED_MESSAGE);
+            }
+        }
+    }
+
     /**
      * Validates whether keys in {@code payload} are equal to values of keys in {@code iiWithData} for list schema node
      *