Merge "Fix missing dependencies in opendaylight-startup archetype"
authorTom Pantelis <tpanteli@brocade.com>
Mon, 26 Jan 2015 19:42:51 +0000 (19:42 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 26 Jan 2015 19:42:51 +0000 (19:42 +0000)
73 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/RpcInvocationStrategy.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/ValueTypes.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/NormalizedNodeSerializerTest.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReaderWriterTest.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/util/TestModel.java
opendaylight/md-sal/sal-clustering-commons/src/test/resources/odl-datastore-test.yang
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapperImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionChainReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChanged.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyExternalizable.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ModifyData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java

index 684c3ac30ecb5cbcb441f90bf430e63e0b5267a7..8f416b3abc45145e2f95307332052b66cdb4b5a1 100644 (file)
@@ -75,9 +75,8 @@ public class ExampleActor extends RaftActor {
 
         } else if (message instanceof PrintRole) {
             if(LOG.isDebugEnabled()) {
-                String followers = "";
                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
-                    followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
+                    final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
                         getRaftActorContext().getPeerAddresses().keySet(), followers);
                 } else {
index 653520c2e47db4be19de53c9e3a42099904298d8..d1c3fefee8309208a6df6fe9b539a10ee000ddef 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.raft;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -19,13 +20,13 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     // We define this as ArrayList so we can use ensureCapacity.
     protected ArrayList<ReplicatedLogEntry> journal;
 
-    protected long snapshotIndex = -1;
-    protected long snapshotTerm = -1;
+    private long snapshotIndex = -1;
+    private long snapshotTerm = -1;
 
     // to be used for rollback during save snapshot failure
-    protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
-    protected long previousSnapshotIndex = -1;
-    protected long previousSnapshotTerm = -1;
+    private ArrayList<ReplicatedLogEntry> snapshottedJournal;
+    private long previousSnapshotIndex = -1;
+    private long previousSnapshotTerm = -1;
     protected int dataSize = 0;
 
     public AbstractReplicatedLogImpl(long snapshotIndex,
@@ -36,7 +37,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     }
 
     public AbstractReplicatedLogImpl() {
-        this.journal = new ArrayList<>();
+        this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList());
     }
 
     protected int adjustedIndex(long logEntryIndex) {
@@ -116,19 +117,18 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
         int adjustedIndex = adjustedIndex(logEntryIndex);
         int size = journal.size();
-        List<ReplicatedLogEntry> entries = new ArrayList<>(100);
         if (adjustedIndex >= 0 && adjustedIndex < size) {
             // physical index should be less than list size and >= 0
             int maxIndex = adjustedIndex + max;
             if(maxIndex > size){
                 maxIndex = size;
             }
-            entries.addAll(journal.subList(adjustedIndex, maxIndex));
+            return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
+        } else {
+            return Collections.emptyList();
         }
-        return entries;
     }
 
-
     @Override
     public long size() {
        return journal.size();
index 4d2bad5c2effe1f55a7abb3e74786e30390abaab..6d0c14e733a8c81bb2e29a663915eb5776422a6d 100644 (file)
@@ -5,11 +5,8 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * The state of the followers log as known by the Leader
  */
@@ -19,13 +16,13 @@ public interface FollowerLogInformation {
      * Increment the value of the nextIndex
      * @return
      */
-    public long incrNextIndex();
+    long incrNextIndex();
 
     /**
      * Decrement the value of the nextIndex
      * @return
      */
-    public long decrNextIndex();
+    long decrNextIndex();
 
     /**
      *
@@ -37,45 +34,43 @@ public interface FollowerLogInformation {
      * Increment the value of the matchIndex
      * @return
      */
-    public long incrMatchIndex();
+    long incrMatchIndex();
 
-    public void setMatchIndex(long matchIndex);
+    void setMatchIndex(long matchIndex);
 
     /**
      * The identifier of the follower
      * This could simply be the url of the remote actor
      */
-    public String getId();
+    String getId();
 
     /**
      * for each server, index of the next log entry
      * to send to that server (initialized to leader
      *    last log index + 1)
      */
-    public AtomicLong getNextIndex();
+    long getNextIndex();
 
     /**
      * for each server, index of highest log entry
      * known to be replicated on server
      *    (initialized to 0, increases monotonically)
      */
-    public AtomicLong getMatchIndex();
+    long getMatchIndex();
 
     /**
      * Checks if the follower is active by comparing the last updated with the duration
      * @return boolean
      */
-    public boolean isFollowerActive();
+    boolean isFollowerActive();
 
     /**
      * restarts the timeout clock of the follower
      */
-    public void markFollowerActive();
+    void markFollowerActive();
 
     /**
      * This will stop the timeout clock
      */
-    public void markFollowerInActive();
-
-
+    void markFollowerInActive();
 }
index 7df80af58a717e99c1a226c3bd6cbb17ccd92afc..7a690d3d18be84433f9e37c874a88b277b83f7cb 100644 (file)
@@ -9,61 +9,69 @@
 package org.opendaylight.controller.cluster.raft;
 
 import com.google.common.base.Stopwatch;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import scala.concurrent.duration.FiniteDuration;
 
-public class FollowerLogInformationImpl implements FollowerLogInformation{
+public class FollowerLogInformationImpl implements FollowerLogInformation {
+    private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
+    private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex");
 
     private final String id;
 
-    private final AtomicLong nextIndex;
+    private final Stopwatch stopwatch = new Stopwatch();
 
-    private final AtomicLong matchIndex;
+    private final long followerTimeoutMillis;
 
-    private final Stopwatch stopwatch;
+    private volatile long nextIndex;
 
-    private final long followerTimeoutMillis;
+    private volatile long matchIndex;
 
-    public FollowerLogInformationImpl(String id, AtomicLong nextIndex,
-        AtomicLong matchIndex, FiniteDuration followerTimeoutDuration) {
+    public FollowerLogInformationImpl(String id, long nextIndex,
+        long matchIndex, FiniteDuration followerTimeoutDuration) {
         this.id = id;
         this.nextIndex = nextIndex;
         this.matchIndex = matchIndex;
-        this.stopwatch = new Stopwatch();
         this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
     }
 
+    @Override
     public long incrNextIndex(){
-        return nextIndex.incrementAndGet();
+        return NEXT_INDEX_UPDATER.incrementAndGet(this);
     }
 
-    @Override public long decrNextIndex() {
-        return nextIndex.decrementAndGet();
+    @Override
+    public long decrNextIndex() {
+        return NEXT_INDEX_UPDATER.decrementAndGet(this);
     }
 
-    @Override public void setNextIndex(long nextIndex) {
-        this.nextIndex.set(nextIndex);
+    @Override
+    public void setNextIndex(long nextIndex) {
+        this.nextIndex = nextIndex;
     }
 
+    @Override
     public long incrMatchIndex(){
-        return matchIndex.incrementAndGet();
+        return MATCH_INDEX_UPDATER.incrementAndGet(this);
     }
 
-    @Override public void setMatchIndex(long matchIndex) {
-        this.matchIndex.set(matchIndex);
+    @Override
+    public void setMatchIndex(long matchIndex) {
+        this.matchIndex = matchIndex;
     }
 
+    @Override
     public String getId() {
         return id;
     }
 
-    public AtomicLong getNextIndex() {
+    @Override
+    public long getNextIndex() {
         return nextIndex;
     }
 
-    public AtomicLong getMatchIndex() {
+    @Override
+    public long getMatchIndex() {
         return matchIndex;
     }
 
index 8e97c5877cba4cb90d195a6bb33cd42c449179dc..164c2cea561349cf178d63965eccd0a313e29b4e 100644 (file)
@@ -204,7 +204,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         timer.stop();
         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
                 replicatedLog.size(), persistenceId(), timer.toString(),
-                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+                replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
     }
 
     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
@@ -268,8 +268,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 "Persistence Id =  " + persistenceId() +
                 " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
                 "journal-size={}",
-            replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
-            replicatedLog.snapshotTerm, replicatedLog.size());
+            replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
+            replicatedLog.getSnapshotTerm(), replicatedLog.size());
 
         initializeBehavior();
     }
index e5c5dc752d3a257e9ce2f5852bf3350b006b8116..462c94ec8a40736cc005c994b74520d9111c3430 100644 (file)
@@ -14,16 +14,19 @@ import akka.actor.Cancellable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
@@ -77,14 +80,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     // This would be passed as the hash code of the last chunk when sending the first chunk
     public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
 
-    protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
-    protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
-
-    protected final Set<String> followers;
+    private final Map<String, FollowerLogInformation> followerToLog;
+    private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
     private Cancellable heartbeatSchedule = null;
 
-    private List<ClientRequestTracker> trackerList = new ArrayList<>();
+    private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
 
     protected final int minReplicationCount;
 
@@ -95,25 +96,22 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     public AbstractLeader(RaftActorContext context) {
         super(context);
 
-        followers = context.getPeerAddresses().keySet();
-
-        for (String followerId : followers) {
+        final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
+        for (String followerId : context.getPeerAddresses().keySet()) {
             FollowerLogInformation followerLogInformation =
                 new FollowerLogInformationImpl(followerId,
-                    new AtomicLong(context.getCommitIndex()),
-                    new AtomicLong(-1),
+                    context.getCommitIndex(), -1,
                     context.getConfigParams().getElectionTimeOutInterval());
 
-            followerToLog.put(followerId, followerLogInformation);
+            ftlBuilder.put(followerId, followerLogInformation);
         }
+        followerToLog = ftlBuilder.build();
 
         leaderId = context.getId();
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Election:Leader has following peers: {}", followers);
-        }
+        LOG.debug("Election:Leader has following peers: {}", getFollowerIds());
 
-        minReplicationCount = getMajorityVoteCount(followers.size());
+        minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
 
         // the isolated Leader peer count will be 1 less than the majority vote count.
         // this is because the vote count has the self vote counted in it
@@ -132,6 +130,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
     }
 
+    /**
+     * Return an immutable collection of follower identifiers.
+     *
+     * @return Collection of follower IDs
+     */
+    protected final Collection<String> getFollowerIds() {
+        return followerToLog.keySet();
+    }
+
     private Optional<ByteString> getSnapshot() {
         return snapshot;
     }
@@ -198,7 +205,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             int replicatedCount = 1;
 
             for (FollowerLogInformation info : followerToLog.values()) {
-                if (info.getMatchIndex().get() >= N) {
+                if (info.getMatchIndex() >= N) {
                     replicatedCount++;
                 }
             }
@@ -222,16 +229,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return this;
     }
 
+    @Override
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
-
-        ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
-        if(toRemove != null) {
-            trackerList.remove(toRemove);
+        final Iterator<ClientRequestTracker> it = trackerList.iterator();
+        while (it.hasNext()) {
+            final ClientRequestTracker t = it.next();
+            if (t.getIndex() == logIndex) {
+                it.remove();
+                return t;
+            }
         }
 
-        return toRemove;
+        return null;
     }
 
+    @Override
     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
         for (ClientRequestTracker tracker : trackerList) {
             if (tracker.getIndex() == logIndex) {
@@ -324,8 +336,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     mapFollowerToSnapshot.remove(followerId);
 
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
-                            followerToLog.get(followerId).getNextIndex().get());
+                        LOG.debug("followerToLog.get(followerId).getNextIndex()=" +
+                            followerToLog.get(followerId).getNextIndex());
                     }
 
                     if (mapFollowerToSnapshot.isEmpty()) {
@@ -376,7 +388,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 logIndex)
         );
 
-        if (followers.size() == 0) {
+        if (followerToLog.isEmpty()) {
             context.setCommitIndex(logIndex);
             applyLogToStateMachine(logIndex);
         } else {
@@ -386,14 +398,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
-        for (String followerId : followers) {
+        for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+            final String followerId = e.getKey();
             ActorSelection followerActor = context.getPeerActorSelection(followerId);
 
             if (followerActor != null) {
                 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-                long followerNextIndex = followerLogInformation.getNextIndex().get();
+                long followerNextIndex = followerLogInformation.getNextIndex();
                 boolean isFollowerActive = followerLogInformation.isFollowerActive();
-                List<ReplicatedLogEntry> entries = null;
 
                 if (mapFollowerToSnapshot.get(followerId) != null) {
                     // if install snapshot is in process , then sent next chunk if possible
@@ -408,6 +420,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 } else {
                     long leaderLastIndex = context.getReplicatedLog().lastIndex();
                     long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+                    final List<ReplicatedLogEntry> entries;
 
                     if (isFollowerActive &&
                         context.getReplicatedLog().isPresent(followerNextIndex)) {
@@ -473,23 +486,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *
      */
     private void installSnapshotIfNeeded() {
-        for (String followerId : followers) {
-            ActorSelection followerActor =
-                context.getPeerActorSelection(followerId);
+        for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+            final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
 
-            if(followerActor != null) {
-                FollowerLogInformation followerLogInformation =
-                    followerToLog.get(followerId);
-
-                long nextIndex = followerLogInformation.getNextIndex().get();
+            if (followerActor != null) {
+                long nextIndex = e.getValue().getNextIndex();
 
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
-                    LOG.info("{} follower needs a snapshot install", followerId);
+                    LOG.info("{} follower needs a snapshot install", e.getKey());
                     if (snapshot.isPresent()) {
                         // if a snapshot is present in the memory, most likely another install is in progress
                         // no need to capture snapshot
-                        sendSnapshotChunk(followerActor, followerId);
+                        sendSnapshotChunk(followerActor, e.getKey());
 
                     } else {
                         initiateCaptureSnapshot();
@@ -528,16 +537,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
 
     private void sendInstallSnapshot() {
-        for (String followerId : followers) {
-            ActorSelection followerActor = context.getPeerActorSelection(followerId);
+        for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+            ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
 
-            if(followerActor != null) {
-                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-                long nextIndex = followerLogInformation.getNextIndex().get();
+            if (followerActor != null) {
+                long nextIndex = e.getValue().getNextIndex();
 
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
-                    sendSnapshotChunk(followerActor, followerId);
+                    sendSnapshotChunk(followerActor, e.getKey());
                 }
             }
         }
@@ -588,7 +596,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     private void sendHeartBeat() {
-        if (followers.size() > 0) {
+        if (!followerToLog.isEmpty()) {
             sendAppendEntries();
         }
     }
@@ -600,7 +608,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     private void scheduleHeartBeat(FiniteDuration interval) {
-        if(followers.size() == 0){
+        if (followerToLog.isEmpty()) {
             // Optimization - do not bother scheduling a heartbeat as there are
             // no followers
             return;
@@ -759,17 +767,38 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     // called from example-actor for printing the follower-states
     public String printFollowerStates() {
-        StringBuilder sb = new StringBuilder();
-        for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
-            boolean isFollowerActive = followerLogInformation.isFollowerActive();
-            sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+        final StringBuilder sb = new StringBuilder();
 
+        sb.append('[');
+        for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
+            sb.append('{');
+            sb.append(followerLogInformation.getId());
+            sb.append(" state:");
+            sb.append(followerLogInformation.isFollowerActive());
+            sb.append("},");
         }
-        return "[" + sb.toString() + "]";
+        sb.append(']');
+
+        return sb.toString();
+    }
+
+    @VisibleForTesting
+    public FollowerLogInformation getFollower(String followerId) {
+        return followerToLog.get(followerId);
+    }
+
+    @VisibleForTesting
+    protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) {
+        mapFollowerToSnapshot.put(followerId, snapshot);
+    }
+
+    @VisibleForTesting
+    public int followerSnapshotSize() {
+        return mapFollowerToSnapshot.size();
     }
 
     @VisibleForTesting
-    void markFollowerActive(String followerId) {
-        followerToLog.get(followerId).markFollowerActive();
+    public int followerLogSize() {
+        return followerToLog.size();
     }
 }
index 97ecef370f08f00c7a2041b1897498a10d42fd60..ee3cc65dddb90815aecd50771c834d3ab461ecd4 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
@@ -74,7 +73,7 @@ public class Leader extends AbstractLeader {
     }
 
     protected void scheduleInstallSnapshotCheck(FiniteDuration interval) {
-        if(followers.size() == 0){
+        if (getFollowerIds().isEmpty()) {
             // Optimization - do not bother scheduling a heartbeat as there are
             // no followers
             return;
@@ -103,7 +102,8 @@ public class Leader extends AbstractLeader {
             context.getActorSystem().dispatcher(), context.getActor());
     }
 
-    @Override public void close() throws Exception {
+    @Override
+    public void close() throws Exception {
         stopInstallSnapshotSchedule();
         stopIsolatedLeaderCheckSchedule();
         super.close();
@@ -111,11 +111,11 @@ public class Leader extends AbstractLeader {
 
     @VisibleForTesting
     void markFollowerActive(String followerId) {
-        followerToLog.get(followerId).markFollowerActive();
+        getFollower(followerId).markFollowerActive();
     }
 
     @VisibleForTesting
     void markFollowerInActive(String followerId) {
-        followerToLog.get(followerId).markFollowerInActive();
+        getFollower(followerId).markFollowerInActive();
     }
 }
index d95c9d502712159334171301868ee9af4cfa99d7..d53ccf25002dbbf407d2e4dc5dc35c0ec231c590 100644 (file)
@@ -154,16 +154,6 @@ public class AbstractReplicatedLogImplTest {
         public void removeFromAndPersist(final long index) {
         }
 
-        @Override
-        public void setSnapshotIndex(final long snapshotIndex) {
-            this.snapshotIndex = snapshotIndex;
-        }
-
-        @Override
-        public void setSnapshotTerm(final long snapshotTerm) {
-            this.snapshotTerm = snapshotTerm;
-        }
-
         @Override
         public int dataSize() {
             return -1;
index 7df9f3713ffc302dcf100c45a93b8fd034b0fffa..a092c46533d251414ee7f22cf843c4fc3765f691 100644 (file)
@@ -7,14 +7,12 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-
 import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -29,7 +27,7 @@ public class FollowerLogInformationImplTest {
 
         FollowerLogInformation followerLogInformation =
             new FollowerLogInformationImpl(
-                "follower1", new AtomicLong(10), new AtomicLong(9), timeoutDuration);
+                "follower1", 10, 9, timeoutDuration);
 
 
 
index 895fe35bff7588526fac71e996e9120968234af2..151015e97ec785277beaf51b0bdb4fe0681f6582 100644 (file)
@@ -67,12 +67,12 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
-
     @Test
     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef followerActor = getTestActor();
@@ -92,6 +92,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     final String out =
                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected String match(Object in) {
                                 Object msg = fromSerializableMessage(in);
                                 if (msg instanceof AppendEntries) {
@@ -117,6 +118,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef followerActor = getTestActor();
@@ -145,6 +147,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     final String out =
                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected String match(Object in) {
                                 Object msg = fromSerializableMessage(in);
                                 if (msg instanceof AppendEntries) {
@@ -169,6 +172,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef raftActor = getTestActor();
@@ -195,6 +199,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                         new ExpectMsg<String>(duration("1 seconds"),
                             "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected String match(Object in) {
                                 if (in instanceof ApplyState) {
                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
@@ -482,6 +487,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             final String out =
                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                     // do not put code outside this method, will run afterwards
+                    @Override
                     protected String match(Object in) {
                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
                             InstallSnapshot is = (InstallSnapshot)
@@ -562,13 +568,13 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             assertTrue(raftBehavior instanceof Leader);
 
-            assertEquals(leader.mapFollowerToSnapshot.size(), 0);
-            assertEquals(leader.followerToLog.size(), 1);
-            assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
-            FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
-            assertEquals(snapshotIndex, fli.getMatchIndex().get());
-            assertEquals(snapshotIndex, fli.getMatchIndex().get());
-            assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
+            assertEquals(0, leader.followerSnapshotSize());
+            assertEquals(1, leader.followerLogSize());
+            assertNotNull(leader.getFollower(followerActor.path().toString()));
+            FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
+            assertEquals(snapshotIndex, fli.getMatchIndex());
+            assertEquals(snapshotIndex, fli.getMatchIndex());
+            assertEquals(snapshotIndex + 1, fli.getNextIndex());
         }};
     }
 
@@ -779,6 +785,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         return createActorContext(leaderActor);
     }
 
+    @Override
     protected RaftActorContext createActorContext(ActorRef actorRef) {
         return new MockRaftActorContext("test", getSystem(), actorRef);
     }
@@ -1180,8 +1187,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
             fts = new FollowerToSnapshot(bs);
-            mapFollowerToSnapshot.put(followerId, fts);
-
+            setFollowerSnapshot(followerId, fts);
         }
     }
 
index f03d07eb99c0fc2ae212cc358402ba67c17a4a74..2a6de4af0a0d2ed5eb6f1887ad7e1a26aa166f4c 100644 (file)
@@ -8,11 +8,16 @@
 
 package org.opendaylight.controller.sal.binding.impl.connect.dom;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import java.lang.ref.WeakReference;
 import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.concurrent.Future;
-
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
@@ -26,12 +31,6 @@ import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
 
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
 /*
  * RPC's can have both input, output, one or the other, or neither.
  *
@@ -44,6 +43,20 @@ import com.google.common.util.concurrent.ListenableFuture;
  *
  */
 public class RpcInvocationStrategy {
+    private final Function<RpcResult<CompositeNode>, RpcResult<?>> transformationFunction = new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+        @SuppressWarnings("rawtypes")
+        @Override
+        public RpcResult<?> apply(final RpcResult<CompositeNode> result) {
+            final Object output;
+            if (getOutputClass() != null && result.getResult() != null) {
+                output = mappingService.dataObjectFromDataDom(getOutputClass().get(), result.getResult());
+            } else {
+                output = null;
+            }
+
+            return RpcResultBuilder.from( (RpcResult)result ).withResult( output ).build();
+        }
+    };
 
     private final BindingIndependentMappingService mappingService;
     private final RpcProvisionRegistry biRpcRegistry;
@@ -61,26 +74,24 @@ public class RpcInvocationStrategy {
                                  final Method targetMethod,
                                  final BindingIndependentMappingService mappingService,
                                  final RpcProvisionRegistry biRpcRegistry ) {
-
+        this.mappingService = mappingService;
+        this.biRpcRegistry = biRpcRegistry;
         this.targetMethod = targetMethod;
         this.rpc = rpc;
 
-        Optional<Class<?>> outputClassOption = BindingReflections.resolveRpcOutputClass(targetMethod);
-        Optional<Class<? extends DataContainer>> inputClassOption = BindingReflections.resolveRpcInputClass(targetMethod);
-
-        if ( outputClassOption != null && outputClassOption.isPresent() ) {
-            this.outputClass = new WeakReference(outputClassOption.get() ) ;
+        final Optional<Class<?>> outputClassOption = BindingReflections.resolveRpcOutputClass(targetMethod);
+        if (outputClassOption.isPresent()) {
+            this.outputClass = new WeakReference(outputClassOption.get());
         } else {
-            this.outputClass = null ;
+            this.outputClass = null;
         }
-        if ( inputClassOption != null && inputClassOption.isPresent() ) {
-            this.inputClass = new WeakReference(inputClassOption.get() ) ;
+
+        final Optional<Class<? extends DataContainer>> inputClassOption = BindingReflections.resolveRpcInputClass(targetMethod);
+        if (inputClassOption.isPresent() ) {
+            this.inputClass = new WeakReference(inputClassOption.get());
         } else {
-            this.inputClass = null ;
+            this.inputClass = null;
         }
-
-        this.mappingService = mappingService;
-        this.biRpcRegistry = biRpcRegistry;
     }
 
     @SuppressWarnings({ "unchecked" })
@@ -98,25 +109,6 @@ public class RpcInvocationStrategy {
             inputXml = ImmutableCompositeNode.create( rpc, Collections.<Node<?>>emptyList() );
         }
 
-        Function<RpcResult<CompositeNode>, RpcResult<?>> transformationFunction =
-                                       new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
-            @SuppressWarnings("rawtypes")
-            @Override
-            public RpcResult<?> apply(RpcResult<CompositeNode> result) {
-
-                Object output = null;
-
-                if( getOutputClass() != null ) {
-                    if (result.getResult() != null) {
-                        output = mappingService.dataObjectFromDataDom(getOutputClass().get(),
-                                                                    result.getResult());
-                    }
-                }
-
-                return RpcResultBuilder.from( (RpcResult)result ).withResult( output ).build();
-            }
-        };
-
         return Futures.transform(biRpcRegistry.invokeRpc(rpc, inputXml), transformationFunction);
     }
 
@@ -153,7 +145,8 @@ public class RpcInvocationStrategy {
     }
 
     @SuppressWarnings("rawtypes")
-    public WeakReference<Class> getOutputClass() {
+    @VisibleForTesting
+    WeakReference<Class> getOutputClass() {
         return outputClass;
     }
 }
index 9201a94de326111856ad7d4d5246264cb9edfe46..2aa0027e55756b8d81cfe65f29b12044f7bf5c91 100644 (file)
@@ -12,6 +12,18 @@ package org.opendaylight.controller.cluster.datastore.node.utils.stream;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.Node;
@@ -29,18 +41,6 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNo
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeContainerBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 /**
  * NormalizedNodeInputStreamReader reads the byte stream and constructs the normalized node including its children nodes.
@@ -67,6 +67,8 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
     private NormalizedNodeAttrBuilder<NodeWithValue, Object,
                                       LeafSetEntryNode<Object>> leafSetEntryBuilder;
 
+    private final StringBuilder reusableStringBuilder = new StringBuilder(50);
+
     public NormalizedNodeInputStreamReader(InputStream stream) throws IOException {
         Preconditions.checkNotNull(stream);
         input = new DataInputStream(stream);
@@ -147,7 +149,6 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
 
             case NodeTypes.ANY_XML_NODE :
                 LOG.debug("Read xml node");
-                Node<?> value = (Node<?>) readObject();
                 return Builders.anyXmlBuilder().withValue((Node<?>) readObject()).build();
 
             case NodeTypes.MAP_NODE :
@@ -196,14 +197,16 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
         String namespace = readCodedString();
         String revision = readCodedString();
 
-        // Not using stringbuilder as compiler optimizes string concatenation of +
         String qName;
         if(!Strings.isNullOrEmpty(revision)) {
-            qName = "(" + namespace + REVISION_ARG + revision + ")" +localName;
+            qName = reusableStringBuilder.append('(').append(namespace).append(REVISION_ARG).
+                        append(revision).append(')').append(localName).toString();
         } else {
-            qName = "(" + namespace + ")" + localName;
+            qName = reusableStringBuilder.append('(').append(namespace).append(')').
+                        append(localName).toString();
         }
 
+        reusableStringBuilder.delete(0, reusableStringBuilder.length());
         return QNameFactory.create(qName);
     }
 
@@ -213,7 +216,7 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
         if(valueType == NormalizedNodeOutputStreamWriter.IS_CODE_VALUE) {
             return codedStringMap.get(input.readInt());
         } else if(valueType == NormalizedNodeOutputStreamWriter.IS_STRING_VALUE) {
-            String value = input.readUTF();
+            String value = input.readUTF().intern();
             codedStringMap.put(Integer.valueOf(codedStringMap.size()), value);
             return value;
         }
@@ -249,22 +252,22 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
                 return readObjSet();
 
             case ValueTypes.BOOL_TYPE :
-                return input.readBoolean();
+                return Boolean.valueOf(input.readBoolean());
 
             case ValueTypes.BYTE_TYPE :
-                return input.readByte();
+                return Byte.valueOf(input.readByte());
 
             case ValueTypes.INT_TYPE :
-                return input.readInt();
+                return Integer.valueOf(input.readInt());
 
             case ValueTypes.LONG_TYPE :
-                return input.readLong();
+                return Long.valueOf(input.readLong());
 
             case ValueTypes.QNAME_TYPE :
                 return readQName();
 
             case ValueTypes.SHORT_TYPE :
-                return input.readShort();
+                return Short.valueOf(input.readShort());
 
             case ValueTypes.STRING_TYPE :
                 return input.readUTF();
@@ -275,6 +278,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
             case ValueTypes.BIG_INTEGER_TYPE :
                 return new BigInteger(input.readUTF());
 
+            case ValueTypes.BINARY_TYPE :
+                byte[] bytes = new byte[input.readInt()];
+                input.readFully(bytes);
+                return bytes;
+
             case ValueTypes.YANG_IDENTIFIER_TYPE :
             return readYangInstanceIdentifier();
 
@@ -313,13 +321,13 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
                 return new YangInstanceIdentifier.AugmentationIdentifier(readQNameSet());
 
             case PathArgumentTypes.NODE_IDENTIFIER :
-            return new NodeIdentifier(readQName());
+                return new NodeIdentifier(readQName());
 
             case PathArgumentTypes.NODE_IDENTIFIER_WITH_PREDICATES :
-            return new NodeIdentifierWithPredicates(readQName(), readKeyValueMap());
+                return new NodeIdentifierWithPredicates(readQName(), readKeyValueMap());
 
             case PathArgumentTypes.NODE_IDENTIFIER_WITH_VALUE :
-            return new NodeWithValue(readQName(), readObject());
+                return new NodeWithValue(readQName(), readObject());
 
             default :
                 return null;
index 46768d5112425effba48a8a8513f288f20aaf71a..ddbc4f5d48e21d829c06e736482092c1fc4f5ef6 100644 (file)
@@ -12,11 +12,6 @@ package org.opendaylight.controller.cluster.datastore.node.utils.stream;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -24,6 +19,11 @@ import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * NormalizedNodeOutputStreamWriter will be used by distributed datastore to send normalized node in
@@ -310,7 +310,6 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
         }
     }
 
-    @SuppressWarnings("rawtypes")
     private void writeObject(Object value) throws IOException {
 
         byte type = ValueTypes.getSerializableType(value);
@@ -339,6 +338,11 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
             case ValueTypes.BITS_TYPE:
                 writeObjSet((Set<?>) value);
                 break;
+            case ValueTypes.BINARY_TYPE:
+                byte[] bytes = (byte[]) value;
+                output.writeInt(bytes.length);
+                output.write(bytes);
+                break;
             case ValueTypes.YANG_IDENTIFIER_TYPE:
                 writeYangInstanceIdentifier((YangInstanceIdentifier) value);
                 break;
index 80fa527b4613e54d942aa14cc121c3cf234dd410..e75a454d394294795c633a1d57d83ccc5661ce98 100644 (file)
@@ -9,14 +9,13 @@
 package org.opendaylight.controller.cluster.datastore.node.utils.stream;
 
 import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class ValueTypes {
     public static final byte SHORT_TYPE = 1;
@@ -30,6 +29,7 @@ public class ValueTypes {
     public static final byte STRING_TYPE = 9;
     public static final byte BIG_INTEGER_TYPE = 10;
     public static final byte BIG_DECIMAL_TYPE = 11;
+    public static final byte BINARY_TYPE = 12;
 
     private static Map<Class<?>, Byte> types = new HashMap<>();
 
@@ -45,6 +45,7 @@ public class ValueTypes {
         types.put(Short.class, Byte.valueOf(SHORT_TYPE));
         types.put(BigInteger.class, Byte.valueOf(BIG_INTEGER_TYPE));
         types.put(BigDecimal.class, Byte.valueOf(BIG_DECIMAL_TYPE));
+        types.put(byte[].class, Byte.valueOf(BINARY_TYPE));
     }
 
     public static final byte getSerializableType(Object node){
index 3a1cfaa443a8e735965ed3b8d92f9037ee7ead36..8be6ad14dd7d92649ab210e7078ee1f20e5520fe 100644 (file)
@@ -4823,14 +4823,26 @@ public final class ShardTransactionMessages {
     // required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     boolean hasInstanceIdentifierPathArguments();
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments();
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder();
 
@@ -4970,18 +4982,30 @@ public final class ShardTransactionMessages {
     private org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier instanceIdentifierPathArguments_;
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public boolean hasInstanceIdentifierPathArguments() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
       return instanceIdentifierPathArguments_;
     }
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
       return instanceIdentifierPathArguments_;
@@ -5309,12 +5333,20 @@ public final class ShardTransactionMessages {
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder> instanceIdentifierPathArgumentsBuilder_;
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public boolean hasInstanceIdentifierPathArguments() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -5325,6 +5357,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder setInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -5341,6 +5377,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder setInstanceIdentifierPathArguments(
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder builderForValue) {
@@ -5355,6 +5395,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder mergeInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -5374,6 +5418,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder clearInstanceIdentifierPathArguments() {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -5387,6 +5435,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder getInstanceIdentifierPathArgumentsBuilder() {
         bitField0_ |= 0x00000001;
@@ -5395,6 +5447,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
         if (instanceIdentifierPathArgumentsBuilder_ != null) {
@@ -5405,6 +5461,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       private com.google.protobuf.SingleFieldBuilder<
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder>
@@ -5863,14 +5923,26 @@ public final class ShardTransactionMessages {
     // required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     boolean hasInstanceIdentifierPathArguments();
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments();
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder();
 
@@ -6010,18 +6082,30 @@ public final class ShardTransactionMessages {
     private org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier instanceIdentifierPathArguments_;
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public boolean hasInstanceIdentifierPathArguments() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
       return instanceIdentifierPathArguments_;
     }
     /**
      * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+     *
+     * <pre>
+     * base Helium version
+     * </pre>
      */
     public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
       return instanceIdentifierPathArguments_;
@@ -6349,12 +6433,20 @@ public final class ShardTransactionMessages {
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder> instanceIdentifierPathArgumentsBuilder_;
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public boolean hasInstanceIdentifierPathArguments() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -6365,6 +6457,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder setInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -6381,6 +6477,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder setInstanceIdentifierPathArguments(
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder builderForValue) {
@@ -6395,6 +6495,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder mergeInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -6414,6 +6518,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public Builder clearInstanceIdentifierPathArguments() {
         if (instanceIdentifierPathArgumentsBuilder_ == null) {
@@ -6427,6 +6535,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder getInstanceIdentifierPathArgumentsBuilder() {
         bitField0_ |= 0x00000001;
@@ -6435,6 +6547,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
         if (instanceIdentifierPathArgumentsBuilder_ != null) {
@@ -6445,6 +6561,10 @@ public final class ShardTransactionMessages {
       }
       /**
        * <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+       *
+       * <pre>
+       * base Helium version
+       * </pre>
        */
       private com.google.protobuf.SingleFieldBuilder<
           org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder>
index c5e4ee45c037df8d3f03c6c3b2e13015c8317ef6..ac9cb2203318d2b9e287e87714227a1d2a60e87d 100644 (file)
@@ -49,9 +49,9 @@ message ReadDataReply{
 }
 
 message WriteData {
- required InstanceIdentifier instanceIdentifierPathArguments = 1;
-required Node normalizedNode =2;
-
+  // base Helium version
+  required InstanceIdentifier instanceIdentifierPathArguments = 1;
+  required Node normalizedNode = 2;
 }
 
 message WriteDataReply{
@@ -59,9 +59,9 @@ message WriteDataReply{
 }
 
 message MergeData {
- required InstanceIdentifier instanceIdentifierPathArguments = 1;
-required Node normalizedNode =2;
-
+  // base Helium version
+  required InstanceIdentifier instanceIdentifierPathArguments = 1;
+  required Node normalizedNode = 2;
 }
 
 message MergeDataReply{
index 816442f885ff33197166971e05b4fce70dbacce7..694b467623615f4ffe595eea7cdd73a940ccc774 100644 (file)
@@ -60,7 +60,7 @@ public class NormalizedNodeSerializerTest {
         }
 
         ContainerNode node1 = TestModel.createBaseTestContainerBuilder()
-                .withChild(ImmutableNodes.leafNode(TestModel.SOME_BINARY_DATE_QNAME, binaryData))
+                .withChild(ImmutableNodes.leafNode(TestModel.SOME_BINARY_DATA_QNAME, binaryData))
                 .build();
 
         NormalizedNodeMessages.Node serializedNode1 = NormalizedNodeSerializer
@@ -73,7 +73,7 @@ public class NormalizedNodeSerializerTest {
         // FIXME: This will not work due to BUG 2326. Once that is fixed we can uncomment this assertion
         // assertEquals(node1, node2);
 
-        Optional<DataContainerChild<? extends YangInstanceIdentifier.PathArgument, ?>> child = node2.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.SOME_BINARY_DATE_QNAME));
+        Optional<DataContainerChild<? extends YangInstanceIdentifier.PathArgument, ?>> child = node2.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.SOME_BINARY_DATA_QNAME));
 
         Object value = child.get().getValue();
 
index 8854fc73b5ebffb656f07ea83b4a7443ba69d81e..a67e887a15cf7c9727ca7e9771acb231fdd4a6b1 100644 (file)
@@ -17,13 +17,17 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.util.TestModel;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public class NormalizedNodeStreamReaderWriterTest {
@@ -31,7 +35,7 @@ public class NormalizedNodeStreamReaderWriterTest {
     @Test
     public void testNormalizedNodeStreamReaderWriter() throws IOException {
 
-        testNormalizedNodeStreamReaderWriter(TestModel.createTestContainer());
+        testNormalizedNodeStreamReaderWriter(createTestContainer());
 
         QName toaster = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","toaster");
         QName darknessFactor = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","darknessFactor");
@@ -44,6 +48,29 @@ public class NormalizedNodeStreamReaderWriterTest {
                 withChild(toasterNode).build());
     }
 
+    private NormalizedNode<?, ?> createTestContainer() {
+        byte[] bytes1 = {1,2,3};
+        LeafSetEntryNode<Object> entry1 = ImmutableLeafSetEntryNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeWithValue(TestModel.BINARY_LEAF_LIST_QNAME, bytes1)).
+                withValue(bytes1).build();
+
+        byte[] bytes2 = {};
+        LeafSetEntryNode<Object> entry2 = ImmutableLeafSetEntryNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeWithValue(TestModel.BINARY_LEAF_LIST_QNAME, bytes2)).
+                withValue(bytes2).build();
+
+        return TestModel.createBaseTestContainerBuilder().
+                withChild(ImmutableLeafSetNodeBuilder.create().withNodeIdentifier(
+                        new YangInstanceIdentifier.NodeIdentifier(TestModel.BINARY_LEAF_LIST_QNAME)).
+                        withChild(entry1).withChild(entry2).build()).
+                withChild(ImmutableNodes.leafNode(TestModel.SOME_BINARY_DATA_QNAME, new byte[]{1,2,3,4})).
+                withChild(Builders.orderedMapBuilder().
+                      withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.ORDERED_LIST_QNAME)).
+                      withChild(ImmutableNodes.mapEntry(TestModel.ORDERED_LIST_ENTRY_QNAME,
+                              TestModel.ID_QNAME, 11)).build()).
+                build();
+    }
+
     private void testNormalizedNodeStreamReaderWriter(NormalizedNode<?, ?> input) throws IOException {
 
         byte[] byteData = null;
index d3abd38153968ca3c66430e6887b5b80c7220afb..aa1cfc6cd835bd32fc82f031668100129241ce52 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore.util;
 
+import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapEntry;
+import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapEntryBuilder;
+import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapNodeBuilder;
 import com.google.common.collect.ImmutableSet;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
@@ -36,18 +48,6 @@ import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
 
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapEntry;
-import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapEntryBuilder;
-import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapNodeBuilder;
-
 public class TestModel {
 
   public static final QName TEST_QNAME = QName.create(
@@ -65,7 +65,8 @@ public class TestModel {
 
   public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
   public static final QName POINTER_QNAME = QName.create(TEST_QNAME, "pointer");
-  public static final QName SOME_BINARY_DATE_QNAME = QName.create(TEST_QNAME, "some-binary-data");
+  public static final QName SOME_BINARY_DATA_QNAME = QName.create(TEST_QNAME, "some-binary-data");
+  public static final QName BINARY_LEAF_LIST_QNAME = QName.create(TEST_QNAME, "binary_leaf_list");
   public static final QName SOME_REF_QNAME = QName.create(TEST_QNAME,
       "some-ref");
   public static final QName MYIDENTITY_QNAME = QName.create(TEST_QNAME,
@@ -73,8 +74,8 @@ public class TestModel {
   public static final QName SWITCH_FEATURES_QNAME = QName.create(TEST_QNAME,
       "switch-features");
 
-  public static final QName AUGMENTED_LIST_QNAME = QName.create(TEST_QNAME,
-      "augmented-list");
+  public static final QName AUGMENTED_LIST_QNAME = QName.create(TEST_QNAME, "augmented-list");
+  public static final QName AUGMENTED_LIST_ENTRY_QNAME = QName.create(TEST_QNAME, "augmented-list-entry");
 
   public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME,
       "outer-list");
@@ -85,6 +86,16 @@ public class TestModel {
   public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
   public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
   public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
+  public static final QName BOOLEAN_LEAF_QNAME = QName.create(TEST_QNAME, "boolean-leaf");
+  public static final QName SHORT_LEAF_QNAME = QName.create(TEST_QNAME, "short-leaf");
+  public static final QName BYTE_LEAF_QNAME = QName.create(TEST_QNAME, "byte-leaf");
+  public static final QName BIGINTEGER_LEAF_QNAME = QName.create(TEST_QNAME, "biginteger-leaf");
+  public static final QName BIGDECIMAL_LEAF_QNAME = QName.create(TEST_QNAME, "bigdecimal-leaf");
+  public static final QName ORDERED_LIST_QNAME = QName.create(TEST_QNAME, "ordered-list");
+  public static final QName ORDERED_LIST_ENTRY_QNAME = QName.create(TEST_QNAME, "ordered-list-entry");
+  public static final QName UNKEYED_LIST_QNAME = QName.create(TEST_QNAME, "unkeyed-list");
+  public static final QName UNKEYED_LIST_ENTRY_QNAME = QName.create(TEST_QNAME, "unkeyed-list-entry");
+  public static final QName CHOICE_QNAME = QName.create(TEST_QNAME, "choice");
   private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
   private static final String DATASTORE_AUG_YANG =
       "/odl-datastore-augmentation.yang";
@@ -107,7 +118,7 @@ public class TestModel {
   private static final String TWO_TWO_NAME = "two";
   private static final String DESC = "Hello there";
   private static final Long LONG_ID = 1L;
-  private static final Boolean ENABLED = false;
+  private static final Boolean ENABLED = true;
   private static final Short SHORT_ID = 1;
   private static final Byte BYTE_ID = 1;
   // Family specific constants
@@ -149,83 +160,6 @@ public class TestModel {
   private static final String SECOND_GRAND_CHILD_NAME = "second grand child";
 
 
-  // first child
-  private static final YangInstanceIdentifier CHILDREN_1_PATH =
-      YangInstanceIdentifier.builder(CHILDREN_PATH)
-          .nodeWithKey(CHILDREN_QNAME, CHILD_NUMBER_QNAME, FIRST_CHILD_ID) //
-          .build();
-  private static final YangInstanceIdentifier CHILDREN_1_NAME_PATH =
-      YangInstanceIdentifier.builder(CHILDREN_PATH)
-          .nodeWithKey(CHILDREN_QNAME, CHILD_NAME_QNAME, FIRST_CHILD_NAME) //
-          .build();
-
-  private static final YangInstanceIdentifier CHILDREN_2_PATH =
-      YangInstanceIdentifier.builder(CHILDREN_PATH)
-          .nodeWithKey(CHILDREN_QNAME, CHILD_NUMBER_QNAME, SECOND_CHILD_ID) //
-          .build();
-  private static final YangInstanceIdentifier CHILDREN_2_NAME_PATH =
-      YangInstanceIdentifier.builder(CHILDREN_PATH)
-          .nodeWithKey(CHILDREN_QNAME, CHILD_NAME_QNAME, SECOND_CHILD_NAME) //
-          .build();
-
-
-  private static final YangInstanceIdentifier GRAND_CHILD_1_PATH =
-      YangInstanceIdentifier
-          .builder(CHILDREN_1_PATH)
-          .node(GRAND_CHILDREN_QNAME)
-          //
-          .nodeWithKey(GRAND_CHILDREN_QNAME, GRAND_CHILD_NUMBER_QNAME,
-              FIRST_GRAND_CHILD_ID) //
-          .build();
-
-  private static final YangInstanceIdentifier GRAND_CHILD_1_NAME_PATH =
-      YangInstanceIdentifier
-          .builder(CHILDREN_1_PATH)
-          .node(GRAND_CHILDREN_QNAME)
-          //
-          .nodeWithKey(GRAND_CHILDREN_QNAME, GRAND_CHILD_NAME_QNAME,
-              FIRST_GRAND_CHILD_NAME) //
-          .build();
-
-  private static final YangInstanceIdentifier GRAND_CHILD_2_PATH =
-      YangInstanceIdentifier
-          .builder(CHILDREN_2_PATH)
-          .node(GRAND_CHILDREN_QNAME)
-          //
-          .nodeWithKey(GRAND_CHILDREN_QNAME, GRAND_CHILD_NUMBER_QNAME,
-              SECOND_GRAND_CHILD_ID) //
-          .build();
-
-  private static final YangInstanceIdentifier GRAND_CHILD_2_NAME_PATH =
-      YangInstanceIdentifier
-          .builder(CHILDREN_2_PATH)
-          .node(GRAND_CHILDREN_QNAME)
-          //
-          .nodeWithKey(GRAND_CHILDREN_QNAME, GRAND_CHILD_NAME_QNAME,
-              SECOND_GRAND_CHILD_NAME) //
-          .build();
-
-  private static final YangInstanceIdentifier DESC_PATH_ID =
-      YangInstanceIdentifier.builder(DESC_PATH).build();
-  private static final YangInstanceIdentifier OUTER_LIST_1_PATH =
-      YangInstanceIdentifier.builder(OUTER_LIST_PATH)
-          .nodeWithKey(OUTER_LIST_QNAME, ID_QNAME, ONE_ID) //
-          .build();
-
-  private static final YangInstanceIdentifier OUTER_LIST_2_PATH =
-      YangInstanceIdentifier.builder(OUTER_LIST_PATH)
-          .nodeWithKey(OUTER_LIST_QNAME, ID_QNAME, TWO_ID) //
-          .build();
-
-  private static final YangInstanceIdentifier TWO_TWO_PATH =
-      YangInstanceIdentifier.builder(OUTER_LIST_2_PATH).node(INNER_LIST_QNAME) //
-          .nodeWithKey(INNER_LIST_QNAME, NAME_QNAME, TWO_TWO_NAME) //
-          .build();
-
-  private static final YangInstanceIdentifier TWO_TWO_VALUE_PATH =
-      YangInstanceIdentifier.builder(TWO_TWO_PATH).node(VALUE_QNAME) //
-          .build();
-
   private static final MapEntryNode BAR_NODE = mapEntryBuilder(
       OUTER_LIST_QNAME, ID_QNAME, TWO_ID) //
       .withChild(mapNodeBuilder(INNER_LIST_QNAME) //
@@ -362,7 +296,7 @@ public class TestModel {
 
 
       // Create augmentations
-      MapEntryNode mapEntry = createAugmentedListEntry(1, "First Test");
+      MapEntryNode augMapEntry = createAugmentedListEntry(1, "First Test");
 
       // Create a bits leaf
       NormalizedNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, Object, LeafNode<Object>>
@@ -371,10 +305,19 @@ public class TestModel {
                       QName.create(TEST_QNAME, "my-bits"))).withValue(
               ImmutableSet.of("foo", "bar"));
 
-      // Create unkey list entry
-      UnkeyedListEntryNode binaryDataKey =
-          Builders.unkeyedListEntryBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME)).
-              withChild(ImmutableNodes.leafNode(SOME_BINARY_DATE_QNAME, DESC)).build();
+      // Create unkeyed list entry
+      UnkeyedListEntryNode unkeyedListEntry =
+          Builders.unkeyedListEntryBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(UNKEYED_LIST_ENTRY_QNAME)).
+              withChild(ImmutableNodes.leafNode(NAME_QNAME, "unkeyed-entry-name")).build();
+
+      // Create YangInstanceIdentifier with all path arg types.
+      YangInstanceIdentifier instanceID = YangInstanceIdentifier.create(
+              new YangInstanceIdentifier.NodeIdentifier(QName.create(TEST_QNAME, "qname")),
+              new YangInstanceIdentifier.NodeIdentifierWithPredicates(QName.create(TEST_QNAME, "list-entry"),
+                      QName.create(TEST_QNAME, "key"), 10),
+              new YangInstanceIdentifier.AugmentationIdentifier(ImmutableSet.of(
+                      QName.create(TEST_QNAME, "aug1"), QName.create(TEST_QNAME, "aug2"))),
+              new YangInstanceIdentifier.NodeWithValue(QName.create(TEST_QNAME, "leaf-list-entry"), "foo"));
 
       Map<QName, Object> keyValues = new HashMap<>();
       keyValues.put(CHILDREN_QNAME, FIRST_CHILD_NAME);
@@ -383,32 +326,30 @@ public class TestModel {
       // Create the document
       return ImmutableContainerNodeBuilder
               .create()
-              .withNodeIdentifier(
-                  new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
+              .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
               .withChild(myBits.build())
               .withChild(ImmutableNodes.leafNode(DESC_QNAME, DESC))
-              .withChild(ImmutableNodes.leafNode(POINTER_QNAME, ENABLED))
-              .withChild(ImmutableNodes.leafNode(POINTER_QNAME, SHORT_ID))
-              .withChild(ImmutableNodes.leafNode(POINTER_QNAME, BYTE_ID))
-              .withChild(
-                  ImmutableNodes.leafNode(SOME_REF_QNAME, GRAND_CHILD_1_PATH))
+              .withChild(ImmutableNodes.leafNode(BOOLEAN_LEAF_QNAME, ENABLED))
+              .withChild(ImmutableNodes.leafNode(SHORT_LEAF_QNAME, SHORT_ID))
+              .withChild(ImmutableNodes.leafNode(BYTE_LEAF_QNAME, BYTE_ID))
+              .withChild(ImmutableNodes.leafNode(TestModel.BIGINTEGER_LEAF_QNAME, BigInteger.valueOf(100)))
+              .withChild(ImmutableNodes.leafNode(TestModel.BIGDECIMAL_LEAF_QNAME, BigDecimal.valueOf(1.2)))
+              .withChild(ImmutableNodes.leafNode(SOME_REF_QNAME, instanceID))
               .withChild(ImmutableNodes.leafNode(MYIDENTITY_QNAME, DESC_QNAME))
-               .withChild(Builders.unkeyedListBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(OUTER_LIST_QNAME))
-                   .withChild(binaryDataKey).build())
-               .withChild(Builders.orderedMapBuilder()
-                   .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME)).withChild(mapEntry).build())
-               .withChild(Builders.choiceBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
+              .withChild(Builders.unkeyedListBuilder()
+                   .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(UNKEYED_LIST_QNAME))
+                   .withChild(unkeyedListEntry).build())
+              .withChild(Builders.choiceBuilder()
+                   .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CHOICE_QNAME))
                    .withChild(ImmutableNodes.leafNode(DESC_QNAME, LONG_ID)).build())
                       // .withChild(augmentationNode)
               .withChild(shoes)
               .withChild(numbers)
               .withChild(switchFeatures)
-              .withChild(
-                  mapNodeBuilder(AUGMENTED_LIST_QNAME).withChild(mapEntry).build())
-              .withChild(
-                  mapNodeBuilder(OUTER_LIST_QNAME)
-                      .withChild(mapEntry(OUTER_LIST_QNAME, ID_QNAME, ONE_ID))
-                      .withChild(BAR_NODE).build()
+              .withChild(mapNodeBuilder(AUGMENTED_LIST_QNAME).withChild(augMapEntry).build())
+              .withChild(mapNodeBuilder(OUTER_LIST_QNAME)
+                   .withChild(mapEntry(OUTER_LIST_QNAME, ID_QNAME, ONE_ID))
+                   .withChild(BAR_NODE).build()
               );
   }
 
@@ -441,7 +382,7 @@ public class TestModel {
         .create()
         .withNodeIdentifier(
             new YangInstanceIdentifier.NodeIdentifierWithPredicates(
-                AUGMENTED_LIST_QNAME, ID_QNAME, id))
+                    AUGMENTED_LIST_ENTRY_QNAME, ID_QNAME, id))
         .withChild(ImmutableNodes.leafNode(ID_QNAME, id))
         .withChild(augmentationNode).build();
   }
index a1fbc1fdad1f897d265bbd6813d7a36962b70a3f..c720d5e0cdcc099026b545357c1edc8edb92de41 100644 (file)
@@ -74,6 +74,10 @@ module odl-datastore-test {
             type uint8;
         }
 
+        leaf-list binary_leaf_list {
+            type binary;
+        }
+
         leaf pointer {
             type leafref {
                 path "/network-topology/topology/node/termination-point/tp-id";
index 89cd033df3ac41a5e5b105dbca4f8074eb47e586..96ed7f183893e3900d7ad31a2f977e067c1aed04 100644 (file)
@@ -97,7 +97,7 @@
     </configuration>
     <required-capabilities>
         <capability>urn:opendaylight:params:xml:ns:yang:controller:config:concurrent-data-broker?module=odl-concurrent-data-broker-cfg&amp;revision=2014-11-24</capability>
-        <capability>urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider?module=distributed-datastore-privider&amp;revision=2014-06-12</capability>
+        <capability>urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider?module=distributed-datastore-provider&amp;revision=2014-06-12</capability>
         <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store?module=opendaylight-config-dom-datastore&amp;revision=2014-06-17</capability>
         <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store?module=opendaylight-operational-dom-datastore&amp;revision=2014-06-17</capability>
         <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom?module=opendaylight-md-sal-dom&amp;revision=2013-10-28</capability>
index 4edd60a33af436fff36e77627657d2cd7844a6c3..d7bfae1b42b88a36be7a9e29124af53231111757 100644 (file)
@@ -32,9 +32,8 @@ public class ClusterWrapperImpl implements ClusterWrapper {
                 "member-3 here would be the name of the member"
         );
 
-        currentMemberName = (String) cluster.getSelfRoles().toArray()[0];
+        currentMemberName = cluster.getSelfRoles().iterator().next();
         selfAddress = cluster.selfAddress();
-
     }
 
     public void subscribeToMemberEvents(ActorRef actorRef){
index 6f14af304f403e8340ea904bbbf22a0a1d40673d..1bc835f1e3c6883196d4a9a6b1f334bb16f9dca7 100644 (file)
@@ -71,7 +71,7 @@ public class DataChangeListener extends AbstractUntypedActor {
         // It seems the sender is never null but it doesn't hurt to check. If the caller passes in
         // a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor.
         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
-            getSender().tell(new DataChangedReply(), getSelf());
+            getSender().tell(DataChangedReply.INSTANCE, getSelf());
         }
     }
 
index 1a0ee8c2fa6866dfdab97cfeab7087ce37d52ca6..afec1a07d484d0852abe626d1a387e607014b8c6 100644 (file)
@@ -16,22 +16,21 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * DataChangeListenerProxy represents a single remote DataChangeListener
  */
 public class DataChangeListenerProxy implements AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>{
     private final ActorSelection dataChangeListenerActor;
-    private final SchemaContext schemaContext;
 
-    public DataChangeListenerProxy(SchemaContext schemaContext, ActorSelection dataChangeListenerActor) {
-        this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor, "dataChangeListenerActor should not be null");
-        this.schemaContext = schemaContext;
+    public DataChangeListenerProxy(ActorSelection dataChangeListenerActor) {
+        this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor,
+                "dataChangeListenerActor should not be null");
     }
 
-    @Override public void onDataChanged(
+    @Override
+    public void onDataChanged(
         AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
-        dataChangeListenerActor.tell(new DataChanged(schemaContext, change), ActorRef.noSender());
+        dataChangeListenerActor.tell(new DataChanged(change), ActorRef.noSender());
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataStoreVersions.java
new file mode 100644 (file)
index 0000000..1f22222
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Defines version numbers.
+ *
+ * @author Thomas Pantelis
+ */
+public interface DataStoreVersions {
+    short BASE_HELIUM_VERSION = 0;
+    short HELIUM_1_VERSION = 1;
+    short HELIUM_2_VERSION = 2;
+    short LITHIUM_VERSION = 3;
+    short CURRENT_VERSION = LITHIUM_VERSION;
+}
index 004faf2de1a42b833877a16efdac1f35ef9c086f..5d63c92e885824c701c4cd8d6f1702dbae84da5f 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSystem;
@@ -17,15 +16,11 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.osgi.framework.BundleContext;
 
-import java.util.concurrent.atomic.AtomicReference;
-
 public class DistributedDataStoreFactory {
+    private static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
+    private static final String CONFIGURATION_NAME = "odl-cluster-data";
 
-    public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
-
-    public static final String CONFIGURATION_NAME = "odl-cluster-data";
-
-    private static AtomicReference<ActorSystem> persistentActorSystem = new AtomicReference<>();
+    private static volatile ActorSystem persistentActorSystem = null;
 
     public static DistributedDataStore createInstance(String name, SchemaService schemaService,
                                                       DatastoreContext datastoreContext, BundleContext bundleContext) {
@@ -41,26 +36,25 @@ public class DistributedDataStoreFactory {
         return dataStore;
     }
 
-    synchronized private static final ActorSystem getOrCreateInstance(final BundleContext bundleContext, ConfigurationReader configurationReader) {
-
-        AtomicReference<ActorSystem> actorSystemReference = persistentActorSystem;
-        String configurationName = CONFIGURATION_NAME;
-        String actorSystemName = ACTOR_SYSTEM_NAME;
-
-        if (actorSystemReference.get() != null){
-            return actorSystemReference.get();
+    private static final ActorSystem getOrCreateInstance(final BundleContext bundleContext, ConfigurationReader configurationReader) {
+        ActorSystem ret = persistentActorSystem;
+        if (ret == null) {
+            synchronized (DistributedDataStoreFactory.class) {
+                ret = persistentActorSystem;
+                if (ret == null) {
+                    // Create an OSGi bundle classloader for actor system
+                    BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
+                        Thread.currentThread().getContextClassLoader());
+
+                    ret = ActorSystem.create(ACTOR_SYSTEM_NAME,
+                        ConfigFactory.load(configurationReader.read()).getConfig(CONFIGURATION_NAME), classLoader);
+                    ret.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+
+                    persistentActorSystem = ret;
+                }
+            }
         }
 
-        // Create an OSGi bundle classloader for actor system
-        BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
-                Thread.currentThread().getContextClassLoader());
-
-        ActorSystem system = ActorSystem.create(actorSystemName,
-                ConfigFactory.load(configurationReader.read()).getConfig(configurationName), classLoader);
-        system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
-
-        actorSystemReference.set(system);
-        return system;
+        return ret;
     }
-
 }
index 7ef6e040a9f3f0d94bf2fdc47790377d505184b3..a3ef0339b7571172dfb0d5b2338f52911a86cf9c 100644 (file)
@@ -95,11 +95,10 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class Shard extends RaftActor {
 
-    private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
-
     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
 
-    public static final String DEFAULT_NAME = "default";
+    @VisibleForTesting
+    static final String DEFAULT_NAME = "default";
 
     // The state of this Shard
     private final InMemoryDOMDataStore store;
@@ -354,7 +353,7 @@ public class Shard extends RaftActor {
             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
             if(cohortEntry != null) {
                 commitWithNewTransaction(cohortEntry.getModification());
-                sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+                sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
             } else {
                 // This really shouldn't happen - it likely means that persistence or replication
                 // took so long to complete such that the cohort entry was expired from the cache.
@@ -376,7 +375,7 @@ public class Shard extends RaftActor {
             // currently uses a same thread executor anyway.
             cohortEntry.getCohort().commit().get();
 
-            sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+            sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
 
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
@@ -412,7 +411,7 @@ public class Shard extends RaftActor {
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
         ActorRef replyActorPath = self();
-        if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
+        if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
             LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
             replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
                     ready.getTransactionID()));
@@ -447,7 +446,7 @@ public class Shard extends RaftActor {
                     shardMBean.incrementAbortTransactionsCount();
 
                     if(sender != null) {
-                        sender.tell(new AbortTransactionReply().toSerializable(), self);
+                        sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
                     }
                 }
 
@@ -480,7 +479,7 @@ public class Shard extends RaftActor {
         // This must be for install snapshot. Don't want to open this up and trigger
         // deSerialization
 
-        self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
+        self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)),
                 self());
 
         createSnapshotTransaction = null;
@@ -500,7 +499,8 @@ public class Shard extends RaftActor {
     }
 
     private ActorRef createTypedTransactionActor(int transactionType,
-            ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
+            ShardTransactionIdentifier transactionId, String transactionChainId,
+            short clientVersion ) {
 
         DOMStoreTransactionFactory factory = store;
 
@@ -568,7 +568,7 @@ public class Shard extends RaftActor {
     }
 
     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
-            String transactionChainId, int clientVersion) {
+            String transactionChainId, short clientVersion) {
 
         ShardTransactionIdentifier transactionId =
             ShardTransactionIdentifier.builder()
@@ -659,7 +659,7 @@ public class Shard extends RaftActor {
         dataChangeListeners.add(dataChangeListenerPath);
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
-                new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
+                new DataChangeListenerProxy(dataChangeListenerPath);
 
         LOG.debug("Registering for path {}", registerChangeListener.getPath());
 
@@ -818,7 +818,7 @@ public class Shard extends RaftActor {
             createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
             createSnapshotTransaction.tell(
                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
index f3b4e416403b0594a22da9ff47de2f68a1d284cc..19fa26682e2a4cea7b637fda85064a3aea0226e5 100644 (file)
@@ -7,6 +7,10 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
@@ -17,10 +21,6 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 
 /**
  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
@@ -31,12 +31,6 @@ public class ShardCommitCoordinator {
 
     private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
 
-    private static final Object CAN_COMMIT_REPLY_TRUE =
-            new CanCommitTransactionReply(Boolean.TRUE).toSerializable();
-
-    private static final Object CAN_COMMIT_REPLY_FALSE =
-            new CanCommitTransactionReply(Boolean.FALSE).toSerializable();
-
     private final Cache<String, CohortEntry> cohortCache;
 
     private CohortEntry currentCohortEntry;
@@ -138,7 +132,8 @@ public class ShardCommitCoordinator {
             Boolean canCommit = cohortEntry.getCohort().canCommit().get();
 
             cohortEntry.getCanCommitSender().tell(
-                    canCommit ? CAN_COMMIT_REPLY_TRUE : CAN_COMMIT_REPLY_FALSE, cohortEntry.getShard());
+                    canCommit ? CanCommitTransactionReply.YES.toSerializable() :
+                        CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
 
             if(!canCommit) {
                 // Remove the entry from the cache now since the Tx will be aborted.
index 88f818f0faedf76f0349ce1f7294dee37b9d79d1..10876045ae272c436e54143c8a0da8bf1c2e41e7 100644 (file)
@@ -25,6 +25,7 @@ import akka.persistence.RecoveryFailure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
@@ -45,10 +46,10 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
-
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -91,7 +92,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final DatastoreContext datastoreContext;
 
-    private final Collection<String> knownModules = new HashSet<>(128);
+    private Collection<String> knownModules = Collections.emptySet();
 
     private final DataPersistenceProvider dataPersistenceProvider;
 
@@ -182,8 +183,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         if(dataPersistenceProvider.isRecoveryApplicable()) {
             if (message instanceof SchemaContextModules) {
                 SchemaContextModules msg = (SchemaContextModules) message;
-                knownModules.clear();
-                knownModules.addAll(msg.getModules());
+                knownModules = ImmutableSet.copyOf(msg.getModules());
             } else if (message instanceof RecoveryFailure) {
                 RecoveryFailure failure = (RecoveryFailure) message;
                 LOG.error(failure.cause(), "Recovery failed");
@@ -277,8 +277,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
             LOG.info("New SchemaContext has a super set of current knownModules - persisting info");
 
-            knownModules.clear();
-            knownModules.addAll(newModules);
+            knownModules = ImmutableSet.copyOf(newModules);
 
             dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
 
index 9d8f57252a2fc43bd316f213c9f8b68f4f4c453f..be9c4d80e311484d0e6edea2050d85a0ebf28d1f 100644 (file)
@@ -27,8 +27,8 @@ public class ShardReadTransaction extends ShardTransaction {
 
     public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
             SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            int txnClientVersion) {
-        super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+            short clientTxVersion) {
+        super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
index e558677ebbaba5598d1ca84875a6402764887f34..b394da88e853f4387fb2cbc70ff3b93ab5436999 100644 (file)
@@ -11,7 +11,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
-
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
@@ -27,8 +26,8 @@ public class ShardReadWriteTransaction extends ShardWriteTransaction {
 
     public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
             SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            int txnClientVersion) {
-        super(transaction, shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+            short clientTxVersion) {
+        super(transaction, shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
index 59bb4bfd77892e1a915563b3d435150de27c4ca1..678b7815693382179328a8c13adb567d80d61b13 100644 (file)
@@ -63,21 +63,21 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
     private final SchemaContext schemaContext;
     private final ShardStats shardStats;
     private final String transactionID;
-    private final int txnClientVersion;
+    private final short clientTxVersion;
 
     protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
-            ShardStats shardStats, String transactionID, int txnClientVersion) {
+            ShardStats shardStats, String transactionID, short clientTxVersion) {
         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
         this.shardActor = shardActor;
         this.schemaContext = schemaContext;
         this.shardStats = shardStats;
         this.transactionID = transactionID;
-        this.txnClientVersion = txnClientVersion;
+        this.clientTxVersion = clientTxVersion;
     }
 
     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
             SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
-            String transactionID, int txnClientVersion) {
+            String transactionID, short txnClientVersion) {
         return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
            datastoreContext, shardStats, transactionID, txnClientVersion));
     }
@@ -96,8 +96,8 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         return schemaContext;
     }
 
-    protected int getTxnClientVersion() {
-        return txnClientVersion;
+    protected short getClientTxVersion() {
+        return clientTxVersion;
     }
 
     @Override
@@ -118,28 +118,28 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         getDOMStoreTransaction().close();
 
         if(sendReply) {
-            getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
+            getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
         }
 
         getSelf().tell(PoisonPill.getInstance(), getSelf());
     }
 
-    protected void readData(DOMStoreReadTransaction transaction, ReadData message, final boolean returnSerialized) {
+    protected void readData(DOMStoreReadTransaction transaction, ReadData message,
+            final boolean returnSerialized) {
         final ActorRef sender = getSender();
         final ActorRef self = getSelf();
         final YangInstanceIdentifier path = message.getPath();
         final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
                 transaction.read(path);
 
-
         future.addListener(new Runnable() {
             @Override
             public void run() {
                 try {
                     Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
-                    ReadDataReply readDataReply = new ReadDataReply(schemaContext, optional.orNull());
+                    ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
 
-                    sender.tell((returnSerialized ? readDataReply.toSerializable():
+                    sender.tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion):
                         readDataReply), self);
 
                 } catch (Exception e) {
@@ -176,11 +176,11 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         final DatastoreContext datastoreContext;
         final ShardStats shardStats;
         final String transactionID;
-        final int txnClientVersion;
+        final short txnClientVersion;
 
         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
                 SchemaContext schemaContext, DatastoreContext datastoreContext,
-                ShardStats shardStats, String transactionID, int txnClientVersion) {
+                ShardStats shardStats, String transactionID, short txnClientVersion) {
             this.transaction = transaction;
             this.shardActor = shardActor;
             this.shardStats = shardStats;
index 78c6a558f4f291bca277c2793676f01fc6686cc7..8ba613958a9a5dfc1e0e2a9b45b921c3b6243b4e 100644 (file)
@@ -12,7 +12,6 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
@@ -46,7 +45,7 @@ public class ShardTransactionChain extends AbstractUntypedActor {
             createTransaction(createTransaction);
         } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
             chain.close();
-            getSender().tell(new CloseTransactionChainReply().toSerializable(), getSelf());
+            getSender().tell(CloseTransactionChainReply.INSTANCE.toSerializable(), getSelf());
         }else{
             unknownMessage(message);
         }
index 44f2c7bd0a2794715d09cb29bb7a5ce42b352344..2e43219523e0d34b5b015b0ef0fc39370dbf89b8 100644 (file)
@@ -43,8 +43,8 @@ public class ShardWriteTransaction extends ShardTransaction {
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
             SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            int txnClientVersion) {
-        super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+            short clientTxVersion) {
+        super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
@@ -66,19 +66,19 @@ public class ShardWriteTransaction extends ShardTransaction {
             deleteData(transaction, (DeleteData) message, !SERIALIZED_REPLY);
 
         } else if (message instanceof ReadyTransaction) {
-            readyTransaction(transaction, new ReadyTransaction(), !SERIALIZED_REPLY);
+            readyTransaction(transaction, !SERIALIZED_REPLY);
 
-        } else if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            writeData(transaction, WriteData.fromSerializable(message, getSchemaContext()), SERIALIZED_REPLY);
+        } else if(WriteData.isSerializedType(message)) {
+            writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY);
 
-        } else if(MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            mergeData(transaction, MergeData.fromSerializable(message, getSchemaContext()), SERIALIZED_REPLY);
+        } else if(MergeData.isSerializedType(message)) {
+            mergeData(transaction, MergeData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
             deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readyTransaction(transaction, new ReadyTransaction(), SERIALIZED_REPLY);
+            readyTransaction(transaction, SERIALIZED_REPLY);
 
         } else if (message instanceof GetCompositedModification) {
             // This is here for testing only
@@ -97,9 +97,9 @@ public class ShardWriteTransaction extends ShardTransaction {
                 new WriteModification(message.getPath(), message.getData(), getSchemaContext()));
         try {
             transaction.write(message.getPath(), message.getData());
-            WriteDataReply writeDataReply = new WriteDataReply();
-            getSender().tell(returnSerialized ? writeDataReply.toSerializable() : writeDataReply,
-                getSelf());
+            WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
+            getSender().tell(returnSerialized ? writeDataReply.toSerializable(message.getVersion()) :
+                writeDataReply, getSelf());
         }catch(Exception e){
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
@@ -114,9 +114,9 @@ public class ShardWriteTransaction extends ShardTransaction {
 
         try {
             transaction.merge(message.getPath(), message.getData());
-            MergeDataReply mergeDataReply = new MergeDataReply();
-            getSender().tell(returnSerialized ? mergeDataReply.toSerializable() : mergeDataReply ,
-                getSelf());
+            MergeDataReply mergeDataReply = MergeDataReply.INSTANCE;
+            getSender().tell(returnSerialized ? mergeDataReply.toSerializable(message.getVersion()) :
+                mergeDataReply, getSelf());
         }catch(Exception e){
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
@@ -137,15 +137,14 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message,
-            boolean returnSerialized) {
+    private void readyTransaction(DOMStoreWriteTransaction transaction, boolean returnSerialized) {
         String transactionID = getTransactionID();
 
         LOG.debug("readyTransaction : {}", transactionID);
 
         DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
 
-        getShardActor().forward(new ForwardedReadyTransaction(transactionID, getTxnClientVersion(),
+        getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
                 cohort, modification, returnSerialized), getContext());
 
         // The shard will handle the commit from here so we're no longer needed - self-destruct.
index 7703f484c73e687391ff5dd9ffe1739afd201c0f..9f48ef96cf517e265970b744b87af014dc51cbca 100644 (file)
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -43,6 +44,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
+import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializableMessage;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -157,8 +159,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(remoteTransactionActorsMB.get()) {
                 for(ActorSelection actor : remoteTransactionActors) {
                     LOG.trace("Sending CloseTransaction to {}", actor);
-                    actorContext.sendOperationAsync(actor,
-                            new CloseTransaction().toSerializable());
+                    actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
                 }
             }
         }
@@ -185,6 +186,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final String transactionChainId;
     private final SchemaContext schemaContext;
     private boolean inReadyState;
+    private final Semaphore operationLimiter;
+    private final OperationCompleter operationCompleter;
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
         this(actorContext, transactionType, "");
@@ -221,6 +224,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             phantomReferenceCache.put(cleanup, cleanup);
         }
 
+        // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
+        this.operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
+        this.operationCompleter = new OperationCompleter(operationLimiter);
+
         LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId);
     }
 
@@ -257,6 +264,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} read {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         return txFutureCallback.enqueueReadOperation(new ReadOperation<Optional<NormalizedNode<?, ?>>>() {
             @Override
@@ -275,6 +284,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} exists {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         return txFutureCallback.enqueueReadOperation(new ReadOperation<Boolean>() {
             @Override
@@ -292,6 +303,25 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 "Transaction is sealed - further modifications are not allowed");
     }
 
+    private void throttleOperation() {
+        throttleOperation(1);
+    }
+
+    private void throttleOperation(int acquirePermits) {
+        try {
+            if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
+                LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
+            }
+        } catch (InterruptedException e) {
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
+            } else {
+                LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
+            }
+        }
+    }
+
+
     @Override
     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
 
@@ -299,6 +329,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} write {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
             @Override
@@ -315,6 +347,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} merge {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
             @Override
@@ -331,6 +365,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} delete {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
             @Override
@@ -345,6 +381,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
+        throttleOperation(txFutureCallbackMap.size());
+
         inReadyState = true;
 
         LOG.debug("Tx {} Readying {} transactions for commit", identifier,
@@ -617,10 +655,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
 
-
-
-
-
         /**
          * Performs a CreateTransaction try async.
          */
@@ -672,7 +706,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                     LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
                             failure.getMessage());
 
-                    localTransactionContext = new NoOpTransactionContext(failure, identifier);
+                    localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
                 } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
                     localTransactionContext = createValidTransactionContext(
                             CreateTransactionReply.fromSerializable(response));
@@ -680,7 +714,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                     IllegalArgumentException exception = new IllegalArgumentException(String.format(
                         "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-                    localTransactionContext = new NoOpTransactionContext(exception, identifier);
+                    localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter);
                 }
 
                 for(TransactionOperation oper: txOperationsOnComplete) {
@@ -717,7 +751,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
 
             return new TransactionContextImpl(transactionPath, transactionActor, identifier,
-                actorContext, schemaContext, isTxActorLocal, reply.getVersion());
+                actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
         }
     }
 
@@ -759,37 +793,49 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
 
         private final ActorContext actorContext;
-        private final SchemaContext schemaContext;
         private final String transactionPath;
         private final ActorSelection actor;
         private final boolean isTxActorLocal;
-        private final int remoteTransactionVersion;
+        private final short remoteTransactionVersion;
+        private final OperationCompleter operationCompleter;
+
 
         private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
                 ActorContext actorContext, SchemaContext schemaContext,
-                boolean isTxActorLocal, int remoteTransactionVersion) {
+                boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) {
             super(identifier);
             this.transactionPath = transactionPath;
             this.actor = actor;
             this.actorContext = actorContext;
-            this.schemaContext = schemaContext;
             this.isTxActorLocal = isTxActorLocal;
             this.remoteTransactionVersion = remoteTransactionVersion;
+            this.operationCompleter = operationCompleter;
+        }
+
+        private Future<Object> completeOperation(Future<Object> operationFuture){
+            operationFuture.onComplete(this.operationCompleter, actorContext.getActorSystem().dispatcher());
+            return operationFuture;
         }
 
+
         private ActorSelection getActor() {
             return actor;
         }
 
         private Future<Object> executeOperationAsync(SerializableMessage msg) {
-            return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable());
+            return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
+        }
+
+        private Future<Object> executeOperationAsync(VersionedSerializableMessage msg) {
+            return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
+                    msg.toSerializable(remoteTransactionVersion)));
         }
 
         @Override
         public void closeTransaction() {
             LOG.debug("Tx {} closeTransaction called", identifier);
 
-            actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
+            actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable());
         }
 
         @Override
@@ -799,7 +845,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
             // Send the ReadyTransaction message to the Tx actor.
 
-            final Future<Object> replyFuture = executeOperationAsync(new ReadyTransaction());
+            final Future<Object> replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
 
             // Combine all the previously recorded put/merge/delete operation reply Futures and the
             // ReadyTransactionReply Future into one Future. If any one fails then the combined
@@ -846,7 +892,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                         // At some point in the future when upgrades from Helium are not supported
                         // we could remove this code to resolvePath and just use the cohortPath as the
                         // resolved cohortPath
-                        if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
+                        if(TransactionContextImpl.this.remoteTransactionVersion <
+                                DataStoreVersions.HELIUM_1_VERSION) {
                             cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
                         }
 
@@ -872,14 +919,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
 
-            recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data, schemaContext)));
+            recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data)));
         }
 
         @Override
         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} writeData called path = {}", identifier, path);
 
-            recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data, schemaContext)));
+            recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data)));
         }
 
         @Override
@@ -950,8 +997,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                             ReadDataReply reply = (ReadDataReply) readResponse;
                             returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
 
-                        } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
-                            ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse);
+                        } else if (ReadDataReply.isSerializedType(readResponse)) {
+                            ReadDataReply reply = ReadDataReply.fromSerializable(readResponse);
                             returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
 
                         } else {
@@ -1055,10 +1102,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
 
         private final Throwable failure;
+        private final Semaphore operationLimiter;
 
-        public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier){
+        public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, Semaphore operationLimiter){
             super(identifier);
             this.failure = failure;
+            this.operationLimiter = operationLimiter;
         }
 
         @Override
@@ -1069,28 +1118,33 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         @Override
         public Future<ActorSelection> readyTransaction() {
             LOG.debug("Tx {} readyTransaction called", identifier);
+            operationLimiter.release();
             return akka.dispatch.Futures.failed(failure);
         }
 
         @Override
         public void deleteData(YangInstanceIdentifier path) {
             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+            operationLimiter.release();
         }
 
         @Override
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+            operationLimiter.release();
         }
 
         @Override
         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} writeData called path = {}", identifier, path);
+            operationLimiter.release();
         }
 
         @Override
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
                 YangInstanceIdentifier path) {
             LOG.debug("Tx {} readData called path = {}", identifier, path);
+            operationLimiter.release();
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error reading data for path " + path, failure));
         }
@@ -1099,8 +1153,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         public CheckedFuture<Boolean, ReadFailedException> dataExists(
                 YangInstanceIdentifier path) {
             LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+            operationLimiter.release();
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error checking exists for path " + path, failure));
         }
     }
+
+    private static class OperationCompleter extends OnComplete<Object> {
+        private final Semaphore operationLimiter;
+        OperationCompleter(Semaphore operationLimiter){
+            this.operationLimiter = operationLimiter;
+        }
+
+        @Override
+        public void onComplete(Throwable throwable, Object o){
+            this.operationLimiter.release();
+        }
+    }
 }
index 79c6b036fa997c94fef08517335d4315cae0629f..3680aec4f36035abbdb36b4b84f7fda93375fc2d 100644 (file)
@@ -14,8 +14,13 @@ public class AbortTransactionReply implements SerializableMessage {
     public static final Class<ThreePhaseCommitCohortMessages.AbortTransactionReply> SERIALIZABLE_CLASS =
             ThreePhaseCommitCohortMessages.AbortTransactionReply.class;
 
+    private static final Object SERIALIZED_INSTANCE =
+            ThreePhaseCommitCohortMessages.AbortTransactionReply.newBuilder().build();
+
+    public static final AbortTransactionReply INSTANCE = new AbortTransactionReply();
+
     @Override
     public Object toSerializable() {
-        return ThreePhaseCommitCohortMessages.AbortTransactionReply.newBuilder().build();
+        return SERIALIZED_INSTANCE;
     }
 }
index 4d121bae0adbc1aa0dee6d2888b0982293c70810..7db4846ef4ff69c4c318ab459a07cc09b7943a47 100644 (file)
@@ -14,23 +14,30 @@ public class CanCommitTransactionReply implements SerializableMessage {
     public static final Class<ThreePhaseCommitCohortMessages.CanCommitTransactionReply> SERIALIZABLE_CLASS =
             ThreePhaseCommitCohortMessages.CanCommitTransactionReply.class;
 
-    private final Boolean canCommit;
+    public static final CanCommitTransactionReply YES = new CanCommitTransactionReply(true);
+    public static final CanCommitTransactionReply NO = new CanCommitTransactionReply(false);
 
-    public CanCommitTransactionReply(final Boolean canCommit) {
+    private final boolean canCommit;
+    private final Object serializedMessage;
+
+    private CanCommitTransactionReply(final boolean canCommit) {
         this.canCommit = canCommit;
+        this.serializedMessage = ThreePhaseCommitCohortMessages.CanCommitTransactionReply.newBuilder().
+                setCanCommit(canCommit).build();
     }
 
-    public Boolean getCanCommit() {
+    public boolean getCanCommit() {
         return canCommit;
     }
 
     @Override
     public Object toSerializable() {
-        return ThreePhaseCommitCohortMessages.CanCommitTransactionReply.newBuilder().setCanCommit(canCommit).build();
+        return serializedMessage;
     }
 
     public static CanCommitTransactionReply fromSerializable(final Object message) {
-        return new CanCommitTransactionReply(
-                ((ThreePhaseCommitCohortMessages.CanCommitTransactionReply) message).getCanCommit());
+        ThreePhaseCommitCohortMessages.CanCommitTransactionReply serialized =
+                (ThreePhaseCommitCohortMessages.CanCommitTransactionReply) message;
+        return serialized.getCanCommit() ? YES : NO;
     }
 }
index c73111f2dbde517989214c7bd5a17145df63bc2c..ef1aac8d4b81ae4b6f1f4bee125b7dfb130d14e6 100644 (file)
@@ -11,10 +11,16 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class CloseTransaction implements SerializableMessage{
-  public static final Class<ShardTransactionMessages.CloseTransaction> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.CloseTransaction.class;
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.CloseTransaction.newBuilder().build();
-  }
+    public static final Class<ShardTransactionMessages.CloseTransaction> SERIALIZABLE_CLASS =
+            ShardTransactionMessages.CloseTransaction.class;
+
+    private static final Object SERIALIZED_INSTANCE =
+            ShardTransactionMessages.CloseTransaction.newBuilder().build();
+
+    public static final CloseTransaction INSTANCE = new CloseTransaction();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
index c001ae185a6dc9641f08f2440d7825624936cb1d..b4673e8a08042668105091223b7723844037edd7 100644 (file)
@@ -11,11 +11,16 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages;
 
 public class CloseTransactionChainReply implements SerializableMessage {
-  public static final Class<ShardTransactionChainMessages.CloseTransactionChainReply> SERIALIZABLE_CLASS =
-          ShardTransactionChainMessages.CloseTransactionChainReply.class;
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionChainMessages.CloseTransactionChainReply.newBuilder().build();
-  }
+    public static final Class<ShardTransactionChainMessages.CloseTransactionChainReply> SERIALIZABLE_CLASS =
+            ShardTransactionChainMessages.CloseTransactionChainReply.class;
 
+    private static final Object SERIALIZED_INSTANCE =
+            ShardTransactionChainMessages.CloseTransactionChainReply.newBuilder().build();
+
+    public static final CloseTransactionChainReply INSTANCE = new CloseTransactionChainReply();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
index 124eeb2235f04d64e30dd33c581166a1edb347fc..1c47a1827f11f86f26ee13c2d81237adcdd55f6b 100644 (file)
@@ -11,10 +11,16 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class CloseTransactionReply implements SerializableMessage {
-  public static final Class<ShardTransactionMessages.CloseTransactionReply> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.CloseTransactionReply.class;
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.CloseTransactionReply.newBuilder().build();
-  }
+    public static final Class<ShardTransactionMessages.CloseTransactionReply> SERIALIZABLE_CLASS =
+            ShardTransactionMessages.CloseTransactionReply.class;
+
+    private static final Object SERIALIZED_INSTANCE =
+            ShardTransactionMessages.CloseTransactionReply.newBuilder().build();
+
+    public static final CloseTransactionReply INSTANCE = new CloseTransactionReply();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
index 3d4a168450bbf4ef07b23a1071e7a6678fba3c13..47adea5ea0305678de89c8b11c4c87a6ea91bedc 100644 (file)
@@ -14,8 +14,13 @@ public class CommitTransactionReply implements SerializableMessage {
     public static final Class<ThreePhaseCommitCohortMessages.CommitTransactionReply> SERIALIZABLE_CLASS =
             ThreePhaseCommitCohortMessages.CommitTransactionReply.class;
 
+    private static final Object SERIALIZED_INSTANCE =
+            ThreePhaseCommitCohortMessages.CommitTransactionReply.newBuilder().build();
+
+    public static final CommitTransactionReply INSTANCE = new CommitTransactionReply();
+
     @Override
     public Object toSerializable() {
-        return ThreePhaseCommitCohortMessages.CommitTransactionReply.newBuilder().build();
+        return SERIALIZED_INSTANCE;
     }
 }
index bf82e660369c8d4f89e5cf48b22f6555f89bb65e..ea3caef093320ab1b9c4aa7d27e74df2f144b102 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 
@@ -16,24 +17,21 @@ public class CreateTransaction implements SerializableMessage {
     public static final Class<ShardTransactionMessages.CreateTransaction> SERIALIZABLE_CLASS =
             ShardTransactionMessages.CreateTransaction.class;
 
-    public static final int HELIUM_1_VERSION = 1;
-    public static final int CURRENT_VERSION = HELIUM_1_VERSION;
-
     private final String transactionId;
     private final int transactionType;
     private final String transactionChainId;
-    private final int version;
+    private final short version;
 
     public CreateTransaction(String transactionId, int transactionType) {
         this(transactionId, transactionType, "");
     }
 
     public CreateTransaction(String transactionId, int transactionType, String transactionChainId) {
-        this(transactionId, transactionType, transactionChainId, CURRENT_VERSION);
+        this(transactionId, transactionType, transactionChainId, DataStoreVersions.CURRENT_VERSION);
     }
 
     private CreateTransaction(String transactionId, int transactionType, String transactionChainId,
-            int version) {
+            short version) {
         this.transactionId = transactionId;
         this.transactionType = transactionType;
         this.transactionChainId = transactionChainId;
@@ -48,7 +46,7 @@ public class CreateTransaction implements SerializableMessage {
         return transactionType;
     }
 
-    public int getVersion() {
+    public short getVersion() {
         return version;
     }
 
@@ -66,7 +64,7 @@ public class CreateTransaction implements SerializableMessage {
             (ShardTransactionMessages.CreateTransaction) message;
         return new CreateTransaction(createTransaction.getTransactionId(),
             createTransaction.getTransactionType(), createTransaction.getTransactionChainId(),
-            createTransaction.getMessageVersion());
+            (short)createTransaction.getMessageVersion());
     }
 
     public String getTransactionChainId() {
index 83e68c9cb41882c2b7c377f96aaecb5f5a07f328..3fde6cc7fc2477eba6c6f466cd376d6ff132d1e1 100644 (file)
@@ -8,23 +8,22 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class CreateTransactionReply implements SerializableMessage {
 
-    public static final Class<ShardTransactionMessages.CreateTransactionReply> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.CreateTransactionReply.class;
+    public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransactionReply.class;
     private final String transactionPath;
     private final String transactionId;
-    private final int version;
+    private final short version;
 
-    public CreateTransactionReply(final String transactionPath,
-        final String transactionId) {
-        this(transactionPath, transactionId, CreateTransaction.CURRENT_VERSION);
+    public CreateTransactionReply(String transactionPath, String transactionId) {
+        this(transactionPath, transactionId, DataStoreVersions.CURRENT_VERSION);
     }
 
     public CreateTransactionReply(final String transactionPath,
-                                  final String transactionId, final int version) {
+                                  final String transactionId, final short version) {
         this.transactionPath = transactionPath;
         this.transactionId = transactionId;
         this.version = version;
@@ -39,7 +38,7 @@ public class CreateTransactionReply implements SerializableMessage {
         return transactionId;
     }
 
-    public int getVersion() {
+    public short getVersion() {
         return version;
     }
 
@@ -52,9 +51,10 @@ public class CreateTransactionReply implements SerializableMessage {
             .build();
     }
 
-    public static CreateTransactionReply fromSerializable(final Object serializable){
+    public static CreateTransactionReply fromSerializable(Object serializable){
         ShardTransactionMessages.CreateTransactionReply o = (ShardTransactionMessages.CreateTransactionReply) serializable;
-        return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(), o.getMessageVersion());
+        return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(),
+                (short)o.getMessageVersion());
     }
 
 }
index 5b5f076d43713b48e6245a35a6d21684ecf909cb..fe81e27e3dcdec117efc637f8dd392c37e0c8c13 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class DataChanged implements SerializableMessage {
-    public static final Class<DataChangeListenerMessages.DataChanged> SERIALIZABLE_CLASS =
-        DataChangeListenerMessages.DataChanged.class;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
 
-    final private SchemaContext schemaContext;
-    private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
-        change;
+public class DataChanged implements Externalizable {
+    private static final long serialVersionUID = 1L;
 
+    private AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
 
+    public DataChanged() {
+    }
 
-    public DataChanged(SchemaContext schemaContext,
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+    public DataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
         this.change = change;
-        this.schemaContext = schemaContext;
     }
 
-
     public AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> getChange() {
         return change;
     }
 
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        in.readShort(); // Read the version
 
-    private NormalizedNodeMessages.Node convertToNodeTree(
-        NormalizedNode<?, ?> normalizedNode) {
+        NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
 
-        return new NormalizedNodeToNodeCodec(schemaContext)
-            .encode(normalizedNode)
-            .getNormalizedNode();
+        // Note: the scope passed to builder is not actually used.
+        Builder builder = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE);
 
-    }
+        // Read created data
 
-    private Iterable<NormalizedNodeMessages.InstanceIdentifier> convertToRemovePaths(
-        Set<YangInstanceIdentifier> removedPaths) {
-        final Set<NormalizedNodeMessages.InstanceIdentifier> removedPathInstanceIds = new HashSet<>();
-        for (YangInstanceIdentifier id : removedPaths) {
-            removedPathInstanceIds.add(InstanceIdentifierUtils.toSerializable(id));
+        int size = in.readInt();
+        for(int i = 0; i < size; i++) {
+            YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+            NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+            builder.addCreated(path, node);
         }
-        return new Iterable<NormalizedNodeMessages.InstanceIdentifier>() {
-            @Override
-            public Iterator<NormalizedNodeMessages.InstanceIdentifier> iterator() {
-                return removedPathInstanceIds.iterator();
-            }
-        };
 
-    }
+        // Read updated data
 
-    private NormalizedNodeMessages.NodeMap convertToNodeMap(
-        Map<YangInstanceIdentifier, NormalizedNode<?, ?>> data) {
-        NormalizedNodeToNodeCodec normalizedNodeToNodeCodec =
-            new NormalizedNodeToNodeCodec(schemaContext);
-        NormalizedNodeMessages.NodeMap.Builder nodeMapBuilder =
-            NormalizedNodeMessages.NodeMap.newBuilder();
-        NormalizedNodeMessages.NodeMapEntry.Builder builder =
-            NormalizedNodeMessages.NodeMapEntry.newBuilder();
-        for (Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data
-            .entrySet()) {
-
-
-            NormalizedNodeMessages.InstanceIdentifier instanceIdentifier =
-                InstanceIdentifierUtils.toSerializable(entry.getKey());
-
-            builder.setInstanceIdentifierPath(instanceIdentifier)
-                .setNormalizedNode(normalizedNodeToNodeCodec
-                    .encode(entry.getValue())
-                    .getNormalizedNode());
-            nodeMapBuilder.addMapEntries(builder.build());
+        size = in.readInt();
+        for(int i = 0; i < size; i++) {
+            YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+            NormalizedNode<?, ?> before = streamReader.readNormalizedNode();
+            NormalizedNode<?, ?> after = streamReader.readNormalizedNode();
+            builder.addUpdated(path, before, after);
         }
-        return nodeMapBuilder.build();
-    }
-
 
-    @Override
-    public Object toSerializable() {
-        return DataChangeListenerMessages.DataChanged.newBuilder()
-            .addAllRemovedPaths(convertToRemovePaths(change.getRemovedPaths()))
-            .setCreatedData(convertToNodeMap(change.getCreatedData()))
-            .setOriginalData(convertToNodeMap(change.getOriginalData()))
-            .setUpdatedData(convertToNodeMap(change.getUpdatedData()))
-            .setOriginalSubTree(convertToNodeTree(change.getOriginalSubtree()))
-            .setUpdatedSubTree(convertToNodeTree(change.getUpdatedSubtree()))
-            .build();
-    }
+        // Read removed data
 
-    public static DataChanged fromSerialize(SchemaContext sc, Object message,
-        YangInstanceIdentifier pathId) {
-        DataChangeListenerMessages.DataChanged dataChanged =
-            (DataChangeListenerMessages.DataChanged) message;
-        DataChangedEvent event = new DataChangedEvent(sc);
-        if (dataChanged.getCreatedData() != null && dataChanged.getCreatedData()
-            .isInitialized()) {
-            event.setCreatedData(dataChanged.getCreatedData());
-        }
-        if (dataChanged.getOriginalData() != null && dataChanged
-            .getOriginalData().isInitialized()) {
-            event.setOriginalData(dataChanged.getOriginalData());
+        size = in.readInt();
+        for(int i = 0; i < size; i++) {
+            YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+            NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+            builder.addRemoved(path, node);
         }
 
-        if (dataChanged.getUpdatedData() != null && dataChanged.getUpdatedData()
-            .isInitialized()) {
-            event.setUpdateData(dataChanged.getUpdatedData());
-        }
+        // Read original subtree
 
-        if (dataChanged.getOriginalSubTree() != null && dataChanged
-            .getOriginalSubTree().isInitialized()) {
-            event.setOriginalSubtree(dataChanged.getOriginalSubTree(), pathId);
+        boolean present = in.readBoolean();
+        if(present) {
+            builder.setBefore(streamReader.readNormalizedNode());
         }
 
-        if (dataChanged.getUpdatedSubTree() != null && dataChanged
-            .getUpdatedSubTree().isInitialized()) {
-            event.setUpdatedSubtree(dataChanged.getOriginalSubTree(), pathId);
-        }
+        // Read updated subtree
 
-        if (dataChanged.getRemovedPathsList() != null && !dataChanged
-            .getRemovedPathsList().isEmpty()) {
-            event.setRemovedPaths(dataChanged.getRemovedPathsList());
+        present = in.readBoolean();
+        if(present) {
+            builder.setAfter(streamReader.readNormalizedNode());
         }
 
-        return new DataChanged(sc, event);
-
+        change = builder.build();
     }
 
-    static class DataChangedEvent implements
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
-        private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData;
-        private final NormalizedNodeToNodeCodec nodeCodec;
-        private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updatedData;
-        private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> originalData;
-        private NormalizedNode<?, ?> originalSubTree;
-        private NormalizedNode<?, ?> updatedSubTree;
-        private Set<YangInstanceIdentifier> removedPathIds;
-
-        DataChangedEvent(SchemaContext schemaContext) {
-            nodeCodec = new NormalizedNodeToNodeCodec(schemaContext);
-        }
-
-        @Override
-        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
-            if(createdData == null){
-                return Collections.emptyMap();
-            }
-            return createdData;
-        }
-
-        DataChangedEvent setCreatedData(
-            NormalizedNodeMessages.NodeMap nodeMap) {
-            this.createdData = convertNodeMapToMap(nodeMap);
-            return this;
-        }
-
-        private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> convertNodeMapToMap(
-            NormalizedNodeMessages.NodeMap nodeMap) {
-            Map<YangInstanceIdentifier, NormalizedNode<?, ?>> mapEntries =
-                new HashMap<YangInstanceIdentifier, NormalizedNode<?, ?>>();
-            for (NormalizedNodeMessages.NodeMapEntry nodeMapEntry : nodeMap
-                .getMapEntriesList()) {
-                YangInstanceIdentifier id = InstanceIdentifierUtils
-                    .fromSerializable(nodeMapEntry.getInstanceIdentifierPath());
-                mapEntries.put(id,
-                    nodeCodec.decode(nodeMapEntry.getNormalizedNode()));
-            }
-            return mapEntries;
-        }
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(DataStoreVersions.CURRENT_VERSION);
 
+        NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+        NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(streamWriter);
 
-        @Override
-        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
-            if(updatedData == null){
-                return Collections.emptyMap();
-            }
-            return updatedData;
-        }
+        // Write created data
 
-        DataChangedEvent setUpdateData(NormalizedNodeMessages.NodeMap nodeMap) {
-            this.updatedData = convertNodeMapToMap(nodeMap);
-            return this;
+        Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData = change.getCreatedData();
+        out.writeInt(createdData.size());
+        for(Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: createdData.entrySet()) {
+            streamWriter.writeYangInstanceIdentifier(e.getKey());
+            nodeWriter.write(e.getValue());
         }
 
-        @Override
-        public Set<YangInstanceIdentifier> getRemovedPaths() {
-            if (removedPathIds == null) {
-                return Collections.emptySet();
-            }
-            return removedPathIds;
-        }
+        // Write updated data
 
-        public DataChangedEvent setRemovedPaths(List<NormalizedNodeMessages.InstanceIdentifier> removedPaths) {
-            Set<YangInstanceIdentifier> removedIds = new HashSet<>();
-            for (NormalizedNodeMessages.InstanceIdentifier path : removedPaths) {
-                removedIds.add(InstanceIdentifierUtils.fromSerializable(path));
-            }
-            this.removedPathIds = removedIds;
-            return this;
+        Map<YangInstanceIdentifier, NormalizedNode<?, ?>> originalData = change.getOriginalData();
+        Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updatedData = change.getUpdatedData();
+        out.writeInt(updatedData.size());
+        for(Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: updatedData.entrySet()) {
+            streamWriter.writeYangInstanceIdentifier(e.getKey());
+            nodeWriter.write(originalData.get(e.getKey()));
+            nodeWriter.write(e.getValue());
         }
 
-        @Override
-        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getOriginalData() {
-            if (originalData == null) {
-                Collections.emptyMap();
-            }
-            return originalData;
-        }
+        // Write removed data
 
-        DataChangedEvent setOriginalData(
-            NormalizedNodeMessages.NodeMap nodeMap) {
-            this.originalData = convertNodeMapToMap(nodeMap);
-            return this;
+        Set<YangInstanceIdentifier> removed = change.getRemovedPaths();
+        out.writeInt(removed.size());
+        for(YangInstanceIdentifier path: removed) {
+            streamWriter.writeYangInstanceIdentifier(path);
+            nodeWriter.write(originalData.get(path));
         }
 
-        @Override
-        public NormalizedNode<?, ?> getOriginalSubtree() {
-            return originalSubTree;
-        }
+        // Write original subtree
 
-        DataChangedEvent setOriginalSubtree(NormalizedNodeMessages.Node node,
-            YangInstanceIdentifier instanceIdentifierPath) {
-            originalSubTree = nodeCodec.decode(node);
-            return this;
+        NormalizedNode<?, ?> originalSubtree = change.getOriginalSubtree();
+        out.writeBoolean(originalSubtree != null);
+        if(originalSubtree != null) {
+            nodeWriter.write(originalSubtree);
         }
 
-        @Override
-        public NormalizedNode<?, ?> getUpdatedSubtree() {
-            return updatedSubTree;
-        }
+        // Write original subtree
 
-        DataChangedEvent setUpdatedSubtree(NormalizedNodeMessages.Node node,
-            YangInstanceIdentifier instanceIdentifierPath) {
-            updatedSubTree = nodeCodec.decode(node);
-            return this;
+        NormalizedNode<?, ?> updatedSubtree = change.getUpdatedSubtree();
+        out.writeBoolean(updatedSubtree != null);
+        if(updatedSubtree != null) {
+            nodeWriter.write(updatedSubtree);
         }
-
-
     }
-
-
-
 }
index e10a40729220941761a925350b5ac25288b0e90c..2db03446d9145abb664ec1e1b379ba0d918205c1 100644 (file)
@@ -11,10 +11,16 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages;
 
 public class DataChangedReply implements SerializableMessage {
-  public static final Class<DataChangeListenerMessages.DataChangedReply> SERIALIZABLE_CLASS =
-          DataChangeListenerMessages.DataChangedReply.class;
-  @Override
-  public Object toSerializable() {
-    return DataChangeListenerMessages.DataChangedReply.newBuilder().build();
-  }
+    public static final Class<DataChangeListenerMessages.DataChangedReply> SERIALIZABLE_CLASS =
+            DataChangeListenerMessages.DataChangedReply.class;
+
+    private static final Object SERIALIZED_INSTANCE =
+            DataChangeListenerMessages.DataChangedReply.newBuilder().build();
+
+    public static final DataChangedReply INSTANCE = new DataChangedReply();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
index 2e02664e1dcd41bc032fb2be2a2913baf0e000f5..eba9c39170b4eef45509eb0de9f488172c5f8bfb 100644 (file)
@@ -11,10 +11,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class DeleteDataReply implements SerializableMessage{
-  public static final Class<ShardTransactionMessages.DeleteDataReply> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.DeleteDataReply.class;
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.DeleteDataReply.newBuilder().build();
-  }
+    public static final Class<ShardTransactionMessages.DeleteDataReply> SERIALIZABLE_CLASS =
+            ShardTransactionMessages.DeleteDataReply.class;
+
+    private static final Object SERIALIZED_INSTANCE = ShardTransactionMessages.DeleteDataReply.newBuilder().build();
+
+    public static final DeleteDataReply INSTANCE = new DeleteDataReply();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyExternalizable.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyExternalizable.java
new file mode 100644 (file)
index 0000000..0b7b262
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Externalizable with no data.
+ *
+ * @author Thomas Pantelis
+ */
+public class EmptyExternalizable implements Externalizable {
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/EmptyReply.java
new file mode 100644 (file)
index 0000000..284c6ef
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+
+/**
+ * A reply with no data.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class EmptyReply extends EmptyExternalizable implements VersionedSerializableMessage {
+
+    private final Object legacySerializedInstance;
+
+    protected EmptyReply(Object legacySerializedInstance) {
+        super();
+        this.legacySerializedInstance = legacySerializedInstance;
+    }
+
+    @Override
+    public Object toSerializable(short toVersion) {
+        return toVersion >= DataStoreVersions.LITHIUM_VERSION ? this : legacySerializedInstance;
+    }
+}
index eb1f3495bdf2c04328ad5336eb44b248149bd242..9234385b3536bfbe8ccaf528075efcdedc1be792 100644 (file)
@@ -8,36 +8,54 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public class MergeData extends ModifyData{
+public class MergeData extends ModifyData implements VersionedSerializableMessage {
+    private static final long serialVersionUID = 1L;
 
-    public static final Class<ShardTransactionMessages.MergeData> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.MergeData.class;
+    public static final Class<MergeData> SERIALIZABLE_CLASS = MergeData.class;
 
-    public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data,
-        SchemaContext context) {
-        super(path, data, context);
+    public MergeData() {
+    }
+
+    public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        super(path, data);
     }
 
     @Override
-    public Object toSerializable() {
-        Encoded encoded = new NormalizedNodeToNodeCodec(schemaContext).encode(path, data);
-        return ShardTransactionMessages.MergeData.newBuilder()
-            .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
-            .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+    public Object toSerializable(short toVersion) {
+        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
+            setVersion(toVersion);
+            return this;
+        } else {
+            // To base or R1 Helium version
+            Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(getPath(), getData());
+            return ShardTransactionMessages.MergeData.newBuilder()
+                    .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+                    .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+        }
+    }
+
+    public static MergeData fromSerializable(Object serializable){
+        if(serializable instanceof MergeData) {
+            return (MergeData) serializable;
+        } else {
+            // From base or R1 Helium version
+            ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
+            Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
+                    o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
+            return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode());
+        }
     }
 
-    public static MergeData fromSerializable(Object serializable, SchemaContext schemaContext){
-        ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
-        Decoded decoded = new NormalizedNodeToNodeCodec(schemaContext).decode(
-                o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
-        return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode(), schemaContext);
+    public static boolean isSerializedType(Object message) {
+        return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) ||
+               message instanceof ShardTransactionMessages.MergeData;
     }
 }
index 92d6d728477aac50dfb68975db69c299249537dc..a4c514bdbf0751b7399ba5928c367089a8210e82 100644 (file)
@@ -10,12 +10,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
-public class MergeDataReply implements SerializableMessage{
-  public static final Class<ShardTransactionMessages.MergeDataReply> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.MergeDataReply.class;
+public class MergeDataReply extends EmptyReply {
+    private static final long serialVersionUID = 1L;
 
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.MergeDataReply.newBuilder().build();
-  }
+    private static final Object LEGACY_SERIALIZED_INSTANCE =
+            ShardTransactionMessages.MergeDataReply.newBuilder().build();
+
+    public static final MergeDataReply INSTANCE = new MergeDataReply();
+
+    public MergeDataReply() {
+        super(LEGACY_SERIALIZED_INSTANCE);
+    }
 }
index b5c39d1c3fa630a674efb1302d3441f8dd8403ed..69c41c2a5663f5021a7a22401aef48ca9b3986d2 100644 (file)
@@ -8,25 +8,28 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils.Applier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public abstract class ModifyData implements SerializableMessage {
-    protected final YangInstanceIdentifier path;
-    protected final NormalizedNode<?, ?> data;
-    protected final SchemaContext schemaContext;
+public abstract class ModifyData implements Externalizable {
+    private static final long serialVersionUID = 1L;
 
-    public ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data,
-        SchemaContext context) {
-        Preconditions.checkNotNull(context,
-            "Cannot serialize an object which does not have a schema schemaContext");
+    private YangInstanceIdentifier path;
+    private NormalizedNode<?, ?> data;
+    private short version;
 
+    protected ModifyData() {
+    }
 
+    protected ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         this.path = path;
         this.data = data;
-        this.schemaContext = context;
     }
 
     public YangInstanceIdentifier getPath() {
@@ -37,4 +40,31 @@ public abstract class ModifyData implements SerializableMessage {
         return data;
     }
 
+    public short getVersion() {
+        return version;
+    }
+
+    protected void setVersion(short version) {
+        this.version = version;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        version = in.readShort();
+        SerializationUtils.deserializePathAndNode(in, this, APPLIER);
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(version);
+        SerializationUtils.serializePathAndNode(path, data, out);
+    }
+
+    private static final Applier<ModifyData> APPLIER = new Applier<ModifyData>() {
+        @Override
+        public void apply(ModifyData instance, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+            instance.path = path;
+            instance.data = data;
+        }
+    };
 }
index 43dd81252c3a52f35da645e6993d716e2a0d7715..8ac6e1b1494a68fd3e7f2f04e0081a4d7f538a10 100644 (file)
@@ -9,23 +9,29 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.protobuf.ByteString;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public class ReadDataReply implements SerializableMessage {
-    public static final Class<ShardTransactionMessages.ReadDataReply> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.ReadDataReply.class;
+public class ReadDataReply implements VersionedSerializableMessage, Externalizable {
+    private static final long serialVersionUID = 1L;
 
-    private final NormalizedNode<?, ?> normalizedNode;
-    private final SchemaContext schemaContext;
+    public static final Class<ReadDataReply> SERIALIZABLE_CLASS = ReadDataReply.class;
 
-    public ReadDataReply(SchemaContext context,NormalizedNode<?, ?> normalizedNode){
+    private NormalizedNode<?, ?> normalizedNode;
+    private short version;
 
+    public ReadDataReply() {
+    }
+
+    public ReadDataReply(NormalizedNode<?, ?> normalizedNode) {
         this.normalizedNode = normalizedNode;
-        this.schemaContext = context;
     }
 
     public NormalizedNode<?, ?> getNormalizedNode() {
@@ -33,26 +39,62 @@ public class ReadDataReply implements SerializableMessage {
     }
 
     @Override
-    public Object toSerializable(){
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        version = in.readShort();
+        normalizedNode = SerializationUtils.deserializeNormalizedNode(in);
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(version);
+        SerializationUtils.serializeNormalizedNode(normalizedNode, out);
+    }
+
+    @Override
+    public Object toSerializable(short toVersion) {
+        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
+            version = toVersion;
+            return this;
+        } else {
+            return toSerializableReadDataReply(normalizedNode);
+        }
+    }
+
+    private static ShardTransactionMessages.ReadDataReply toSerializableReadDataReply(
+            NormalizedNode<?, ?> normalizedNode) {
         if(normalizedNode != null) {
             return ShardTransactionMessages.ReadDataReply.newBuilder()
-                    .setNormalizedNode(new NormalizedNodeToNodeCodec(schemaContext)
-                        .encode(normalizedNode).getNormalizedNode()).build();
+                    .setNormalizedNode(new NormalizedNodeToNodeCodec(null)
+                    .encode(normalizedNode).getNormalizedNode()).build();
         } else {
             return ShardTransactionMessages.ReadDataReply.newBuilder().build();
 
         }
     }
 
-    public static ReadDataReply fromSerializable(SchemaContext schemaContext,
-            YangInstanceIdentifier id, Object serializable) {
-        ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
-        return new ReadDataReply(schemaContext, new NormalizedNodeToNodeCodec(schemaContext).decode(
-                o.getNormalizedNode()));
+    public static ReadDataReply fromSerializable(Object serializable) {
+        if(serializable instanceof ReadDataReply) {
+            return (ReadDataReply) serializable;
+        } else {
+            ShardTransactionMessages.ReadDataReply o =
+                    (ShardTransactionMessages.ReadDataReply) serializable;
+            return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode()));
+        }
+    }
+
+    public static ByteString fromSerializableAsByteString(Object serializable) {
+        if(serializable instanceof ReadDataReply) {
+            ReadDataReply r = (ReadDataReply)serializable;
+            return toSerializableReadDataReply(r.getNormalizedNode()).toByteString();
+        } else {
+            ShardTransactionMessages.ReadDataReply o =
+                    (ShardTransactionMessages.ReadDataReply) serializable;
+            return o.getNormalizedNode().toByteString();
+        }
     }
 
-    public static ByteString getNormalizedNodeByteString(Object serializable){
-        ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
-        return ((ShardTransactionMessages.ReadDataReply) serializable).getNormalizedNode().toByteString();
+    public static boolean isSerializedType(Object message) {
+        return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) ||
+               message instanceof ShardTransactionMessages.ReadDataReply;
     }
 }
index 581caefd04e9fb83263a06cba9b6014f3b92109d..09617abde9b370351a45207d8ea42f60c4c6d5fc 100644 (file)
@@ -11,12 +11,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 public class ReadyTransaction implements SerializableMessage{
-  public static final Class<ShardTransactionMessages.ReadyTransaction> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.ReadyTransaction.class;
+    public static final Class<ShardTransactionMessages.ReadyTransaction> SERIALIZABLE_CLASS =
+            ShardTransactionMessages.ReadyTransaction.class;
 
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.ReadyTransaction.newBuilder().build();
-  }
+    private static final Object SERIALIZED_INSTANCE = ShardTransactionMessages.ReadyTransaction.newBuilder().build();
 
+    public static final ReadyTransaction INSTANCE = new ReadyTransaction();
+
+    @Override
+    public Object toSerializable() {
+        return SERIALIZED_INSTANCE;
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedSerializableMessage.java
new file mode 100644 (file)
index 0000000..5c30b10
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Interface for a Serializable message with versioning.
+ *
+ * @author Thomas Pantelis
+ */
+public interface VersionedSerializableMessage {
+    Object toSerializable(short toVersion);
+}
index 8aa63ef262a1366b63de5844979ed0caafb27208..c5e3a6b05966c9c30f12c5f6f7dab93a78fbebf7 100644 (file)
@@ -8,35 +8,54 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public class WriteData extends ModifyData {
+public class WriteData extends ModifyData implements VersionedSerializableMessage {
+    private static final long serialVersionUID = 1L;
 
-    public static final Class<ShardTransactionMessages.WriteData> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.WriteData.class;
+    public static final Class<WriteData> SERIALIZABLE_CLASS = WriteData.class;
 
-    public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, SchemaContext schemaContext) {
-        super(path, data, schemaContext);
+    public WriteData() {
+    }
+
+    public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        super(path, data);
     }
 
     @Override
-    public Object toSerializable() {
-        Encoded encoded = new NormalizedNodeToNodeCodec(schemaContext).encode(path, data);
-        return ShardTransactionMessages.WriteData.newBuilder()
-                .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
-                .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+    public Object toSerializable(short toVersion) {
+        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
+            setVersion(toVersion);
+            return this;
+        } else {
+            // To base or R1 Helium version
+            Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(getPath(), getData());
+            return ShardTransactionMessages.WriteData.newBuilder()
+                    .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+                    .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+        }
+    }
+
+    public static WriteData fromSerializable(Object serializable) {
+        if(serializable instanceof WriteData) {
+            return (WriteData) serializable;
+        } else {
+            // From base or R1 Helium version
+            ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable;
+            Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
+                    o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
+            return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode());
+        }
     }
 
-    public static WriteData fromSerializable(Object serializable, SchemaContext schemaContext){
-        ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable;
-        Decoded decoded = new NormalizedNodeToNodeCodec(schemaContext).decode(
-                o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
-        return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode(), schemaContext);
+    public static boolean isSerializedType(Object message) {
+        return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) ||
+               message instanceof ShardTransactionMessages.WriteData;
     }
 }
index 876105de18abf7c4beee7b1c7afca342fc4086b2..8255828819cd494a93fc61dacd32bc48ba55edba 100644 (file)
@@ -10,11 +10,15 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
-public class WriteDataReply implements SerializableMessage{
-  public static final Class<ShardTransactionMessages.WriteDataReply> SERIALIZABLE_CLASS =
-          ShardTransactionMessages.WriteDataReply.class;
-  @Override
-  public Object toSerializable() {
-    return ShardTransactionMessages.WriteDataReply.newBuilder().build();
-  }
+public class WriteDataReply extends EmptyReply {
+    private static final long serialVersionUID = 1L;
+
+    private static final Object LEGACY_SERIALIZED_INSTANCE =
+            ShardTransactionMessages.WriteDataReply.newBuilder().build();
+
+    public static final WriteDataReply INSTANCE = new WriteDataReply();
+
+    public WriteDataReply() {
+        super(LEGACY_SERIALIZED_INSTANCE);
+    }
 }
index f217d05bb21a12e6f92add47da5536c6f6fe12d9..c9fdf389311f73c70ca2e0f16dae8e86b7cc0a05 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import static akka.pattern.Patterns.ask;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -21,6 +22,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
@@ -45,8 +47,6 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import static akka.pattern.Patterns.ask;
-
 /**
  * The ActorContext class contains utility methods which could be used by
  * non-actors (like DistributedDataStore) to work with actors a little more
@@ -84,6 +84,7 @@ public class ActorContext {
     private final FiniteDuration operationDuration;
     private final Timeout operationTimeout;
     private final String selfAddressHostPort;
+    private final int transactionOutstandingOperationLimit;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
@@ -110,6 +111,8 @@ public class ActorContext {
         } else {
             selfAddressHostPort = null;
         }
+
+        transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -431,4 +434,16 @@ public class ActorContext {
 
         return builder.toString();
     }
+
+    /**
+     * Get the maximum number of operations that are to be permitted within a transaction before the transaction
+     * should begin throttling the operations
+     *
+     * Parking reading this configuration here because we need to get to the actor system settings
+     *
+     * @return
+     */
+    public int getTransactionOutstandingOperationLimit(){
+        return transactionOutstandingOperationLimit;
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java
new file mode 100644 (file)
index 0000000..8404a6e
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Preconditions;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+
+/**
+ * Provides various utility methods for serialization and de-serialization.
+ *
+ * @author Thomas Pantelis
+ */
+public final class SerializationUtils {
+    public static interface Applier<T> {
+        void apply(T instance, YangInstanceIdentifier path, NormalizedNode<?, ?> node);
+    }
+
+    public static void serializePathAndNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node,
+            DataOutput out) {
+        Preconditions.checkNotNull(path);
+        Preconditions.checkNotNull(node);
+        try {
+            NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+            NormalizedNodeWriter.forStreamWriter(streamWriter).write(node);
+            streamWriter.writeYangInstanceIdentifier(path);
+        } catch (IOException e) {
+            throw new IllegalArgumentException(String.format("Error serializing path {} and Node {}",
+                    path, node), e);
+        }
+    }
+
+    public static <T> void deserializePathAndNode(DataInput in, T instance, Applier<T> applier) {
+        try {
+            NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+            NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+            YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+            applier.apply(instance, path, node);
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Error deserializing path and Node", e);
+        }
+    }
+
+    public static void serializeNormalizedNode(NormalizedNode<?, ?> node, DataOutput out) {
+        try {
+            out.writeBoolean(node != null);
+            if(node != null) {
+                NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+                NormalizedNodeWriter.forStreamWriter(streamWriter).write(node);
+            }
+        } catch (IOException e) {
+            throw new IllegalArgumentException(String.format("Error serializing NormalizedNode {}",
+                    node), e);
+        }
+    }
+
+    public static NormalizedNode<?, ?> deserializeNormalizedNode(DataInput in) {
+            try {
+                boolean present = in.readBoolean();
+                if(present) {
+                    NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+                    return streamReader.readNormalizedNode();
+                }
+            } catch (IOException e) {
+                throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
+            }
+
+        return null;
+    }
+}
index 55250dd5e9cd3ea71c0356f5313c09d930700c00..5485c57fd6dabd6c8cc0e32dcf757dddfabaa8ec 100644 (file)
@@ -17,11 +17,9 @@ import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
 public class DataChangeListenerProxyTest extends AbstractActorTest {
 
   private static class MockDataChangedEvent implements AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
@@ -73,7 +71,7 @@ public class DataChangeListenerProxyTest extends AbstractActorTest {
         final ActorRef actorRef = getSystem().actorOf(props);
 
         DataChangeListenerProxy dataChangeListenerProxy = new DataChangeListenerProxy(
-                TestModel.createTestContext(), getSystem().actorSelection(actorRef.path()));
+                getSystem().actorSelection(actorRef.path()));
 
         dataChangeListenerProxy.onDataChanged(new MockDataChangedEvent());
 
index 25d47388fe49579fdb66271e4f33ca7effeef314..19f0f8c5514663224f1bdb83c083bf202a4a4c93 100644 (file)
@@ -29,7 +29,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
             // Let the DataChangeListener know that notifications should be enabled
             subject.tell(new EnableNotification(true), getRef());
 
-            subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
+            subject.tell(new DataChanged(mockChangeEvent),
                     getRef());
 
             expectMsgClass(DataChangedReply.class);
@@ -48,7 +48,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
             final ActorRef subject =
                 getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
 
-            subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
+            subject.tell(new DataChanged(mockChangeEvent),
                     getRef());
 
             new Within(duration("1 seconds")) {
@@ -74,8 +74,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
 
             getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
 
-            subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
-                    ActorRef.noSender());
+            subject.tell(new DataChanged(mockChangeEvent), ActorRef.noSender());
 
             // Make sure no DataChangedReply is sent to DeadLetters.
             while(true) {
@@ -113,13 +112,13 @@ public class DataChangeListenerTest extends AbstractActorTest {
 
             SchemaContext schemaContext = CompositeModel.createTestContext();
 
-            subject.tell(new DataChanged(schemaContext, mockChangeEvent1),getRef());
+            subject.tell(new DataChanged(mockChangeEvent1),getRef());
             expectMsgClass(DataChangedReply.class);
 
-            subject.tell(new DataChanged(schemaContext, mockChangeEvent2),getRef());
+            subject.tell(new DataChanged(mockChangeEvent2),getRef());
             expectMsgClass(DataChangedReply.class);
 
-            subject.tell(new DataChanged(schemaContext, mockChangeEvent3),getRef());
+            subject.tell(new DataChanged(mockChangeEvent3),getRef());
             expectMsgClass(DataChangedReply.class);
 
             Mockito.verify(mockListener).onDataChanged(mockChangeEvent1);
index ed842b2021475b3fef53d95244a37e03ac053d6b..42f30437c9061b916169365e2cfa506a65e8de9a 100644 (file)
@@ -10,7 +10,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
-import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
index 9f57359429bf5938721f269604109ce5cb49981e..09a4532b53906ab1b803aae880c1e69660537f07 100644 (file)
@@ -21,7 +21,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -72,7 +71,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -102,7 +101,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -132,7 +131,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -162,7 +161,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -195,7 +194,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -233,7 +232,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props, "testNegativeMergeTransactionReady");
@@ -266,7 +265,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
index af07aeebcf0ce324014812d986391a438ffd2076..58cec67a2d6cce1f30e3cdf41fdaf7b7d23185a7 100644 (file)
@@ -7,6 +7,12 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
 import java.util.Collections;
 import org.junit.Assert;
 import org.junit.Test;
@@ -27,12 +33,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.dispatch.Dispatchers;
-import akka.testkit.TestActorRef;
 
 /**
  * Tests backwards compatibility support from Helium-1 to Helium.
@@ -46,6 +46,7 @@ import akka.testkit.TestActorRef;
  */
 public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest {
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testTransactionCommit() throws Exception {
         new ShardTestKit(getSystem()) {{
@@ -78,9 +79,10 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc
             // Write data to the Tx
 
             txActor.tell(new WriteData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
+                            DataStoreVersions.BASE_HELIUM_VERSION), getRef());
 
-            expectMsgClass(duration, WriteDataReply.class);
+            expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class);
 
             // Ready the Tx
 
@@ -151,7 +153,7 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc
             // Write data to the Tx
 
             txActor.tell(new WriteData(TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef());
 
             expectMsgClass(duration, WriteDataReply.class);
 
index f5af93d584ce4ff62678f9f499cd629c96ccc80b..79480ce5926dbce0be66580eb602ed8592c30bc0 100644 (file)
@@ -20,7 +20,6 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
@@ -38,6 +37,8 @@ import org.opendaylight.controller.cluster.datastore.modification.DeleteModifica
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
@@ -76,13 +77,13 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
         }
@@ -92,12 +93,10 @@ public class ShardTransactionTest extends AbstractActorTest {
             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
                 getRef());
 
-            ShardTransactionMessages.ReadDataReply replySerialized =
-                expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
+            Object replySerialized =
+                    expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
 
-            assertNotNull(ReadDataReply.fromSerializable(
-                testSchemaContext,YangInstanceIdentifier.builder().build(), replySerialized)
-                .getNormalizedNode());
+            assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
 
             // unserialized read
             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
@@ -114,14 +113,14 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
                     props, "testReadDataWhenDataNotFoundRO"));
 
             props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
                     props, "testReadDataWhenDataNotFoundRW"));
@@ -131,11 +130,10 @@ public class ShardTransactionTest extends AbstractActorTest {
             // serialized read
             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
 
-            ShardTransactionMessages.ReadDataReply replySerialized =
-                expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
+            Object replySerialized =
+                    expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
 
-            assertTrue(ReadDataReply.fromSerializable(
-                testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
+            assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
 
             // unserialized read
             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
@@ -146,19 +144,39 @@ public class ShardTransactionTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testOnReceiveReadDataHeliumR1() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = createShard();
+            Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    DataStoreVersions.HELIUM_1_VERSION);
+
+            ActorRef transaction = getSystem().actorOf(props, "testOnReceiveReadDataHeliumR1");
+
+            transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
+                    getRef());
+
+            ShardTransactionMessages.ReadDataReply replySerialized =
+                    expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
+
+            assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
+        }};
+    }
+
     @Test
     public void testOnReceiveDataExistsPositive() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
         }
@@ -187,13 +205,13 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
 
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
         }
@@ -234,39 +252,61 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
-            final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
+                    DataStoreVersions.CURRENT_VERSION);
+            final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveWriteData");
 
             transaction.tell(new WriteData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
-                getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
+                            DataStoreVersions.HELIUM_2_VERSION), getRef());
 
             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
 
             assertModification(transaction, WriteModification.class);
 
-            //unserialized write
+            // unserialized write
             transaction.tell(new WriteData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME),
-                TestModel.createTestContext()),
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
                 getRef());
 
             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
         }};
     }
 
+    @Test
+    public void testOnReceiveHeliumR1WriteData() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = createShard();
+            final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    DataStoreVersions.HELIUM_1_VERSION);
+            final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1WriteData");
+
+            Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+            ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
+                    .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+                    .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+
+            transaction.tell(serialized, getRef());
+
+            expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
+
+            assertModification(transaction, WriteModification.class);
+        }};
+    }
+
     @Test
     public void testOnReceiveMergeData() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
 
             transaction.tell(new MergeData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
-                getRef());
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
+                            DataStoreVersions.HELIUM_2_VERSION), getRef());
 
             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
 
@@ -274,20 +314,43 @@ public class ShardTransactionTest extends AbstractActorTest {
 
             //unserialized merge
             transaction.tell(new MergeData(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
                 getRef());
 
             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
         }};
     }
 
+    @Test
+    public void testOnReceiveHeliumR1MergeData() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = createShard();
+            final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    DataStoreVersions.HELIUM_1_VERSION);
+            final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1MergeData");
+
+            Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+            ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
+                    .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+                    .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+
+            transaction.tell(serialized, getRef());
+
+            expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
+
+            assertModification(transaction, MergeModification.class);
+        }};
+    }
+
     @Test
     public void testOnReceiveDeleteData() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
 
             transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
@@ -310,7 +373,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
 
             watch(transaction);
@@ -328,7 +391,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
 
             watch(transaction);
@@ -343,13 +406,14 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
 
             watch(transaction);
@@ -366,7 +430,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                 testSchemaContext, datastoreContext, shardStats, "txn",
-                CreateTransaction.CURRENT_VERSION);
+                DataStoreVersions.CURRENT_VERSION);
         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
         transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
