Bug 3020: Use leader version in LeaderStateChanged 48/22248/1
authorTom Pantelis <tpanteli@brocade.com>
Tue, 2 Jun 2015 01:30:59 +0000 (21:30 -0400)
committerMoiz Raja <moraja@cisco.com>
Wed, 10 Jun 2015 02:09:52 +0000 (02:09 +0000)
Modified the ShardManager to store the leader's version obtained from the
LeaderStateChanged message in the ShardInformation and propagate to the
RemotePrimaryShardFound message in response to FindPrimary.

ActorContext#findPrimaryShardAsync sets the leader's version in the
PrimaryShardInfo based on the FindPrimary response. If the response is
LocalPrimaryShardFound, it sets it to CURRENT_VERSION, otherwise it sets
it from the RemotePrimaryShardFound response.

RemoteTransactionContextSupport#setPrimaryShard checks the leader's
version to determine if it can utilize the direct tx modification
preparation on the shard actor.

Change-Id: I1defe03dea27dfb652cdc1e0a02fa70c6e454035
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit 0c74ca17e0d64c4c2a7555470a5737ebef148890)

15 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryShardInfo.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemotePrimaryShardFound.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCacheTest.java

index 4dff3915359aeda6cbaf933803c5f67279ad9468..976e613e8ea183b293d666bc56c8fd8cbc358123 100644 (file)
@@ -76,7 +76,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
         } else {
             RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextAdapter,
                     parent, shardName);
-            remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
+            remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor(), primaryShardInfo.getPrimaryShardVersion());
         }
     }
 
