From: Tony Tkacik Date: Mon, 27 Oct 2014 08:26:36 +0000 (+0000) Subject: Merge "BUG-2184 Fix subtree filtering for identity-ref leaves" X-Git-Tag: release/lithium~978 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=f14033146e051aca1b51c791373f6e867af340b0;hp=37b37e348a362f31e01077753cee1c6bdaa645dc Merge "BUG-2184 Fix subtree filtering for identity-ref leaves" --- diff --git a/features/base/src/main/resources/features.xml b/features/base/src/main/resources/features.xml index c324f6cea6..d7d8e0ddac 100644 --- a/features/base/src/main/resources/features.xml +++ b/features/base/src/main/resources/features.xml @@ -80,6 +80,7 @@ mvn:eclipselink/javax.persistence/2.0.4.v201112161009 mvn:eclipselink/javax.resource/1.5.0.v200906010428 + mvn:org.eclipse.persistence/org.eclipse.persistence.antlr/2.5.0 mvn:org.eclipse.persistence/org.eclipse.persistence.moxy/2.5.0 mvn:org.eclipse.persistence/org.eclipse.persistence.core/2.5.0 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 04df7785ad..97b912ef74 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -100,7 +100,7 @@ public class ExampleActor extends RaftActor { try { bs = fromObject(state); } catch (Exception e) { - LOG.error("Exception in creating snapshot", e); + LOG.error(e, "Exception in creating snapshot"); } getSelf().tell(new CaptureSnapshotReply(bs), null); } @@ -110,10 +110,10 @@ public class ExampleActor extends RaftActor { try { state.putAll((HashMap) toObject(snapshot)); } catch (Exception e) { - LOG.error("Exception in applying snapshot", e); + LOG.error(e, "Exception in applying snapshot"); } if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot applied to state :" + ((HashMap) state).size()); + LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size()); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 64fa749604..66a46ef3bd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -29,9 +29,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; -import org.opendaylight.controller.cluster.raft.behaviors.Candidate; import org.opendaylight.controller.cluster.raft.behaviors.Follower; -import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; @@ -159,7 +157,9 @@ public abstract class RaftActor extends UntypedPersistentActor { } private void onRecoveredSnapshot(SnapshotOffer offer) { - LOG.debug("SnapshotOffer called.."); + if(LOG.isDebugEnabled()) { + LOG.debug("SnapshotOffer called.."); + } initRecoveryTimer(); @@ -250,7 +250,7 @@ public abstract class RaftActor extends UntypedPersistentActor { replicatedLog.lastIndex(), replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, replicatedLog.size()); - currentBehavior = switchBehavior(RaftState.Follower); + currentBehavior = new Follower(context); onStateChanged(); } @@ -355,14 +355,13 @@ public abstract class RaftActor extends UntypedPersistentActor { if (!(message instanceof AppendEntriesMessages.AppendEntries) && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) { if(LOG.isDebugEnabled()) { - LOG.debug("onReceiveCommand: message:" + message.getClass()); + LOG.debug("onReceiveCommand: message: {}", message.getClass()); } } - RaftState state = - currentBehavior.handleMessage(getSender(), message); RaftActorBehavior oldBehavior = currentBehavior; - currentBehavior = switchBehavior(state); + currentBehavior = currentBehavior.handleMessage(getSender(), message); + if(oldBehavior != currentBehavior){ onStateChanged(); } @@ -569,38 +568,6 @@ public abstract class RaftActor extends UntypedPersistentActor { protected void onLeaderChanged(String oldLeader, String newLeader){}; - private RaftActorBehavior switchBehavior(RaftState state) { - if (currentBehavior != null) { - if (currentBehavior.state() == state) { - return currentBehavior; - } - LOG.info("Switching from state " + currentBehavior.state() + " to " - + state); - - try { - currentBehavior.close(); - } catch (Exception e) { - LOG.error(e, - "Failed to close behavior : " + currentBehavior.state()); - } - - } else { - LOG.info("Switching behavior to " + state); - } - RaftActorBehavior behavior = null; - if (state == RaftState.Candidate) { - behavior = new Candidate(context); - } else if (state == RaftState.Follower) { - behavior = new Follower(context); - } else { - behavior = new Leader(context); - } - - - - return behavior; - } - private void trimPersistentData(long sequenceNumber) { // Trim akka snapshots // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied @@ -622,8 +589,8 @@ public abstract class RaftActor extends UntypedPersistentActor { } String peerAddress = context.getPeerAddress(leaderId); if(LOG.isDebugEnabled()) { - LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = " - + peerAddress); + LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}", + leaderId, peerAddress); } return peerAddress; @@ -697,8 +664,11 @@ public abstract class RaftActor extends UntypedPersistentActor { public void appendAndPersist(final ActorRef clientActor, final String identifier, final ReplicatedLogEntry replicatedLogEntry) { - context.getLogger().debug( - "Append log entry and persist {} ", replicatedLogEntry); + + if(LOG.isDebugEnabled()) { + LOG.debug("Append log entry and persist {} ", replicatedLogEntry); + } + // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs journal.add(replicatedLogEntry); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index b1560a5648..eed74bba82 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -10,9 +10,9 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.Cancellable; +import akka.event.LoggingAdapter; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; -import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; @@ -44,6 +44,11 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { */ protected final RaftActorContext context; + /** + * + */ + protected final LoggingAdapter LOG; + /** * */ @@ -57,6 +62,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected AbstractRaftActorBehavior(RaftActorContext context) { this.context = context; + this.LOG = context.getLogger(); } /** @@ -71,7 +77,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param appendEntries The AppendEntries message * @return */ - protected abstract RaftState handleAppendEntries(ActorRef sender, + protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries); @@ -83,19 +89,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param appendEntries * @return */ - protected RaftState appendEntries(ActorRef sender, + protected RaftActorBehavior appendEntries(ActorRef sender, AppendEntries appendEntries) { // 1. Reply false if term < currentTerm (§5.1) if (appendEntries.getTerm() < currentTerm()) { - context.getLogger().debug( - "Cannot append entries because sender term " + appendEntries - .getTerm() + " is less than " + currentTerm()); + if(LOG.isDebugEnabled()) { + LOG.debug("Cannot append entries because sender term {} is less than {}", + appendEntries.getTerm(), currentTerm()); + } + sender.tell( new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm()), actor() ); - return state(); + return this; } @@ -114,7 +122,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param appendEntriesReply The AppendEntriesReply message * @return */ - protected abstract RaftState handleAppendEntriesReply(ActorRef sender, + protected abstract RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply); /** @@ -125,11 +133,12 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param requestVote * @return */ - protected RaftState requestVote(ActorRef sender, + protected RaftActorBehavior requestVote(ActorRef sender, RequestVote requestVote) { - - context.getLogger().debug(requestVote.toString()); + if(LOG.isDebugEnabled()) { + LOG.debug(requestVote.toString()); + } boolean grantVote = false; @@ -167,7 +176,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor()); - return state(); + return this; } /** @@ -182,7 +191,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @param requestVoteReply The RequestVoteReply message * @return */ - protected abstract RaftState handleRequestVoteReply(ActorRef sender, + protected abstract RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply); /** @@ -341,12 +350,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } else { //if one index is not present in the log, no point in looping // around as the rest wont be present either - context.getLogger().warning( - "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index ); + LOG.warning( + "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index); break; } } - context.getLogger().debug("Setting last applied to {}", newLastApplied); + if(LOG.isDebugEnabled()) { + LOG.debug("Setting last applied to {}", newLastApplied); + } context.setLastApplied(newLastApplied); // send a message to persist a ApplyLogEntries marker message into akka's persistent journal @@ -361,7 +372,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } @Override - public RaftState handleMessage(ActorRef sender, Object message) { + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { if (message instanceof AppendEntries) { return appendEntries(sender, (AppendEntries) message); } else if (message instanceof AppendEntriesReply) { @@ -371,10 +382,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } else if (message instanceof RequestVoteReply) { return handleRequestVoteReply(sender, (RequestVoteReply) message); } - return state(); + return this; } @Override public String getLeaderId() { return leaderId; } + + protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) { + LOG.info("Switching from behavior {} to {}", this.state(), behavior.state()); + try { + close(); + } catch (Exception e) { + LOG.error(e, "Failed to close behavior : {}", this.state()); + } + + return behavior; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index bb1927ef23..4a3e2c5d66 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -52,7 +52,9 @@ public class Candidate extends AbstractRaftActorBehavior { peers = context.getPeerAddresses().keySet(); - context.getLogger().debug("Election:Candidate has following peers:"+ peers); + if(LOG.isDebugEnabled()) { + LOG.debug("Election:Candidate has following peers: {}", peers); + } if(peers.size() > 0) { // Votes are required from a majority of the peers including self. @@ -78,21 +80,23 @@ public class Candidate extends AbstractRaftActorBehavior { scheduleElection(electionDuration()); } - @Override protected RaftState handleAppendEntries(ActorRef sender, + @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { - context.getLogger().debug(appendEntries.toString()); + if(LOG.isDebugEnabled()) { + LOG.debug(appendEntries.toString()); + } - return state(); + return this; } - @Override protected RaftState handleAppendEntriesReply(ActorRef sender, + @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { - return state(); + return this; } - @Override protected RaftState handleRequestVoteReply(ActorRef sender, + @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) { if (requestVoteReply.isVoteGranted()) { @@ -100,10 +104,10 @@ public class Candidate extends AbstractRaftActorBehavior { } if (voteCount >= votesRequired) { - return RaftState.Leader; + return switchBehavior(new Leader(context)); } - return state(); + return this; } @Override public RaftState state() { @@ -111,7 +115,7 @@ public class Candidate extends AbstractRaftActorBehavior { } @Override - public RaftState handleMessage(ActorRef sender, Object originalMessage) { + public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { Object message = fromSerializableMessage(originalMessage); @@ -119,14 +123,17 @@ public class Candidate extends AbstractRaftActorBehavior { RaftRPC rpc = (RaftRPC) message; - context.getLogger().debug("RaftRPC message received {} my term is {}", rpc.toString(), context.getTermInformation().getCurrentTerm()); + if(LOG.isDebugEnabled()) { + LOG.debug("RaftRPC message received {} my term is {}", rpc, context.getTermInformation().getCurrentTerm()); + } // If RPC request or response contains term T > currentTerm: // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { context.getTermInformation().updateAndPersist(rpc.getTerm(), null); - return RaftState.Follower; + + return switchBehavior(new Follower(context)); } } @@ -137,11 +144,12 @@ public class Candidate extends AbstractRaftActorBehavior { // ourselves the leader. This gives enough time for a leader // who we do not know about (as a peer) // to send a message to the candidate - return RaftState.Leader; + + return switchBehavior(new Leader(context)); } startNewTerm(); scheduleElection(electionDuration()); - return state(); + return this; } return super.handleMessage(sender, message); @@ -159,7 +167,9 @@ public class Candidate extends AbstractRaftActorBehavior { context.getTermInformation().updateAndPersist(currentTerm + 1, context.getId()); - context.getLogger().debug("Starting new term " + (currentTerm + 1)); + if(LOG.isDebugEnabled()) { + LOG.debug("Starting new term {}", (currentTerm + 1)); + } // Request for a vote // TODO: Retry request for vote if replies do not arrive in a reasonable diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 1cfdf9dba8..7ada8b31c5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; -import akka.event.LoggingAdapter; import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; @@ -39,17 +38,13 @@ import java.util.ArrayList; public class Follower extends AbstractRaftActorBehavior { private ByteString snapshotChunksCollected = ByteString.EMPTY; - private final LoggingAdapter LOG; - public Follower(RaftActorContext context) { super(context); - LOG = context.getLogger(); - scheduleElection(electionDuration()); } - @Override protected RaftState handleAppendEntries(ActorRef sender, + @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) { @@ -133,15 +128,14 @@ public class Follower extends AbstractRaftActorBehavior { new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm()), actor() ); - return state(); + return this; } if (appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) { if(LOG.isDebugEnabled()) { LOG.debug( - "Number of entries to be appended = " + appendEntries - .getEntries().size() + "Number of entries to be appended = {}", appendEntries.getEntries().size() ); } @@ -168,8 +162,7 @@ public class Follower extends AbstractRaftActorBehavior { if(LOG.isDebugEnabled()) { LOG.debug( - "Removing entries from log starting at " - + matchEntry.getIndex() + "Removing entries from log starting at {}", matchEntry.getIndex() ); } @@ -181,9 +174,7 @@ public class Follower extends AbstractRaftActorBehavior { } if(LOG.isDebugEnabled()) { - context.getLogger().debug( - "After cleanup entries to be added from = " + (addEntriesFrom - + lastIndex()) + LOG.debug("After cleanup entries to be added from = {}", (addEntriesFrom + lastIndex()) ); } @@ -191,17 +182,14 @@ public class Follower extends AbstractRaftActorBehavior { for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) { - context.getLogger().info( - "Append entry to log " + appendEntries.getEntries().get( - i).getData() - .toString() - ); - context.getReplicatedLog() - .appendAndPersist(appendEntries.getEntries().get(i)); + if(LOG.isDebugEnabled()) { + LOG.debug("Append entry to log {}", appendEntries.getEntries().get(i).getData()); + } + context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i)); } if(LOG.isDebugEnabled()) { - LOG.debug("Log size is now " + context.getReplicatedLog().size()); + LOG.debug("Log size is now {}", context.getReplicatedLog().size()); } } @@ -216,7 +204,7 @@ public class Follower extends AbstractRaftActorBehavior { if (prevCommitIndex != context.getCommitIndex()) { if(LOG.isDebugEnabled()) { - LOG.debug("Commit index set to " + context.getCommitIndex()); + LOG.debug("Commit index set to {}", context.getCommitIndex()); } } @@ -239,24 +227,24 @@ public class Follower extends AbstractRaftActorBehavior { sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true, lastIndex(), lastTerm()), actor()); - return state(); + return this; } - @Override protected RaftState handleAppendEntriesReply(ActorRef sender, + @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { - return state(); + return this; } - @Override protected RaftState handleRequestVoteReply(ActorRef sender, + @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) { - return state(); + return this; } @Override public RaftState state() { return RaftState.Follower; } - @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) { + @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { Object message = fromSerializableMessage(originalMessage); @@ -271,7 +259,7 @@ public class Follower extends AbstractRaftActorBehavior { } if (message instanceof ElectionTimeout) { - return RaftState.Candidate; + return switchBehavior(new Candidate(context)); } else if (message instanceof InstallSnapshot) { InstallSnapshot installSnapshot = (InstallSnapshot) message; @@ -297,8 +285,10 @@ public class Follower extends AbstractRaftActorBehavior { // this is the last chunk, create a snapshot object and apply snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData()); - context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}", - snapshotChunksCollected.size()); + if(LOG.isDebugEnabled()) { + LOG.debug("Last chunk received: snapshotChunksCollected.size:{}", + snapshotChunksCollected.size()); + } Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(), new ArrayList(), @@ -324,7 +314,7 @@ public class Follower extends AbstractRaftActorBehavior { true), actor()); } catch (Exception e) { - context.getLogger().error("Exception in InstallSnapshot of follower", e); + LOG.error(e, "Exception in InstallSnapshot of follower:"); //send reply with success as false. The chunk will be sent again on failure sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), installSnapshot.getChunkIndex(), false), actor()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index ff8a2256d3..9edba85865 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; -import akka.event.LoggingAdapter; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; @@ -81,13 +80,9 @@ public class Leader extends AbstractRaftActorBehavior { private final int minReplicationCount; - private final LoggingAdapter LOG; - public Leader(RaftActorContext context) { super(context); - LOG = context.getLogger(); - followers = context.getPeerAddresses().keySet(); for (String followerId : followers) { @@ -100,7 +95,7 @@ public class Leader extends AbstractRaftActorBehavior { } if(LOG.isDebugEnabled()) { - LOG.debug("Election:Leader has following peers:" + followers); + LOG.debug("Election:Leader has following peers: {}", followers); } if (followers.size() > 0) { @@ -123,17 +118,17 @@ public class Leader extends AbstractRaftActorBehavior { } - @Override protected RaftState handleAppendEntries(ActorRef sender, + @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { if(LOG.isDebugEnabled()) { LOG.debug(appendEntries.toString()); } - return state(); + return this; } - @Override protected RaftState handleAppendEntriesReply(ActorRef sender, + @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { if(! appendEntriesReply.isSuccess()) { @@ -149,7 +144,7 @@ public class Leader extends AbstractRaftActorBehavior { if(followerLogInformation == null){ LOG.error("Unknown follower {}", followerId); - return state(); + return this; } if (appendEntriesReply.isSuccess()) { @@ -199,7 +194,7 @@ public class Leader extends AbstractRaftActorBehavior { applyLogToStateMachine(context.getCommitIndex()); } - return state(); + return this; } protected ClientRequestTracker removeClientRequestTracker(long logIndex) { @@ -222,16 +217,16 @@ public class Leader extends AbstractRaftActorBehavior { return null; } - @Override protected RaftState handleRequestVoteReply(ActorRef sender, + @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) { - return state(); + return this; } @Override public RaftState state() { return RaftState.Leader; } - @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) { + @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { Preconditions.checkNotNull(sender, "sender should not be null"); Object message = fromSerializableMessage(originalMessage); @@ -243,13 +238,15 @@ public class Leader extends AbstractRaftActorBehavior { // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { context.getTermInformation().updateAndPersist(rpc.getTerm(), null); - return RaftState.Follower; + + return switchBehavior(new Follower(context)); } } try { if (message instanceof SendHeartBeat) { - return sendHeartBeat(); + sendHeartBeat(); + return this; } else if(message instanceof SendInstallSnapshot) { installSnapshotIfNeeded(); } else if (message instanceof Replicate) { @@ -321,7 +318,7 @@ public class Leader extends AbstractRaftActorBehavior { long logIndex = replicate.getReplicatedLogEntry().getIndex(); if(LOG.isDebugEnabled()) { - LOG.debug("Replicate message " + logIndex); + LOG.debug("Replicate message {}", logIndex); } // Create a tracker entry we will use this later to notify the @@ -445,7 +442,7 @@ public class Leader extends AbstractRaftActorBehavior { followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(), mapFollowerToSnapshot.get(followerId).getTotalChunks()); } catch (IOException e) { - LOG.error("InstallSnapshot failed for Leader.", e); + LOG.error(e, "InstallSnapshot failed for Leader."); } } @@ -467,11 +464,10 @@ public class Leader extends AbstractRaftActorBehavior { return nextChunk; } - private RaftState sendHeartBeat() { + private void sendHeartBeat() { if (followers.size() > 0) { sendAppendEntries(); } - return state(); } private void stopHeartBeat() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java index ca2d916ecf..064cd8b88c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java @@ -25,17 +25,18 @@ import org.opendaylight.controller.cluster.raft.RaftState; * differently. */ public interface RaftActorBehavior extends AutoCloseable{ + /** * Handle a message. If the processing of the message warrants a state - * change then a new state should be returned otherwise this method should - * return the state for the current behavior. + * change then a new behavior should be returned otherwise this method should + * return the current behavior. * * @param sender The sender of the message * @param message A message that needs to be processed * - * @return The new state or self (this) + * @return The new behavior or current behavior */ - RaftState handleMessage(ActorRef sender, Object message); + RaftActorBehavior handleMessage(ActorRef sender, Object message); /** * The state associated with a given behavior diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 22f374319c..c15c9198bd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -181,7 +181,7 @@ public class RaftActorTest extends AbstractActorTest { return true; } }.from(raftActor.path().toString()) - .message("Switching from state Candidate to Leader") + .message("Switching from behavior Candidate to Leader") .occurrences(1).exec(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java index 8068dfbcff..3893018008 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java @@ -7,7 +7,6 @@ import org.junit.Test; import org.opendaylight.controller.cluster.raft.AbstractActorTest; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; -import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; @@ -22,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { @@ -79,12 +79,12 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { RaftActorBehavior behavior = createBehavior(context); // Send an unknown message so that the state of the RaftActor remains unchanged - RaftState expected = behavior.handleMessage(getRef(), "unknown"); + RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown"); - RaftState raftState = + RaftActorBehavior raftBehavior = behavior.handleMessage(getRef(), appendEntries); - assertEquals(expected, raftState); + assertEquals(expected, raftBehavior); // Also expect an AppendEntriesReply to be sent where success is false final Boolean out = new ExpectMsg(duration("1 seconds"), @@ -145,12 +145,12 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { } // Send an unknown message so that the state of the RaftActor remains unchanged - RaftState expected = behavior.handleMessage(getRef(), "unknown"); + RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown"); - RaftState raftState = + RaftActorBehavior raftBehavior = behavior.handleMessage(getRef(), appendEntries); - assertEquals(expected, raftState); + assertEquals(expected, raftBehavior); assertEquals(1, log.size()); @@ -174,11 +174,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { RaftActorBehavior behavior = createBehavior( createActorContext(behaviorActor)); - RaftState raftState = behavior.handleMessage(getTestActor(), + RaftActorBehavior raftBehavior = behavior.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999)); - if(behavior.state() != RaftState.Follower){ - assertEquals(RaftState.Follower, raftState); + if(!(behavior instanceof Follower)){ + assertTrue(raftBehavior instanceof Follower); } else { final Boolean out = @@ -228,11 +228,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { RaftActorBehavior behavior = createBehavior(actorContext); - RaftState raftState = behavior.handleMessage(getTestActor(), + RaftActorBehavior raftBehavior = behavior.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999)); - if(behavior.state() != RaftState.Follower){ - assertEquals(RaftState.Follower, raftState); + if(!(behavior instanceof Follower)){ + assertTrue(raftBehavior instanceof Follower); } else { final Boolean out = new ExpectMsg(duration("1 seconds"), @@ -309,10 +309,10 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { setLastLogEntry( (MockRaftActorContext) actorContext, 0, 0, p); - RaftState raftState = createBehavior(actorContext) + RaftActorBehavior raftBehavior = createBehavior(actorContext) .handleMessage(actorRef, rpc); - assertEquals(RaftState.Follower, raftState); + assertTrue(raftBehavior instanceof Follower); } protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry( diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java index d478b17555..a8d47e2c60 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -9,7 +9,6 @@ import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; -import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; @@ -109,10 +108,10 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { Candidate candidate = new Candidate(raftActorContext); - RaftState raftState = + RaftActorBehavior raftBehavior = candidate.handleMessage(candidateActor, new ElectionTimeout()); - Assert.assertEquals(RaftState.Leader, raftState); + Assert.assertTrue(raftBehavior instanceof Leader); } @Test @@ -123,10 +122,10 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { Candidate candidate = new Candidate(raftActorContext); - RaftState raftState = + RaftActorBehavior raftBehavior = candidate.handleMessage(candidateActor, new ElectionTimeout()); - Assert.assertEquals(RaftState.Candidate, raftState); + Assert.assertTrue(raftBehavior instanceof Candidate); } @Test @@ -137,9 +136,9 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { Candidate candidate = new Candidate(raftActorContext); - RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true)); + RaftActorBehavior behaviorOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true)); - Assert.assertEquals(RaftState.Leader, stateOnFirstVote); + Assert.assertTrue(behaviorOnFirstVote instanceof Leader); } @@ -151,12 +150,12 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { Candidate candidate = new Candidate(raftActorContext); - RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true)); + RaftActorBehavior behaviorOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true)); - RaftState stateOnSecondVote = candidate.handleMessage(peerActor2, new RequestVoteReply(0, true)); + RaftActorBehavior behaviorOnSecondVote = candidate.handleMessage(peerActor2, new RequestVoteReply(0, true)); - Assert.assertEquals(RaftState.Candidate, stateOnFirstVote); - Assert.assertEquals(RaftState.Leader, stateOnSecondVote); + Assert.assertTrue(behaviorOnFirstVote instanceof Candidate); + Assert.assertTrue(behaviorOnSecondVote instanceof Leader); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index a72a7c4332..edeab11e2a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -9,7 +9,6 @@ import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; -import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; @@ -85,10 +84,10 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { Follower follower = new Follower(raftActorContext); - RaftState raftState = + RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout()); - Assert.assertEquals(RaftState.Candidate, raftState); + Assert.assertTrue(raftBehavior instanceof Candidate); } @Test @@ -187,7 +186,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101); - RaftState raftState = + RaftActorBehavior raftBehavior = createBehavior(context).handleMessage(getRef(), appendEntries); assertEquals(101L, context.getLastApplied()); @@ -226,12 +225,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { RaftActorBehavior behavior = createBehavior(context); // Send an unknown message so that the state of the RaftActor remains unchanged - RaftState expected = behavior.handleMessage(getRef(), "unknown"); + RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown"); - RaftState raftState = + RaftActorBehavior raftBehavior = behavior.handleMessage(getRef(), appendEntries); - assertEquals(expected, raftState); + assertEquals(expected, raftBehavior); // Also expect an AppendEntriesReply to be sent where success is false final Boolean out = new ExpectMsg(duration("1 seconds"), @@ -302,12 +301,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { RaftActorBehavior behavior = createBehavior(context); // Send an unknown message so that the state of the RaftActor remains unchanged - RaftState expected = behavior.handleMessage(getRef(), "unknown"); + RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown"); - RaftState raftState = + RaftActorBehavior raftBehavior = behavior.handleMessage(getRef(), appendEntries); - assertEquals(expected, raftState); + assertEquals(expected, raftBehavior); assertEquals(5, log.last().getIndex() + 1); assertNotNull(log.get(3)); assertNotNull(log.get(4)); @@ -382,12 +381,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { RaftActorBehavior behavior = createBehavior(context); // Send an unknown message so that the state of the RaftActor remains unchanged - RaftState expected = behavior.handleMessage(getRef(), "unknown"); + RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown"); - RaftState raftState = + RaftActorBehavior raftBehavior = behavior.handleMessage(getRef(), appendEntries); - assertEquals(expected, raftState); + assertEquals(expected, raftBehavior); // The entry at index 2 will be found out-of-sync with the leader // and will be removed diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 19af64790f..48543d7de2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -12,7 +12,6 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; -import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -54,8 +53,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { // handle message should return the Leader state when it receives an // unknown message - RaftState state = leader.handleMessage(senderActor, "foo"); - Assert.assertEquals(RaftState.Leader, state); + RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo"); + Assert.assertTrue(behavior instanceof Leader); }}; } @@ -125,7 +124,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { actorContext.setPeerAddresses(peerAddresses); Leader leader = new Leader(actorContext); - RaftState raftState = leader + RaftActorBehavior raftBehavior = leader .handleMessage(senderActor, new Replicate(null, null, new MockRaftActorContext.MockReplicatedLogEntry(1, 100, @@ -133,7 +132,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { )); // State should not change - assertEquals(RaftState.Leader, raftState); + assertTrue(raftBehavior instanceof Leader); final String out = new ExpectMsg(duration("1 seconds"), "match hint") { @@ -179,11 +178,11 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { .build()); Leader leader = new Leader(actorContext); - RaftState raftState = leader + RaftActorBehavior raftBehavior = leader .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1))); // State should not change - assertEquals(RaftState.Leader, raftState); + assertTrue(raftBehavior instanceof Leader); assertEquals(1, actorContext.getCommitIndex()); @@ -258,10 +257,10 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { new MockRaftActorContext.MockPayload("D")); // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex - RaftState raftState = leader.handleMessage( + RaftActorBehavior raftBehavior = leader.handleMessage( senderActor, new Replicate(null, "state-id", entry)); - assertEquals(RaftState.Leader, raftState); + assertTrue(raftBehavior instanceof Leader); // we might receive some heartbeat messages, so wait till we SendInstallSnapshot Boolean[] matches = new ReceiveWhile(Boolean.class, duration("2 seconds")) { @@ -333,9 +332,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { new ReplicatedLogImplEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")); - RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot()); + RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, new SendInstallSnapshot()); - assertEquals(RaftState.Leader, raftState); + assertTrue(raftBehavior instanceof Leader); // check if installsnapshot gets called with the correct values. final String out = @@ -419,11 +418,11 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { //clears leaders log actorContext.getReplicatedLog().removeFrom(0); - RaftState raftState = leader.handleMessage(senderActor, + RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, new InstallSnapshotReply(currentTerm, followerActor.path().toString(), leader.getFollowerToSnapshot().getChunkIndex(), true)); - assertEquals(RaftState.Leader, raftState); + assertTrue(raftBehavior instanceof Leader); assertEquals(leader.mapFollowerToSnapshot.size(), 0); assertEquals(leader.followerToLog.size(), 1); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapper.java index 2eac2400b5..58d805b2b5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapper.java @@ -13,4 +13,5 @@ import akka.actor.ActorRef; public interface ClusterWrapper { void subscribeToMemberEvents(ActorRef actorRef); String getCurrentMemberName(); + String getSelfAddress(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapperImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapperImpl.java index 8910137ec4..857510ad4b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapperImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapperImpl.java @@ -17,6 +17,7 @@ import com.google.common.base.Preconditions; public class ClusterWrapperImpl implements ClusterWrapper { private final Cluster cluster; private final String currentMemberName; + private final String selfAddress; public ClusterWrapperImpl(ActorSystem actorSystem){ Preconditions.checkNotNull(actorSystem, "actorSystem should not be null"); @@ -31,6 +32,7 @@ public class ClusterWrapperImpl implements ClusterWrapper { ); currentMemberName = (String) cluster.getSelfRoles().toArray()[0]; + selfAddress = cluster.selfAddress().toString(); } @@ -45,4 +47,8 @@ public class ClusterWrapperImpl implements ClusterWrapper { public String getCurrentMemberName() { return currentMemberName; } + + public String getSelfAddress() { + return selfAddress; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index fef7e22873..d0bb3d3b69 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -34,9 +34,9 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; -import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; @@ -74,6 +74,8 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.Nonnull; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -81,7 +83,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; /** * A Shard represents a portion of the logical data tree
@@ -383,8 +384,11 @@ public class Shard extends RaftActor { ready.getModification()); // Return our actor path as we'll handle the three phase commit. - getSender().tell(new ReadyTransactionReply(Serialization.serializedActorPath(self())). - toSerializable(), getSelf()); + ReadyTransactionReply readyTransactionReply = + new ReadyTransactionReply(Serialization.serializedActorPath(self())); + getSender().tell( + ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply, + getSelf()); } private void handleAbortTransaction(AbortTransaction abort) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java index 29f22b28f4..d12e9997bb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; - import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.ReadData; @@ -34,10 +33,18 @@ public class ShardReadTransaction extends ShardTransaction { @Override public void handleReceive(Object message) throws Exception { - if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) { - readData(transaction, ReadData.fromSerializable(message)); + if(message instanceof ReadData) { + readData(transaction, (ReadData) message, !SERIALIZED_REPLY); + + } else if (message instanceof DataExists) { + dataExists(transaction, (DataExists) message, !SERIALIZED_REPLY); + + } else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) { + readData(transaction, ReadData.fromSerializable(message), SERIALIZED_REPLY); + } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) { - dataExists(transaction, DataExists.fromSerializable(message)); + dataExists(transaction, DataExists.fromSerializable(message), SERIALIZED_REPLY); + } else { super.handleReceive(message); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java index 2e174ebf56..b1fd02d217 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java @@ -33,10 +33,18 @@ public class ShardReadWriteTransaction extends ShardWriteTransaction { @Override public void handleReceive(Object message) throws Exception { - if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) { - readData(transaction, ReadData.fromSerializable(message)); + if (message instanceof ReadData) { + readData(transaction, (ReadData) message, !SERIALIZED_REPLY); + + } else if (message instanceof DataExists) { + dataExists(transaction, (DataExists) message, !SERIALIZED_REPLY); + + } else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) { + readData(transaction, ReadData.fromSerializable(message), SERIALIZED_REPLY); + } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) { - dataExists(transaction, DataExists.fromSerializable(message)); + dataExists(transaction, DataExists.fromSerializable(message), SERIALIZED_REPLY); + } else { super.handleReceive(message); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index edaf935678..5289ad33bf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -61,6 +61,7 @@ public abstract class ShardTransaction extends AbstractUntypedActor { private final SchemaContext schemaContext; private final ShardStats shardStats; private final String transactionID; + protected static final boolean SERIALIZED_REPLY = true; protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext, ShardStats shardStats, String transactionID) { @@ -115,23 +116,24 @@ public abstract class ShardTransaction extends AbstractUntypedActor { getSelf().tell(PoisonPill.getInstance(), getSelf()); } - protected void readData(DOMStoreReadTransaction transaction,ReadData message) { + protected void readData(DOMStoreReadTransaction transaction, ReadData message, final boolean returnSerialized) { final ActorRef sender = getSender(); final ActorRef self = getSelf(); final YangInstanceIdentifier path = message.getPath(); final CheckedFuture>, ReadFailedException> future = transaction.read(path); + future.addListener(new Runnable() { @Override public void run() { try { Optional> optional = future.checkedGet(); - if (optional.isPresent()) { - sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self); - } else { - sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self); - } + ReadDataReply readDataReply = new ReadDataReply(schemaContext, optional.orNull()); + + sender.tell((returnSerialized ? readDataReply.toSerializable(): + readDataReply), self); + } catch (Exception e) { shardStats.incrementFailedReadTransactionsCount(); sender.tell(new akka.actor.Status.Failure(e), self); @@ -141,12 +143,15 @@ public abstract class ShardTransaction extends AbstractUntypedActor { }, getContext().dispatcher()); } - protected void dataExists(DOMStoreReadTransaction transaction, DataExists message) { + protected void dataExists(DOMStoreReadTransaction transaction, DataExists message, + final boolean returnSerialized) { final YangInstanceIdentifier path = message.getPath(); try { Boolean exists = transaction.exists(path).checkedGet(); - getSender().tell(new DataExistsReply(exists).toSerializable(), getSelf()); + DataExistsReply dataExistsReply = new DataExistsReply(exists); + getSender().tell(returnSerialized ? dataExistsReply.toSerializable() : + dataExistsReply, getSelf()); } catch (ReadFailedException e) { getSender().tell(new akka.actor.Status.Failure(e),getSelf()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index e993e4b55c..21c210daf2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -53,14 +53,31 @@ public class ShardWriteTransaction extends ShardTransaction { @Override public void handleReceive(Object message) throws Exception { - if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) { - writeData(transaction, WriteData.fromSerializable(message, getSchemaContext())); + + if (message instanceof WriteData) { + writeData(transaction, (WriteData) message, !SERIALIZED_REPLY); + + } else if (message instanceof MergeData) { + mergeData(transaction, (MergeData) message, !SERIALIZED_REPLY); + + } else if (message instanceof DeleteData) { + deleteData(transaction, (DeleteData) message, !SERIALIZED_REPLY); + + } else if (message instanceof ReadyTransaction) { + readyTransaction(transaction, new ReadyTransaction(), !SERIALIZED_REPLY); + + } else if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) { + writeData(transaction, WriteData.fromSerializable(message, getSchemaContext()), SERIALIZED_REPLY); + } else if(MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) { - mergeData(transaction, MergeData.fromSerializable(message, getSchemaContext())); + mergeData(transaction, MergeData.fromSerializable(message, getSchemaContext()), SERIALIZED_REPLY); + } else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) { - deleteData(transaction, DeleteData.fromSerializable(message)); + deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY); + } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) { - readyTransaction(transaction, new ReadyTransaction()); + readyTransaction(transaction, new ReadyTransaction(), SERIALIZED_REPLY); + } else if (message instanceof GetCompositedModification) { // This is here for testing only getSender().tell(new GetCompositeModificationReply( @@ -70,7 +87,7 @@ public class ShardWriteTransaction extends ShardTransaction { } } - private void writeData(DOMStoreWriteTransaction transaction, WriteData message) { + private void writeData(DOMStoreWriteTransaction transaction, WriteData message, boolean returnSerialized) { modification.addModification( new WriteModification(message.getPath(), message.getData(), getSchemaContext())); if(LOG.isDebugEnabled()) { @@ -78,13 +95,15 @@ public class ShardWriteTransaction extends ShardTransaction { } try { transaction.write(message.getPath(), message.getData()); - getSender().tell(new WriteDataReply().toSerializable(), getSelf()); + WriteDataReply writeDataReply = new WriteDataReply(); + getSender().tell(returnSerialized ? writeDataReply.toSerializable() : writeDataReply, + getSelf()); }catch(Exception e){ getSender().tell(new akka.actor.Status.Failure(e), getSelf()); } } - private void mergeData(DOMStoreWriteTransaction transaction, MergeData message) { + private void mergeData(DOMStoreWriteTransaction transaction, MergeData message, boolean returnSerialized) { modification.addModification( new MergeModification(message.getPath(), message.getData(), getSchemaContext())); if(LOG.isDebugEnabled()) { @@ -92,29 +111,34 @@ public class ShardWriteTransaction extends ShardTransaction { } try { transaction.merge(message.getPath(), message.getData()); - getSender().tell(new MergeDataReply().toSerializable(), getSelf()); + MergeDataReply mergeDataReply = new MergeDataReply(); + getSender().tell(returnSerialized ? mergeDataReply.toSerializable() : mergeDataReply , + getSelf()); }catch(Exception e){ getSender().tell(new akka.actor.Status.Failure(e), getSelf()); } } - private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) { + private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message, boolean returnSerialized) { if(LOG.isDebugEnabled()) { LOG.debug("deleteData at path : " + message.getPath().toString()); } modification.addModification(new DeleteModification(message.getPath())); try { transaction.delete(message.getPath()); - getSender().tell(new DeleteDataReply().toSerializable(), getSelf()); + DeleteDataReply deleteDataReply = new DeleteDataReply(); + getSender().tell(returnSerialized ? deleteDataReply.toSerializable() : deleteDataReply, + getSelf()); }catch(Exception e){ getSender().tell(new akka.actor.Status.Failure(e), getSelf()); } } - private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) { + private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message, boolean returnSerialized) { DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); - getShardActor().forward(new ForwardedReadyTransaction(getTransactionID(), cohort, modification), + getShardActor().forward(new ForwardedReadyTransaction( + getTransactionID(), cohort, modification, returnSerialized), getContext()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index ec198510d3..715f48c349 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -157,7 +157,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { for(ActorSelection actor : remoteTransactionActors) { LOG.trace("Sending CloseTransaction to {}", actor); actorContext.sendOperationAsync(actor, - new CloseTransaction().toSerializable()); + new CloseTransaction().toSerializable()); } } } @@ -385,8 +385,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } Object response = actorContext.executeOperation(primaryShard.get(), - new CreateTransaction(identifier.toString(), this.transactionType.ordinal(), - getTransactionChainId()).toSerializable()); + new CreateTransaction(identifier.toString(), this.transactionType.ordinal(), + getTransactionChainId()).toSerializable()); if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { CreateTransactionReply reply = CreateTransactionReply.fromSerializable(response); @@ -408,8 +408,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { remoteTransactionActorsMB.set(true); } + // TxActor is always created where the leader of the shard is. + // Check if TxActor is created in the same node + boolean isTxActorLocal = actorContext.isLocalPath(transactionPath); + transactionContext = new TransactionContextImpl(shardName, transactionPath, - transactionActor, identifier, actorContext, schemaContext); + transactionActor, identifier, actorContext, schemaContext, isTxActorLocal); remoteTransactionPaths.put(shardName, transactionContext); } else { @@ -483,15 +487,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final SchemaContext schemaContext; private final String actorPath; private final ActorSelection actor; + private final boolean isTxActorLocal; private TransactionContextImpl(String shardName, String actorPath, ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext, - SchemaContext schemaContext) { + SchemaContext schemaContext, boolean isTxActorLocal) { super(shardName, identifier); this.actorPath = actorPath; this.actor = actor; this.actorContext = actorContext; this.schemaContext = schemaContext; + this.isTxActorLocal = isTxActorLocal; } private ActorSelection getActor() { @@ -514,8 +520,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } // Send the ReadyTransaction message to the Tx actor. + ReadyTransaction readyTransaction = new ReadyTransaction(); final Future replyFuture = actorContext.executeOperationAsync(getActor(), - new ReadyTransaction().toSerializable()); + isTxActorLocal ? readyTransaction : readyTransaction.toSerializable()); // Combine all the previously recorded put/merge/delete operation reply Futures and the // ReadyTransactionReply Future into one Future. If any one fails then the combined @@ -549,15 +556,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Note the Future get call here won't block as it's complete. Object serializedReadyReply = replyFuture.value().get().get(); - if(serializedReadyReply.getClass().equals( - ReadyTransactionReply.SERIALIZABLE_CLASS)) { - ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable( - serializedReadyReply); + if (serializedReadyReply instanceof ReadyTransactionReply) { + return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath()); + } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) { + ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply); return actorContext.actorSelection(reply.getCohortPath()); + } else { // Throwing an exception here will fail the Future. - throw new IllegalArgumentException(String.format("Invalid reply type {}", serializedReadyReply.getClass())); } @@ -570,8 +577,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} deleteData called path = {}", identifier, path); } + + DeleteData deleteData = new DeleteData(path); recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), - new DeleteData(path).toSerializable())); + isTxActorLocal ? deleteData : deleteData.toSerializable())); } @Override @@ -579,8 +588,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} mergeData called path = {}", identifier, path); } + + MergeData mergeData = new MergeData(path, data, schemaContext); recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), - new MergeData(path, data, schemaContext).toSerializable())); + isTxActorLocal ? mergeData : mergeData.toSerializable())); } @Override @@ -588,8 +599,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} writeData called path = {}", identifier, path); } + + WriteData writeData = new WriteData(path, data, schemaContext); recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), - new WriteData(path, data, schemaContext).toSerializable())); + isTxActorLocal ? writeData : writeData.toSerializable())); } @Override @@ -619,6 +632,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { Future> combinedFutures = akka.dispatch.Futures.sequence( Lists.newArrayList(recordedOperationFutures), actorContext.getActorSystem().dispatcher()); + OnComplete> onComplete = new OnComplete>() { @Override public void onComplete(Throwable failure, Iterable notUsed) @@ -663,25 +677,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} read operation succeeded", identifier, failure); } - if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { - ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, - path, readResponse); - if (reply.getNormalizedNode() == null) { - returnFuture.set(Optional.>absent()); - } else { - returnFuture.set(Optional.>of( - reply.getNormalizedNode())); - } + + if (readResponse instanceof ReadDataReply) { + ReadDataReply reply = (ReadDataReply) readResponse; + returnFuture.set(Optional.>fromNullable(reply.getNormalizedNode())); + + } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { + ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse); + returnFuture.set(Optional.>fromNullable(reply.getNormalizedNode())); + } else { returnFuture.setException(new ReadFailedException( - "Invalid response reading data for path " + path)); + "Invalid response reading data for path " + path)); } } } }; + ReadData readData = new ReadData(path); Future readFuture = actorContext.executeOperationAsync(getActor(), - new ReadData(path).toSerializable()); + isTxActorLocal ? readData : readData.toSerializable()); + readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); } @@ -756,9 +772,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} dataExists operation succeeded", identifier, failure); } - if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) { - returnFuture.set(Boolean.valueOf(DataExistsReply. - fromSerializable(response).exists())); + + if (response instanceof DataExistsReply) { + returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists())); + + } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) { + returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists())); + } else { returnFuture.setException(new ReadFailedException( "Invalid response checking exists for path " + path)); @@ -767,8 +787,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } }; + DataExists dataExists = new DataExists(path); Future future = actorContext.executeOperationAsync(getActor(), - new DataExists(path).toSerializable()); + isTxActorLocal ? dataExists : dataExists.toSerializable()); + future.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java index 4f8ea51f78..180108f218 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java @@ -19,12 +19,15 @@ public class ForwardedReadyTransaction { private final String transactionID; private final DOMStoreThreePhaseCommitCohort cohort; private final Modification modification; + private final boolean returnSerialized; public ForwardedReadyTransaction(String transactionID, DOMStoreThreePhaseCommitCohort cohort, - Modification modification) { + Modification modification, boolean returnSerialized) { this.transactionID = transactionID; this.cohort = cohort; this.modification = modification; + this.returnSerialized = returnSerialized; + } public String getTransactionID() { @@ -38,4 +41,8 @@ public class ForwardedReadyTransaction { public Modification getModification() { return modification; } + + public boolean isReturnSerialized() { + return returnSerialized; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index d8af09c86b..314ae916de 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -33,9 +33,7 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; - import java.util.concurrent.TimeUnit; - import static akka.pattern.Patterns.ask; /** @@ -237,6 +235,10 @@ public class ActorContext { actorSystem.shutdown(); } + public ClusterWrapper getClusterWrapper() { + return clusterWrapper; + } + public String getCurrentMemberName(){ return clusterWrapper.getCurrentMemberName(); } @@ -262,4 +264,30 @@ public class ActorContext { public FiniteDuration getOperationDuration() { return operationDuration; } + + public boolean isLocalPath(String path) { + String selfAddress = clusterWrapper.getSelfAddress(); + if (path == null || selfAddress == null) { + return false; + } + + int atIndex1 = path.indexOf("@"); + int atIndex2 = selfAddress.indexOf("@"); + + if (atIndex1 == -1 || atIndex2 == -1) { + return false; + } + + int slashIndex1 = path.indexOf("/", atIndex1); + int slashIndex2 = selfAddress.indexOf("/", atIndex2); + + if (slashIndex1 == -1 || slashIndex2 == -1) { + return false; + } + + String hostPort1 = path.substring(atIndex1, slashIndex1); + String hostPort2 = selfAddress.substring(atIndex2, slashIndex2); + + return hostPort1.equals(hostPort2); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 9b4f77b7c4..f183bb319e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -468,7 +468,7 @@ public class ShardTest extends AbstractActorTest { // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef()); ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); @@ -482,10 +482,10 @@ public class ShardTest extends AbstractActorTest { // Send the ForwardedReadyTransaction for the next 2 Tx's. - shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and @@ -639,10 +639,10 @@ public class ShardTest extends AbstractActorTest { // Simulate the ForwardedReadyTransaction messages that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message for the first Tx. @@ -703,7 +703,7 @@ public class ShardTest extends AbstractActorTest { // Simulate the ForwardedReadyTransaction messages that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message. @@ -743,7 +743,7 @@ public class ShardTest extends AbstractActorTest { // Simulate the ForwardedReadyTransaction messages that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message. @@ -793,7 +793,7 @@ public class ShardTest extends AbstractActorTest { TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification, preCommit); - shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); @@ -854,10 +854,10 @@ public class ShardTest extends AbstractActorTest { // Ready the Tx's - shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // canCommit 1st Tx. We don't send the commit so it should timeout. @@ -913,13 +913,13 @@ public class ShardTest extends AbstractActorTest { // Ready the Tx's - shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // canCommit 1st Tx. @@ -976,10 +976,10 @@ public class ShardTest extends AbstractActorTest { // Simulate the ForwardedReadyTransaction messages that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message for the first Tx. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 8ce8f4d4b5..711f3d7a72 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -9,6 +9,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.junit.BeforeClass; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply; import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; @@ -33,6 +34,7 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -40,6 +42,8 @@ import scala.concurrent.duration.Duration; import java.util.Collections; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class ShardTransactionTest extends AbstractActorTest { @@ -73,41 +77,35 @@ public class ShardTransactionTest extends AbstractActorTest { public void testOnReceiveReadData() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, + Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn"); - final ActorRef subject = getSystem().actorOf(props, "testReadData"); - new Within(duration("1 seconds")) { - @Override - protected void run() { - - subject.tell( - new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), - getRef()); - - final String out = new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { - if (ReadDataReply.fromSerializable(testSchemaContext,YangInstanceIdentifier.builder().build(), in) - .getNormalizedNode()!= null) { - return "match"; - } - return null; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", out); - - expectNoMsg(); - } + testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO")); + + props = ShardTransaction.props(store.newReadWriteTransaction(), shard, + testSchemaContext, datastoreContext, shardStats, "txn"); + + testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW")); + } + + private void testOnReceiveReadData(final ActorRef subject) { + //serialized read + subject.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), + getRef()); + + ShardTransactionMessages.ReadDataReply replySerialized = + expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS); + + assertNotNull(ReadDataReply.fromSerializable( + testSchemaContext,YangInstanceIdentifier.builder().build(), replySerialized) + .getNormalizedNode()); + + // unserialized read + subject.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef()); + ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class); - }; + assertNotNull(reply.getNormalizedNode()); }}; } @@ -115,42 +113,35 @@ public class ShardTransactionTest extends AbstractActorTest { public void testOnReceiveReadDataWhenDataNotFound() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); - final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard, + Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn"); - final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound"); - new Within(duration("1 seconds")) { - @Override - protected void run() { - - subject.tell( - new ReadData(TestModel.TEST_PATH).toSerializable(), - getRef()); - - final String out = new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { - if (ReadDataReply.fromSerializable(testSchemaContext,TestModel.TEST_PATH, in) - .getNormalizedNode() - == null) { - return "match"; - } - return null; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", out); - - expectNoMsg(); - } + testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf( + props, "testReadDataWhenDataNotFoundRO")); + + props = ShardTransaction.props( store.newReadWriteTransaction(), shard, + testSchemaContext, datastoreContext, shardStats, "txn"); + + testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf( + props, "testReadDataWhenDataNotFoundRW")); + } + + private void testOnReceiveReadDataWhenDataNotFound(final ActorRef subject) { + // serialized read + subject.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef()); + + ShardTransactionMessages.ReadDataReply replySerialized = + expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS); + + assertTrue(ReadDataReply.fromSerializable( + testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null); + // unserialized read + subject.tell(new ReadData(TestModel.TEST_PATH),getRef()); - }; + ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class); + + assertTrue(reply.getNormalizedNode() == null); }}; } @@ -158,41 +149,32 @@ public class ShardTransactionTest extends AbstractActorTest { public void testOnReceiveDataExistsPositive() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, + Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn"); - final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive"); - new Within(duration("1 seconds")) { - @Override - protected void run() { - - subject.tell( - new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(), - getRef()); - - final String out = new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) { - if (DataExistsReply.fromSerializable(in) - .exists()) { - return "match"; - } - return null; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", out); - - expectNoMsg(); - } + testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO")); + + props = ShardTransaction.props(store.newReadWriteTransaction(), shard, + testSchemaContext, datastoreContext, shardStats, "txn"); + + testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW")); + } + + private void testOnReceiveDataExistsPositive(final ActorRef subject) { + subject.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(), + getRef()); + + ShardTransactionMessages.DataExistsReply replySerialized = + expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class); + assertTrue(DataExistsReply.fromSerializable(replySerialized).exists()); - }; + // unserialized read + subject.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef()); + + DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class); + + assertTrue(reply.exists()); }}; } @@ -200,76 +182,44 @@ public class ShardTransactionTest extends AbstractActorTest { public void testOnReceiveDataExistsNegative() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, + Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn"); - final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative"); - new Within(duration("1 seconds")) { - @Override - protected void run() { - - subject.tell( - new DataExists(TestModel.TEST_PATH).toSerializable(), - getRef()); - - final String out = new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) { - if (!DataExistsReply.fromSerializable(in) - .exists()) { - return "match"; - } - return null; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", out); - - expectNoMsg(); - } + testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO")); + + props = ShardTransaction.props(store.newReadWriteTransaction(), shard, + testSchemaContext, datastoreContext, shardStats, "txn"); + + testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW")); + } + + private void testOnReceiveDataExistsNegative(final ActorRef subject) { + subject.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef()); + ShardTransactionMessages.DataExistsReply replySerialized = + expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class); - }; + assertFalse(DataExistsReply.fromSerializable(replySerialized).exists()); + + // unserialized read + subject.tell(new DataExists(TestModel.TEST_PATH),getRef()); + + DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class); + + assertFalse(reply.exists()); }}; } private void assertModification(final ActorRef subject, final Class modificationType) { new JavaTestKit(getSystem()) {{ - new Within(duration("3 seconds")) { - @Override - protected void run() { - subject - .tell(new ShardWriteTransaction.GetCompositedModification(), - getRef()); - - final CompositeModification compositeModification = - new ExpectMsg(duration("3 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected CompositeModification match(Object in) { - if (in instanceof ShardWriteTransaction.GetCompositeModificationReply) { - return ((ShardWriteTransaction.GetCompositeModificationReply) in) - .getModification(); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertTrue( - compositeModification.getModifications().size() == 1); - assertEquals(modificationType, - compositeModification.getModifications().get(0) - .getClass()); + subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef()); - } - }; + CompositeModification compositeModification = expectMsgClass(duration("3 seconds"), + GetCompositeModificationReply.class).getModification(); + + assertTrue(compositeModification.getModifications().size() == 1); + assertEquals(modificationType, compositeModification.getModifications().get(0).getClass()); }}; } @@ -282,34 +232,22 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef subject = getSystem().actorOf(props, "testWriteData"); - new Within(duration("1 seconds")) { - @Override - protected void run() { - - subject.tell(new WriteData(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(), - getRef()); - - final String out = new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) { - return "match"; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", out); - - assertModification(subject, WriteModification.class); - expectNoMsg(); - } + subject.tell(new WriteData(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(), + getRef()); + + ShardTransactionMessages.WriteDataReply replySerialized = + expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class); + assertModification(subject, WriteModification.class); - }; + //unserialized write + subject.tell(new WriteData(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), + TestModel.createTestContext()), + getRef()); + + expectMsgClass(duration("5 seconds"), WriteDataReply.class); }}; } @@ -322,35 +260,21 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef subject = getSystem().actorOf(props, "testMergeData"); - new Within(duration("1 seconds")) { - @Override - protected void run() { - - subject.tell(new MergeData(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(), - getRef()); - - final String out = new ExpectMsg(duration("500 milliseconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in.getClass().equals(MergeDataReply.SERIALIZABLE_CLASS)) { - return "match"; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + subject.tell(new MergeData(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(), + getRef()); - assertEquals("match", out); + ShardTransactionMessages.MergeDataReply replySerialized = + expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class); - assertModification(subject, MergeModification.class); - - expectNoMsg(); - } + assertModification(subject, MergeModification.class); + //unserialized merge + subject.tell(new MergeData(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext), + getRef()); - }; + expectMsgClass(duration("5 seconds"), MergeDataReply.class); }}; } @@ -363,32 +287,17 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef subject = getSystem().actorOf(props, "testDeleteData"); - new Within(duration("1 seconds")) { - @Override - protected void run() { - - subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef()); - - final String out = new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in.getClass().equals(DeleteDataReply.SERIALIZABLE_CLASS)) { - return "match"; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", out); - - assertModification(subject, DeleteModification.class); - expectNoMsg(); - } + subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef()); + ShardTransactionMessages.DeleteDataReply replySerialized = + expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class); - }; + assertModification(subject, DeleteModification.class); + + //unserialized merge + subject.tell(new DeleteData(TestModel.TEST_PATH), getRef()); + + expectMsgClass(duration("5 seconds"), DeleteDataReply.class); }}; } @@ -402,83 +311,41 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef subject = getSystem().actorOf(props, "testReadyTransaction"); - new Within(duration("1 seconds")) { - @Override - protected void run() { - - subject.tell(new ReadyTransaction().toSerializable(), getRef()); + subject.tell(new ReadyTransaction().toSerializable(), getRef()); - final String out = new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) { - return "match"; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", out); + expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS); + }}; - expectNoMsg(); - } + // test + new JavaTestKit(getSystem()) {{ + final ActorRef shard = createShard(); + final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard, + testSchemaContext, datastoreContext, shardStats, "txn"); + final ActorRef subject = + getSystem().actorOf(props, "testReadyTransaction2"); + subject.tell(new ReadyTransaction(), getRef()); - }; + expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class); }}; } + @SuppressWarnings("unchecked") @Test public void testOnReceiveCloseTransaction() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext, datastoreContext, shardStats, "txn"); - final ActorRef subject = - getSystem().actorOf(props, "testCloseTransaction"); + final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction"); watch(subject); - new Within(duration("6 seconds")) { - @Override - protected void run() { - - subject.tell(new CloseTransaction().toSerializable(), getRef()); - - final String out = new ExpectMsg(duration("3 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - System.out.println("!!!IN match 1: "+(in!=null?in.getClass():"NULL")); - if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) { - return "match"; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", out); - - final String termination = new ExpectMsg(duration("3 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - System.out.println("!!!IN match 2: "+(in!=null?in.getClass():"NULL")); - if (in instanceof Terminated) { - return "match"; - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message - - assertEquals("match", termination); - } - }; + subject.tell(new CloseTransaction().toSerializable(), getRef()); + + expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS); + expectMsgClass(duration("3 seconds"), Terminated.class); }}; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 592337f93f..f2b849122a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -1,12 +1,12 @@ package org.opendaylight.controller.cluster.datastore; -import com.google.common.util.concurrent.CheckedFuture; - import akka.actor.ActorRef; import akka.actor.ActorSelection; +import akka.actor.ActorSystem; import akka.actor.Props; import akka.dispatch.Futures; import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatcher; @@ -44,10 +44,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; - import java.util.List; import java.util.concurrent.TimeUnit; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -83,6 +81,9 @@ public class TransactionProxyTest extends AbstractActorTest { private SchemaContext schemaContext; + @Mock + private ClusterWrapper mockClusterWrapper; + String memberName = "mock-member"; @Before @@ -94,6 +95,7 @@ public class TransactionProxyTest extends AbstractActorTest { doReturn(getSystem()).when(mockActorContext).getActorSystem(); doReturn(memberName).when(mockActorContext).getCurrentMemberName(); doReturn(schemaContext).when(mockActorContext).getSchemaContext(); + doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); ShardStrategyFactory.setConfiguration(configuration); } @@ -112,7 +114,7 @@ public class TransactionProxyTest extends AbstractActorTest { return argThat(matcher); } - private DataExists eqDataExists() { + private DataExists eqSerializedDataExists() { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { @@ -124,7 +126,19 @@ public class TransactionProxyTest extends AbstractActorTest { return argThat(matcher); } - private ReadData eqReadData() { + private DataExists eqDataExists() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return (argument instanceof DataExists) && + ((DataExists)argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + private ReadData eqSerializedReadData() { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { @@ -136,7 +150,19 @@ public class TransactionProxyTest extends AbstractActorTest { return argThat(matcher); } - private WriteData eqWriteData(final NormalizedNode nodeToWrite) { + private ReadData eqReadData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return (argument instanceof ReadData) && + ((ReadData)argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + private WriteData eqSerializedWriteData(final NormalizedNode nodeToWrite) { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { @@ -153,7 +179,23 @@ public class TransactionProxyTest extends AbstractActorTest { return argThat(matcher); } - private MergeData eqMergeData(final NormalizedNode nodeToWrite) { + private WriteData eqWriteData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if(argument instanceof WriteData) { + WriteData obj = (WriteData) argument; + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + return false; + } + }; + + return argThat(matcher); + } + + private MergeData eqSerializedMergeData(final NormalizedNode nodeToWrite) { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { @@ -170,7 +212,24 @@ public class TransactionProxyTest extends AbstractActorTest { return argThat(matcher); } - private DeleteData eqDeleteData() { + private MergeData eqMergeData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if(argument instanceof MergeData) { + MergeData obj = ((MergeData) argument); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + + return false; + } + }; + + return argThat(matcher); + } + + private DeleteData eqSerializedDeleteData() { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { @@ -182,30 +241,67 @@ public class TransactionProxyTest extends AbstractActorTest { return argThat(matcher); } - private Future readyTxReply(String path) { + private DeleteData eqDeleteData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return argument instanceof DeleteData && + ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + private Future readySerializedTxReply(String path) { return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable()); } - private Future readDataReply(NormalizedNode data) { + private Future readyTxReply(String path) { + return Futures.successful((Object)new ReadyTransactionReply(path)); + } + + + private Future readSerializedDataReply(NormalizedNode data) { return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable()); } - private Future dataExistsReply(boolean exists) { + private Future readDataReply(NormalizedNode data) { + return Futures.successful(new ReadDataReply(schemaContext, data)); + } + + private Future dataExistsSerializedReply(boolean exists) { return Futures.successful(new DataExistsReply(exists).toSerializable()); } - private Future writeDataReply() { + private Future dataExistsReply(boolean exists) { + return Futures.successful(new DataExistsReply(exists)); + } + + private Future writeSerializedDataReply() { return Futures.successful(new WriteDataReply().toSerializable()); } - private Future mergeDataReply() { + private Future writeDataReply() { + return Futures.successful(new WriteDataReply()); + } + + private Future mergeSerializedDataReply() { return Futures.successful(new MergeDataReply().toSerializable()); } - private Future deleteDataReply() { + private Future mergeDataReply() { + return Futures.successful(new MergeDataReply()); + } + + private Future deleteSerializedDataReply() { return Futures.successful(new DeleteDataReply().toSerializable()); } + private Future deleteDataReply() { + return Futures.successful(new DeleteDataReply()); + } + private ActorSelection actorSelection(ActorRef actorRef) { return getSystem().actorSelection(actorRef.path()); } @@ -216,17 +312,20 @@ public class TransactionProxyTest extends AbstractActorTest { .setTransactionId("txn-1").build(); } - private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) { - ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); - doReturn(getSystem().actorSelection(actorRef.path())). + private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) { + ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + doReturn(actorSystem.actorSelection(actorRef.path())). when(mockActorContext).actorSelection(actorRef.path().toString()); - doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))). + doReturn(Optional.of(actorSystem.actorSelection(actorRef.path()))). when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD)); doReturn(createTransactionReply(actorRef)).when(mockActorContext). - executeOperation(eq(getSystem().actorSelection(actorRef.path())), + executeOperation(eq(actorSystem.actorSelection(actorRef.path())), eqCreateTransaction(memberName, type)); + + doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString()); + return actorRef; } @@ -243,13 +342,13 @@ public class TransactionProxyTest extends AbstractActorTest { @Test public void testRead() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); - doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData()); + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); Optional> readOptional = transactionProxy.read( TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); @@ -258,8 +357,8 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData()); + doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); @@ -270,7 +369,7 @@ public class TransactionProxyTest extends AbstractActorTest { @Test(expected = ReadFailedException.class) public void testReadWithInvalidReplyMessageType() throws Exception { - setupActorContextWithInitialCreateTransaction(READ_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); doReturn(Futures.successful(new Object())).when(mockActorContext). executeOperationAsync(any(ActorSelection.class), any()); @@ -283,7 +382,7 @@ public class TransactionProxyTest extends AbstractActorTest { @Test(expected = TestException.class) public void testReadWithAsyncRemoteOperatonFailure() throws Throwable { - setupActorContextWithInitialCreateTransaction(READ_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); doReturn(Futures.failed(new TestException())).when(mockActorContext). executeOperationAsync(any(ActorSelection.class), any()); @@ -338,18 +437,18 @@ public class TransactionProxyTest extends AbstractActorTest { @Test(expected = TestException.class) public void testReadWithPriorRecordingOperationFailure() throws Throwable { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData()); + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData()); - doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData()); + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -362,21 +461,21 @@ public class TransactionProxyTest extends AbstractActorTest { propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); } finally { verify(mockActorContext, times(0)).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData()); + eq(actorSelection(actorRef)), eqSerializedReadData()); } } @Test public void testReadWithPriorRecordingOperationSuccessful() throws Throwable { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(expectedNode)); + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode)); - doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData()); + doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -402,20 +501,20 @@ public class TransactionProxyTest extends AbstractActorTest { @Test public void testExists() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); - doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqDataExists()); + doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); assertEquals("Exists response", false, exists); - doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqDataExists()); + doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); @@ -434,7 +533,7 @@ public class TransactionProxyTest extends AbstractActorTest { @Test(expected = ReadFailedException.class) public void testExistsWithInvalidReplyMessageType() throws Exception { - setupActorContextWithInitialCreateTransaction(READ_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); doReturn(Futures.successful(new Object())).when(mockActorContext). executeOperationAsync(any(ActorSelection.class), any()); @@ -447,7 +546,7 @@ public class TransactionProxyTest extends AbstractActorTest { @Test(expected = TestException.class) public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable { - setupActorContextWithInitialCreateTransaction(READ_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); doReturn(Futures.failed(new TestException())).when(mockActorContext). executeOperationAsync(any(ActorSelection.class), any()); @@ -460,18 +559,18 @@ public class TransactionProxyTest extends AbstractActorTest { @Test(expected = TestException.class) public void testExistsWithPriorRecordingOperationFailure() throws Throwable { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData()); + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData()); - doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqDataExists()); + doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -484,21 +583,21 @@ public class TransactionProxyTest extends AbstractActorTest { propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH)); } finally { verify(mockActorContext, times(0)).executeOperationAsync( - eq(actorSelection(actorRef)), eqDataExists()); + eq(actorSelection(actorRef)), eqSerializedDataExists()); } } @Test public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); - doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqDataExists()); + doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -544,12 +643,12 @@ public class TransactionProxyTest extends AbstractActorTest { @Test public void testWrite() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -557,7 +656,7 @@ public class TransactionProxyTest extends AbstractActorTest { transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), WriteDataReply.SERIALIZABLE_CLASS); @@ -587,12 +686,12 @@ public class TransactionProxyTest extends AbstractActorTest { @Test public void testMerge() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); + doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -600,7 +699,7 @@ public class TransactionProxyTest extends AbstractActorTest { transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); + eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), MergeDataReply.SERIALIZABLE_CLASS); @@ -608,10 +707,10 @@ public class TransactionProxyTest extends AbstractActorTest { @Test public void testDelete() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); - doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqDeleteData()); + doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDeleteData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -619,7 +718,7 @@ public class TransactionProxyTest extends AbstractActorTest { transactionProxy.delete(TestModel.TEST_PATH); verify(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqDeleteData()); + eq(actorSelection(actorRef)), eqSerializedDeleteData()); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), DeleteDataReply.SERIALIZABLE_CLASS); @@ -637,7 +736,7 @@ public class TransactionProxyTest extends AbstractActorTest { Object expReply = expReplies[i++]; if(expReply instanceof ActorSelection) { ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS)); - assertEquals("Cohort actor path", (ActorSelection) expReply, actual); + assertEquals("Cohort actor path", expReply, actual); } else { // Expecting exception. try { @@ -653,17 +752,17 @@ public class TransactionProxyTest extends AbstractActorTest { @SuppressWarnings("unchecked") @Test public void testReady() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData()); + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); - doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); - doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, @@ -688,19 +787,21 @@ public class TransactionProxyTest extends AbstractActorTest { @SuppressWarnings("unchecked") @Test public void testReadyWithRecordingOperationFailure() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); + doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); - doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -723,12 +824,12 @@ public class TransactionProxyTest extends AbstractActorTest { @SuppressWarnings("unchecked") @Test public void testReadyWithReplyFailure() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); + doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); doReturn(Futures.failed(new TestException())).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), @@ -781,12 +882,12 @@ public class TransactionProxyTest extends AbstractActorTest { @SuppressWarnings("unchecked") @Test public void testReadyWithInvalidReplyMessageType() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); doReturn(Futures.successful(new Object())).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), @@ -808,7 +909,7 @@ public class TransactionProxyTest extends AbstractActorTest { @Test public void testGetIdentifier() { - setupActorContextWithInitialCreateTransaction(READ_ONLY); + setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, TransactionProxy.TransactionType.READ_ONLY); @@ -820,10 +921,10 @@ public class TransactionProxyTest extends AbstractActorTest { @SuppressWarnings("unchecked") @Test public void testClose() throws Exception{ - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); - doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqReadData()); + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -835,4 +936,140 @@ public class TransactionProxyTest extends AbstractActorTest { verify(mockActorContext).sendOperationAsync( eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS)); } + + + /** + * Method to test a local Tx actor. The Tx paths are matched to decide if the + * Tx actor is local or not. This is done by mocking the Tx actor path + * and the caller paths and ensuring that the paths have the remote-address format + * + * Note: Since the default akka provider for test is not a RemoteActorRefProvider, + * the paths returned for the actors for all the tests are not qualified remote paths. + * Hence are treated as non-local/remote actors. In short, all tests except + * few below run for remote actors + * + * @throws Exception + */ + @Test + public void testLocalTxActorRead() throws Exception { + ActorSystem actorSystem = getSystem(); + ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + + doReturn(actorSystem.actorSelection(shardActorRef.path())). + when(mockActorContext).actorSelection(shardActorRef.path().toString()); + + doReturn(Optional.of(actorSystem.actorSelection(shardActorRef.path()))). + when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; + CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() + .setTransactionId("txn-1") + .setTransactionActorPath(actorPath) + .build(); + + doReturn(createTransactionReply).when(mockActorContext). + executeOperation(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(memberName, READ_ONLY)); + + doReturn(true).when(mockActorContext).isLocalPath(actorPath); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY); + + // negative test case with null as the reply + doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqReadData()); + + Optional> readOptional = transactionProxy.read( + TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + + assertEquals("NormalizedNode isPresent", false, readOptional.isPresent()); + + // test case with node as read data reply + NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqReadData()); + + readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); + + assertEquals("Response NormalizedNode", expectedNode, readOptional.get()); + + // test for local data exists + doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDataExists()); + + boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); + + assertEquals("Exists response", true, exists); + } + + @Test + public void testLocalTxActorWrite() throws Exception { + ActorSystem actorSystem = getSystem(); + ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + + doReturn(actorSystem.actorSelection(shardActorRef.path())). + when(mockActorContext).actorSelection(shardActorRef.path().toString()); + + doReturn(Optional.of(actorSystem.actorSelection(shardActorRef.path()))). + when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; + CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() + .setTransactionId("txn-1") + .setTransactionActorPath(actorPath) + .build(); + + doReturn(createTransactionReply).when(mockActorContext). + executeOperation(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(memberName, WRITE_ONLY)); + + doReturn(true).when(mockActorContext).isLocalPath(actorPath); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(nodeToWrite)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + verify(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(nodeToWrite)); + + //testing local merge + doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqMergeData(nodeToWrite)); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + verify(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqMergeData(nodeToWrite)); + + + //testing local delete + doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDeleteData()); + + transactionProxy.delete(TestModel.TEST_PATH); + + verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData()); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class); + + // testing ready + doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), isA(ReadyTransaction.class)); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path())); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 8426b03a37..60f9a2d9dc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -7,7 +7,6 @@ import akka.actor.UntypedActor; import akka.japi.Creator; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; - import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; @@ -18,9 +17,7 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; - import java.util.concurrent.TimeUnit; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -99,23 +96,15 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testFindLocalShardWithShardNotFound(){ new JavaTestKit(getSystem()) {{ + ActorRef shardManagerActorRef = getSystem() + .actorOf(MockShardManager.props(false, null)); - new Within(duration("1 seconds")) { - @Override - protected void run() { - - ActorRef shardManagerActorRef = getSystem() - .actorOf(MockShardManager.props(false, null)); - - ActorContext actorContext = - new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), + ActorContext actorContext = + new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), mock(Configuration.class)); - Optional out = actorContext.findLocalShard("default"); - assertTrue(!out.isPresent()); - expectNoMsg(); - } - }; + Optional out = actorContext.findLocalShard("default"); + assertTrue(!out.isPresent()); }}; } @@ -123,63 +112,74 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testExecuteRemoteOperation() { new JavaTestKit(getSystem()) {{ + ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); - new Within(duration("3 seconds")) { - @Override - protected void run() { - - ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); - - ActorRef shardManagerActorRef = getSystem() - .actorOf(MockShardManager.props(true, shardActorRef)); + ActorRef shardManagerActorRef = getSystem() + .actorOf(MockShardManager.props(true, shardActorRef)); - ActorContext actorContext = - new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), + ActorContext actorContext = + new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), mock(Configuration.class)); - ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); - - Object out = actorContext.executeOperation(actor, "hello"); + ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); - assertEquals("hello", out); + Object out = actorContext.executeOperation(actor, "hello"); - expectNoMsg(); - } - }; + assertEquals("hello", out); }}; } @Test public void testExecuteRemoteOperationAsync() { new JavaTestKit(getSystem()) {{ + ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); - new Within(duration("3 seconds")) { - @Override - protected void run() { + ActorRef shardManagerActorRef = getSystem() + .actorOf(MockShardManager.props(true, shardActorRef)); - ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); + ActorContext actorContext = + new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), + mock(Configuration.class)); - ActorRef shardManagerActorRef = getSystem() - .actorOf(MockShardManager.props(true, shardActorRef)); + ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); - ActorContext actorContext = - new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), - mock(Configuration.class)); + Future future = actorContext.executeOperationAsync(actor, "hello"); - ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); + try { + Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS)); + assertEquals("Result", "hello", result); + } catch(Exception e) { + throw new AssertionError(e); + } + }}; + } - Future future = actorContext.executeOperationAsync(actor, "hello"); + @Test + public void testIsLocalPath() { + MockClusterWrapper clusterWrapper = new MockClusterWrapper(); + ActorContext actorContext = + new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class)); - try { - Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS)); - assertEquals("Result", "hello", result); - } catch(Exception e) { - throw new AssertionError(e); - } + clusterWrapper.setSelfAddress(""); + assertEquals(false, actorContext.isLocalPath(null)); + assertEquals(false, actorContext.isLocalPath("")); - expectNoMsg(); - } - }; - }}; + clusterWrapper.setSelfAddress(null); + assertEquals(false, actorContext.isLocalPath("")); + + clusterWrapper.setSelfAddress("akka://test/user/$b"); + assertEquals(false, actorContext.isLocalPath("akka://test/user/$a")); + + clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2550/"); + assertEquals(true, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/")); + + clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2550"); + assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/")); + + clusterWrapper.setSelfAddress("akka.tcp://system@128.0.0.1:2550/"); + assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/")); + + clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2551/"); + assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/")); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java index 803aa03b7c..b80506d17d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java @@ -15,19 +15,31 @@ import akka.cluster.MemberStatus; import akka.cluster.UniqueAddress; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import scala.collection.JavaConversions; - import java.util.HashSet; import java.util.Set; public class MockClusterWrapper implements ClusterWrapper{ - @Override public void subscribeToMemberEvents(ActorRef actorRef) { + private String selfAddress = "akka.tcp://test@127.0.0.1:2550/user/member-1-shard-test-config"; + + @Override + public void subscribeToMemberEvents(ActorRef actorRef) { } - @Override public String getCurrentMemberName() { + @Override + public String getCurrentMemberName() { return "member-1"; } + @Override + public String getSelfAddress() { + return selfAddress; + } + + public void setSelfAddress(String selfAddress) { + this.selfAddress = selfAddress; + } + public static void sendMemberUp(ActorRef to, String memberName, String address){ to.tell(createMemberUp(memberName, address), null); }