@@ -382,7 +446,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
                     testSchemaContext, datastoreContext, shardStats, "txn",
-                    CreateTransaction.CURRENT_VERSION);
+                    DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction =
                 getSystem().actorOf(props, "testShardTransactionInactivity");
 
index 46060dda2a9bdd1cf1ec94e98d24c53ec97ed5a3..75c93dd5d2fd2de9521d7540da7169fe656c71e9 100644 (file)
@@ -1,11 +1,21 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.dispatch.Futures;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -24,18 +34,6 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import scala.concurrent.Future;
 
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.isA;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
     @SuppressWarnings("serial")
@@ -112,14 +110,14 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new CanCommitTransactionReply(true));
+                CanCommitTransactionReply.YES);
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new CanCommitTransactionReply(false));
+                CanCommitTransactionReply.NO);
 
         future = proxy.canCommit();
 
@@ -134,7 +132,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+                CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
@@ -149,8 +147,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         ThreePhaseCommitCohortProxy proxy = setupProxy(3);
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new CanCommitTransactionReply(true), new CanCommitTransactionReply(false),
-                new CanCommitTransactionReply(true));
+                CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
@@ -289,7 +286,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
-                new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+                CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
 
         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
                 new PreCommitTransactionReply(), new PreCommitTransactionReply());