index 984d650a32fcd52db6749e48aa820d7b47b6975e..afd748fd484c5303f27558497bb586ae137bc057 100644 (file)
@@ -85,19 +85,18 @@ final class RemoteTransactionContextSupport {
     /**
      * Sets the target primary shard and initiates a CreateTransaction try.
      */
-    void setPrimaryShard(ActorSelection primaryShard) {
+    void setPrimaryShard(ActorSelection primaryShard, short primaryVersion) {
         this.primaryShard = primaryShard;
 
-        if (getTransactionType() == TransactionType.WRITE_ONLY &&
+        if (getTransactionType() == TransactionType.WRITE_ONLY && primaryVersion >= DataStoreVersions.LITHIUM_VERSION &&
                 getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
             LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
                 getIdentifier(), primaryShard);
 
             // For write-only Tx's we prepare the transaction modifications directly on the shard actor
             // to avoid the overhead of creating a separate transaction actor.
-            // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
             transactionContextAdapter.executePriorTransactionOperations(createValidTransactionContext(this.primaryShard,
-                    this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
+                    this.primaryShard.path().toString(), primaryVersion));
         } else {
             tryCreateTransaction();
         }
index c8f2b1b8d98a45e86a827bb7613fb94f850b4243..db04956f0005ba1f7a8e2b73b5ece7615e37aba3 100644 (file)
@@ -453,14 +453,15 @@ public class Shard extends RaftActor {
             try {
                 commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
             } catch (Exception e) {
-                LOG.error("{}: Error handling LocalModifications for Tx {}", persistenceId(),
+                LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
                         message.getTransactionID(), e);
                 getSender().tell(new akka.actor.Status.Failure(e), getSelf());
             }
         } else {
             ActorSelection leader = getLeader();
             if (leader != null) {
-                LOG.debug("{}: Forwarding LocalModifications to leader {}", persistenceId(), leader);
+                LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
+                message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(message, getContext());
             } else {
                 noLeaderError(message);
index 6de370e1afc1d39601d571d718093e950790d197..4f3c521f442ea10765d06dd4f0372d72968e712a 100644 (file)
@@ -203,6 +203,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
         if(shardInformation != null) {
             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
+            shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
                 primaryShardInfoCache.remove(shardInformation.getShardName());
             }
@@ -516,7 +517,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     String primaryPath = info.getSerializedLeaderActor();
                     Object found = canReturnLocalShardState && info.isLeader() ?
                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
-                                new RemotePrimaryShardFound(primaryPath);
+                                new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
 
                     if(LOG.isDebugEnabled()) {
                         LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
@@ -666,6 +667,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
         private String role ;
         private String leaderId;
+        private short leaderVersion;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<String, String> peerAddresses) {
@@ -820,13 +822,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return changed;
         }
 
-        public String getLeaderId() {
+        String getLeaderId() {
             return leaderId;
         }
 
-        public void setLeaderAvailable(boolean leaderAvailable) {
+        void setLeaderAvailable(boolean leaderAvailable) {
             this.leaderAvailable = leaderAvailable;
         }
+
+        short getLeaderVersion() {
+            return leaderVersion;
+        }
+
+        void setLeaderVersion(short leaderVersion) {
+            this.leaderVersion = leaderVersion;
+        }
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {
index bbeb1aa84bbe728ffe073e9a3cc5402161fde676..cad0a4569ec8214927133fb13040f0b8fea13ffc 100644 (file)
@@ -20,10 +20,13 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
  */
 public class PrimaryShardInfo {
     private final ActorSelection primaryShardActor;
+    private final short primaryShardVersion;
     private final Optional<DataTree> localShardDataTree;
 
-    public PrimaryShardInfo(@Nonnull ActorSelection primaryShardActor, @Nonnull Optional<DataTree> localShardDataTree) {
+    public PrimaryShardInfo(@Nonnull ActorSelection primaryShardActor, short primaryShardVersion,
+            @Nonnull Optional<DataTree> localShardDataTree) {
         this.primaryShardActor = Preconditions.checkNotNull(primaryShardActor);
+        this.primaryShardVersion = primaryShardVersion;
         this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree);
     }
 
@@ -34,6 +37,13 @@ public class PrimaryShardInfo {
         return primaryShardActor;
     }
 
+    /**
+     * Returns the version of the primary shard.
+     */
+    public short getPrimaryShardVersion() {
+        return primaryShardVersion;
+    }
+
     /**
      * Returns an Optional whose value contains the primary shard's DataTree if the primary shard is local
      * to the caller. Otherwise the Optional value is absent.
index f8cd18ced20deb245c5e0e1f56b0dcba1b891cb6..0cded39f4e3541616664105aaa97d7fc62268e36 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 /**
@@ -22,6 +23,9 @@ public final class ReadyLocalTransaction {
     private final String transactionID;
     private final boolean doCommitOnReady;
 
+    // The version of the remote system used only when needing to convert to BatchedModifications.
+    private short remoteVersion = DataStoreVersions.CURRENT_VERSION;
+
     public ReadyLocalTransaction(final String transactionID, final DataTreeModification modification, final boolean doCommitOnReady) {
         this.transactionID = Preconditions.checkNotNull(transactionID);
         this.modification = Preconditions.checkNotNull(modification);
@@ -39,4 +43,12 @@ public final class ReadyLocalTransaction {
     public boolean isDoCommitOnReady() {
         return doCommitOnReady;
     }
+
+    public short getRemoteVersion() {
+        return remoteVersion;
+    }
+
+    public void setRemoteVersion(short remoteVersion) {
+        this.remoteVersion = remoteVersion;
+    }
 }
index 1091aa50706338b9a45e9bb12723f979a905dc48..fc5a99fe9adf88d05a386429345e30ead5944f33 100644 (file)
@@ -14,7 +14,6 @@ import java.util.ArrayDeque;
 import java.util.Deque;
 import javax.annotation.Nonnull;
 import org.apache.commons.lang3.SerializationUtils;
-import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
@@ -43,14 +42,14 @@ public final class ReadyLocalTransactionSerializer extends JSerializer {
     @Override
     public byte[] toBinary(final Object obj) {
         Preconditions.checkArgument(obj instanceof ReadyLocalTransaction, "Unsupported object type %s", obj.getClass());
-        final ReadyLocalTransaction msg = (ReadyLocalTransaction) obj;
-        final BatchedModifications batched = new BatchedModifications(msg.getTransactionID(),
-                DataStoreVersions.CURRENT_VERSION, "");
-        batched.setDoCommitOnReady(msg.isDoCommitOnReady());
+        final ReadyLocalTransaction readyLocal = (ReadyLocalTransaction) obj;
+        final BatchedModifications batched = new BatchedModifications(readyLocal.getTransactionID(),
+                readyLocal.getRemoteVersion(), "");
+        batched.setDoCommitOnReady(readyLocal.isDoCommitOnReady());
         batched.setTotalMessagesSent(1);
         batched.setReady(true);
 
-        msg.getModification().applyToCursor(new BatchedCursor(batched));
+        readyLocal.getModification().applyToCursor(new BatchedCursor(batched));
 
         return SerializationUtils.serialize(batched);
     }
index 662eefd9d1ec00d813a5727bda0c64e38d5ab061..d3e27dba95b847c4b6ad65487acb9e0bd1f28245 100644 (file)
@@ -17,19 +17,26 @@ public class RemotePrimaryShardFound implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final String primaryPath;
+    private final short primaryVersion;
 
-    public RemotePrimaryShardFound(final String primaryPath) {
+    public RemotePrimaryShardFound(final String primaryPath, short primaryVersion) {
         this.primaryPath = primaryPath;
+        this.primaryVersion = primaryVersion;
     }
 
     public String getPrimaryPath() {
         return primaryPath;
     }
 
+    public short getPrimaryVersion() {
+        return primaryVersion;
+    }
+
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("RemotePrimaryShardFound [primaryPath=").append(primaryPath).append("]");
+        builder.append("RemotePrimaryShardFound [primaryPath=").append(primaryPath).append(", primaryVersion=")
+                .append(primaryVersion).append("]");
         return builder.toString();
     }
 }
index 5c514cf77502587fc15939df20e61187d0dfe297..6640898dea4c651d058d549289eb3fe02c2baacd 100644 (file)
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
@@ -206,11 +207,13 @@ public class ActorContext {
             public PrimaryShardInfo checkedApply(Object response) throws Exception {
                 if(response instanceof RemotePrimaryShardFound) {
                     LOG.debug("findPrimaryShardAsync received: {}", response);
-                    return onPrimaryShardFound(shardName, ((RemotePrimaryShardFound)response).getPrimaryPath(), null);
+                    RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
+                    return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
                 } else if(response instanceof LocalPrimaryShardFound) {
                     LOG.debug("findPrimaryShardAsync received: {}", response);
                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
-                    return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getLocalShardDataTree());
+                    return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
+                            found.getLocalShardDataTree());
                 } else if(response instanceof NotInitializedException) {
                     throw (NotInitializedException)response;
                 } else if(response instanceof PrimaryNotFoundException) {
@@ -226,9 +229,10 @@ public class ActorContext {
     }
 
     private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
-            DataTree localShardDataTree) {
+            short primaryVersion, DataTree localShardDataTree) {
         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
-        PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree));
+        PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, primaryVersion,
+                Optional.fromNullable(localShardDataTree));
         primaryShardInfoCache.putSuccessful(shardName, info);
         return info;
     }
index 27fe3c5869342b3f8c9f39a792c0d318aa3f02c4..8e9d79ee5e6f54f24796455c129f4c7caded96fa 100644 (file)
@@ -316,18 +316,28 @@ public abstract class AbstractTransactionProxyTest {
     }
 
     protected Future<PrimaryShardInfo> primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef) {
+        return primaryShardInfoReply(actorSystem, actorRef, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    protected Future<PrimaryShardInfo> primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef,
+            short transactionVersion) {
         return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()),
-                Optional.<DataTree>absent()));
+                transactionVersion, Optional.<DataTree>absent()));
     }
 
     protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName) {
+        return setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName,
+            short transactionVersion) {
         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
         log.info("Created mock shard actor {}", actorRef);
 
         doReturn(actorSystem.actorSelection(actorRef.path())).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
 
-        doReturn(primaryShardInfoReply(actorSystem, actorRef)).
+        doReturn(primaryShardInfoReply(actorSystem, actorRef, transactionVersion)).
                 when(mockActorContext).findPrimaryShardAsync(eq(shardName));
 
         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
@@ -338,15 +348,16 @@ public abstract class AbstractTransactionProxyTest {
     }
 
     protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
-            TransactionType type, int transactionVersion, String shardName) {
-        ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName);
+            TransactionType type, short transactionVersion, String shardName) {
+        ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName,
+                transactionVersion);
 
         return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion,
                 memberName, shardActorRef);
     }
 
     protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
-            TransactionType type, int transactionVersion, String prefix, ActorRef shardActorRef) {
+            TransactionType type, short transactionVersion, String prefix, ActorRef shardActorRef) {
 
         ActorRef txActorRef;
         if(type == TransactionType.WRITE_ONLY && transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
@@ -357,11 +368,11 @@ public abstract class AbstractTransactionProxyTest {
             log.info("Created mock shard Tx actor {}", txActorRef);
 
             doReturn(actorSystem.actorSelection(txActorRef.path())).
-            when(mockActorContext).actorSelection(txActorRef.path().toString());
+                when(mockActorContext).actorSelection(txActorRef.path().toString());
 
             doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext).
-            executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
-                    eqCreateTransaction(prefix, type));
+                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqCreateTransaction(prefix, type));
         }
 
         return txActorRef;
index e95993de24bbe80b4c3f7bc7a6bc3fdf74816b8f..b5cceea8510350faf6e357b81cfb8fb24afdf03b 100644 (file)
@@ -206,14 +206,16 @@ public class ShardManagerTest extends AbstractActorTest {
             String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
             shardManager.tell(new RoleChangeNotification(memberId1,
                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+            short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
             shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
-                    DataStoreVersions.CURRENT_VERSION), mockShardActor);
+                    leaderVersion), mockShardActor);
 
             shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
             RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
+            assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
         }};
     }
 
@@ -393,8 +395,9 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager2.tell(new ActorInitialized(), mockShardActor2);
 
             String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
+            short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
             shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
-                    Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor2);
+                    Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
             shardManager2.tell(new RoleChangeNotification(memberId2,
                     RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
 
@@ -405,6 +408,7 @@ public class ShardManagerTest extends AbstractActorTest {
             RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
             String path = found.getPrimaryPath();
             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
+            assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
 
             shardManager2.underlyingActor().verifyFindPrimary();
 
@@ -560,7 +564,7 @@ public class ShardManagerTest extends AbstractActorTest {
             assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
 
             primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
-                    mockShardActor1.path()), Optional.<DataTree>absent()));
+                    mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
 
             shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
                 createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
index 8909e1d3127fe7e49e986470f95d4a04533cf206..26d51cbae3bf2df728ec1fa9e3322d0151b114b4 100644 (file)
@@ -799,11 +799,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     }
 
     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
-        return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), Optional.<DataTree>absent());
+        return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
+                Optional.<DataTree>absent());
     }
 
     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, Optional<DataTree> dataTreeOptional){
-        return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), dataTreeOptional);
+        return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
+                dataTreeOptional);
     }
 
 
index a6656b2681dd8550f3edde2eae1212ce75ebed30..cb7c78090f1b5358c137ecd2a0beb4e98924c5a7 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.cluster.datastore.compat;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
@@ -21,15 +20,15 @@ import akka.actor.ActorRef;
 import akka.dispatch.Futures;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import org.junit.Ignore;
+import java.util.concurrent.TimeoutException;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.AbstractThreePhaseCommitCohort;
 import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
-import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy;
 import org.opendaylight.controller.cluster.datastore.TransactionProxy;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
@@ -174,13 +173,20 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
+        doThreePhaseCommit(actorRef, transactionProxy, proxy);
+
+        return actorRef;
+    }
+
+    private void doThreePhaseCommit(ActorRef actorRef, TransactionProxy transactionProxy,
+            AbstractThreePhaseCommitCohort<?> proxy) throws InterruptedException, ExecutionException, TimeoutException {
         doReturn(Futures.successful(CanCommitTransactionReply.YES.toSerializable())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)),
-                    eqCanCommitTransaction(transactionProxy.getIdentifier().toString()), any(Timeout.class));
+                executeOperationAsync(eq(actorSelection(actorRef)), eqCanCommitTransaction(
+                        transactionProxy.getIdentifier().toString()), any(Timeout.class));
 
         doReturn(Futures.successful(new CommitTransactionReply().toSerializable())).when(mockActorContext).
-                executeOperationAsync(eq(actorSelection(actorRef)),
-                   eqCommitTransaction(transactionProxy.getIdentifier().toString()), any(Timeout.class));
+                executeOperationAsync(eq(actorSelection(actorRef)), eqCommitTransaction(
+                        transactionProxy.getIdentifier().toString()), any(Timeout.class));
 
         Boolean canCommit = proxy.canCommit().get(3, TimeUnit.SECONDS);
         assertEquals("canCommit", true, canCommit.booleanValue());
@@ -189,7 +195,11 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
 
         proxy.commit().get(3, TimeUnit.SECONDS);
 
-        return actorRef;
+        verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), eqCanCommitTransaction(
+                transactionProxy.getIdentifier().toString()), any(Timeout.class));
+
+        verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), eqCommitTransaction(
+                transactionProxy.getIdentifier().toString()), any(Timeout.class));
     }
 
     @Test
@@ -209,9 +219,6 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
     }
 
     @Test
-    @Ignore
-    // FIXME: disabled until we can get the primary shard version from the ShardManager as we now skip
-    // creating transaction actors for write-only Tx's.
     public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception {
         short version = DataStoreVersions.HELIUM_2_VERSION;
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version,
@@ -225,19 +232,16 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
-        doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
-                eq(actorRef.path().toString()));
-
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, testNode);
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
-        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
-        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+        AbstractThreePhaseCommitCohort<?> proxy = (AbstractThreePhaseCommitCohort<?>) ready;
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+        doThreePhaseCommit(actorRef, transactionProxy, proxy);
     }
 }
index 1cc89f18af960557ba510814e6fe7c5326fa34ea..2b9b9d635e4f4e0861d71ba00c833efff002ad44 100644 (file)
@@ -34,6 +34,7 @@ import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
@@ -387,12 +388,13 @@ public class ActorContextTest extends AbstractActorTest{
                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
 
             final String expPrimaryPath = "akka://test-system/find-primary-shard";
+            final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath));
+                            return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
                         }
                     };
 
@@ -403,6 +405,7 @@ public class ActorContextTest extends AbstractActorTest{
             assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+            assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
 
             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
@@ -445,6 +448,7 @@ public class ActorContextTest extends AbstractActorTest{
             assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+            assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
 
             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
@@ -509,8 +513,10 @@ public class ActorContextTest extends AbstractActorTest{
 
             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
-            shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString()));
-            shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString()));
+            shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
+                    DataStoreVersions.CURRENT_VERSION));
+            shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
+                    DataStoreVersions.CURRENT_VERSION));
             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
 
             Configuration mockConfig = mock(Configuration.class);
index 9d34ce5a08985fef69151e6405db9dc0afba62e3..e9d780aff2fb3d0238c6f493b0b213e75583fa39 100644 (file)
@@ -13,6 +13,7 @@ import static org.mockito.Mockito.mock;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import scala.concurrent.Future;
@@ -30,7 +31,8 @@ public class PrimaryShardInfoFutureCacheTest {
 
         assertEquals("getIfPresent", null, cache.getIfPresent("foo"));
 
-        PrimaryShardInfo shardInfo = new PrimaryShardInfo(mock(ActorSelection.class), Optional.<DataTree>absent());
+        PrimaryShardInfo shardInfo = new PrimaryShardInfo(mock(ActorSelection.class), DataStoreVersions.CURRENT_VERSION,
+                Optional.<DataTree>absent());
         cache.putSuccessful("foo", shardInfo);
 
         Future<PrimaryShardInfo> future = cache.getIfPresent("foo");