From: Tom Pantelis Date: Tue, 2 Jun 2015 01:30:59 +0000 (-0400) Subject: Bug 3020: Use leader version in LeaderStateChanged X-Git-Tag: release/beryllium~473 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=9302c5052c431ae2add87d4e14a68570ef7604ec Bug 3020: Use leader version in LeaderStateChanged Modified the ShardManager to store the leader's version obtained from the LeaderStateChanged message in the ShardInformation and propagate to the RemotePrimaryShardFound message in response to FindPrimary. ActorContext#findPrimaryShardAsync sets the leader's version in the PrimaryShardInfo based on the FindPrimary response. If the response is LocalPrimaryShardFound, it sets it to CURRENT_VERSION, otherwise it sets it from the RemotePrimaryShardFound response. RemoteTransactionContextSupport#setPrimaryShard checks the leader's version to determine if it can utilize the direct tx modification preparation on the shard actor. Change-Id: I1defe03dea27dfb652cdc1e0a02fa70c6e454035 Signed-off-by: Tom Pantelis (cherry picked from commit 0c74ca17e0d64c4c2a7555470a5737ebef148890) --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java index 4dff391535..976e613e8e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java @@ -76,7 +76,7 @@ abstract class AbstractTransactionContextFactory= DataStoreVersions.LITHIUM_VERSION && getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context", getIdentifier(), primaryShard); // For write-only Tx's we prepare the transaction modifications directly on the shard actor // to avoid the overhead of creating a separate transaction actor. - // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow. transactionContextAdapter.executePriorTransactionOperations(createValidTransactionContext(this.primaryShard, - this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION)); + this.primaryShard.path().toString(), primaryVersion)); } else { tryCreateTransaction(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index c8f2b1b8d9..db04956f00 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -453,14 +453,15 @@ public class Shard extends RaftActor { try { commitCoordinator.handleReadyLocalTransaction(message, getSender(), this); } catch (Exception e) { - LOG.error("{}: Error handling LocalModifications for Tx {}", persistenceId(), + LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), message.getTransactionID(), e); getSender().tell(new akka.actor.Status.Failure(e), getSelf()); } } else { ActorSelection leader = getLeader(); if (leader != null) { - LOG.debug("{}: Forwarding LocalModifications to leader {}", persistenceId(), leader); + LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader); + message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(message, getContext()); } else { noLeaderError(message); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 6de370e1af..4f3c521f44 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -203,6 +203,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); if(shardInformation != null) { shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree()); + shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion()); if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) { primaryShardInfoCache.remove(shardInformation.getShardName()); } @@ -516,7 +517,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { String primaryPath = info.getSerializedLeaderActor(); Object found = canReturnLocalShardState && info.isLeader() ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) : - new RemotePrimaryShardFound(primaryPath); + new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion()); if(LOG.isDebugEnabled()) { LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); @@ -666,6 +667,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Set onShardInitializedSet = Sets.newHashSet(); private String role ; private String leaderId; + private short leaderVersion; private ShardInformation(String shardName, ShardIdentifier shardId, Map peerAddresses) { @@ -820,13 +822,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return changed; } - public String getLeaderId() { + String getLeaderId() { return leaderId; } - public void setLeaderAvailable(boolean leaderAvailable) { + void setLeaderAvailable(boolean leaderAvailable) { this.leaderAvailable = leaderAvailable; } + + short getLeaderVersion() { + return leaderVersion; + } + + void setLeaderVersion(short leaderVersion) { + this.leaderVersion = leaderVersion; + } } private static class ShardManagerCreator implements Creator { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryShardInfo.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryShardInfo.java index bbeb1aa84b..cad0a4569e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryShardInfo.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryShardInfo.java @@ -20,10 +20,13 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; */ public class PrimaryShardInfo { private final ActorSelection primaryShardActor; + private final short primaryShardVersion; private final Optional localShardDataTree; - public PrimaryShardInfo(@Nonnull ActorSelection primaryShardActor, @Nonnull Optional localShardDataTree) { + public PrimaryShardInfo(@Nonnull ActorSelection primaryShardActor, short primaryShardVersion, + @Nonnull Optional localShardDataTree) { this.primaryShardActor = Preconditions.checkNotNull(primaryShardActor); + this.primaryShardVersion = primaryShardVersion; this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree); } @@ -34,6 +37,13 @@ public class PrimaryShardInfo { return primaryShardActor; } + /** + * Returns the version of the primary shard. + */ + public short getPrimaryShardVersion() { + return primaryShardVersion; + } + /** * Returns an Optional whose value contains the primary shard's DataTree if the primary shard is local * to the caller. Otherwise the Optional value is absent. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java index f8cd18ced2..0cded39f4e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore.messages; import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; /** @@ -22,6 +23,9 @@ public final class ReadyLocalTransaction { private final String transactionID; private final boolean doCommitOnReady; + // The version of the remote system used only when needing to convert to BatchedModifications. + private short remoteVersion = DataStoreVersions.CURRENT_VERSION; + public ReadyLocalTransaction(final String transactionID, final DataTreeModification modification, final boolean doCommitOnReady) { this.transactionID = Preconditions.checkNotNull(transactionID); this.modification = Preconditions.checkNotNull(modification); @@ -39,4 +43,12 @@ public final class ReadyLocalTransaction { public boolean isDoCommitOnReady() { return doCommitOnReady; } + + public short getRemoteVersion() { + return remoteVersion; + } + + public void setRemoteVersion(short remoteVersion) { + this.remoteVersion = remoteVersion; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java index 1091aa5070..fc5a99fe9a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java @@ -14,7 +14,6 @@ import java.util.ArrayDeque; import java.util.Deque; import javax.annotation.Nonnull; import org.apache.commons.lang3.SerializationUtils; -import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; @@ -43,14 +42,14 @@ public final class ReadyLocalTransactionSerializer extends JSerializer { @Override public byte[] toBinary(final Object obj) { Preconditions.checkArgument(obj instanceof ReadyLocalTransaction, "Unsupported object type %s", obj.getClass()); - final ReadyLocalTransaction msg = (ReadyLocalTransaction) obj; - final BatchedModifications batched = new BatchedModifications(msg.getTransactionID(), - DataStoreVersions.CURRENT_VERSION, ""); - batched.setDoCommitOnReady(msg.isDoCommitOnReady()); + final ReadyLocalTransaction readyLocal = (ReadyLocalTransaction) obj; + final BatchedModifications batched = new BatchedModifications(readyLocal.getTransactionID(), + readyLocal.getRemoteVersion(), ""); + batched.setDoCommitOnReady(readyLocal.isDoCommitOnReady()); batched.setTotalMessagesSent(1); batched.setReady(true); - msg.getModification().applyToCursor(new BatchedCursor(batched)); + readyLocal.getModification().applyToCursor(new BatchedCursor(batched)); return SerializationUtils.serialize(batched); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemotePrimaryShardFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemotePrimaryShardFound.java index 662eefd9d1..d3e27dba95 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemotePrimaryShardFound.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RemotePrimaryShardFound.java @@ -17,19 +17,26 @@ public class RemotePrimaryShardFound implements Serializable { private static final long serialVersionUID = 1L; private final String primaryPath; + private final short primaryVersion; - public RemotePrimaryShardFound(final String primaryPath) { + public RemotePrimaryShardFound(final String primaryPath, short primaryVersion) { this.primaryPath = primaryPath; + this.primaryVersion = primaryVersion; } public String getPrimaryPath() { return primaryPath; } + public short getPrimaryVersion() { + return primaryVersion; + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("RemotePrimaryShardFound [primaryPath=").append(primaryPath).append("]"); + builder.append("RemotePrimaryShardFound [primaryPath=").append(primaryPath).append(", primaryVersion=") + .append(primaryVersion).append("]"); return builder.toString(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 5c514cf775..6640898dea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; @@ -206,11 +207,13 @@ public class ActorContext { public PrimaryShardInfo checkedApply(Object response) throws Exception { if(response instanceof RemotePrimaryShardFound) { LOG.debug("findPrimaryShardAsync received: {}", response); - return onPrimaryShardFound(shardName, ((RemotePrimaryShardFound)response).getPrimaryPath(), null); + RemotePrimaryShardFound found = (RemotePrimaryShardFound)response; + return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null); } else if(response instanceof LocalPrimaryShardFound) { LOG.debug("findPrimaryShardAsync received: {}", response); LocalPrimaryShardFound found = (LocalPrimaryShardFound)response; - return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getLocalShardDataTree()); + return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION, + found.getLocalShardDataTree()); } else if(response instanceof NotInitializedException) { throw (NotInitializedException)response; } else if(response instanceof PrimaryNotFoundException) { @@ -226,9 +229,10 @@ public class ActorContext { } private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath, - DataTree localShardDataTree) { + short primaryVersion, DataTree localShardDataTree) { ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath); - PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree)); + PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, primaryVersion, + Optional.fromNullable(localShardDataTree)); primaryShardInfoCache.putSuccessful(shardName, info); return info; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index 27fe3c5869..8e9d79ee5e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -316,18 +316,28 @@ public abstract class AbstractTransactionProxyTest { } protected Future primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef) { + return primaryShardInfoReply(actorSystem, actorRef, DataStoreVersions.CURRENT_VERSION); + } + + protected Future primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef, + short transactionVersion) { return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()), - Optional.absent())); + transactionVersion, Optional.absent())); } protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName) { + return setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName, DataStoreVersions.CURRENT_VERSION); + } + + protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName, + short transactionVersion) { ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); log.info("Created mock shard actor {}", actorRef); doReturn(actorSystem.actorSelection(actorRef.path())). when(mockActorContext).actorSelection(actorRef.path().toString()); - doReturn(primaryShardInfoReply(actorSystem, actorRef)). + doReturn(primaryShardInfoReply(actorSystem, actorRef, transactionVersion)). when(mockActorContext).findPrimaryShardAsync(eq(shardName)); doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); @@ -338,15 +348,16 @@ public abstract class AbstractTransactionProxyTest { } protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, - TransactionType type, int transactionVersion, String shardName) { - ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName); + TransactionType type, short transactionVersion, String shardName) { + ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName, + transactionVersion); return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion, memberName, shardActorRef); } protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, - TransactionType type, int transactionVersion, String prefix, ActorRef shardActorRef) { + TransactionType type, short transactionVersion, String prefix, ActorRef shardActorRef) { ActorRef txActorRef; if(type == TransactionType.WRITE_ONLY && transactionVersion >= DataStoreVersions.LITHIUM_VERSION && @@ -357,11 +368,11 @@ public abstract class AbstractTransactionProxyTest { log.info("Created mock shard Tx actor {}", txActorRef); doReturn(actorSystem.actorSelection(txActorRef.path())). - when(mockActorContext).actorSelection(txActorRef.path().toString()); + when(mockActorContext).actorSelection(txActorRef.path().toString()); doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), - eqCreateTransaction(prefix, type)); + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(prefix, type)); } return txActorRef; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index e95993de24..b5cceea851 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -206,14 +206,16 @@ public class ShardManagerTest extends AbstractActorTest { String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.tell(new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.absent(), - DataStoreVersions.CURRENT_VERSION), mockShardActor); + leaderVersion), mockShardActor); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-2-shard-default")); + assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion()); }}; } @@ -393,8 +395,9 @@ public class ShardManagerTest extends AbstractActorTest { shardManager2.tell(new ActorInitialized(), mockShardActor2); String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; + short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, - Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor2); + Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2); shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); @@ -405,6 +408,7 @@ public class ShardManagerTest extends AbstractActorTest { RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); String path = found.getPrimaryPath(); assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config")); + assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion()); shardManager2.underlyingActor().verifyFindPrimary(); @@ -560,7 +564,7 @@ public class ShardManagerTest extends AbstractActorTest { assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection( - mockShardActor1.path()), Optional.absent())); + mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.absent())); shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 8909e1d312..26d51cbae3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -799,11 +799,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { } private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){ - return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), Optional.absent()); + return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION, + Optional.absent()); } private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, Optional dataTreeOptional){ - return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), dataTreeOptional); + return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION, + dataTreeOptional); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java index a6656b2681..cb7c78090f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.datastore.compat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; @@ -21,15 +20,15 @@ import akka.actor.ActorRef; import akka.dispatch.Futures; import akka.util.Timeout; import com.google.common.base.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import org.junit.Ignore; +import java.util.concurrent.TimeoutException; import org.junit.Test; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.AbstractThreePhaseCommitCohort; import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; -import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy; import org.opendaylight.controller.cluster.datastore.TransactionProxy; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; @@ -174,13 +173,20 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + doThreePhaseCommit(actorRef, transactionProxy, proxy); + + return actorRef; + } + + private void doThreePhaseCommit(ActorRef actorRef, TransactionProxy transactionProxy, + AbstractThreePhaseCommitCohort proxy) throws InterruptedException, ExecutionException, TimeoutException { doReturn(Futures.successful(CanCommitTransactionReply.YES.toSerializable())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), - eqCanCommitTransaction(transactionProxy.getIdentifier().toString()), any(Timeout.class)); + executeOperationAsync(eq(actorSelection(actorRef)), eqCanCommitTransaction( + transactionProxy.getIdentifier().toString()), any(Timeout.class)); doReturn(Futures.successful(new CommitTransactionReply().toSerializable())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), - eqCommitTransaction(transactionProxy.getIdentifier().toString()), any(Timeout.class)); + executeOperationAsync(eq(actorSelection(actorRef)), eqCommitTransaction( + transactionProxy.getIdentifier().toString()), any(Timeout.class)); Boolean canCommit = proxy.canCommit().get(3, TimeUnit.SECONDS); assertEquals("canCommit", true, canCommit.booleanValue()); @@ -189,7 +195,11 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest proxy.commit().get(3, TimeUnit.SECONDS); - return actorRef; + verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), eqCanCommitTransaction( + transactionProxy.getIdentifier().toString()), any(Timeout.class)); + + verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), eqCommitTransaction( + transactionProxy.getIdentifier().toString()), any(Timeout.class)); } @Test @@ -209,9 +219,6 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest } @Test - @Ignore - // FIXME: disabled until we can get the primary shard version from the ShardManager as we now skip - // creating transaction actors for write-only Tx's. public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception { short version = DataStoreVersions.HELIUM_2_VERSION; ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version, @@ -225,19 +232,16 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); - doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), - eq(actorRef.path().toString())); - TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); transactionProxy.write(TestModel.TEST_PATH, testNode); DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + AbstractThreePhaseCommitCohort proxy = (AbstractThreePhaseCommitCohort) ready; verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + + doThreePhaseCommit(actorRef, transactionProxy, proxy); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 1cc89f18af..2b9b9d635e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -34,6 +34,7 @@ import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -387,12 +388,13 @@ public class ActorContextTest extends AbstractActorTest{ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); final String expPrimaryPath = "akka://test-system/find-primary-shard"; + final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION; ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath)); + return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion)); } }; @@ -403,6 +405,7 @@ public class ActorContextTest extends AbstractActorTest{ assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent()); assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(), expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString())); + assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion()); Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); @@ -445,6 +448,7 @@ public class ActorContextTest extends AbstractActorTest{ assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get()); assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(), expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString())); + assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion()); Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); @@ -509,8 +513,10 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props()); MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor(); - shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString())); - shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString())); + shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(), + DataStoreVersions.CURRENT_VERSION)); + shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(), + DataStoreVersions.CURRENT_VERSION)); shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found")); Configuration mockConfig = mock(Configuration.class); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCacheTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCacheTest.java index 9d34ce5a08..e9d780aff2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCacheTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/PrimaryShardInfoFutureCacheTest.java @@ -13,6 +13,7 @@ import static org.mockito.Mockito.mock; import akka.actor.ActorSelection; import com.google.common.base.Optional; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import scala.concurrent.Future; @@ -30,7 +31,8 @@ public class PrimaryShardInfoFutureCacheTest { assertEquals("getIfPresent", null, cache.getIfPresent("foo")); - PrimaryShardInfo shardInfo = new PrimaryShardInfo(mock(ActorSelection.class), Optional.absent()); + PrimaryShardInfo shardInfo = new PrimaryShardInfo(mock(ActorSelection.class), DataStoreVersions.CURRENT_VERSION, + Optional.absent()); cache.putSuccessful("foo", shardInfo); Future future = cache.getIfPresent("foo");