index ce0547c3883d19a12c01b8d9a9ce355770aead3b..dd37371a4510c622b4ee0fdb3b5ab5cf67b5bf7f 100644 (file)
@@ -11,7 +11,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -19,19 +18,21 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public class TransactionChainProxyTest {
-    ActorContext actorContext = mock(ActorContext.class);
+public class TransactionChainProxyTest extends AbstractActorTest{
+    ActorContext actorContext = null;
     SchemaContext schemaContext = mock(SchemaContext.class);
 
     @Before
     public void setUp() {
-        doReturn(schemaContext).when(actorContext).getSchemaContext();
+        actorContext = new MockActorContext(getSystem());
+        actorContext.setSchemaContext(schemaContext);
     }
 
     @SuppressWarnings("resource")
index 7407897dfac504872e37d35c374397369a8c5576..79edd19bba3328034ea313baa28333ba398226af 100644 (file)
@@ -1,5 +1,21 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
@@ -11,12 +27,17 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
@@ -40,8 +61,10 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -50,24 +73,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.isA;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
 
 @SuppressWarnings("resource")
 public class TransactionProxyTest {
@@ -116,7 +121,7 @@ public class TransactionProxyTest {
 
         schemaContext = TestModel.createTestContext();
 
-        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().build();
+        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
 
         doReturn(getSystem()).when(mockActorContext).getActorSystem();
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
@@ -124,6 +129,7 @@ public class TransactionProxyTest {
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
 
         ShardStrategyFactory.setConfiguration(configuration);
     }
@@ -137,9 +143,13 @@ public class TransactionProxyTest {
         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
             @Override
             public boolean matches(Object argument) {
-                CreateTransaction obj = CreateTransaction.fromSerializable(argument);
-                return obj.getTransactionId().startsWith(memberName) &&
-                       obj.getTransactionType() == type.ordinal();
+                if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+                    CreateTransaction obj = CreateTransaction.fromSerializable(argument);
+                    return obj.getTransactionId().startsWith(memberName) &&
+                            obj.getTransactionType() == type.ordinal();
+                }
+
+                return false;
             }
         };
 
@@ -195,16 +205,25 @@ public class TransactionProxyTest {
     }
 
     private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+        return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
+            final int transactionVersion) {
         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
             @Override
             public boolean matches(Object argument) {
-                if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
-                    return false;
+                if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+                        WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
+                   (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
+                           ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
+
+                    WriteData obj = WriteData.fromSerializable(argument);
+                    return obj.getPath().equals(TestModel.TEST_PATH) &&
+                           obj.getData().equals(nodeToWrite);
                 }
 
-                WriteData obj = WriteData.fromSerializable(argument, schemaContext);
-                return obj.getPath().equals(TestModel.TEST_PATH) &&
-                       obj.getData().equals(nodeToWrite);
+                return false;
             }
         };
 
@@ -228,16 +247,25 @@ public class TransactionProxyTest {
     }
 
     private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+        return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
+            final int transactionVersion) {
         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
             @Override
             public boolean matches(Object argument) {
-                if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
-                    return false;
+                if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+                        MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
+                   (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
+                           ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
+
+                    MergeData obj = MergeData.fromSerializable(argument);
+                    return obj.getPath().equals(TestModel.TEST_PATH) &&
+                           obj.getData().equals(nodeToWrite);
                 }
 
-                MergeData obj = MergeData.fromSerializable(argument, schemaContext);
-                return obj.getPath().equals(TestModel.TEST_PATH) &&
-                       obj.getData().equals(nodeToWrite);
+                return false;
             }
         };
 
@@ -293,13 +321,17 @@ public class TransactionProxyTest {
         return Futures.successful((Object)new ReadyTransactionReply(path));
     }
 
+    private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
+            short transactionVersion) {
+        return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
+    }
 
     private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
