<feature name="odl-base-eclipselink-persistence" description="EclipseLink Persistence API" version="2.0.4.v201112161009">
<bundle start="true">mvn:eclipselink/javax.persistence/2.0.4.v201112161009</bundle>
<bundle start="true">mvn:eclipselink/javax.resource/1.5.0.v200906010428</bundle>
+ <bundle start="true">mvn:org.eclipse.persistence/org.eclipse.persistence.antlr/2.5.0</bundle>
<bundle start="true">mvn:org.eclipse.persistence/org.eclipse.persistence.moxy/2.5.0</bundle>
<bundle start="true">mvn:org.eclipse.persistence/org.eclipse.persistence.core/2.5.0</bundle>
</feature>
try {
bs = fromObject(state);
} catch (Exception e) {
- LOG.error("Exception in creating snapshot", e);
+ LOG.error(e, "Exception in creating snapshot");
}
getSelf().tell(new CaptureSnapshotReply(bs), null);
}
try {
state.putAll((HashMap) toObject(snapshot));
} catch (Exception e) {
- LOG.error("Exception in applying snapshot", e);
+ LOG.error(e, "Exception in applying snapshot");
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot applied to state :" + ((HashMap) state).size());
+ LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
}
}
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
-import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
}
private void onRecoveredSnapshot(SnapshotOffer offer) {
- LOG.debug("SnapshotOffer called..");
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("SnapshotOffer called..");
+ }
initRecoveryTimer();
replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
replicatedLog.snapshotTerm, replicatedLog.size());
- currentBehavior = switchBehavior(RaftState.Follower);
+ currentBehavior = new Follower(context);
onStateChanged();
}
if (!(message instanceof AppendEntriesMessages.AppendEntries)
&& !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
if(LOG.isDebugEnabled()) {
- LOG.debug("onReceiveCommand: message:" + message.getClass());
+ LOG.debug("onReceiveCommand: message: {}", message.getClass());
}
}
- RaftState state =
- currentBehavior.handleMessage(getSender(), message);
RaftActorBehavior oldBehavior = currentBehavior;
- currentBehavior = switchBehavior(state);
+ currentBehavior = currentBehavior.handleMessage(getSender(), message);
+
if(oldBehavior != currentBehavior){
onStateChanged();
}
protected void onLeaderChanged(String oldLeader, String newLeader){};
- private RaftActorBehavior switchBehavior(RaftState state) {
- if (currentBehavior != null) {
- if (currentBehavior.state() == state) {
- return currentBehavior;
- }
- LOG.info("Switching from state " + currentBehavior.state() + " to "
- + state);
-
- try {
- currentBehavior.close();
- } catch (Exception e) {
- LOG.error(e,
- "Failed to close behavior : " + currentBehavior.state());
- }
-
- } else {
- LOG.info("Switching behavior to " + state);
- }
- RaftActorBehavior behavior = null;
- if (state == RaftState.Candidate) {
- behavior = new Candidate(context);
- } else if (state == RaftState.Follower) {
- behavior = new Follower(context);
- } else {
- behavior = new Leader(context);
- }
-
-
-
- return behavior;
- }
-
private void trimPersistentData(long sequenceNumber) {
// Trim akka snapshots
// FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
}
String peerAddress = context.getPeerAddress(leaderId);
if(LOG.isDebugEnabled()) {
- LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
- + peerAddress);
+ LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}",
+ leaderId, peerAddress);
}
return peerAddress;
public void appendAndPersist(final ActorRef clientActor,
final String identifier,
final ReplicatedLogEntry replicatedLogEntry) {
- context.getLogger().debug(
- "Append log entry and persist {} ", replicatedLogEntry);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
+ }
+
// FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
journal.add(replicatedLogEntry);
import akka.actor.ActorRef;
import akka.actor.Cancellable;
+import akka.event.LoggingAdapter;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
*/
protected final RaftActorContext context;
+ /**
+ *
+ */
+ protected final LoggingAdapter LOG;
+
/**
*
*/
protected AbstractRaftActorBehavior(RaftActorContext context) {
this.context = context;
+ this.LOG = context.getLogger();
}
/**
* @param appendEntries The AppendEntries message
* @return
*/
- protected abstract RaftState handleAppendEntries(ActorRef sender,
+ protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries);
* @param appendEntries
* @return
*/
- protected RaftState appendEntries(ActorRef sender,
+ protected RaftActorBehavior appendEntries(ActorRef sender,
AppendEntries appendEntries) {
// 1. Reply false if term < currentTerm (§5.1)
if (appendEntries.getTerm() < currentTerm()) {
- context.getLogger().debug(
- "Cannot append entries because sender term " + appendEntries
- .getTerm() + " is less than " + currentTerm());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Cannot append entries because sender term {} is less than {}",
+ appendEntries.getTerm(), currentTerm());
+ }
+
sender.tell(
new AppendEntriesReply(context.getId(), currentTerm(), false,
lastIndex(), lastTerm()), actor()
);
- return state();
+ return this;
}
* @param appendEntriesReply The AppendEntriesReply message
* @return
*/
- protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
+ protected abstract RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply);
/**
* @param requestVote
* @return
*/
- protected RaftState requestVote(ActorRef sender,
+ protected RaftActorBehavior requestVote(ActorRef sender,
RequestVote requestVote) {
-
- context.getLogger().debug(requestVote.toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(requestVote.toString());
+ }
boolean grantVote = false;
sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
- return state();
+ return this;
}
/**
* @param requestVoteReply The RequestVoteReply message
* @return
*/
- protected abstract RaftState handleRequestVoteReply(ActorRef sender,
+ protected abstract RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply);
/**
} else {
//if one index is not present in the log, no point in looping
// around as the rest wont be present either
- context.getLogger().warning(
- "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index );
+ LOG.warning(
+ "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index);
break;
}
}
- context.getLogger().debug("Setting last applied to {}", newLastApplied);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Setting last applied to {}", newLastApplied);
+ }
context.setLastApplied(newLastApplied);
// send a message to persist a ApplyLogEntries marker message into akka's persistent journal
}
@Override
- public RaftState handleMessage(ActorRef sender, Object message) {
+ public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
if (message instanceof AppendEntries) {
return appendEntries(sender, (AppendEntries) message);
} else if (message instanceof AppendEntriesReply) {
} else if (message instanceof RequestVoteReply) {
return handleRequestVoteReply(sender, (RequestVoteReply) message);
}
- return state();
+ return this;
}
@Override public String getLeaderId() {
return leaderId;
}
+
+ protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
+ LOG.info("Switching from behavior {} to {}", this.state(), behavior.state());
+ try {
+ close();
+ } catch (Exception e) {
+ LOG.error(e, "Failed to close behavior : {}", this.state());
+ }
+
+ return behavior;
+ }
}
peers = context.getPeerAddresses().keySet();
- context.getLogger().debug("Election:Candidate has following peers:"+ peers);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Election:Candidate has following peers: {}", peers);
+ }
if(peers.size() > 0) {
// Votes are required from a majority of the peers including self.
scheduleElection(electionDuration());
}
- @Override protected RaftState handleAppendEntries(ActorRef sender,
+ @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
- context.getLogger().debug(appendEntries.toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(appendEntries.toString());
+ }
- return state();
+ return this;
}
- @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply) {
- return state();
+ return this;
}
- @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
if (requestVoteReply.isVoteGranted()) {
}
if (voteCount >= votesRequired) {
- return RaftState.Leader;
+ return switchBehavior(new Leader(context));
}
- return state();
+ return this;
}
@Override public RaftState state() {
}
@Override
- public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+ public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
Object message = fromSerializableMessage(originalMessage);
RaftRPC rpc = (RaftRPC) message;
- context.getLogger().debug("RaftRPC message received {} my term is {}", rpc.toString(), context.getTermInformation().getCurrentTerm());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("RaftRPC message received {} my term is {}", rpc, context.getTermInformation().getCurrentTerm());
+ }
// If RPC request or response contains term T > currentTerm:
// set currentTerm = T, convert to follower (§5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
- return RaftState.Follower;
+
+ return switchBehavior(new Follower(context));
}
}
// ourselves the leader. This gives enough time for a leader
// who we do not know about (as a peer)
// to send a message to the candidate
- return RaftState.Leader;
+
+ return switchBehavior(new Leader(context));
}
startNewTerm();
scheduleElection(electionDuration());
- return state();
+ return this;
}
return super.handleMessage(sender, message);
context.getTermInformation().updateAndPersist(currentTerm + 1,
context.getId());
- context.getLogger().debug("Starting new term " + (currentTerm + 1));
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Starting new term {}", (currentTerm + 1));
+ }
// Request for a vote
// TODO: Retry request for vote if replies do not arrive in a reasonable
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
-import akka.event.LoggingAdapter;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
public class Follower extends AbstractRaftActorBehavior {
private ByteString snapshotChunksCollected = ByteString.EMPTY;
- private final LoggingAdapter LOG;
-
public Follower(RaftActorContext context) {
super(context);
- LOG = context.getLogger();
-
scheduleElection(electionDuration());
}
- @Override protected RaftState handleAppendEntries(ActorRef sender,
+ @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
new AppendEntriesReply(context.getId(), currentTerm(), false,
lastIndex(), lastTerm()), actor()
);
- return state();
+ return this;
}
if (appendEntries.getEntries() != null
&& appendEntries.getEntries().size() > 0) {
if(LOG.isDebugEnabled()) {
LOG.debug(
- "Number of entries to be appended = " + appendEntries
- .getEntries().size()
+ "Number of entries to be appended = {}", appendEntries.getEntries().size()
);
}
if(LOG.isDebugEnabled()) {
LOG.debug(
- "Removing entries from log starting at "
- + matchEntry.getIndex()
+ "Removing entries from log starting at {}", matchEntry.getIndex()
);
}
}
if(LOG.isDebugEnabled()) {
- context.getLogger().debug(
- "After cleanup entries to be added from = " + (addEntriesFrom
- + lastIndex())
+ LOG.debug("After cleanup entries to be added from = {}", (addEntriesFrom + lastIndex())
);
}
for (int i = addEntriesFrom;
i < appendEntries.getEntries().size(); i++) {
- context.getLogger().info(
- "Append entry to log " + appendEntries.getEntries().get(
- i).getData()
- .toString()
- );
- context.getReplicatedLog()
- .appendAndPersist(appendEntries.getEntries().get(i));
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Append entry to log {}", appendEntries.getEntries().get(i).getData());
+ }
+ context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Log size is now " + context.getReplicatedLog().size());
+ LOG.debug("Log size is now {}", context.getReplicatedLog().size());
}
}
if (prevCommitIndex != context.getCommitIndex()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Commit index set to " + context.getCommitIndex());
+ LOG.debug("Commit index set to {}", context.getCommitIndex());
}
}
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
lastIndex(), lastTerm()), actor());
- return state();
+ return this;
}
- @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply) {
- return state();
+ return this;
}
- @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
- return state();
+ return this;
}
@Override public RaftState state() {
return RaftState.Follower;
}
- @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+ @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
Object message = fromSerializableMessage(originalMessage);
}
if (message instanceof ElectionTimeout) {
- return RaftState.Candidate;
+ return switchBehavior(new Candidate(context));
} else if (message instanceof InstallSnapshot) {
InstallSnapshot installSnapshot = (InstallSnapshot) message;
// this is the last chunk, create a snapshot object and apply
snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
- context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
- snapshotChunksCollected.size());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Last chunk received: snapshotChunksCollected.size:{}",
+ snapshotChunksCollected.size());
+ }
Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
new ArrayList<ReplicatedLogEntry>(),
true), actor());
} catch (Exception e) {
- context.getLogger().error("Exception in InstallSnapshot of follower", e);
+ LOG.error(e, "Exception in InstallSnapshot of follower:");
//send reply with success as false. The chunk will be sent again on failure
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
installSnapshot.getChunkIndex(), false), actor());
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
-import akka.event.LoggingAdapter;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
private final int minReplicationCount;
- private final LoggingAdapter LOG;
-
public Leader(RaftActorContext context) {
super(context);
- LOG = context.getLogger();
-
followers = context.getPeerAddresses().keySet();
for (String followerId : followers) {
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Election:Leader has following peers:" + followers);
+ LOG.debug("Election:Leader has following peers: {}", followers);
}
if (followers.size() > 0) {
}
- @Override protected RaftState handleAppendEntries(ActorRef sender,
+ @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
LOG.debug(appendEntries.toString());
}
- return state();
+ return this;
}
- @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply) {
if(! appendEntriesReply.isSuccess()) {
if(followerLogInformation == null){
LOG.error("Unknown follower {}", followerId);
- return state();
+ return this;
}
if (appendEntriesReply.isSuccess()) {
applyLogToStateMachine(context.getCommitIndex());
}
- return state();
+ return this;
}
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
return null;
}
- @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+ @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
- return state();
+ return this;
}
@Override public RaftState state() {
return RaftState.Leader;
}
- @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+ @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
Preconditions.checkNotNull(sender, "sender should not be null");
Object message = fromSerializableMessage(originalMessage);
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
- return RaftState.Follower;
+
+ return switchBehavior(new Follower(context));
}
}
try {
if (message instanceof SendHeartBeat) {
- return sendHeartBeat();
+ sendHeartBeat();
+ return this;
} else if(message instanceof SendInstallSnapshot) {
installSnapshotIfNeeded();
} else if (message instanceof Replicate) {
long logIndex = replicate.getReplicatedLogEntry().getIndex();
if(LOG.isDebugEnabled()) {
- LOG.debug("Replicate message " + logIndex);
+ LOG.debug("Replicate message {}", logIndex);
}
// Create a tracker entry we will use this later to notify the
followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
mapFollowerToSnapshot.get(followerId).getTotalChunks());
} catch (IOException e) {
- LOG.error("InstallSnapshot failed for Leader.", e);
+ LOG.error(e, "InstallSnapshot failed for Leader.");
}
}
return nextChunk;
}
- private RaftState sendHeartBeat() {
+ private void sendHeartBeat() {
if (followers.size() > 0) {
sendAppendEntries();
}
- return state();
}
private void stopHeartBeat() {
* differently.
*/
public interface RaftActorBehavior extends AutoCloseable{
+
/**
* Handle a message. If the processing of the message warrants a state
- * change then a new state should be returned otherwise this method should
- * return the state for the current behavior.
+ * change then a new behavior should be returned otherwise this method should
+ * return the current behavior.
*
* @param sender The sender of the message
* @param message A message that needs to be processed
*
- * @return The new state or self (this)
+ * @return The new behavior or current behavior
*/
- RaftState handleMessage(ActorRef sender, Object message);
+ RaftActorBehavior handleMessage(ActorRef sender, Object message);
/**
* The state associated with a given behavior
return true;
}
}.from(raftActor.path().toString())
- .message("Switching from state Candidate to Leader")
+ .message("Switching from behavior Candidate to Leader")
.occurrences(1).exec();
import org.opendaylight.controller.cluster.raft.AbstractActorTest;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import java.util.List;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
RaftActorBehavior behavior = createBehavior(context);
// Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
behavior.handleMessage(getRef(), appendEntries);
- assertEquals(expected, raftState);
+ assertEquals(expected, raftBehavior);
// Also expect an AppendEntriesReply to be sent where success is false
final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
}
// Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
behavior.handleMessage(getRef(), appendEntries);
- assertEquals(expected, raftState);
+ assertEquals(expected, raftBehavior);
assertEquals(1, log.size());
RaftActorBehavior behavior = createBehavior(
createActorContext(behaviorActor));
- RaftState raftState = behavior.handleMessage(getTestActor(),
+ RaftActorBehavior raftBehavior = behavior.handleMessage(getTestActor(),
new RequestVote(1000, "test", 10000, 999));
- if(behavior.state() != RaftState.Follower){
- assertEquals(RaftState.Follower, raftState);
+ if(!(behavior instanceof Follower)){
+ assertTrue(raftBehavior instanceof Follower);
} else {
final Boolean out =
RaftActorBehavior behavior = createBehavior(actorContext);
- RaftState raftState = behavior.handleMessage(getTestActor(),
+ RaftActorBehavior raftBehavior = behavior.handleMessage(getTestActor(),
new RequestVote(1000, "test", 10000, 999));
- if(behavior.state() != RaftState.Follower){
- assertEquals(RaftState.Follower, raftState);
+ if(!(behavior instanceof Follower)){
+ assertTrue(raftBehavior instanceof Follower);
} else {
final Boolean out =
new ExpectMsg<Boolean>(duration("1 seconds"),
setLastLogEntry(
(MockRaftActorContext) actorContext, 0, 0, p);
- RaftState raftState = createBehavior(actorContext)
+ RaftActorBehavior raftBehavior = createBehavior(actorContext)
.handleMessage(actorRef, rpc);
- assertEquals(RaftState.Follower, raftState);
+ assertTrue(raftBehavior instanceof Follower);
}
protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
Candidate candidate =
new Candidate(raftActorContext);
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
candidate.handleMessage(candidateActor, new ElectionTimeout());
- Assert.assertEquals(RaftState.Leader, raftState);
+ Assert.assertTrue(raftBehavior instanceof Leader);
}
@Test
Candidate candidate =
new Candidate(raftActorContext);
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
candidate.handleMessage(candidateActor, new ElectionTimeout());
- Assert.assertEquals(RaftState.Candidate, raftState);
+ Assert.assertTrue(raftBehavior instanceof Candidate);
}
@Test
Candidate candidate =
new Candidate(raftActorContext);
- RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+ RaftActorBehavior behaviorOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
- Assert.assertEquals(RaftState.Leader, stateOnFirstVote);
+ Assert.assertTrue(behaviorOnFirstVote instanceof Leader);
}
Candidate candidate =
new Candidate(raftActorContext);
- RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+ RaftActorBehavior behaviorOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
- RaftState stateOnSecondVote = candidate.handleMessage(peerActor2, new RequestVoteReply(0, true));
+ RaftActorBehavior behaviorOnSecondVote = candidate.handleMessage(peerActor2, new RequestVoteReply(0, true));
- Assert.assertEquals(RaftState.Candidate, stateOnFirstVote);
- Assert.assertEquals(RaftState.Leader, stateOnSecondVote);
+ Assert.assertTrue(behaviorOnFirstVote instanceof Candidate);
+ Assert.assertTrue(behaviorOnSecondVote instanceof Leader);
}
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
Follower follower =
new Follower(raftActorContext);
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
follower.handleMessage(followerActor, new ElectionTimeout());
- Assert.assertEquals(RaftState.Candidate, raftState);
+ Assert.assertTrue(raftBehavior instanceof Candidate);
}
@Test
AppendEntries appendEntries =
new AppendEntries(2, "leader-1", 100, 1, entries, 101);
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
createBehavior(context).handleMessage(getRef(), appendEntries);
assertEquals(101L, context.getLastApplied());
RaftActorBehavior behavior = createBehavior(context);
// Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
behavior.handleMessage(getRef(), appendEntries);
- assertEquals(expected, raftState);
+ assertEquals(expected, raftBehavior);
// Also expect an AppendEntriesReply to be sent where success is false
final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
RaftActorBehavior behavior = createBehavior(context);
// Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
behavior.handleMessage(getRef(), appendEntries);
- assertEquals(expected, raftState);
+ assertEquals(expected, raftBehavior);
assertEquals(5, log.last().getIndex() + 1);
assertNotNull(log.get(3));
assertNotNull(log.get(4));
RaftActorBehavior behavior = createBehavior(context);
// Send an unknown message so that the state of the RaftActor remains unchanged
- RaftState expected = behavior.handleMessage(getRef(), "unknown");
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
- RaftState raftState =
+ RaftActorBehavior raftBehavior =
behavior.handleMessage(getRef(), appendEntries);
- assertEquals(expected, raftState);
+ assertEquals(expected, raftBehavior);
// The entry at index 2 will be found out-of-sync with the leader
// and will be removed
import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
// handle message should return the Leader state when it receives an
// unknown message
- RaftState state = leader.handleMessage(senderActor, "foo");
- Assert.assertEquals(RaftState.Leader, state);
+ RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
+ Assert.assertTrue(behavior instanceof Leader);
}};
}
actorContext.setPeerAddresses(peerAddresses);
Leader leader = new Leader(actorContext);
- RaftState raftState = leader
+ RaftActorBehavior raftBehavior = leader
.handleMessage(senderActor, new Replicate(null, null,
new MockRaftActorContext.MockReplicatedLogEntry(1,
100,
));
// State should not change
- assertEquals(RaftState.Leader, raftState);
+ assertTrue(raftBehavior instanceof Leader);
final String out =
new ExpectMsg<String>(duration("1 seconds"), "match hint") {
.build());
Leader leader = new Leader(actorContext);
- RaftState raftState = leader
+ RaftActorBehavior raftBehavior = leader
.handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
// State should not change
- assertEquals(RaftState.Leader, raftState);
+ assertTrue(raftBehavior instanceof Leader);
assertEquals(1, actorContext.getCommitIndex());
new MockRaftActorContext.MockPayload("D"));
// this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
- RaftState raftState = leader.handleMessage(
+ RaftActorBehavior raftBehavior = leader.handleMessage(
senderActor, new Replicate(null, "state-id", entry));
- assertEquals(RaftState.Leader, raftState);
+ assertTrue(raftBehavior instanceof Leader);
// we might receive some heartbeat messages, so wait till we SendInstallSnapshot
Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
new MockRaftActorContext.MockPayload("D"));
- RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
+ RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, new SendInstallSnapshot());
- assertEquals(RaftState.Leader, raftState);
+ assertTrue(raftBehavior instanceof Leader);
// check if installsnapshot gets called with the correct values.
final String out =
//clears leaders log
actorContext.getReplicatedLog().removeFrom(0);
- RaftState raftState = leader.handleMessage(senderActor,
+ RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
leader.getFollowerToSnapshot().getChunkIndex(), true));
- assertEquals(RaftState.Leader, raftState);
+ assertTrue(raftBehavior instanceof Leader);
assertEquals(leader.mapFollowerToSnapshot.size(), 0);
assertEquals(leader.followerToLog.size(), 1);
public interface ClusterWrapper {
void subscribeToMemberEvents(ActorRef actorRef);
String getCurrentMemberName();
+ String getSelfAddress();
}
public class ClusterWrapperImpl implements ClusterWrapper {
private final Cluster cluster;
private final String currentMemberName;
+ private final String selfAddress;
public ClusterWrapperImpl(ActorSystem actorSystem){
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
);
currentMemberName = (String) cluster.getSelfRoles().toArray()[0];
+ selfAddress = cluster.selfAddress().toString();
}
public String getCurrentMemberName() {
return currentMemberName;
}
+
+ public String getSelfAddress() {
+ return selfAddress;
+ }
}
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
/**
* A Shard represents a portion of the logical data tree <br/>
ready.getModification());
// Return our actor path as we'll handle the three phase commit.
- getSender().tell(new ReadyTransactionReply(Serialization.serializedActorPath(self())).
- toSerializable(), getSelf());
+ ReadyTransactionReply readyTransactionReply =
+ new ReadyTransactionReply(Serialization.serializedActorPath(self()));
+ getSender().tell(
+ ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply,
+ getSelf());
}
private void handleAbortTransaction(AbortTransaction abort) {
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
-
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
@Override
public void handleReceive(Object message) throws Exception {
- if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- readData(transaction, ReadData.fromSerializable(message));
+ if(message instanceof ReadData) {
+ readData(transaction, (ReadData) message, !SERIALIZED_REPLY);
+
+ } else if (message instanceof DataExists) {
+ dataExists(transaction, (DataExists) message, !SERIALIZED_REPLY);
+
+ } else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ readData(transaction, ReadData.fromSerializable(message), SERIALIZED_REPLY);
+
} else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
- dataExists(transaction, DataExists.fromSerializable(message));
+ dataExists(transaction, DataExists.fromSerializable(message), SERIALIZED_REPLY);
+
} else {
super.handleReceive(message);
}
@Override
public void handleReceive(Object message) throws Exception {
- if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- readData(transaction, ReadData.fromSerializable(message));
+ if (message instanceof ReadData) {
+ readData(transaction, (ReadData) message, !SERIALIZED_REPLY);
+
+ } else if (message instanceof DataExists) {
+ dataExists(transaction, (DataExists) message, !SERIALIZED_REPLY);
+
+ } else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ readData(transaction, ReadData.fromSerializable(message), SERIALIZED_REPLY);
+
} else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
- dataExists(transaction, DataExists.fromSerializable(message));
+ dataExists(transaction, DataExists.fromSerializable(message), SERIALIZED_REPLY);
+
} else {
super.handleReceive(message);
}
private final SchemaContext schemaContext;
private final ShardStats shardStats;
private final String transactionID;
+ protected static final boolean SERIALIZED_REPLY = true;
protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
ShardStats shardStats, String transactionID) {
getSelf().tell(PoisonPill.getInstance(), getSelf());
}
- protected void readData(DOMStoreReadTransaction transaction,ReadData message) {
+ protected void readData(DOMStoreReadTransaction transaction, ReadData message, final boolean returnSerialized) {
final ActorRef sender = getSender();
final ActorRef self = getSelf();
final YangInstanceIdentifier path = message.getPath();
final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
transaction.read(path);
+
future.addListener(new Runnable() {
@Override
public void run() {
try {
Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
- if (optional.isPresent()) {
- sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self);
- } else {
- sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
- }
+ ReadDataReply readDataReply = new ReadDataReply(schemaContext, optional.orNull());
+
+ sender.tell((returnSerialized ? readDataReply.toSerializable():
+ readDataReply), self);
+
} catch (Exception e) {
shardStats.incrementFailedReadTransactionsCount();
sender.tell(new akka.actor.Status.Failure(e), self);
}, getContext().dispatcher());
}
- protected void dataExists(DOMStoreReadTransaction transaction, DataExists message) {
+ protected void dataExists(DOMStoreReadTransaction transaction, DataExists message,
+ final boolean returnSerialized) {
final YangInstanceIdentifier path = message.getPath();
try {
Boolean exists = transaction.exists(path).checkedGet();
- getSender().tell(new DataExistsReply(exists).toSerializable(), getSelf());
+ DataExistsReply dataExistsReply = new DataExistsReply(exists);
+ getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
+ dataExistsReply, getSelf());
} catch (ReadFailedException e) {
getSender().tell(new akka.actor.Status.Failure(e),getSelf());
}
@Override
public void handleReceive(Object message) throws Exception {
- if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- writeData(transaction, WriteData.fromSerializable(message, getSchemaContext()));
+
+ if (message instanceof WriteData) {
+ writeData(transaction, (WriteData) message, !SERIALIZED_REPLY);
+
+ } else if (message instanceof MergeData) {
+ mergeData(transaction, (MergeData) message, !SERIALIZED_REPLY);
+
+ } else if (message instanceof DeleteData) {
+ deleteData(transaction, (DeleteData) message, !SERIALIZED_REPLY);
+
+ } else if (message instanceof ReadyTransaction) {
+ readyTransaction(transaction, new ReadyTransaction(), !SERIALIZED_REPLY);
+
+ } else if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+ writeData(transaction, WriteData.fromSerializable(message, getSchemaContext()), SERIALIZED_REPLY);
+
} else if(MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- mergeData(transaction, MergeData.fromSerializable(message, getSchemaContext()));
+ mergeData(transaction, MergeData.fromSerializable(message, getSchemaContext()), SERIALIZED_REPLY);
+
} else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- deleteData(transaction, DeleteData.fromSerializable(message));
+ deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY);
+
} else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
- readyTransaction(transaction, new ReadyTransaction());
+ readyTransaction(transaction, new ReadyTransaction(), SERIALIZED_REPLY);
+
} else if (message instanceof GetCompositedModification) {
// This is here for testing only
getSender().tell(new GetCompositeModificationReply(
}
}
- private void writeData(DOMStoreWriteTransaction transaction, WriteData message) {
+ private void writeData(DOMStoreWriteTransaction transaction, WriteData message, boolean returnSerialized) {
modification.addModification(
new WriteModification(message.getPath(), message.getData(), getSchemaContext()));
if(LOG.isDebugEnabled()) {
}
try {
transaction.write(message.getPath(), message.getData());
- getSender().tell(new WriteDataReply().toSerializable(), getSelf());
+ WriteDataReply writeDataReply = new WriteDataReply();
+ getSender().tell(returnSerialized ? writeDataReply.toSerializable() : writeDataReply,
+ getSelf());
}catch(Exception e){
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
}
- private void mergeData(DOMStoreWriteTransaction transaction, MergeData message) {
+ private void mergeData(DOMStoreWriteTransaction transaction, MergeData message, boolean returnSerialized) {
modification.addModification(
new MergeModification(message.getPath(), message.getData(), getSchemaContext()));
if(LOG.isDebugEnabled()) {
}
try {
transaction.merge(message.getPath(), message.getData());
- getSender().tell(new MergeDataReply().toSerializable(), getSelf());
+ MergeDataReply mergeDataReply = new MergeDataReply();
+ getSender().tell(returnSerialized ? mergeDataReply.toSerializable() : mergeDataReply ,
+ getSelf());
}catch(Exception e){
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
}
- private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
+ private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message, boolean returnSerialized) {
if(LOG.isDebugEnabled()) {
LOG.debug("deleteData at path : " + message.getPath().toString());
}
modification.addModification(new DeleteModification(message.getPath()));
try {
transaction.delete(message.getPath());
- getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
+ DeleteDataReply deleteDataReply = new DeleteDataReply();
+ getSender().tell(returnSerialized ? deleteDataReply.toSerializable() : deleteDataReply,
+ getSelf());
}catch(Exception e){
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
}
- private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {
+ private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message, boolean returnSerialized) {
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
- getShardActor().forward(new ForwardedReadyTransaction(getTransactionID(), cohort, modification),
+ getShardActor().forward(new ForwardedReadyTransaction(
+ getTransactionID(), cohort, modification, returnSerialized),
getContext());
}
for(ActorSelection actor : remoteTransactionActors) {
LOG.trace("Sending CloseTransaction to {}", actor);
actorContext.sendOperationAsync(actor,
- new CloseTransaction().toSerializable());
+ new CloseTransaction().toSerializable());
}
}
}
}
Object response = actorContext.executeOperation(primaryShard.get(),
- new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
- getTransactionChainId()).toSerializable());
+ new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
+ getTransactionChainId()).toSerializable());
if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
CreateTransactionReply reply =
CreateTransactionReply.fromSerializable(response);
remoteTransactionActorsMB.set(true);
}
+ // TxActor is always created where the leader of the shard is.
+ // Check if TxActor is created in the same node
+ boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
+
transactionContext = new TransactionContextImpl(shardName, transactionPath,
- transactionActor, identifier, actorContext, schemaContext);
+ transactionActor, identifier, actorContext, schemaContext, isTxActorLocal);
remoteTransactionPaths.put(shardName, transactionContext);
} else {
private final SchemaContext schemaContext;
private final String actorPath;
private final ActorSelection actor;
+ private final boolean isTxActorLocal;
private TransactionContextImpl(String shardName, String actorPath,
ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
- SchemaContext schemaContext) {
+ SchemaContext schemaContext, boolean isTxActorLocal) {
super(shardName, identifier);
this.actorPath = actorPath;
this.actor = actor;
this.actorContext = actorContext;
this.schemaContext = schemaContext;
+ this.isTxActorLocal = isTxActorLocal;
}
private ActorSelection getActor() {
}
// Send the ReadyTransaction message to the Tx actor.
+ ReadyTransaction readyTransaction = new ReadyTransaction();
final Future<Object> replyFuture = actorContext.executeOperationAsync(getActor(),
- new ReadyTransaction().toSerializable());
+ isTxActorLocal ? readyTransaction : readyTransaction.toSerializable());
// Combine all the previously recorded put/merge/delete operation reply Futures and the
// ReadyTransactionReply Future into one Future. If any one fails then the combined
// Note the Future get call here won't block as it's complete.
Object serializedReadyReply = replyFuture.value().get().get();
- if(serializedReadyReply.getClass().equals(
- ReadyTransactionReply.SERIALIZABLE_CLASS)) {
- ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
- serializedReadyReply);
+ if (serializedReadyReply instanceof ReadyTransactionReply) {
+ return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
+ } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
+ ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
return actorContext.actorSelection(reply.getCohortPath());
+
} else {
// Throwing an exception here will fail the Future.
-
throw new IllegalArgumentException(String.format("Invalid reply type {}",
serializedReadyReply.getClass()));
}
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} deleteData called path = {}", identifier, path);
}
+
+ DeleteData deleteData = new DeleteData(path);
recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
- new DeleteData(path).toSerializable()));
+ isTxActorLocal ? deleteData : deleteData.toSerializable()));
}
@Override
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} mergeData called path = {}", identifier, path);
}
+
+ MergeData mergeData = new MergeData(path, data, schemaContext);
recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
- new MergeData(path, data, schemaContext).toSerializable()));
+ isTxActorLocal ? mergeData : mergeData.toSerializable()));
}
@Override
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} writeData called path = {}", identifier, path);
}
+
+ WriteData writeData = new WriteData(path, data, schemaContext);
recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
- new WriteData(path, data, schemaContext).toSerializable()));
+ isTxActorLocal ? writeData : writeData.toSerializable()));
}
@Override
Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
Lists.newArrayList(recordedOperationFutures),
actorContext.getActorSystem().dispatcher());
+
OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
@Override
public void onComplete(Throwable failure, Iterable<Object> notUsed)
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} read operation succeeded", identifier, failure);
}
- if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
- ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
- path, readResponse);
- if (reply.getNormalizedNode() == null) {
- returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
- } else {
- returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
- reply.getNormalizedNode()));
- }
+
+ if (readResponse instanceof ReadDataReply) {
+ ReadDataReply reply = (ReadDataReply) readResponse;
+ returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
+
+ } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+ ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse);
+ returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
+
} else {
returnFuture.setException(new ReadFailedException(
- "Invalid response reading data for path " + path));
+ "Invalid response reading data for path " + path));
}
}
}
};
+ ReadData readData = new ReadData(path);
Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
- new ReadData(path).toSerializable());
+ isTxActorLocal ? readData : readData.toSerializable());
+
readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
}
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
}
- if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
- returnFuture.set(Boolean.valueOf(DataExistsReply.
- fromSerializable(response).exists()));
+
+ if (response instanceof DataExistsReply) {
+ returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
+
+ } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
+ returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
+
} else {
returnFuture.setException(new ReadFailedException(
"Invalid response checking exists for path " + path));
}
};
+ DataExists dataExists = new DataExists(path);
Future<Object> future = actorContext.executeOperationAsync(getActor(),
- new DataExists(path).toSerializable());
+ isTxActorLocal ? dataExists : dataExists.toSerializable());
+
future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
}
}
private final String transactionID;
private final DOMStoreThreePhaseCommitCohort cohort;
private final Modification modification;
+ private final boolean returnSerialized;
public ForwardedReadyTransaction(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
- Modification modification) {
+ Modification modification, boolean returnSerialized) {
this.transactionID = transactionID;
this.cohort = cohort;
this.modification = modification;
+ this.returnSerialized = returnSerialized;
+
}
public String getTransactionID() {
public Modification getModification() {
return modification;
}
+
+ public boolean isReturnSerialized() {
+ return returnSerialized;
+ }
}
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-
import java.util.concurrent.TimeUnit;
-
import static akka.pattern.Patterns.ask;
/**
actorSystem.shutdown();
}
+ public ClusterWrapper getClusterWrapper() {
+ return clusterWrapper;
+ }
+
public String getCurrentMemberName(){
return clusterWrapper.getCurrentMemberName();
}
public FiniteDuration getOperationDuration() {
return operationDuration;
}
+
+ public boolean isLocalPath(String path) {
+ String selfAddress = clusterWrapper.getSelfAddress();
+ if (path == null || selfAddress == null) {
+ return false;
+ }
+
+ int atIndex1 = path.indexOf("@");
+ int atIndex2 = selfAddress.indexOf("@");
+
+ if (atIndex1 == -1 || atIndex2 == -1) {
+ return false;
+ }
+
+ int slashIndex1 = path.indexOf("/", atIndex1);
+ int slashIndex2 = selfAddress.indexOf("/", atIndex2);
+
+ if (slashIndex1 == -1 || slashIndex2 == -1) {
+ return false;
+ }
+
+ String hostPort1 = path.substring(atIndex1, slashIndex1);
+ String hostPort2 = selfAddress.substring(atIndex2, slashIndex2);
+
+ return hostPort1.equals(hostPort2);
+ }
}
// Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
// Send the ForwardedReadyTransaction for the next 2 Tx's.
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message for the first Tx.
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message.
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message.
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
modification, preCommit);
- shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
// Ready the Tx's
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// canCommit 1st Tx. We don't send the commit so it should timeout.
// Ready the Tx's
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// canCommit 1st Tx.
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message for the first Tx.
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class ShardTransactionTest extends AbstractActorTest {
public void testOnReceiveReadData() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+ Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject = getSystem().actorOf(props, "testReadData");
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- subject.tell(
- new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
- getRef());
-
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
- if (ReadDataReply.fromSerializable(testSchemaContext,YangInstanceIdentifier.builder().build(), in)
- .getNormalizedNode()!= null) {
- return "match";
- }
- return null;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
-
- expectNoMsg();
- }
+ testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
+
+ props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn");
+
+ testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
+ }
+
+ private void testOnReceiveReadData(final ActorRef subject) {
+ //serialized read
+ subject.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
+ getRef());
+
+ ShardTransactionMessages.ReadDataReply replySerialized =
+ expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
+
+ assertNotNull(ReadDataReply.fromSerializable(
+ testSchemaContext,YangInstanceIdentifier.builder().build(), replySerialized)
+ .getNormalizedNode());
+
+ // unserialized read
+ subject.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
+ ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
- };
+ assertNotNull(reply.getNormalizedNode());
}};
}
public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
- final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
+ Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- subject.tell(
- new ReadData(TestModel.TEST_PATH).toSerializable(),
- getRef());
-
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
- if (ReadDataReply.fromSerializable(testSchemaContext,TestModel.TEST_PATH, in)
- .getNormalizedNode()
- == null) {
- return "match";
- }
- return null;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
-
- expectNoMsg();
- }
+ testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
+ props, "testReadDataWhenDataNotFoundRO"));
+
+ props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn");
+
+ testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
+ props, "testReadDataWhenDataNotFoundRW"));
+ }
+
+ private void testOnReceiveReadDataWhenDataNotFound(final ActorRef subject) {
+ // serialized read
+ subject.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
+
+ ShardTransactionMessages.ReadDataReply replySerialized =
+ expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
+
+ assertTrue(ReadDataReply.fromSerializable(
+ testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
+ // unserialized read
+ subject.tell(new ReadData(TestModel.TEST_PATH),getRef());
- };
+ ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
+
+ assertTrue(reply.getNormalizedNode() == null);
}};
}
public void testOnReceiveDataExistsPositive() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+ Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- subject.tell(
- new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
- getRef());
-
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
- if (DataExistsReply.fromSerializable(in)
- .exists()) {
- return "match";
- }
- return null;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
-
- expectNoMsg();
- }
+ testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
+
+ props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn");
+
+ testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
+ }
+
+ private void testOnReceiveDataExistsPositive(final ActorRef subject) {
+ subject.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
+ getRef());
+
+ ShardTransactionMessages.DataExistsReply replySerialized =
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
+ assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
- };
+ // unserialized read
+ subject.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
+
+ DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
+
+ assertTrue(reply.exists());
}};
}
public void testOnReceiveDataExistsNegative() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+ Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- subject.tell(
- new DataExists(TestModel.TEST_PATH).toSerializable(),
- getRef());
-
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
- if (!DataExistsReply.fromSerializable(in)
- .exists()) {
- return "match";
- }
- return null;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
-
- expectNoMsg();
- }
+ testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
+
+ props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn");
+
+ testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
+ }
+
+ private void testOnReceiveDataExistsNegative(final ActorRef subject) {
+ subject.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
+ ShardTransactionMessages.DataExistsReply replySerialized =
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
- };
+ assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
+
+ // unserialized read
+ subject.tell(new DataExists(TestModel.TEST_PATH),getRef());
+
+ DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
+
+ assertFalse(reply.exists());
}};
}
private void assertModification(final ActorRef subject,
final Class<? extends Modification> modificationType) {
new JavaTestKit(getSystem()) {{
- new Within(duration("3 seconds")) {
- @Override
- protected void run() {
- subject
- .tell(new ShardWriteTransaction.GetCompositedModification(),
- getRef());
-
- final CompositeModification compositeModification =
- new ExpectMsg<CompositeModification>(duration("3 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected CompositeModification match(Object in) {
- if (in instanceof ShardWriteTransaction.GetCompositeModificationReply) {
- return ((ShardWriteTransaction.GetCompositeModificationReply) in)
- .getModification();
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertTrue(
- compositeModification.getModifications().size() == 1);
- assertEquals(modificationType,
- compositeModification.getModifications().get(0)
- .getClass());
+ subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
- }
- };
+ CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
+ GetCompositeModificationReply.class).getModification();
+
+ assertTrue(compositeModification.getModifications().size() == 1);
+ assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
}};
}
final ActorRef subject =
getSystem().actorOf(props, "testWriteData");
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- subject.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
- getRef());
-
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
- return "match";
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
-
- assertModification(subject, WriteModification.class);
- expectNoMsg();
- }
+ subject.tell(new WriteData(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
+ getRef());
+
+ ShardTransactionMessages.WriteDataReply replySerialized =
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
+ assertModification(subject, WriteModification.class);
- };
+ //unserialized write
+ subject.tell(new WriteData(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+ TestModel.createTestContext()),
+ getRef());
+
+ expectMsgClass(duration("5 seconds"), WriteDataReply.class);
}};
}
final ActorRef subject =
getSystem().actorOf(props, "testMergeData");
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- subject.tell(new MergeData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
- getRef());
-
- final String out = new ExpectMsg<String>(duration("500 milliseconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in.getClass().equals(MergeDataReply.SERIALIZABLE_CLASS)) {
- return "match";
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
+ subject.tell(new MergeData(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
+ getRef());
- assertEquals("match", out);
+ ShardTransactionMessages.MergeDataReply replySerialized =
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
- assertModification(subject, MergeModification.class);
-
- expectNoMsg();
- }
+ assertModification(subject, MergeModification.class);
+ //unserialized merge
+ subject.tell(new MergeData(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
+ getRef());
- };
+ expectMsgClass(duration("5 seconds"), MergeDataReply.class);
}};
}
final ActorRef subject =
getSystem().actorOf(props, "testDeleteData");
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
-
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in.getClass().equals(DeleteDataReply.SERIALIZABLE_CLASS)) {
- return "match";
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
-
- assertModification(subject, DeleteModification.class);
- expectNoMsg();
- }
+ subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
+ ShardTransactionMessages.DeleteDataReply replySerialized =
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
- };
+ assertModification(subject, DeleteModification.class);
+
+ //unserialized merge
+ subject.tell(new DeleteData(TestModel.TEST_PATH), getRef());
+
+ expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
}};
}
final ActorRef subject =
getSystem().actorOf(props, "testReadyTransaction");
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- subject.tell(new ReadyTransaction().toSerializable(), getRef());
+ subject.tell(new ReadyTransaction().toSerializable(), getRef());
- final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
- return "match";
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
+ expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
+ }};
- expectNoMsg();
- }
+ // test
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = createShard();
+ final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn");
+ final ActorRef subject =
+ getSystem().actorOf(props, "testReadyTransaction2");
+ subject.tell(new ReadyTransaction(), getRef());
- };
+ expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
}};
}
+ @SuppressWarnings("unchecked")
@Test
public void testOnReceiveCloseTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
- getSystem().actorOf(props, "testCloseTransaction");
+ final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction");
watch(subject);
- new Within(duration("6 seconds")) {
- @Override
- protected void run() {
-
- subject.tell(new CloseTransaction().toSerializable(), getRef());
-
- final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- System.out.println("!!!IN match 1: "+(in!=null?in.getClass():"NULL"));
- if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) {
- return "match";
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
-
- final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- System.out.println("!!!IN match 2: "+(in!=null?in.getClass():"NULL"));
- if (in instanceof Terminated) {
- return "match";
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", termination);
- }
- };
+ subject.tell(new CloseTransaction().toSerializable(), getRef());
+
+ expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
+ expectMsgClass(duration("3 seconds"), Terminated.class);
}};
}
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.util.concurrent.CheckedFuture;
-
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.Futures;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
-
import java.util.List;
import java.util.concurrent.TimeUnit;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
private SchemaContext schemaContext;
+ @Mock
+ private ClusterWrapper mockClusterWrapper;
+
String memberName = "mock-member";
@Before
doReturn(getSystem()).when(mockActorContext).getActorSystem();
doReturn(memberName).when(mockActorContext).getCurrentMemberName();
doReturn(schemaContext).when(mockActorContext).getSchemaContext();
+ doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
ShardStrategyFactory.setConfiguration(configuration);
}
return argThat(matcher);
}
- private DataExists eqDataExists() {
+ private DataExists eqSerializedDataExists() {
ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
@Override
public boolean matches(Object argument) {
return argThat(matcher);
}
- private ReadData eqReadData() {
+ private DataExists eqDataExists() {
+ ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
+ @Override
+ public boolean matches(Object argument) {
+ return (argument instanceof DataExists) &&
+ ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private ReadData eqSerializedReadData() {
ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
@Override
public boolean matches(Object argument) {
return argThat(matcher);
}
- private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+ private ReadData eqReadData() {
+ ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
+ @Override
+ public boolean matches(Object argument) {
+ return (argument instanceof ReadData) &&
+ ((ReadData)argument).getPath().equals(TestModel.TEST_PATH);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
@Override
public boolean matches(Object argument) {
return argThat(matcher);
}
- private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+ private WriteData eqWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+ ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
+ @Override
+ public boolean matches(Object argument) {
+ if(argument instanceof WriteData) {
+ WriteData obj = (WriteData) argument;
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
+ }
+ return false;
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
@Override
public boolean matches(Object argument) {
return argThat(matcher);
}
- private DeleteData eqDeleteData() {
+ private MergeData eqMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+ ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
+ @Override
+ public boolean matches(Object argument) {
+ if(argument instanceof MergeData) {
+ MergeData obj = ((MergeData) argument);
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
+ }
+
+ return false;
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private DeleteData eqSerializedDeleteData() {
ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
@Override
public boolean matches(Object argument) {
return argThat(matcher);
}
- private Future<Object> readyTxReply(String path) {
+ private DeleteData eqDeleteData() {
+ ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
+ @Override
+ public boolean matches(Object argument) {
+ return argument instanceof DeleteData &&
+ ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH);
+ }
+ };
+
+ return argThat(matcher);
+ }
+
+ private Future<Object> readySerializedTxReply(String path) {
return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
}
- private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
+ private Future<Object> readyTxReply(String path) {
+ return Futures.successful((Object)new ReadyTransactionReply(path));
+ }
+
+
+ private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
}
- private Future<Object> dataExistsReply(boolean exists) {
+ private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
+ return Futures.successful(new ReadDataReply(schemaContext, data));
+ }
+
+ private Future<Object> dataExistsSerializedReply(boolean exists) {
return Futures.successful(new DataExistsReply(exists).toSerializable());
}
- private Future<Object> writeDataReply() {
+ private Future<DataExistsReply> dataExistsReply(boolean exists) {
+ return Futures.successful(new DataExistsReply(exists));
+ }
+
+ private Future<Object> writeSerializedDataReply() {
return Futures.successful(new WriteDataReply().toSerializable());
}
- private Future<Object> mergeDataReply() {
+ private Future<WriteDataReply> writeDataReply() {
+ return Futures.successful(new WriteDataReply());
+ }
+
+ private Future<Object> mergeSerializedDataReply() {
return Futures.successful(new MergeDataReply().toSerializable());
}
- private Future<Object> deleteDataReply() {
+ private Future<MergeDataReply> mergeDataReply() {
+ return Futures.successful(new MergeDataReply());
+ }
+
+ private Future<Object> deleteSerializedDataReply() {
return Futures.successful(new DeleteDataReply().toSerializable());
}
+ private Future<DeleteDataReply> deleteDataReply() {
+ return Futures.successful(new DeleteDataReply());
+ }
+
private ActorSelection actorSelection(ActorRef actorRef) {
return getSystem().actorSelection(actorRef.path());
}
.setTransactionId("txn-1").build();
}
- private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) {
- ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
- doReturn(getSystem().actorSelection(actorRef.path())).
+ private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+ ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+ doReturn(actorSystem.actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
- doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
+ doReturn(Optional.of(actorSystem.actorSelection(actorRef.path()))).
when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
doReturn(createTransactionReply(actorRef)).when(mockActorContext).
- executeOperation(eq(getSystem().actorSelection(actorRef.path())),
+ executeOperation(eq(actorSystem.actorSelection(actorRef.path())),
eqCreateTransaction(memberName, type));
+
+ doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString());
+
return actorRef;
}
@Test
public void testRead() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
- doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData());
+ doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData());
Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData());
+ doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData());
readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
@Test(expected = ReadFailedException.class)
public void testReadWithInvalidReplyMessageType() throws Exception {
- setupActorContextWithInitialCreateTransaction(READ_ONLY);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
executeOperationAsync(any(ActorSelection.class), any());
@Test(expected = TestException.class)
public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
- setupActorContextWithInitialCreateTransaction(READ_ONLY);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeOperationAsync(any(ActorSelection.class), any());
@Test(expected = TestException.class)
public void testReadWithPriorRecordingOperationFailure() throws Throwable {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+ doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
- doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData());
+ doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
} finally {
verify(mockActorContext, times(0)).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData());
+ eq(actorSelection(actorRef)), eqSerializedReadData());
}
}
@Test
public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(expectedNode));
+ doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode));
- doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData());
+ doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
@Test
public void testExists() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
- doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists());
+ doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedDataExists());
Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
assertEquals("Exists response", false, exists);
- doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists());
+ doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedDataExists());
exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
@Test(expected = ReadFailedException.class)
public void testExistsWithInvalidReplyMessageType() throws Exception {
- setupActorContextWithInitialCreateTransaction(READ_ONLY);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
executeOperationAsync(any(ActorSelection.class), any());
@Test(expected = TestException.class)
public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
- setupActorContextWithInitialCreateTransaction(READ_ONLY);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeOperationAsync(any(ActorSelection.class), any());
@Test(expected = TestException.class)
public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+ doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData());
- doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists());
+ doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
} finally {
verify(mockActorContext, times(0)).executeOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists());
+ eq(actorSelection(actorRef)), eqSerializedDataExists());
}
}
@Test
public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+ doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
- doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists());
+ doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
@Test
public void testWrite() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+ doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+ eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
WriteDataReply.SERIALIZABLE_CLASS);
@Test
public void testMerge() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
+ doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
+ eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
MergeDataReply.SERIALIZABLE_CLASS);
@Test
public void testDelete() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
- doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqDeleteData());
+ doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedDeleteData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
transactionProxy.delete(TestModel.TEST_PATH);
verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqDeleteData());
+ eq(actorSelection(actorRef)), eqSerializedDeleteData());
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
DeleteDataReply.SERIALIZABLE_CLASS);
Object expReply = expReplies[i++];
if(expReply instanceof ActorSelection) {
ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
- assertEquals("Cohort actor path", (ActorSelection) expReply, actual);
+ assertEquals("Cohort actor path", expReply, actual);
} else {
// Expecting exception.
try {
@SuppressWarnings("unchecked")
@Test
public void testReady() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData());
+ doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData());
- doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+ doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
- doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@SuppressWarnings("unchecked")
@Test
public void testReadyWithRecordingOperationFailure() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
+ doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+ executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
- doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString());
+
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
@SuppressWarnings("unchecked")
@Test
public void testReadyWithReplyFailure() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
+ doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)),
@SuppressWarnings("unchecked")
@Test
public void testReadyWithInvalidReplyMessageType() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+ doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
doReturn(Futures.successful(new Object())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)),
@Test
public void testGetIdentifier() {
- setupActorContextWithInitialCreateTransaction(READ_ONLY);
+ setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
TransactionProxy.TransactionType.READ_ONLY);
@SuppressWarnings("unchecked")
@Test
public void testClose() throws Exception{
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
- doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData());
+ doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
verify(mockActorContext).sendOperationAsync(
eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
}
+
+
+ /**
+ * Method to test a local Tx actor. The Tx paths are matched to decide if the
+ * Tx actor is local or not. This is done by mocking the Tx actor path
+ * and the caller paths and ensuring that the paths have the remote-address format
+ *
+ * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
+ * the paths returned for the actors for all the tests are not qualified remote paths.
+ * Hence are treated as non-local/remote actors. In short, all tests except
+ * few below run for remote actors
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testLocalTxActorRead() throws Exception {
+ ActorSystem actorSystem = getSystem();
+ ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+ doReturn(actorSystem.actorSelection(shardActorRef.path())).
+ when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+ doReturn(Optional.of(actorSystem.actorSelection(shardActorRef.path()))).
+ when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+ String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+ CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+ .setTransactionId("txn-1")
+ .setTransactionActorPath(actorPath)
+ .build();
+
+ doReturn(createTransactionReply).when(mockActorContext).
+ executeOperation(eq(actorSystem.actorSelection(shardActorRef.path())),
+ eqCreateTransaction(memberName, READ_ONLY));
+
+ doReturn(true).when(mockActorContext).isLocalPath(actorPath);
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
+
+ // negative test case with null as the reply
+ doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqReadData());
+
+ Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
+ TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+
+ assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
+
+ // test case with node as read data reply
+ NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqReadData());
+
+ readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+
+ assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+
+ assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
+
+ // test for local data exists
+ doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqDataExists());
+
+ boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
+
+ assertEquals("Exists response", true, exists);
+ }
+
+ @Test
+ public void testLocalTxActorWrite() throws Exception {
+ ActorSystem actorSystem = getSystem();
+ ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+ doReturn(actorSystem.actorSelection(shardActorRef.path())).
+ when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+ doReturn(Optional.of(actorSystem.actorSelection(shardActorRef.path()))).
+ when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+ String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+ CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+ .setTransactionId("txn-1")
+ .setTransactionActorPath(actorPath)
+ .build();
+
+ doReturn(createTransactionReply).when(mockActorContext).
+ executeOperation(eq(actorSystem.actorSelection(shardActorRef.path())),
+ eqCreateTransaction(memberName, WRITE_ONLY));
+
+ doReturn(true).when(mockActorContext).isLocalPath(actorPath);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ verify(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+ //testing local merge
+ doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqMergeData(nodeToWrite));
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+ verify(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqMergeData(nodeToWrite));
+
+
+ //testing local delete
+ doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqDeleteData());
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData());
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class);
+
+ // testing ready
+ doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), isA(ReadyTransaction.class));
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
+ }
}
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
-
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
-
import java.util.concurrent.TimeUnit;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@Test
public void testFindLocalShardWithShardNotFound(){
new JavaTestKit(getSystem()) {{
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(false, null));
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- ActorRef shardManagerActorRef = getSystem()
- .actorOf(MockShardManager.props(false, null));
-
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
mock(Configuration.class));
- Optional<ActorRef> out = actorContext.findLocalShard("default");
- assertTrue(!out.isPresent());
- expectNoMsg();
- }
- };
+ Optional<ActorRef> out = actorContext.findLocalShard("default");
+ assertTrue(!out.isPresent());
}};
}
@Test
public void testExecuteRemoteOperation() {
new JavaTestKit(getSystem()) {{
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
- new Within(duration("3 seconds")) {
- @Override
- protected void run() {
-
- ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
-
- ActorRef shardManagerActorRef = getSystem()
- .actorOf(MockShardManager.props(true, shardActorRef));
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(true, shardActorRef));
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
mock(Configuration.class));
- ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
-
- Object out = actorContext.executeOperation(actor, "hello");
+ ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
- assertEquals("hello", out);
+ Object out = actorContext.executeOperation(actor, "hello");
- expectNoMsg();
- }
- };
+ assertEquals("hello", out);
}};
}
@Test
public void testExecuteRemoteOperationAsync() {
new JavaTestKit(getSystem()) {{
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
- new Within(duration("3 seconds")) {
- @Override
- protected void run() {
+ ActorRef shardManagerActorRef = getSystem()
+ .actorOf(MockShardManager.props(true, shardActorRef));
- ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+ mock(Configuration.class));
- ActorRef shardManagerActorRef = getSystem()
- .actorOf(MockShardManager.props(true, shardActorRef));
+ ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
- mock(Configuration.class));
+ Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
- ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+ try {
+ Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
+ assertEquals("Result", "hello", result);
+ } catch(Exception e) {
+ throw new AssertionError(e);
+ }
+ }};
+ }
- Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
+ @Test
+ public void testIsLocalPath() {
+ MockClusterWrapper clusterWrapper = new MockClusterWrapper();
+ ActorContext actorContext =
+ new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- try {
- Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
- assertEquals("Result", "hello", result);
- } catch(Exception e) {
- throw new AssertionError(e);
- }
+ clusterWrapper.setSelfAddress("");
+ assertEquals(false, actorContext.isLocalPath(null));
+ assertEquals(false, actorContext.isLocalPath(""));
- expectNoMsg();
- }
- };
- }};
+ clusterWrapper.setSelfAddress(null);
+ assertEquals(false, actorContext.isLocalPath(""));
+
+ clusterWrapper.setSelfAddress("akka://test/user/$b");
+ assertEquals(false, actorContext.isLocalPath("akka://test/user/$a"));
+
+ clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2550/");
+ assertEquals(true, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
+
+ clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2550");
+ assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
+
+ clusterWrapper.setSelfAddress("akka.tcp://system@128.0.0.1:2550/");
+ assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
+
+ clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2551/");
+ assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
}
}
import akka.cluster.UniqueAddress;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import scala.collection.JavaConversions;
-
import java.util.HashSet;
import java.util.Set;
public class MockClusterWrapper implements ClusterWrapper{
- @Override public void subscribeToMemberEvents(ActorRef actorRef) {
+ private String selfAddress = "akka.tcp://test@127.0.0.1:2550/user/member-1-shard-test-config";
+
+ @Override
+ public void subscribeToMemberEvents(ActorRef actorRef) {
}
- @Override public String getCurrentMemberName() {
+ @Override
+ public String getCurrentMemberName() {
return "member-1";
}
+ @Override
+ public String getSelfAddress() {
+ return selfAddress;
+ }
+
+ public void setSelfAddress(String selfAddress) {
+ this.selfAddress = selfAddress;
+ }
+
public static void sendMemberUp(ActorRef to, String memberName, String address){
to.tell(createMemberUp(memberName, address), null);
}