Modified the ShardManager to store the leader's version obtained from the
LeaderStateChanged message in the ShardInformation and propagate to the
RemotePrimaryShardFound message in response to FindPrimary.
ActorContext#findPrimaryShardAsync sets the leader's version in the
PrimaryShardInfo based on the FindPrimary response. If the response is
LocalPrimaryShardFound, it sets it to CURRENT_VERSION, otherwise it sets
it from the RemotePrimaryShardFound response.
RemoteTransactionContextSupport#setPrimaryShard checks the leader's
version to determine if it can utilize the direct tx modification
preparation on the shard actor.
Change-Id: I1defe03dea27dfb652cdc1e0a02fa70c6e454035
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
15 files changed:
} else {
RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextAdapter,
parent, shardName);
} else {
RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextAdapter,
parent, shardName);
- remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
+ remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor(), primaryShardInfo.getPrimaryShardVersion());
/**
* Sets the target primary shard and initiates a CreateTransaction try.
*/
/**
* Sets the target primary shard and initiates a CreateTransaction try.
*/
- void setPrimaryShard(ActorSelection primaryShard) {
+ void setPrimaryShard(ActorSelection primaryShard, short primaryVersion) {
this.primaryShard = primaryShard;
this.primaryShard = primaryShard;
- if (getTransactionType() == TransactionType.WRITE_ONLY &&
+ if (getTransactionType() == TransactionType.WRITE_ONLY && primaryVersion >= DataStoreVersions.LITHIUM_VERSION &&
getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
getIdentifier(), primaryShard);
// For write-only Tx's we prepare the transaction modifications directly on the shard actor
// to avoid the overhead of creating a separate transaction actor.
getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
getIdentifier(), primaryShard);
// For write-only Tx's we prepare the transaction modifications directly on the shard actor
// to avoid the overhead of creating a separate transaction actor.
- // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
transactionContextAdapter.executePriorTransactionOperations(createValidTransactionContext(this.primaryShard,
transactionContextAdapter.executePriorTransactionOperations(createValidTransactionContext(this.primaryShard,
- this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
+ this.primaryShard.path().toString(), primaryVersion));
} else {
tryCreateTransaction();
}
} else {
tryCreateTransaction();
}
try {
commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
} catch (Exception e) {
try {
commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
} catch (Exception e) {
- LOG.error("{}: Error handling LocalModifications for Tx {}", persistenceId(),
+ LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
message.getTransactionID(), e);
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
} else {
ActorSelection leader = getLeader();
if (leader != null) {
message.getTransactionID(), e);
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
} else {
ActorSelection leader = getLeader();
if (leader != null) {
- LOG.debug("{}: Forwarding LocalModifications to leader {}", persistenceId(), leader);
+ LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
+ message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(message, getContext());
} else {
noLeaderError(message);
leader.forward(message, getContext());
} else {
noLeaderError(message);
ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
if(shardInformation != null) {
shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
if(shardInformation != null) {
shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
+ shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
primaryShardInfoCache.remove(shardInformation.getShardName());
}
if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
primaryShardInfoCache.remove(shardInformation.getShardName());
}
String primaryPath = info.getSerializedLeaderActor();
Object found = canReturnLocalShardState && info.isLeader() ?
new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
String primaryPath = info.getSerializedLeaderActor();
Object found = canReturnLocalShardState && info.isLeader() ?
new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
- new RemotePrimaryShardFound(primaryPath);
+ new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
private String role ;
private String leaderId;
private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
private String role ;
private String leaderId;
+ private short leaderVersion;
private ShardInformation(String shardName, ShardIdentifier shardId,
Map<String, String> peerAddresses) {
private ShardInformation(String shardName, ShardIdentifier shardId,
Map<String, String> peerAddresses) {
- public String getLeaderId() {
- public void setLeaderAvailable(boolean leaderAvailable) {
+ void setLeaderAvailable(boolean leaderAvailable) {
this.leaderAvailable = leaderAvailable;
}
this.leaderAvailable = leaderAvailable;
}
+
+ short getLeaderVersion() {
+ return leaderVersion;
+ }
+
+ void setLeaderVersion(short leaderVersion) {
+ this.leaderVersion = leaderVersion;
+ }
}
private static class ShardManagerCreator implements Creator<ShardManager> {
}
private static class ShardManagerCreator implements Creator<ShardManager> {
*/
public class PrimaryShardInfo {
private final ActorSelection primaryShardActor;
*/
public class PrimaryShardInfo {
private final ActorSelection primaryShardActor;
+ private final short primaryShardVersion;
private final Optional<DataTree> localShardDataTree;
private final Optional<DataTree> localShardDataTree;
- public PrimaryShardInfo(@Nonnull ActorSelection primaryShardActor, @Nonnull Optional<DataTree> localShardDataTree) {
+ public PrimaryShardInfo(@Nonnull ActorSelection primaryShardActor, short primaryShardVersion,
+ @Nonnull Optional<DataTree> localShardDataTree) {
this.primaryShardActor = Preconditions.checkNotNull(primaryShardActor);
this.primaryShardActor = Preconditions.checkNotNull(primaryShardActor);
+ this.primaryShardVersion = primaryShardVersion;
this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree);
}
this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree);
}
return primaryShardActor;
}
return primaryShardActor;
}
+ /**
+ * Returns the version of the primary shard.
+ */
+ public short getPrimaryShardVersion() {
+ return primaryShardVersion;
+ }
+
/**
* Returns an Optional whose value contains the primary shard's DataTree if the primary shard is local
* to the caller. Otherwise the Optional value is absent.
/**
* Returns an Optional whose value contains the primary shard's DataTree if the primary shard is local
* to the caller. Otherwise the Optional value is absent.
package org.opendaylight.controller.cluster.datastore.messages;
import com.google.common.base.Preconditions;
package org.opendaylight.controller.cluster.datastore.messages;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
/**
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
/**
private final String transactionID;
private final boolean doCommitOnReady;
private final String transactionID;
private final boolean doCommitOnReady;
+ // The version of the remote system used only when needing to convert to BatchedModifications.
+ private short remoteVersion = DataStoreVersions.CURRENT_VERSION;
+
public ReadyLocalTransaction(final String transactionID, final DataTreeModification modification, final boolean doCommitOnReady) {
this.transactionID = Preconditions.checkNotNull(transactionID);
this.modification = Preconditions.checkNotNull(modification);
public ReadyLocalTransaction(final String transactionID, final DataTreeModification modification, final boolean doCommitOnReady) {
this.transactionID = Preconditions.checkNotNull(transactionID);
this.modification = Preconditions.checkNotNull(modification);
public boolean isDoCommitOnReady() {
return doCommitOnReady;
}
public boolean isDoCommitOnReady() {
return doCommitOnReady;
}
+
+ public short getRemoteVersion() {
+ return remoteVersion;
+ }
+
+ public void setRemoteVersion(short remoteVersion) {
+ this.remoteVersion = remoteVersion;
+ }
import java.util.Deque;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.SerializationUtils;
import java.util.Deque;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.SerializationUtils;
-import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
@Override
public byte[] toBinary(final Object obj) {
Preconditions.checkArgument(obj instanceof ReadyLocalTransaction, "Unsupported object type %s", obj.getClass());
@Override
public byte[] toBinary(final Object obj) {
Preconditions.checkArgument(obj instanceof ReadyLocalTransaction, "Unsupported object type %s", obj.getClass());
- final ReadyLocalTransaction msg = (ReadyLocalTransaction) obj;
- final BatchedModifications batched = new BatchedModifications(msg.getTransactionID(),
- DataStoreVersions.CURRENT_VERSION, "");
- batched.setDoCommitOnReady(msg.isDoCommitOnReady());
+ final ReadyLocalTransaction readyLocal = (ReadyLocalTransaction) obj;
+ final BatchedModifications batched = new BatchedModifications(readyLocal.getTransactionID(),
+ readyLocal.getRemoteVersion(), "");
+ batched.setDoCommitOnReady(readyLocal.isDoCommitOnReady());
batched.setTotalMessagesSent(1);
batched.setReady(true);
batched.setTotalMessagesSent(1);
batched.setReady(true);
- msg.getModification().applyToCursor(new BatchedCursor(batched));
+ readyLocal.getModification().applyToCursor(new BatchedCursor(batched));
return SerializationUtils.serialize(batched);
}
return SerializationUtils.serialize(batched);
}
private static final long serialVersionUID = 1L;
private final String primaryPath;
private static final long serialVersionUID = 1L;
private final String primaryPath;
+ private final short primaryVersion;
- public RemotePrimaryShardFound(final String primaryPath) {
+ public RemotePrimaryShardFound(final String primaryPath, short primaryVersion) {
this.primaryPath = primaryPath;
this.primaryPath = primaryPath;
+ this.primaryVersion = primaryVersion;
}
public String getPrimaryPath() {
return primaryPath;
}
}
public String getPrimaryPath() {
return primaryPath;
}
+ public short getPrimaryVersion() {
+ return primaryVersion;
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("RemotePrimaryShardFound [primaryPath=").append(primaryPath).append("]");
+ builder.append("RemotePrimaryShardFound [primaryPath=").append(primaryPath).append(", primaryVersion=")
+ .append(primaryVersion).append("]");
return builder.toString();
}
}
return builder.toString();
}
}
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
public PrimaryShardInfo checkedApply(Object response) throws Exception {
if(response instanceof RemotePrimaryShardFound) {
LOG.debug("findPrimaryShardAsync received: {}", response);
public PrimaryShardInfo checkedApply(Object response) throws Exception {
if(response instanceof RemotePrimaryShardFound) {
LOG.debug("findPrimaryShardAsync received: {}", response);
- return onPrimaryShardFound(shardName, ((RemotePrimaryShardFound)response).getPrimaryPath(), null);
+ RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
+ return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
} else if(response instanceof LocalPrimaryShardFound) {
LOG.debug("findPrimaryShardAsync received: {}", response);
LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
} else if(response instanceof LocalPrimaryShardFound) {
LOG.debug("findPrimaryShardAsync received: {}", response);
LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
- return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getLocalShardDataTree());
+ return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
+ found.getLocalShardDataTree());
} else if(response instanceof NotInitializedException) {
throw (NotInitializedException)response;
} else if(response instanceof PrimaryNotFoundException) {
} else if(response instanceof NotInitializedException) {
throw (NotInitializedException)response;
} else if(response instanceof PrimaryNotFoundException) {
}
private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
}
private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
- DataTree localShardDataTree) {
+ short primaryVersion, DataTree localShardDataTree) {
ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
- PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree));
+ PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, primaryVersion,
+ Optional.fromNullable(localShardDataTree));
primaryShardInfoCache.putSuccessful(shardName, info);
return info;
}
primaryShardInfoCache.putSuccessful(shardName, info);
return info;
}
}
protected Future<PrimaryShardInfo> primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef) {
}
protected Future<PrimaryShardInfo> primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef) {
+ return primaryShardInfoReply(actorSystem, actorRef, DataStoreVersions.CURRENT_VERSION);
+ }
+
+ protected Future<PrimaryShardInfo> primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef,
+ short transactionVersion) {
return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()),
return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()),
- Optional.<DataTree>absent()));
+ transactionVersion, Optional.<DataTree>absent()));
}
protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName) {
}
protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName) {
+ return setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName, DataStoreVersions.CURRENT_VERSION);
+ }
+
+ protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName,
+ short transactionVersion) {
ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
log.info("Created mock shard actor {}", actorRef);
doReturn(actorSystem.actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
log.info("Created mock shard actor {}", actorRef);
doReturn(actorSystem.actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
- doReturn(primaryShardInfoReply(actorSystem, actorRef)).
+ doReturn(primaryShardInfoReply(actorSystem, actorRef, transactionVersion)).
when(mockActorContext).findPrimaryShardAsync(eq(shardName));
doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
when(mockActorContext).findPrimaryShardAsync(eq(shardName));
doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
}
protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
}
protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
- TransactionType type, int transactionVersion, String shardName) {
- ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName);
+ TransactionType type, short transactionVersion, String shardName) {
+ ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName,
+ transactionVersion);
return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion,
memberName, shardActorRef);
}
protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion,
memberName, shardActorRef);
}
protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
- TransactionType type, int transactionVersion, String prefix, ActorRef shardActorRef) {
+ TransactionType type, short transactionVersion, String prefix, ActorRef shardActorRef) {
ActorRef txActorRef;
if(type == TransactionType.WRITE_ONLY && transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
ActorRef txActorRef;
if(type == TransactionType.WRITE_ONLY && transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
log.info("Created mock shard Tx actor {}", txActorRef);
doReturn(actorSystem.actorSelection(txActorRef.path())).
log.info("Created mock shard Tx actor {}", txActorRef);
doReturn(actorSystem.actorSelection(txActorRef.path())).
- when(mockActorContext).actorSelection(txActorRef.path().toString());
+ when(mockActorContext).actorSelection(txActorRef.path().toString());
doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext).
doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext).
- executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
- eqCreateTransaction(prefix, type));
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+ eqCreateTransaction(prefix, type));
String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.tell(new RoleChangeNotification(memberId1,
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.tell(new RoleChangeNotification(memberId1,
RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+ short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.<DataTree>absent(),
- DataStoreVersions.CURRENT_VERSION), mockShardActor);
+ leaderVersion), mockShardActor);
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-2-shard-default"));
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
primaryFound.getPrimaryPath().contains("member-2-shard-default"));
+ assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion());
shardManager2.tell(new ActorInitialized(), mockShardActor2);
String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
shardManager2.tell(new ActorInitialized(), mockShardActor2);
String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
+ short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2,
- Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor2);
+ Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2);
shardManager2.tell(new RoleChangeNotification(memberId2,
RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
shardManager2.tell(new RoleChangeNotification(memberId2,
RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
String path = found.getPrimaryPath();
assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
String path = found.getPrimaryPath();
assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
+ assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion());
shardManager2.underlyingActor().verifyFindPrimary();
shardManager2.underlyingActor().verifyFindPrimary();
assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection(
- mockShardActor1.path()), Optional.<DataTree>absent()));
+ mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.<DataTree>absent()));
shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
}
private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
}
private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
- return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), Optional.<DataTree>absent());
+ return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
+ Optional.<DataTree>absent());
}
private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, Optional<DataTree> dataTreeOptional){
}
private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, Optional<DataTree> dataTreeOptional){
- return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), dataTreeOptional);
+ return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
+ dataTreeOptional);
package org.opendaylight.controller.cluster.datastore.compat;
import static org.junit.Assert.assertEquals;
package org.opendaylight.controller.cluster.datastore.compat;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import akka.dispatch.Futures;
import akka.util.Timeout;
import com.google.common.base.Optional;
import akka.dispatch.Futures;
import akka.util.Timeout;
import com.google.common.base.Optional;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeUnit;
-import org.junit.Ignore;
+import java.util.concurrent.TimeoutException;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.AbstractThreePhaseCommitCohort;
import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.AbstractThreePhaseCommitCohort;
import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
-import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy;
import org.opendaylight.controller.cluster.datastore.TransactionProxy;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.TransactionProxy;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+ doThreePhaseCommit(actorRef, transactionProxy, proxy);
+
+ return actorRef;
+ }
+
+ private void doThreePhaseCommit(ActorRef actorRef, TransactionProxy transactionProxy,
+ AbstractThreePhaseCommitCohort<?> proxy) throws InterruptedException, ExecutionException, TimeoutException {
doReturn(Futures.successful(CanCommitTransactionReply.YES.toSerializable())).when(mockActorContext).
doReturn(Futures.successful(CanCommitTransactionReply.YES.toSerializable())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)),
- eqCanCommitTransaction(transactionProxy.getIdentifier().toString()), any(Timeout.class));
+ executeOperationAsync(eq(actorSelection(actorRef)), eqCanCommitTransaction(
+ transactionProxy.getIdentifier().toString()), any(Timeout.class));
doReturn(Futures.successful(new CommitTransactionReply().toSerializable())).when(mockActorContext).
doReturn(Futures.successful(new CommitTransactionReply().toSerializable())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)),
- eqCommitTransaction(transactionProxy.getIdentifier().toString()), any(Timeout.class));
+ executeOperationAsync(eq(actorSelection(actorRef)), eqCommitTransaction(
+ transactionProxy.getIdentifier().toString()), any(Timeout.class));
Boolean canCommit = proxy.canCommit().get(3, TimeUnit.SECONDS);
assertEquals("canCommit", true, canCommit.booleanValue());
Boolean canCommit = proxy.canCommit().get(3, TimeUnit.SECONDS);
assertEquals("canCommit", true, canCommit.booleanValue());
proxy.commit().get(3, TimeUnit.SECONDS);
proxy.commit().get(3, TimeUnit.SECONDS);
+ verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), eqCanCommitTransaction(
+ transactionProxy.getIdentifier().toString()), any(Timeout.class));
+
+ verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), eqCommitTransaction(
+ transactionProxy.getIdentifier().toString()), any(Timeout.class));
- @Ignore
- // FIXME: disabled until we can get the primary shard version from the ShardManager as we now skip
- // creating transaction actors for write-only Tx's.
public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception {
short version = DataStoreVersions.HELIUM_2_VERSION;
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version,
public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception {
short version = DataStoreVersions.HELIUM_2_VERSION;
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version,
doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
- doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
- eq(actorRef.path().toString()));
-
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, testNode);
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, testNode);
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
- assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
- ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+ AbstractThreePhaseCommitCohort<?> proxy = (AbstractThreePhaseCommitCohort<?>) ready;
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+ doThreePhaseCommit(actorRef, transactionProxy, proxy);
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
final String expPrimaryPath = "akka://test-system/find-primary-shard";
shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
final String expPrimaryPath = "akka://test-system/find-primary-shard";
+ final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath));
+ return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+ assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+ assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
- shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString()));
- shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString()));
+ shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
+ DataStoreVersions.CURRENT_VERSION));
+ shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
+ DataStoreVersions.CURRENT_VERSION));
shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
Configuration mockConfig = mock(Configuration.class);
shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
Configuration mockConfig = mock(Configuration.class);
import akka.actor.ActorSelection;
import com.google.common.base.Optional;
import org.junit.Test;
import akka.actor.ActorSelection;
import com.google.common.base.Optional;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import scala.concurrent.Future;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import scala.concurrent.Future;
assertEquals("getIfPresent", null, cache.getIfPresent("foo"));
assertEquals("getIfPresent", null, cache.getIfPresent("foo"));
- PrimaryShardInfo shardInfo = new PrimaryShardInfo(mock(ActorSelection.class), Optional.<DataTree>absent());
+ PrimaryShardInfo shardInfo = new PrimaryShardInfo(mock(ActorSelection.class), DataStoreVersions.CURRENT_VERSION,
+ Optional.<DataTree>absent());
cache.putSuccessful("foo", shardInfo);
Future<PrimaryShardInfo> future = cache.getIfPresent("foo");
cache.putSuccessful("foo", shardInfo);
Future<PrimaryShardInfo> future = cache.getIfPresent("foo");