-        return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
+        return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
     }
 
     private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
-        return Futures.successful(new ReadDataReply(schemaContext, data));
+        return Futures.successful(new ReadDataReply(data));
     }
 
     private Future<Object> dataExistsSerializedReply(boolean exists) {
@@ -310,16 +342,28 @@ public class TransactionProxyTest {
         return Futures.successful(new DataExistsReply(exists));
     }
 
+    private Future<Object> writeSerializedDataReply(short version) {
+        return Futures.successful(new WriteDataReply().toSerializable(version));
+    }
+
     private Future<Object> writeSerializedDataReply() {
-        return Futures.successful(new WriteDataReply().toSerializable());
+        return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
     }
 
     private Future<WriteDataReply> writeDataReply() {
         return Futures.successful(new WriteDataReply());
     }
 
+    private Future<Object> mergeSerializedDataReply(short version) {
+        return Futures.successful(new MergeDataReply().toSerializable(version));
+    }
+
     private Future<Object> mergeSerializedDataReply() {
-        return Futures.successful(new MergeDataReply().toSerializable());
+        return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
+    }
+
+    private Future<Object> incompleteFuture(){
+        return mock(Future.class);
     }
 
     private Future<MergeDataReply> mergeDataReply() {
@@ -346,7 +390,8 @@ public class TransactionProxyTest {
             .build();
     }
 
-    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) {
+    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
+            TransactionType type, int transactionVersion) {
         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
         doReturn(actorSystem.actorSelection(actorRef.path())).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
@@ -360,11 +405,13 @@ public class TransactionProxyTest {
 
         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
 
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
         return actorRef;
     }
 
     private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
-        return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION);
+        return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
     }
 
 
@@ -718,7 +765,7 @@ public class TransactionProxyTest {
                 eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.SERIALIZABLE_CLASS);
+                WriteDataReply.class);
     }
 
     @Test(expected=IllegalStateException.class)
@@ -760,7 +807,7 @@ public class TransactionProxyTest {
                 eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.SERIALIZABLE_CLASS);
+                MergeDataReply.class);
     }
 
     @Test
@@ -836,22 +883,25 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.SERIALIZABLE_CLASS);
+                WriteDataReply.class);
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
     }
 
-    @Test
-    public void testReadyForwardCompatibility() throws Exception {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0);
+    private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(),
+                READ_WRITE, version);
 
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+        doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+        doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
+
+        doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
 
         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
@@ -859,12 +909,17 @@ public class TransactionProxyTest {
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
                 eq(actorRef.path().toString()));
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
-        transactionProxy.read(TestModel.TEST_PATH);
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
+                get(5, TimeUnit.SECONDS);
 
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+        assertEquals("Response NormalizedNode", testNode, readOptional.get());
+
+        transactionProxy.write(TestModel.TEST_PATH, testNode);
+
+        transactionProxy.merge(TestModel.TEST_PATH, testNode);
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
@@ -873,14 +928,29 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                WriteDataReply.SERIALIZABLE_CLASS);
+                ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
+        return actorRef;
+    }
+
+    @Test
+    public void testCompatibilityWithBaseHeliumVersion() throws Exception {
+        ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
+
         verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
                 eq(actorRef.path().toString()));
     }
 
+    @Test
+    public void testCompatibilityWithHeliumR1Version() throws Exception {
+        ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
+
+        verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+    }
+
     @Test
     public void testReadyWithRecordingOperationFailure() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
@@ -914,7 +984,7 @@ public class TransactionProxyTest {
         verifyCohortFutures(proxy, TestException.class);
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
+                MergeDataReply.class, TestException.class);
     }
 
     @Test
@@ -942,7 +1012,7 @@ public class TransactionProxyTest {
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                MergeDataReply.SERIALIZABLE_CLASS);
+                MergeDataReply.class);
 
         verifyCohortFutures(proxy, TestException.class);
     }
@@ -1164,4 +1234,425 @@ public class TransactionProxyTest {
 
         verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
     }
+
+    private static interface TransactionProxyOperation {
+        void run(TransactionProxy transactionProxy);
+    }
+
+    private void throttleOperation(TransactionProxyOperation operation) {
+        throttleOperation(operation, 1, true);
+    }
+
+    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+        ActorSystem actorSystem = getSystem();
+        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        doReturn(actorSystem.actorSelection(shardActorRef.path())).
+                when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+        if(shardFound) {
+            doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+                    when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        } else {
+            doReturn(Futures.failed(new Exception("not found")))
+                    .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        }
+
+        String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+                .setTransactionId("txn-1")
+                .setTransactionActorPath(actorPath)
+                .build();
+
+        doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqCreateTransaction(memberName, READ_WRITE));
+
+        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        long start = System.currentTimeMillis();
+
+        operation.run(transactionProxy);
+
+        long end = System.currentTimeMillis();
+
+        Assert.assertTrue(String.format("took less time than expected %s was %s",
+                mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
+                (end-start)), (end - start) > mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
+
+    }
+
+    private void completeOperation(TransactionProxyOperation operation){
+        completeOperation(operation, true);
+    }
+
+    private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
+        ActorSystem actorSystem = getSystem();
+        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        doReturn(actorSystem.actorSelection(shardActorRef.path())).
+                when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+        if(shardFound) {
+            doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+                    when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        } else {
+            doReturn(Futures.failed(new Exception("not found")))
+                    .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        }
+
+        String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+                .setTransactionId("txn-1")
+                .setTransactionActorPath(actorPath)
+                .build();
+
+        doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqCreateTransaction(memberName, READ_WRITE));
+
+        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        long start = System.currentTimeMillis();
+
+        operation.run(transactionProxy);
+
+        long end = System.currentTimeMillis();
+
+        Assert.assertTrue(String.format("took more time than expected %s was %s",
+                mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
+                (end-start)), (end - start) <= mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
+    }
+
+    public void testWriteThrottling(boolean shardFound){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        }, 1, shardFound);
+    }
+
+    @Test
+    public void testWriteThrottlingWhenShardFound(){
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        });
+
+    }
+
+    @Test
+    public void testWriteThrottlingWhenShardNotFound(){
+        // Confirm that there is no throttling when the Shard is not found
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        }, false);
+
+    }
+
+
+    @Test
+    public void testWriteCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        });
+
+    }
+
+    @Test
+    public void testMergeThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+            }
+        });
+    }
+
+    @Test
+    public void testMergeThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+            }
+        }, false);
+    }
+
+    @Test
+    public void testMergeCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+            }
+        });
+
+    }
+
+    @Test
+    public void testDeleteThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDeleteData());
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        });
+    }
+
+
+    @Test
+    public void testDeleteThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDeleteData());
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        }, false);
+    }
+
+    @Test
+    public void testDeleteCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDeleteData());
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        });
+
+    }
+
+    @Test
+    public void testReadThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqReadData());
+
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        });
+    }
+
+    @Test
+    public void testReadThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqReadData());
+
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        }, false);
+    }
+
+
+    @Test
+    public void testReadCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqReadData());
+
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        });
+
+    }
+
+    @Test
+    public void testExistsThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDataExists());
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        });
+    }
+
+    @Test
+    public void testExistsThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDataExists());
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        }, false);
+    }
+
+
+    @Test
+    public void testExistsCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDataExists());
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        });
+
+    }
+
+    @Test
+    public void testReadyThrottling(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), any(ReadyTransaction.class));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.ready();
+            }
+        });
+    }
+
+    @Test
+    public void testReadyThrottlingWithTwoTransactionContexts(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+                NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
+
+                doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(carsNode));
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), any(ReadyTransaction.class));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, carsNode);
+
+                transactionProxy.ready();
+            }
+        }, 2, true);
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedTest.java
new file mode 100644 (file)
index 0000000..2c1de7f
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for DataChanged.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataChangedTest {
+
+    @Test
+    public void testSerialization() {
+        DOMImmutableDataChangeEvent change = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE).
+                addCreated(TestModel.TEST_PATH, ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                        new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                        withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build()).
+                addUpdated(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+                        ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                            new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                            withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "bar")).build())
+.
+                addRemoved(TestModel.OUTER_LIST_PATH,
+                       ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()).
+                setBefore(ImmutableNodes.containerNode(TestModel.TEST_QNAME)).
+                setAfter(ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                        new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                        withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).
+                        withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "bar")).build()).build();
+
+        DataChanged expected = new DataChanged(change);
+
+        DataChanged actual = (DataChanged) SerializationUtils.clone(expected);
+
+        assertEquals("getCreatedData", change.getCreatedData(), actual.getChange().getCreatedData());
+        assertEquals("getOriginalData", change.getOriginalData(), actual.getChange().getOriginalData());
+        assertEquals("getOriginalSubtree", change.getOriginalSubtree(), actual.getChange().getOriginalSubtree());
+        assertEquals("getRemovedPaths", change.getRemovedPaths(), actual.getChange().getRemovedPaths());
+        assertEquals("getUpdatedData", change.getUpdatedData(), actual.getChange().getUpdatedData());
+        assertEquals("getUpdatedSubtree", change.getUpdatedSubtree(), actual.getChange().getUpdatedSubtree());
+    }
+}
index 8f3ca9c535a06a1ae5a67f80c16dec2a2c5c2e48..5b40afdff8288ff714cc4f9b4a398e2f2724369b 100644 (file)
@@ -1,21 +1,70 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
 public class MergeDataTest {
 
     @Test
     public void testSerialization() {
-        SchemaContext schemaContext = TestModel.createTestContext();
-        MergeData expected = new MergeData(TestModel.TEST_PATH, ImmutableNodes
-            .containerNode(TestModel.TEST_QNAME), schemaContext);
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        MergeData actual = MergeData.fromSerializable(expected.toSerializable(), schemaContext);
-        Assert.assertEquals("getPath", expected.getPath(), actual.getPath());
-        Assert.assertEquals("getData", expected.getData(), actual.getData());
+        MergeData expected = new MergeData(path, data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        assertEquals("Serialized type", MergeData.class, serialized.getClass());
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)serialized).getVersion());
+
+        Object clone = SerializationUtils.clone((Serializable) serialized);
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)clone).getVersion());
+        MergeData actual = MergeData.fromSerializable(clone);
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getData", expected.getData(), actual.getData());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, MergeData.isSerializedType(
+                ShardTransactionMessages.MergeData.newBuilder()
+                    .setInstanceIdentifierPathArguments(InstanceIdentifier.getDefaultInstance())
+                    .setNormalizedNode(Node.getDefaultInstance()).build()));
+        assertEquals("isSerializedType", true,
+                MergeData.isSerializedType(new MergeData()));
+        assertEquals("isSerializedType", false, MergeData.isSerializedType(new Object()));
+    }
+
+    /**
+     * Tests backwards compatible serialization/deserialization of a MergeData message with the
+     * base and R1 Helium versions, which used the protobuff MergeData message.
+     */
+    @Test
+    public void testSerializationWithHeliumR1Version() throws Exception {
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        MergeData expected = new MergeData(path, data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        assertEquals("Serialized type", ShardTransactionMessages.MergeData.class, serialized.getClass());
+
+        MergeData actual = MergeData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getData", expected.getData(), actual.getData());
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReplyTest.java
new file mode 100644 (file)
index 0000000..8ce7329
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for ReadDataReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReadDataReplyTest {
+
+    @Test
+    public void testSerialization() {
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        ReadDataReply expected = new ReadDataReply(data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        assertEquals("Serialized type", ReadDataReply.class, serialized.getClass());
+
+        ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
+                (Serializable) serialized));
+        assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, ReadDataReply.isSerializedType(
+                ShardTransactionMessages.ReadDataReply.newBuilder().build()));
+        assertEquals("isSerializedType", true, ReadDataReply.isSerializedType(new ReadDataReply()));
+        assertEquals("isSerializedType", false, ReadDataReply.isSerializedType(new Object()));
+    }
+
+    /**
+     * Tests backwards compatible serialization/deserialization of a ReadDataReply message with the
+     * base and R1 Helium versions, which used the protobuff ReadDataReply message.
+     */
+    @Test
+    public void testSerializationWithHeliumR1Version() throws Exception {
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        ReadDataReply expected = new ReadDataReply(data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        assertEquals("Serialized type", ShardTransactionMessages.ReadDataReply.class, serialized.getClass());
+
+        ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
+                (Serializable) serialized));
+        assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode());
+    }
+}
index 6a5d65f8daab3e3952393e8bbdb764560c355886..90a76f229e1ddd045c084e56b1b854a518b43a92 100644 (file)
@@ -7,11 +7,19 @@
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
-import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
 /**
  * Unit tests for WriteData.
@@ -22,12 +30,52 @@ public class WriteDataTest {
 
     @Test
     public void testSerialization() {
-        SchemaContext schemaContext = TestModel.createTestContext();
-        WriteData expected = new WriteData(TestModel.TEST_PATH, ImmutableNodes
-            .containerNode(TestModel.TEST_QNAME), schemaContext);
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        WriteData actual = WriteData.fromSerializable(expected.toSerializable(), schemaContext);
-        Assert.assertEquals("getPath", expected.getPath(), actual.getPath());
-        Assert.assertEquals("getData", expected.getData(), actual.getData());
+        WriteData expected = new WriteData(path, data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        assertEquals("Serialized type", WriteData.class, serialized.getClass());
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)serialized).getVersion());
+
+        Object clone = SerializationUtils.clone((Serializable) serialized);
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)clone).getVersion());
+        WriteData actual = WriteData.fromSerializable(clone);
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getData", expected.getData(), actual.getData());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, WriteData.isSerializedType(
+                ShardTransactionMessages.WriteData.newBuilder()
+                    .setInstanceIdentifierPathArguments(InstanceIdentifier.getDefaultInstance())
+                    .setNormalizedNode(Node.getDefaultInstance()).build()));
+        assertEquals("isSerializedType", true, WriteData.isSerializedType(new WriteData()));
+        assertEquals("isSerializedType", false, WriteData.isSerializedType(new Object()));
+    }
+
+    /**
+     * Tests backwards compatible serialization/deserialization of a WriteData message with the
+     * base and R1 Helium versions, which used the protobuff WriteData message.
+     */
+    @Test
+    public void testSerializationWithHeliumR1Version() throws Exception {
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        WriteData expected = new WriteData(path, data);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        assertEquals("Serialized type", ShardTransactionMessages.WriteData.class, serialized.getClass());
+
+        WriteData actual = WriteData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+        assertEquals("getData", expected.getData(), actual.getData());
     }
 }
index fcb0324bea77e1608dbd8b6d3d7f2077d2c27c4c..e4ab969f5c4351c0e5b3894d3d3115aa6322337a 100644 (file)
@@ -1,8 +1,10 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
@@ -21,10 +23,6 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
 public class ActorContextTest extends AbstractActorTest{
 
     private static class MockShardManager extends UntypedActor {
@@ -224,7 +222,7 @@ public class ActorContextTest extends AbstractActorTest{
     @Test
     public void testResolvePathForRemoteActor() {
         ActorContext actorContext =
-                new ActorContext(mock(ActorSystem.class), mock(ActorRef.class), mock(
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(
                         ClusterWrapper.class),
                         mock(Configuration.class));
 
index 85441eca0d290ca4968cea22e793b4897fc3ec36..67fa0960cbcb96cc2f617148aab2c7a7a7bab71d 100644 (file)
@@ -7,25 +7,26 @@
  */
 package org.opendaylight.controller.md.cluster.datastore.model;
 
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Set;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
 
-import java.io.InputStream;
-import java.util.Collections;
-import java.util.Set;
-
 public class TestModel {
 
   public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
           "test");
+
   public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
   public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
   public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
   public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
   public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
+  public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
   public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
   private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";