<artifactId>commons-io</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_${scala.version}</artifactId>
*/
long timeSinceLastActivity();
+ /**
+ * This method checks if it is ok to replicate
+ *
+ * @return true if it is ok to replicate
+ */
+ boolean okToReplicate();
}
private volatile long matchIndex;
+ private long lastReplicatedIndex = -1L;
+
+ private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
+
+
public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
this.id = id;
this.nextIndex = context.getCommitIndex();
return stopwatch.elapsed(TimeUnit.MILLISECONDS);
}
+ @Override
+ public boolean okToReplicate() {
+ // Return false if we are trying to send duplicate data before the heartbeat interval
+ if(getNextIndex() == lastReplicatedIndex){
+ if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams()
+ .getHeartBeatInterval().toMillis()){
+ return false;
+ }
+ }
+
+ resetLastReplicated();
+ return true;
+ }
+
+ private void resetLastReplicated(){
+ lastReplicatedIndex = getNextIndex();
+ if(lastReplicatedStopwatch.isRunning()){
+ lastReplicatedStopwatch.reset();
+ }
+ lastReplicatedStopwatch.start();
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
.append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
return builder.toString();
}
-
-
}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.time.DurationFormatUtils;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
} else if (message instanceof CaptureSnapshotReply){
handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
-
+ } else if(message instanceof GetOnDemandRaftState) {
+ onGetOnDemandRaftStats();
} else {
RaftActorBehavior oldBehavior = currentBehavior;
currentBehavior = currentBehavior.handleMessage(getSender(), message);
}
}
+ private void onGetOnDemandRaftStats() {
+ // Debugging message to retrieve raft stats.
+
+ OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
+ .commitIndex(context.getCommitIndex())
+ .currentTerm(context.getTermInformation().getCurrentTerm())
+ .inMemoryJournalDataSize(replicatedLog.dataSize())
+ .inMemoryJournalLogSize(replicatedLog.size())
+ .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated())
+ .lastApplied(context.getLastApplied())
+ .lastIndex(replicatedLog.lastIndex())
+ .lastTerm(replicatedLog.lastTerm())
+ .leader(getLeaderId())
+ .raftState(currentBehavior.state().toString())
+ .replicatedToAllIndex(currentBehavior.getReplicatedToAllIndex())
+ .snapshotIndex(replicatedLog.getSnapshotIndex())
+ .snapshotTerm(replicatedLog.getSnapshotTerm())
+ .votedFor(context.getTermInformation().getVotedFor())
+ .peerAddresses(ImmutableMap.copyOf(context.getPeerAddresses()));
+
+ ReplicatedLogEntry lastLogEntry = getLastLogEntry();
+ if (lastLogEntry != null) {
+ builder.lastLogIndex(lastLogEntry.getIndex());
+ builder.lastLogTerm(lastLogEntry.getTerm());
+ }
+
+ if(currentBehavior instanceof AbstractLeader) {
+ AbstractLeader leader = (AbstractLeader)currentBehavior;
+ Collection<String> followerIds = leader.getFollowerIds();
+ List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
+ for(String id: followerIds) {
+ final FollowerLogInformation info = leader.getFollower(id);
+ followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
+ info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity())));
+ }
+
+ builder.followerInfoList(followerInfoList);
+ }
+
+ sender().tell(builder.build(), self());
+
+ }
+
private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
if (oldBehavior != currentBehavior){
onStateChanged();
*
* @return Collection of follower IDs
*/
- protected final Collection<String> getFollowerIds() {
+ public final Collection<String> getFollowerIds() {
return followerToLog.keySet();
}
if (followerActor != null) {
long followerNextIndex = followerLogInformation.getNextIndex();
boolean isFollowerActive = followerLogInformation.isFollowerActive();
+ boolean sendAppendEntries = false;
+ List<ReplicatedLogEntry> entries = Collections.EMPTY_LIST;
if (mapFollowerToSnapshot.get(followerId) != null) {
// if install snapshot is in process , then sent next chunk if possible
sendSnapshotChunk(followerActor, followerId);
} else if(sendHeartbeat) {
// we send a heartbeat even if we have not received a reply for the last chunk
- sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
- Collections.<ReplicatedLogEntry>emptyList(), followerId);
+ sendAppendEntries = true;
}
} else {
long leaderLastIndex = context.getReplicatedLog().lastIndex();
followerNextIndex, followerId);
// FIXME : Sending one entry at a time
- final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
- sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
-
+ if(followerLogInformation.okToReplicate()) {
+ entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+ sendAppendEntries = true;
+ }
} else if (isFollowerActive && followerNextIndex >= 0 &&
leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
// if the followers next index is not present in the leaders log, and
}
// Send heartbeat to follower whenever install snapshot is initiated.
- sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
- Collections.<ReplicatedLogEntry>emptyList(), followerId);
-
+ sendAppendEntries = true;
initiateCaptureSnapshot(followerId, followerNextIndex);
} else if(sendHeartbeat) {
- //we send an AppendEntries, even if the follower is inactive
+ // we send an AppendEntries, even if the follower is inactive
// in-order to update the followers timestamp, in case it becomes active again
- sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
- Collections.<ReplicatedLogEntry>emptyList(), followerId);
+ sendAppendEntries = true;
}
}
+
+ if(sendAppendEntries) {
+ sendAppendEntriesToFollower(followerActor, followerNextIndex,
+ entries, followerId);
+ }
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+import java.beans.ConstructorProperties;
+
+/**
+ * A bean class containing a snapshot of information for a follower returned from GetOnDemandRaftStats.
+ *
+ * @author Thomas Pantelis
+ */
+public class FollowerInfo {
+ private final String id;
+ private final long nextIndex;
+ private final long matchIndex;
+ private final boolean isActive;
+ private final String timeSinceLastActivity;
+
+ @ConstructorProperties({"id","nextIndex", "matchIndex", "isActive", "timeSinceLastActivity"})
+ public FollowerInfo(String id, long nextIndex, long matchIndex, boolean isActive, String timeSinceLastActivity) {
+ this.id = id;
+ this.nextIndex = nextIndex;
+ this.matchIndex = matchIndex;
+ this.isActive = isActive;
+ this.timeSinceLastActivity = timeSinceLastActivity;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public long getNextIndex() {
+ return nextIndex;
+ }
+
+ public long getMatchIndex() {
+ return matchIndex;
+ }
+
+ public boolean isActive() {
+ return isActive;
+ }
+
+ public String getTimeSinceLastActivity() {
+ return timeSinceLastActivity;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+/**
+ * Local message sent to a RaftActor to obtain a snapshot of statistical information. Returns an
+ * OnDemandRaftState instance.
+ *
+ * @author Thomas Pantelis
+ */
+public class GetOnDemandRaftState {
+ public static final GetOnDemandRaftState INSTANCE = new GetOnDemandRaftState();
+
+ private GetOnDemandRaftState() {
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The response to a GetOnDemandRaftState message,
+ *
+ * @author Thomas Pantelis
+ */
+public class OnDemandRaftState {
+ private long lastLogIndex = -1L;
+ private long lastLogTerm = -1L;
+ private long currentTerm = -1L;
+ private long commitIndex = -1L;
+ private long lastApplied = -1L;
+ private long lastIndex = -1L;
+ private long lastTerm = -1L;
+ private long snapshotIndex = -1L;
+ private long snapshotTerm = -1L;
+ private long replicatedToAllIndex = -1L;
+ private long inMemoryJournalDataSize;
+ private long inMemoryJournalLogSize;
+ private String leader;
+ private String raftState;
+ private String votedFor;
+ private boolean isSnapshotCaptureInitiated;
+
+ private List<FollowerInfo> followerInfoList = Collections.emptyList();
+ private Map<String, String> peerAddresses = Collections.emptyMap();
+
+ private OnDemandRaftState() {
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public long getLastLogIndex() {
+ return lastLogIndex;
+ }
+
+ public long getLastLogTerm() {
+ return lastLogTerm;
+ }
+
+ public long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ public long getCommitIndex() {
+ return commitIndex;
+ }
+
+ public long getLastApplied() {
+ return lastApplied;
+ }
+
+ public long getLastIndex() {
+ return lastIndex;
+ }
+
+ public long getLastTerm() {
+ return lastTerm;
+ }
+
+ public long getSnapshotIndex() {
+ return snapshotIndex;
+ }
+
+ public long getSnapshotTerm() {
+ return snapshotTerm;
+ }
+
+ public long getReplicatedToAllIndex() {
+ return replicatedToAllIndex;
+ }
+
+ public long getInMemoryJournalDataSize() {
+ return inMemoryJournalDataSize;
+ }
+
+ public long getInMemoryJournalLogSize() {
+ return inMemoryJournalLogSize;
+ }
+
+ public String getLeader() {
+ return leader;
+ }
+
+ public String getRaftState() {
+ return raftState;
+ }
+
+ public String getVotedFor() {
+ return votedFor;
+ }
+
+ public boolean isSnapshotCaptureInitiated() {
+ return isSnapshotCaptureInitiated;
+ }
+
+ public List<FollowerInfo> getFollowerInfoList() {
+ return followerInfoList;
+ }
+
+ public Map<String, String> getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ public static class Builder {
+ private final OnDemandRaftState stats = new OnDemandRaftState();
+
+ public Builder lastLogIndex(long value) {
+ stats.lastLogIndex = value;
+ return this;
+ }
+
+ public Builder lastLogTerm(long value) {
+ stats.lastLogTerm = value;
+ return this;
+ }
+
+ public Builder currentTerm(long value) {
+ stats.currentTerm = value;
+ return this;
+ }
+
+ public Builder commitIndex(long value) {
+ stats.commitIndex = value;
+ return this;
+ }
+
+ public Builder lastApplied(long value) {
+ stats.lastApplied = value;
+ return this;
+ }
+
+ public Builder lastIndex(long value) {
+ stats.lastIndex = value;
+ return this;
+ }
+
+ public Builder lastTerm(long value) {
+ stats.lastTerm = value;
+ return this;
+ }
+
+ public Builder snapshotIndex(long value) {
+ stats.snapshotIndex = value;
+ return this;
+ }
+
+ public Builder snapshotTerm(long value) {
+ stats.snapshotTerm = value;
+ return this;
+ }
+
+ public Builder replicatedToAllIndex(long value) {
+ stats.replicatedToAllIndex = value;
+ return this;
+ }
+
+ public Builder inMemoryJournalDataSize(long value) {
+ stats.inMemoryJournalDataSize = value;
+ return this;
+ }
+
+ public Builder inMemoryJournalLogSize(long value) {
+ stats.inMemoryJournalLogSize = value;
+ return this;
+ }
+
+ public Builder leader(String value) {
+ stats.leader = value;
+ return this;
+ }
+
+ public Builder raftState(String value) {
+ stats.raftState = value;
+ return this;
+ }
+
+ public Builder votedFor(String value) {
+ stats.votedFor = value;
+ return this;
+ }
+
+ public Builder followerInfoList(List<FollowerInfo> followerInfoList) {
+ stats.followerInfoList = followerInfoList;
+ return this;
+ }
+
+ public Builder peerAddresses(Map<String, String> peerAddresses) {
+ stats.peerAddresses = peerAddresses;
+ return this;
+ }
+
+ public Builder isSnapshotCaptureInitiated(boolean value) {
+ stats.isSnapshotCaptureInitiated = value;
+ return this;
+ }
+
+ public OnDemandRaftState build() {
+ return stats;
+ }
+ }
+}
stopwatch.stop();
return stopwatch.elapsed(TimeUnit.MILLISECONDS);
}
+
+ @Test
+ public void testOkToReplicate(){
+ MockRaftActorContext context = new MockRaftActorContext();
+ context.setCommitIndex(9);
+ FollowerLogInformation followerLogInformation =
+ new FollowerLogInformationImpl(
+ "follower1", 10, context);
+
+ assertTrue(followerLogInformation.okToReplicate());
+ assertFalse(followerLogInformation.okToReplicate());
+
+ // wait for 150 milliseconds and it should work again
+ Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+ assertTrue(followerLogInformation.okToReplicate());
+
+ //increment next index and try immediately and it should work again
+ followerLogInformation.incrNextIndex();
+ assertTrue(followerLogInformation.okToReplicate());
+ }
}
assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
}
+
+ private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+ MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+ MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+ 1, index, payload);
+ actorContext.getReplicatedLog().append(newEntry);
+ return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
+ }
+
@Test
public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
1, lastIndex + 1, payload);
actorContext.getReplicatedLog().append(newEntry);
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
- new Replicate(null, null, newEntry));
+ RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
// State should not change
assertTrue(raftBehavior instanceof Leader);
assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
}
+ @Test
+ public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
+ logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(5, TimeUnit.SECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ for(int i=0;i<5;i++) {
+ sendReplicate(actorContext, lastIndex+i+1);
+ }
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ // We expect only 1 message to be sent because of two reasons,
+ // - an append entries reply was not received
+ // - the heartbeat interval has not expired
+ // In this scenario if multiple messages are sent they would likely be duplicates
+ assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
+ }
+
+ @Test
+ public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
+ logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(5, TimeUnit.SECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ for(int i=0;i<3;i++) {
+ sendReplicate(actorContext, lastIndex+i+1);
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex + i + 1, term));
+
+ }
+
+ for(int i=3;i<5;i++) {
+ sendReplicate(actorContext, lastIndex + i + 1);
+ }
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
+ // get sent to the follower - but not the 5th
+ assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
+
+ for(int i=0;i<4;i++) {
+ long expected = allMessages.get(i).getEntries().get(0).getIndex();
+ assertEquals(expected, i+2);
+ }
+ }
+
+ @Test
+ public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
+ logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ sendReplicate(actorContext, lastIndex+1);
+
+ // Wait slightly longer than heartbeat duration
+ Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+ assertEquals(1, allMessages.get(0).getEntries().size());
+ assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+ assertEquals(1, allMessages.get(1).getEntries().size());
+ assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+
+ }
+
+ @Test
+ public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
+ logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ for(int i=0;i<3;i++) {
+ Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ }
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
+ }
+
+ @Test
+ public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
+ logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ sendReplicate(actorContext, lastIndex+1);
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+ assertEquals(0, allMessages.get(0).getEntries().size());
+ assertEquals(1, allMessages.get(1).getEntries().size());
+ }
+
+
@Test
public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
+import org.slf4j.Logger;
+
+/**
+ * A factory for creating DOM transactions, either normal or chained.
+ *
+ * @author Thomas Pantelis
+ */
+public class DOMTransactionFactory {
+
+ private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
+ private final InMemoryDOMDataStore store;
+ private final ShardStats shardMBean;
+ private final Logger log;
+ private final String name;
+
+ public DOMTransactionFactory(InMemoryDOMDataStore store, ShardStats shardMBean, Logger log, String name) {
+ this.store = store;
+ this.shardMBean = shardMBean;
+ this.log = log;
+ this.name = name;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends DOMStoreTransaction> T newTransaction(TransactionProxy.TransactionType type,
+ String transactionID, String transactionChainID) {
+
+ DOMStoreTransactionFactory factory = store;
+
+ if(!transactionChainID.isEmpty()) {
+ factory = transactionChains.get(transactionChainID);
+ if(factory == null) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Creating transaction with ID {} from chain {}", name, transactionID,
+ transactionChainID);
+ }
+
+ DOMStoreTransactionChain transactionChain = store.createTransactionChain();
+ transactionChains.put(transactionChainID, transactionChain);
+ factory = transactionChain;
+ }
+ } else {
+ log.debug("{}: Creating transaction with ID {}", name, transactionID);
+ }
+
+ T transaction = null;
+ switch(type) {
+ case READ_ONLY:
+ transaction = (T) factory.newReadOnlyTransaction();
+ shardMBean.incrementReadOnlyTransactionCount();
+ break;
+ case READ_WRITE:
+ transaction = (T) factory.newReadWriteTransaction();
+ shardMBean.incrementReadWriteTransactionCount();
+ break;
+ case WRITE_ONLY:
+ transaction = (T) factory.newWriteOnlyTransaction();
+ shardMBean.incrementWriteOnlyTransactionCount();
+ break;
+ }
+
+ return transaction;
+ }
+
+ public void closeTransactionChain(String transactionChainID) {
+ DOMStoreTransactionChain chain =
+ transactionChains.remove(transactionChainID);
+
+ if(chain != null) {
+ chain.close();
+ }
+ }
+
+ public void closeAllTransactionChains() {
+ for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
+ entry.getValue().close();
+ }
+
+ transactionChains.clear();
+ }
+}
private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
+ private boolean writeOnlyTransactionOptimizationsEnabled = false;
private DatastoreContext() {
setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
this.dataStoreType = other.dataStoreType;
this.shardBatchedModificationCount = other.shardBatchedModificationCount;
+ this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
return shardBatchedModificationCount;
}
+ public boolean isWriteOnlyTransactionOptimizationsEnabled() {
+ return writeOnlyTransactionOptimizationsEnabled;
+ }
+
public static class Builder {
private final DatastoreContext datastoreContext;
private int maxShardDataChangeExecutorPoolSize =
return this;
}
+ public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) {
+ datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
+ return this;
+ }
+
public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
return this;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private final InMemoryDOMDataStore store;
/// The name of this shard
- private final ShardIdentifier name;
+ private final String name;
private final ShardStats shardMBean;
private ShardRecoveryCoordinator recoveryCoordinator;
private List<Object> currentLogRecoveryBatch;
- private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
+ private final DOMTransactionFactory transactionFactory;
private final String txnDispatcherPath;
super(name.toString(), mapPeerAddresses(peerAddresses),
Optional.of(datastoreContext.getShardRaftConfig()));
- this.name = name;
+ this.name = name.toString();
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
this.dataPersistenceProvider = (datastoreContext.isPersistent())
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
datastoreContext.getDataStoreMXBeanType());
shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
+ shardMBean.setShardActor(getSelf());
if (isMetricsCaptureEnabled()) {
getContext().become(new MeteringBehavior(this));
}
- commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
- datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
+ transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
+
+ commitCoordinator = new ShardCommitCoordinator(transactionFactory,
+ TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
+ datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
setTransactionCommitTimeout();
try {
if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCreateTransaction(message);
+ } else if (BatchedModifications.class.isInstance(message)) {
+ handleBatchedModifications((BatchedModifications)message);
} else if (message instanceof ForwardedReadyTransaction) {
handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
} else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
// currently uses a same thread executor anyway.
cohortEntry.getCohort().preCommit().get();
- // If we do not have any followers and we are not using persistence we can
- // apply modification to the state immediately
- if(!hasFollowers() && !persistence().isRecoveryApplicable()){
+ // If we do not have any followers and we are not using persistence
+ // or if cohortEntry has no modifications
+ // we can apply modification to the state immediately
+ if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
} else {
Shard.this.persistData(getSender(), transactionID,
commitCoordinator.handleCanCommit(canCommit, getSender(), self());
}
+ private void handleBatchedModifications(BatchedModifications batched) {
+ // This message is sent to prepare the modificationsa transaction directly on the Shard as an
+ // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
+ // BatchedModifications message, the caller sets the ready flag in the message indicating
+ // modifications are complete. The reply contains the cohort actor path (this actor) for the caller
+ // to initiate the 3-phase commit. This also avoids the overhead of sending an additional
+ // ReadyTransaction message.
+
+ // If we're not the leader then forward to the leader. This is a safety measure - we shouldn't
+ // normally get here if we're not the leader as the front-end (TransactionProxy) should determine
+ // the primary/leader shard. However with timing and caching on the front-end, there's a small
+ // window where it could have a stale leader during leadership transitions.
+ //
+ if(isLeader()) {
+ try {
+ BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
+ sender().tell(reply, self());
+ } catch (Exception e) {
+ LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
+ batched.getTransactionID(), e);
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ } else {
+ ActorSelection leader = getLeader();
+ if(leader != null) {
+ // TODO: what if this is not the first batch and leadership changed in between batched messages?
+ // We could check if the commitCoordinator already has a cached entry and forward all the previous
+ // batched modifications.
+ LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
+ leader.forward(batched, getContext());
+ } else {
+ // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
+ // it more resilient in case we're in the process of electing a new leader.
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Could not find the leader for shard %s. This typically happens" +
+ " when the system is coming up or recovering and a leader is being elected. Try again" +
+ " later.", persistenceId()))), getSelf());
+ }
+ }
+ }
+
private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
ready.getTransactionID(), ready.getTxnClientVersion());
// commitCoordinator in preparation for the subsequent three phase commit initiated by
// the front-end.
commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
- ready.getModification());
+ (MutableCompositeModification) ready.getModification());
// Return our actor path as we'll handle the three phase commit, except if the Tx client
// version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- DOMStoreTransactionChain chain =
- transactionChains.remove(closeTransactionChain.getTransactionChainId());
-
- if(chain != null) {
- chain.close();
- }
+ transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
}
private ActorRef createTypedTransactionActor(int transactionType,
ShardTransactionIdentifier transactionId, String transactionChainId,
short clientVersion ) {
- DOMStoreTransactionFactory factory = store;
-
- if(!transactionChainId.isEmpty()) {
- factory = transactionChains.get(transactionChainId);
- if(factory == null){
- DOMStoreTransactionChain transactionChain = store.createTransactionChain();
- transactionChains.put(transactionChainId, transactionChain);
- factory = transactionChain;
- }
- }
-
- if(this.schemaContext == null) {
- throw new IllegalStateException("SchemaContext is not set");
- }
-
- if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
-
- shardMBean.incrementWriteOnlyTransactionCount();
-
- return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
-
- } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
-
- shardMBean.incrementReadWriteTransactionCount();
+ DOMStoreTransaction transaction = transactionFactory.newTransaction(
+ TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(),
+ transactionChainId);
- return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
-
- } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
-
- shardMBean.incrementReadOnlyTransactionCount();
-
- return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
-
- } else {
- throw new IllegalArgumentException(
- "Shard="+name + ":CreateTransaction message has unidentified transaction type="
- + transactionType);
- }
+ return createShardTransaction(transaction, transactionId, clientVersion);
}
private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
recoveryCoordinator = null;
currentLogRecoveryBatch = null;
- updateJournalStats();
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
persistenceId(), data, data.getClass().getClassLoader(),
CompositeModificationPayload.class.getClassLoader());
}
-
- updateJournalStats();
-
}
private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
}
}
- private void updateJournalStats() {
- ReplicatedLogEntry lastLogEntry = getLastLogEntry();
-
- if (lastLogEntry != null) {
- shardMBean.setLastLogIndex(lastLogEntry.getIndex());
- shardMBean.setLastLogTerm(lastLogEntry.getTerm());
- }
-
- shardMBean.setCommitIndex(getCommitIndex());
- shardMBean.setLastApplied(getLastApplied());
- shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize());
- }
-
@Override
protected void createSnapshot() {
// Create a transaction actor. We are really going to treat the transaction as a worker
delayedListenerRegistrations.clear();
}
- shardMBean.setRaftState(getRaftState().name());
- shardMBean.setCurrentTerm(getCurrentTerm());
-
// If this actor is no longer the leader close all the transaction chains
- if(!isLeader){
- for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
- if(LOG.isDebugEnabled()) {
- LOG.debug(
- "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
- persistenceId(), entry.getKey(), getId());
- }
- entry.getValue().close();
+ if(!isLeader) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader",
+ persistenceId(), getId());
}
- transactionChains.clear();
+ transactionFactory.closeAllTransactionChains();
}
}
return dataPersistenceProvider;
}
- @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) {
- shardMBean.setLeader(newLeader);
- }
-
@Override public String persistenceId() {
- return this.name.toString();
+ return this.name;
}
@VisibleForTesting
return dataPersistenceProvider;
}
+ @VisibleForTesting
+ ShardCommitCoordinator getCommitCoordinator() {
+ return commitCoordinator;
+ }
+
+
private static class ShardCreator implements Creator<Shard> {
private static final long serialVersionUID = 1L;
import akka.actor.ActorRef;
import akka.actor.Status;
+import akka.serialization.Serialization;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.slf4j.Logger;
/**
*/
public class ShardCommitCoordinator {
+ // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
+ public interface CohortDecorator {
+ DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual);
+ }
+
private final Cache<String, CohortEntry> cohortCache;
private CohortEntry currentCohortEntry;
+ private final DOMTransactionFactory transactionFactory;
+
private final Queue<CohortEntry> queuedCohortEntries;
private int queueCapacity;
private final String name;
- public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log,
- String name) {
- cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
- cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
+ private final String shardActorPath;
+
+ private final RemovalListener<String, CohortEntry> cacheRemovalListener =
+ new RemovalListener<String, CohortEntry>() {
+ @Override
+ public void onRemoval(RemovalNotification<String, CohortEntry> notification) {
+ if(notification.getCause() == RemovalCause.EXPIRED) {
+ log.warn("{}: Transaction {} was timed out of the cache", name, notification.getKey());
+ }
+ }
+ };
+
+ // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
+ private CohortDecorator cohortDecorator;
+
+ public ShardCommitCoordinator(DOMTransactionFactory transactionFactory,
+ long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
this.queueCapacity = queueCapacity;
this.log = log;
this.name = name;
+ this.transactionFactory = transactionFactory;
+
+ shardActorPath = Serialization.serializedActorPath(shardActor);
+
+ cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
+ removalListener(cacheRemovalListener).build();
// We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
// since this should only be accessed on the shard's dispatcher.
}
/**
- * This method caches a cohort entry for the given transactions ID in preparation for the
- * subsequent 3-phase commit.
+ * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
+ * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
*
* @param transactionID the ID of the transaction
* @param cohort the cohort to participate in the transaction commit
- * @param modification the modification made by the transaction
+ * @param modification the modifications made by the transaction
*/
public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
- Modification modification) {
+ MutableCompositeModification modification) {
cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
}
+ /**
+ * This method handles a BatchedModifications message for a transaction being prepared directly on the
+ * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
+ * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
+ * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
+ *
+ * @param batched the BatchedModifications
+ * @param shardActor the transaction's shard actor
+ *
+ * @throws ExecutionException if an error occurs loading the cache
+ */
+ public BatchedModificationsReply handleTransactionModifications(BatchedModifications batched)
+ throws ExecutionException {
+ CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
+ if(cohortEntry == null) {
+ cohortEntry = new CohortEntry(batched.getTransactionID(),
+ transactionFactory.<DOMStoreWriteTransaction>newTransaction(
+ TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(),
+ batched.getTransactionChainID()));
+ cohortCache.put(batched.getTransactionID(), cohortEntry);
+ }
+
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Applying {} batched modifications for Tx {}", name,
+ batched.getModifications().size(), batched.getTransactionID());
+ }
+
+ cohortEntry.applyModifications(batched.getModifications());
+
+ String cohortPath = null;
+ if(batched.isReady()) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Readying Tx {}, client version {}", name,
+ batched.getTransactionID(), batched.getVersion());
+ }
+
+ cohortEntry.ready(cohortDecorator);
+ cohortPath = shardActorPath;
+ }
+
+ return new BatchedModificationsReply(batched.getModifications().size(), cohortPath);
+ }
+
/**
* This method handles the canCommit phase for a transaction.
*
}
}
+ @VisibleForTesting
+ void setCohortDecorator(CohortDecorator cohortDecorator) {
+ this.cohortDecorator = cohortDecorator;
+ }
+
+
static class CohortEntry {
private final String transactionID;
- private final DOMStoreThreePhaseCommitCohort cohort;
- private final Modification modification;
+ private DOMStoreThreePhaseCommitCohort cohort;
+ private final MutableCompositeModification compositeModification;
+ private final DOMStoreWriteTransaction transaction;
private ActorRef canCommitSender;
private ActorRef shard;
private long lastAccessTime;
+ CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) {
+ this.compositeModification = new MutableCompositeModification();
+ this.transaction = transaction;
+ this.transactionID = transactionID;
+ }
+
CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
- Modification modification) {
+ MutableCompositeModification compositeModification) {
this.transactionID = transactionID;
this.cohort = cohort;
- this.modification = modification;
+ this.compositeModification = compositeModification;
+ this.transaction = null;
}
void updateLastAccessTime() {
return cohort;
}
- Modification getModification() {
- return modification;
+ MutableCompositeModification getModification() {
+ return compositeModification;
+ }
+
+ void applyModifications(Iterable<Modification> modifications) {
+ for(Modification modification: modifications) {
+ compositeModification.addModification(modification);
+ modification.apply(transaction);
+ }
+ }
+
+ void ready(CohortDecorator cohortDecorator) {
+ Preconditions.checkState(cohort == null, "cohort was already set");
+
+ cohort = transaction.ready();
+
+ if(cohortDecorator != null) {
+ // Call the hook for unit tests.
+ cohort = cohortDecorator.decorate(transactionID, cohort);
+ }
}
ActorRef getCanCommitSender() {
void setShard(ActorRef shard) {
this.shard = shard;
}
+
+ boolean hasModifications(){
+ return compositeModification.getModifications().size() > 0;
+ }
}
}
import java.util.List;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
public class TransactionContextImpl extends AbstractTransactionContext {
private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
+ private final String transactionChainId;
private final ActorContext actorContext;
- private final String transactionPath;
private final ActorSelection actor;
private final boolean isTxActorLocal;
private final short remoteTransactionVersion;
private final OperationCompleter operationCompleter;
private BatchedModifications batchedModifications;
- protected TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
- ActorContext actorContext, SchemaContext schemaContext,
- boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) {
+ protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+ String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+ short remoteTransactionVersion, OperationCompleter operationCompleter) {
super(identifier);
- this.transactionPath = transactionPath;
this.actor = actor;
+ this.transactionChainId = transactionChainId;
this.actorContext = actorContext;
this.isTxActorLocal = isTxActorLocal;
this.remoteTransactionVersion = remoteTransactionVersion;
return actor;
}
+ protected ActorContext getActorContext() {
+ return actorContext;
+ }
+
protected short getRemoteTransactionVersion() {
return remoteTransactionVersion;
}
// Send the remaining batched modifications if any.
- sendBatchedModifications();
+ sendAndRecordBatchedModifications();
// Send the ReadyTransaction message to the Tx actor.
- final Future<Object> replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+ Future<Object> readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+ return combineRecordedOperationsFutures(readyReplyFuture);
+ }
+
+ protected Future<ActorSelection> combineRecordedOperationsFutures(final Future<Object> withLastReplyFuture) {
// Combine all the previously recorded put/merge/delete operation reply Futures and the
// ReadyTransactionReply Future into one Future. If any one fails then the combined
// Future will fail. We need all prior operations and the ready operation to succeed
// in order to attempt commit.
- List<Future<Object>> futureList =
- Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
+ List<Future<Object>> futureList = Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
futureList.addAll(recordedOperationFutures);
- futureList.add(replyFuture);
+ futureList.add(withLastReplyFuture);
Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
actorContext.getClientDispatcher());
// de-serializing each reply.
// Note the Future get call here won't block as it's complete.
- Object serializedReadyReply = replyFuture.value().get().get();
+ Object serializedReadyReply = withLastReplyFuture.value().get().get();
if (serializedReadyReply instanceof ReadyTransactionReply) {
return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
-
+ } else if(serializedReadyReply instanceof BatchedModificationsReply) {
+ return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath());
} else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
- String cohortPath = reply.getCohortPath();
-
- // In Helium we used to return the local path of the actor which represented
- // a remote ThreePhaseCommitCohort. The local path would then be converted to
- // a remote path using this resolvePath method. To maintain compatibility with
- // a Helium node we need to continue to do this conversion.
- // At some point in the future when upgrades from Helium are not supported
- // we could remove this code to resolvePath and just use the cohortPath as the
- // resolved cohortPath
- if(TransactionContextImpl.this.remoteTransactionVersion <
- DataStoreVersions.HELIUM_1_VERSION) {
- cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
- }
-
+ String cohortPath = deserializeCohortPath(reply.getCohortPath());
return actorContext.actorSelection(cohortPath);
-
} else {
// Throwing an exception here will fail the Future.
throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
}, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
}
+ protected String deserializeCohortPath(String cohortPath) {
+ return cohortPath;
+ }
+
private void batchModification(Modification modification) {
if(batchedModifications == null) {
- batchedModifications = new BatchedModifications(remoteTransactionVersion);
+ batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion,
+ transactionChainId);
}
batchedModifications.addModification(modification);
if(batchedModifications.getModifications().size() >=
actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
- sendBatchedModifications();
+ sendAndRecordBatchedModifications();
}
}
- private void sendBatchedModifications() {
+ private void sendAndRecordBatchedModifications() {
+ Future<Object> sentFuture = sendBatchedModifications();
+ if(sentFuture != null) {
+ recordedOperationFutures.add(sentFuture);
+ }
+ }
+
+ protected Future<Object> sendBatchedModifications() {
+ return sendBatchedModifications(false);
+ }
+
+ protected Future<Object> sendBatchedModifications(boolean ready) {
+ Future<Object> sent = null;
if(batchedModifications != null) {
- LOG.debug("Tx {} sending {} batched modifications", identifier,
- batchedModifications.getModifications().size());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} sending {} batched modifications, ready: {}", identifier,
+ batchedModifications.getModifications().size(), ready);
+ }
- recordedOperationFutures.add(executeOperationAsync(batchedModifications));
- batchedModifications = null;
+ batchedModifications.setReady(ready);
+ sent = executeOperationAsync(batchedModifications);
+
+ batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion,
+ transactionChainId);
}
+
+ return sent;
}
@Override
// Send the remaining batched modifications if any.
- sendBatchedModifications();
+ sendAndRecordBatchedModifications();
// If there were any previous recorded put/merge/delete operation reply Futures then we
// must wait for them to successfully complete. This is necessary to honor the read
// Send the remaining batched modifications if any.
- sendBatchedModifications();
+ sendAndRecordBatchedModifications();
// If there were any previous recorded put/merge/delete operation reply Futures then we
// must wait for them to successfully complete. This is necessary to honor the read
public static enum TransactionType {
READ_ONLY,
WRITE_ONLY,
- READ_WRITE
+ READ_WRITE;
+
+ public static TransactionType fromInt(int type) {
+ if(type == WRITE_ONLY.ordinal()) {
+ return WRITE_ONLY;
+ } else if(type == READ_WRITE.ordinal()) {
+ return READ_WRITE;
+ } else if(type == READ_ONLY.ordinal()) {
+ return READ_ONLY;
+ } else {
+ throw new IllegalArgumentException("In TransactionType enum value" + type);
+ }
+ }
}
static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
* Sets the target primary shard and initiates a CreateTransaction try.
*/
void setPrimaryShard(ActorSelection primaryShard) {
- LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
-
this.primaryShard = primaryShard;
- tryCreateTransaction();
+
+ if(transactionType == TransactionType.WRITE_ONLY &&
+ actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+ LOG.debug("Tx {} Primary shard found - creating WRITE_ONLY transaction context", identifier);
+
+ // For write-only Tx's we prepare the transaction modifications directly on the shard actor
+ // to avoid the overhead of creating a separate transaction actor.
+ // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
+ executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
+ this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
+ } else {
+ tryCreateTransaction();
+ }
}
/**
boolean invokeOperation = true;
synchronized(txOperationsOnComplete) {
if(transactionContext == null) {
- LOG.debug("Tx {} Adding operation on complete {}", identifier);
+ LOG.debug("Tx {} Adding operation on complete", identifier);
invokeOperation = false;
txOperationsOnComplete.add(operation);
* Performs a CreateTransaction try async.
*/
private void tryCreateTransaction() {
+ LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
+
Object serializedCreateMessage = new CreateTransaction(identifier.toString(),
TransactionProxy.this.transactionType.ordinal(),
getTransactionChainId()).toSerializable();
// TransactionContext until after we've executed all cached TransactionOperations.
TransactionContext localTransactionContext;
if(failure != null) {
- LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
- failure.getMessage());
+ LOG.debug("Tx {} Creating NoOpTransaction because of error", identifier, failure);
localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
} else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
}
private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
- String transactionPath = reply.getTransactionPath();
-
LOG.debug("Tx {} Received {}", identifier, reply);
- ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
+ return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
+ reply.getTransactionPath(), reply.getVersion());
+ }
+
+ private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
+ String transactionPath, short remoteTransactionVersion) {
if (transactionType == TransactionType.READ_ONLY) {
// Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
// Check if TxActor is created in the same node
boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
- if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
- return new TransactionContextImpl(transactionPath, transactionActor, identifier,
- actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
- } else {
+ if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier,
- actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+ transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
+ operationCompleter);
+ } else if (transactionType == TransactionType.WRITE_ONLY &&
+ actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+ return new WriteOnlyTransactionContextImpl(transactionActor, identifier, transactionChainId,
+ actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
+ } else {
+ return new TransactionContextImpl(transactionActor, identifier, transactionChainId,
+ actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * Context for a write-only transaction.
+ *
+ * @author Thomas Pantelis
+ */
+public class WriteOnlyTransactionContextImpl extends TransactionContextImpl {
+ private static final Logger LOG = LoggerFactory.getLogger(WriteOnlyTransactionContextImpl.class);
+
+ public WriteOnlyTransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+ String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+ short remoteTransactionVersion, OperationCompleter operationCompleter) {
+ super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal,
+ remoteTransactionVersion, operationCompleter);
+ }
+
+ @Override
+ public Future<ActorSelection> readyTransaction() {
+ LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+ identifier, recordedOperationFutures.size());
+
+ // Send the remaining batched modifications if any.
+
+ Future<Object> lastModificationsFuture = sendBatchedModifications(true);
+
+ return combineRecordedOperationsFutures(lastModificationsFuture);
+ }
+}
package org.opendaylight.controller.cluster.datastore.compat;
import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.OperationCompleter;
import org.opendaylight.controller.cluster.datastore.TransactionContextImpl;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
/**
* Implementation of TransactionContextImpl used when talking to a pre-Lithium controller that doesn't
* @author Thomas Pantelis
*/
public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
+ private static final Logger LOG = LoggerFactory.getLogger(PreLithiumTransactionContextImpl.class);
+
+ private final String transactionPath;
public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
- ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+ String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationCompleter operationCompleter) {
- super(transactionPath, actor, identifier, actorContext, schemaContext, isTxActorLocal,
- remoteTransactionVersion, operationCompleter);
+ super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal,
+ remoteTransactionVersion, operationCompleter);
+ this.transactionPath = transactionPath;
}
@Override
recordedOperationFutures.add(executeOperationAsync(
new WriteData(path, data, getRemoteTransactionVersion())));
}
+
+ @Override
+ public Future<ActorSelection> readyTransaction() {
+ LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+ identifier, recordedOperationFutures.size());
+
+ // Send the ReadyTransaction message to the Tx actor.
+
+ Future<Object> lastReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+
+ return combineRecordedOperationsFutures(lastReplyFuture);
+ }
+
+ @Override
+ protected String deserializeCohortPath(String cohortPath) {
+ // In base Helium we used to return the local path of the actor which represented
+ // a remote ThreePhaseCommitCohort. The local path would then be converted to
+ // a remote path using this resolvePath method. To maintain compatibility with
+ // a Helium node we need to continue to do this conversion.
+ // At some point in the future when upgrades from Helium are not supported
+ // we could remove this code to resolvePath and just use the cohortPath as the
+ // resolved cohortPath
+ if(getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+ return getActorContext().resolvePath(transactionPath, cohortPath);
+ }
+
+ return cohortPath;
+ }
}
public class ShardTransactionIdentifier {
private final String remoteTransactionId;
+ private final String stringRepresentation;
public ShardTransactionIdentifier(String remoteTransactionId) {
this.remoteTransactionId = Preconditions.checkNotNull(remoteTransactionId,
"remoteTransactionId should not be null");
+
+ stringRepresentation = new StringBuilder(remoteTransactionId.length() + 6).append("shard-").
+ append(remoteTransactionId).toString();
}
public String getRemoteTransactionId() {
}
@Override public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("shard-").append(remoteTransactionId);
- return sb.toString();
+ return stringRepresentation;
}
}
package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Stopwatch;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
/**
* Maintains statistics for a shard.
public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
public static String JMX_CATEGORY_SHARD = "Shards";
+ private static final Logger LOG = LoggerFactory.getLogger(ShardStats.class);
+
+ private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+
+ private static final Cache<String, OnDemandRaftState> onDemandRaftStateCache =
+ CacheBuilder.newBuilder().expireAfterWrite(2, TimeUnit.SECONDS).build();
+
private long committedTransactionsCount;
private long readOnlyTransactionCount;
private long readWriteTransactionCount;
- private String leader;
-
- private String raftState;
-
- private long lastLogTerm = -1L;
-
- private long lastLogIndex = -1L;
-
- private long currentTerm = -1L;
-
- private long commitIndex = -1L;
-
- private long lastApplied = -1L;
-
private long lastCommittedTransactionTime;
private long failedTransactionsCount;
private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
- private long dataSize = 0;
+ private boolean followerInitialSyncStatus = false;
- private final SimpleDateFormat sdf =
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ private ActorRef shardActor;
- private boolean followerInitialSyncStatus = false;
+ private String statRetrievalError;
+
+ private String statRetrievalTime;
public ShardStats(final String shardName, final String mxBeanType) {
super(shardName, mxBeanType, JMX_CATEGORY_SHARD);
this.notificationExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(manager.getExecutor());
}
+ public void setShardActor(ActorRef shardActor) {
+ this.shardActor = shardActor;
+ }
+
+ private OnDemandRaftState getOnDemandRaftState() {
+ String name = getShardName();
+ OnDemandRaftState state = onDemandRaftStateCache.getIfPresent(name);
+ if(state == null) {
+ statRetrievalError = null;
+ statRetrievalTime = null;
+
+ if(shardActor != null) {
+ Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
+ try {
+ Stopwatch timer = Stopwatch.createStarted();
+
+ state = (OnDemandRaftState) Await.result(Patterns.ask(shardActor,
+ GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
+
+ statRetrievalTime = timer.stop().toString();
+ onDemandRaftStateCache.put(name, state);
+ } catch (Exception e) {
+ statRetrievalError = e.toString();
+ }
+ }
+
+ state = state != null ? state : OnDemandRaftState.builder().build();
+ }
+
+ return state;
+ }
+
@Override
public String getShardName() {
return getMBeanName();
@Override
public String getLeader() {
- return leader;
+ return getOnDemandRaftState().getLeader();
}
@Override
public String getRaftState() {
- return raftState;
+ return getOnDemandRaftState().getRaftState();
}
@Override
@Override
public long getLastLogIndex() {
- return lastLogIndex;
+ return getOnDemandRaftState().getLastLogIndex();
}
@Override
public long getLastLogTerm() {
- return lastLogTerm;
+ return getOnDemandRaftState().getLastLogTerm();
}
@Override
public long getCurrentTerm() {
- return currentTerm;
+ return getOnDemandRaftState().getCurrentTerm();
}
@Override
public long getCommitIndex() {
- return commitIndex;
+ return getOnDemandRaftState().getCommitIndex();
}
@Override
public long getLastApplied() {
- return lastApplied;
+ return getOnDemandRaftState().getLastApplied();
}
@Override
- public String getLastCommittedTransactionTime() {
+ public long getLastIndex() {
+ return getOnDemandRaftState().getLastIndex();
+ }
- return sdf.format(new Date(lastCommittedTransactionTime));
+ @Override
+ public long getLastTerm() {
+ return getOnDemandRaftState().getLastTerm();
+ }
+
+ @Override
+ public long getSnapshotIndex() {
+ return getOnDemandRaftState().getSnapshotIndex();
+ }
+
+ @Override
+ public long getSnapshotTerm() {
+ return getOnDemandRaftState().getSnapshotTerm();
+ }
+
+ @Override
+ public long getReplicatedToAllIndex() {
+ return getOnDemandRaftState().getReplicatedToAllIndex();
+ }
+
+ @Override
+ public String getVotedFor() {
+ return getOnDemandRaftState().getVotedFor();
+ }
+
+ @Override
+ public boolean isSnapshotCaptureInitiated() {
+ return getOnDemandRaftState().isSnapshotCaptureInitiated();
+ }
+
+ @Override
+ public String getLastCommittedTransactionTime() {
+ return DATE_FORMAT.format(new Date(lastCommittedTransactionTime));
}
@Override
return ++abortTransactionsCount;
}
- public void setLeader(final String leader) {
- this.leader = leader;
- }
-
- public void setRaftState(final String raftState) {
- this.raftState = raftState;
- }
-
- public void setLastLogTerm(final long lastLogTerm) {
- this.lastLogTerm = lastLogTerm;
- }
-
- public void setLastLogIndex(final long lastLogIndex) {
- this.lastLogIndex = lastLogIndex;
- }
-
- public void setCurrentTerm(final long currentTerm) {
- this.currentTerm = currentTerm;
- }
-
- public void setCommitIndex(final long commitIndex) {
- this.commitIndex = commitIndex;
- }
-
- public void setLastApplied(final long lastApplied) {
- this.lastApplied = lastApplied;
- }
-
public void setLastCommittedTransactionTime(final long lastCommittedTransactionTime) {
this.lastCommittedTransactionTime = lastCommittedTransactionTime;
}
- public void setInMemoryJournalDataSize(long dataSize){
- this.dataSize = dataSize;
+ @Override
+ public long getInMemoryJournalDataSize(){
+ return getOnDemandRaftState().getInMemoryJournalDataSize();
}
@Override
- public long getInMemoryJournalDataSize(){
- return dataSize;
+ public long getInMemoryJournalLogSize() {
+ return getOnDemandRaftState().getInMemoryJournalLogSize();
}
@Override
public boolean getFollowerInitialSyncStatus() {
return followerInitialSyncStatus;
}
+
+ @Override
+ public List<FollowerInfo> getFollowerInfo() {
+ return getOnDemandRaftState().getFollowerInfoList();
+ }
+
+ @Override
+ public String getPeerAddresses() {
+ StringBuilder builder = new StringBuilder();
+ int i = 0;
+ for(Map.Entry<String, String> e: getOnDemandRaftState().getPeerAddresses().entrySet()) {
+ if(i++ > 0) {
+ builder.append(", ");
+ }
+
+ builder.append(e.getKey()).append(": ").append(e.getValue());
+ }
+
+ return builder.toString();
+ }
+
+ @Override
+ public String getStatRetrievalTime() {
+ getOnDemandRaftState();
+ return statRetrievalTime;
+ }
+
+ @Override
+ public String getStatRetrievalError() {
+ getOnDemandRaftState();
+ return statRetrievalError;
+ }
}
package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
import java.util.List;
-
+import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
String getShardName();
+ String getStatRetrievalTime();
+
+ String getStatRetrievalError();
+
long getCommittedTransactionsCount();
long getReadOnlyTransactionCount();
long getLastApplied();
+ long getLastIndex();
+
+ long getLastTerm();
+
+ long getSnapshotIndex();
+
+ long getSnapshotTerm();
+
+ long getReplicatedToAllIndex();
+
String getLastCommittedTransactionTime();
long getFailedTransactionsCount();
String getRaftState();
+ String getVotedFor();
+
+ boolean isSnapshotCaptureInitiated();
+
ThreadExecutorStats getDataStoreExecutorStats();
ThreadExecutorStats getNotificationMgrExecutorStats();
long getInMemoryJournalDataSize();
+ long getInMemoryJournalLogSize();
+
boolean getFollowerInitialSyncStatus();
+
+ List<FollowerInfo> getFollowerInfo();
+
+ String getPeerAddresses();
}
*/
package org.opendaylight.controller.cluster.datastore.messages;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
/**
public class BatchedModifications extends MutableCompositeModification implements SerializableMessage {
private static final long serialVersionUID = 1L;
+ private boolean ready;
+ private String transactionID;
+ private String transactionChainID;
+
public BatchedModifications() {
}
- public BatchedModifications(short version) {
+ public BatchedModifications(String transactionID, short version, String transactionChainID) {
super(version);
+ this.transactionID = Preconditions.checkNotNull(transactionID, "transactionID can't be null");
+ this.transactionChainID = transactionChainID != null ? transactionChainID : "";
+ }
+
+ public boolean isReady() {
+ return ready;
+ }
+
+ public void setReady(boolean ready) {
+ this.ready = ready;
+ }
+
+ public String getTransactionID() {
+ return transactionID;
+ }
+
+ public String getTransactionChainID() {
+ return transactionChainID;
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+ transactionID = in.readUTF();
+ transactionChainID = in.readUTF();
+ ready = in.readBoolean();
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+ out.writeUTF(transactionID);
+ out.writeUTF(transactionChainID);
+ out.writeBoolean(ready);
}
@Override
public Object toSerializable() {
return this;
}
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("BatchedModifications [transactionID=").append(transactionID).append(", ready=").append(ready)
+ .append(", modifications size=").append(getModifications().size()).append("]");
+ return builder.toString();
+ }
}
public class BatchedModificationsReply extends VersionedExternalizableMessage {
private static final long serialVersionUID = 1L;
+ private static final byte COHORT_PATH_NOT_PRESENT = 0;
+ private static final byte COHORT_PATH_PRESENT = 1;
+
private int numBatched;
+ private String cohortPath;
public BatchedModificationsReply() {
}
this.numBatched = numBatched;
}
+ public BatchedModificationsReply(int numBatched, String cohortPath) {
+ this.numBatched = numBatched;
+ this.cohortPath = cohortPath;
+ }
public int getNumBatched() {
return numBatched;
}
+ public String getCohortPath() {
+ return cohortPath;
+ }
+
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
numBatched = in.readInt();
+
+ if(in.readByte() == COHORT_PATH_PRESENT) {
+ cohortPath = in.readUTF();
+ }
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
out.writeInt(numBatched);
+
+ if(cohortPath != null) {
+ out.writeByte(COHORT_PATH_PRESENT);
+ out.writeUTF(cohortPath);
+ } else {
+ out.writeByte(COHORT_PATH_NOT_PRESENT);
+ }
}
@Override
public Object toSerializable() {
return this;
}
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append(", cohortPath=")
+ .append(cohortPath).append("]");
+ return builder.toString();
+ }
}
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
/**
* Abstract base class for a versioned Externalizable message.
public abstract class VersionedExternalizableMessage implements Externalizable, SerializableMessage {
private static final long serialVersionUID = 1L;
- private short version;
+ private short version = DataStoreVersions.CURRENT_VERSION;
public VersionedExternalizableMessage() {
}
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
- protected void verifyLastLogIndex(TestActorRef<Shard> shard, long expectedValue) {
+ protected void verifyLastApplied(TestActorRef<Shard> shard, long expectedValue) {
+ long lastApplied = -1;
for(int i = 0; i < 20 * 5; i++) {
- long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
- if(lastLogIndex == expectedValue) {
- break;
+ lastApplied = shard.underlyingActor().getShardMBean().getLastApplied();
+ if(lastApplied == expectedValue) {
+ return;
}
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
}
- assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex());
+ Assert.fail(String.format("Expected last applied: %d, Actual: %d", expectedValue, lastApplied));
}
protected NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
protected final String memberName = "mock-member";
- protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).
- shardBatchedModificationCount(1);
+ protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2);
@BeforeClass
public static void setUpClass() throws IOException {
eq(actorSelection(actorRef)), isA(BatchedModifications.class));
}
+ protected void expectBatchedModificationsReady(ActorRef actorRef, int count) {
+ Future<BatchedModificationsReply> replyFuture = Futures.successful(
+ new BatchedModificationsReply(count, actorRef.path().toString()));
+ doReturn(replyFuture).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+ }
+
protected void expectBatchedModifications(int count) {
doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
any(ActorSelection.class), isA(BatchedModifications.class));
protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
TransactionType type, int transactionVersion, String prefix, ActorRef shardActorRef) {
- ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
- log.info("Created mock shard Tx actor {}", txActorRef);
+ ActorRef txActorRef;
+ if(type == TransactionType.WRITE_ONLY && transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+ dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled()) {
+ txActorRef = shardActorRef;
+ } else {
+ txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+ log.info("Created mock shard Tx actor {}", txActorRef);
- doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext).actorSelection(
- txActorRef.path().toString());
+ doReturn(actorSystem.actorSelection(txActorRef.path())).
+ when(mockActorContext).actorSelection(txActorRef.path().toString());
- doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext).
- executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
- eqCreateTransaction(prefix, type));
+ doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+ eqCreateTransaction(prefix, type));
+ }
return txActorRef;
}
return captured;
}
- protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected) {
+ protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected, boolean expIsReady) {
List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
- verifyBatchedModifications(batchedModifications.get(0), expected);
+ verifyBatchedModifications(batchedModifications.get(0), expIsReady, expected);
}
- protected void verifyBatchedModifications(Object message, Modification... expected) {
+ protected void verifyBatchedModifications(Object message, boolean expIsReady, Modification... expected) {
assertEquals("Message type", BatchedModifications.class, message.getClass());
BatchedModifications batchedModifications = (BatchedModifications)message;
assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
+ assertEquals("isReady", expIsReady, batchedModifications.isReady());
for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
Modification actual = batchedModifications.getModifications().get(i);
assertEquals("Modification type", expected[i].getClass(), actual.getClass());
// Create the write Tx
- final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+ // TODO - we'll want to test this with write-only as well when FindPrimary returns the leader shard.
+ final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
assertNotNull("newReadWriteTransaction returned null", writeTx);
// Do some modification operations and ready the Tx on a separate thread.
// Create the write Tx.
- final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+ final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
assertNotNull("newReadWriteTransaction returned null", writeTx);
// Do some modifications and ready the Tx on a separate thread.
import static org.mockito.Mockito.mock;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.dispatch.Dispatchers;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.mockito.InOrder;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import scala.concurrent.duration.FiniteDuration;
public class ShardTest extends AbstractShardTest {
+
@Test
public void testRegisterChangeListener() throws Exception {
new ShardTestKit(getSystem()) {{
waitUntilLeader(shard);
- // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
-
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
-
- String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+ final String transactionID1 = "tx1";
+ final String transactionID2 = "tx2";
+ final String transactionID3 = "tx3";
- String transactionID2 = "tx2";
- MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
- TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
- modification2);
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort1 = new AtomicReference<>();
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort2 = new AtomicReference<>();
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort3 = new AtomicReference<>();
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) {
+ if(transactionID.equals(transactionID1)) {
+ mockCohort1.set(createDelegatingMockCohort("cohort1", actual));
+ return mockCohort1.get();
+ } else if(transactionID.equals(transactionID2)) {
+ mockCohort2.set(createDelegatingMockCohort("cohort2", actual));
+ return mockCohort2.get();
+ } else {
+ mockCohort3.set(createDelegatingMockCohort("cohort3", actual));
+ return mockCohort3.get();
+ }
+ }
+ };
- String transactionID3 = "tx3";
- MutableCompositeModification modification3 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
- YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
- modification3);
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
long timeoutSec = 5;
final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
- // by the ShardTransaction.
+ // Send a BatchedModifications message for the first transaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
- ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
- assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class);
+ assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath());
+ assertEquals("getNumBatched", 1, batchedReply.getNumBatched());
// Send the CanCommitTransaction message for the first Tx.
expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
- // Send the ForwardedReadyTransaction for the next 2 Tx's.
+ // Send BatchedModifications for the next 2 Tx's.
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder(
+ TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
// processed after the first Tx completes.
assertEquals("Commits complete", true, done);
- InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
- inOrder.verify(cohort1).canCommit();
- inOrder.verify(cohort1).preCommit();
- inOrder.verify(cohort1).commit();
- inOrder.verify(cohort2).canCommit();
- inOrder.verify(cohort2).preCommit();
- inOrder.verify(cohort2).commit();
- inOrder.verify(cohort3).canCommit();
- inOrder.verify(cohort3).preCommit();
- inOrder.verify(cohort3).commit();
+ InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get());
+ inOrder.verify(mockCohort1.get()).canCommit();
+ inOrder.verify(mockCohort1.get()).preCommit();
+ inOrder.verify(mockCohort1.get()).commit();
+ inOrder.verify(mockCohort2.get()).canCommit();
+ inOrder.verify(mockCohort2.get()).preCommit();
+ inOrder.verify(mockCohort2.get()).commit();
+ inOrder.verify(mockCohort3.get()).canCommit();
+ inOrder.verify(mockCohort3.get()).preCommit();
+ inOrder.verify(mockCohort3.get()).commit();
// Verify data in the data store.
assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
- verifyLastLogIndex(shard, 2);
+ verifyLastApplied(shard, 2);
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
+ private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
+ NormalizedNode<?, ?> data, boolean ready) {
+ return newBatchedModifications(transactionID, null, path, data, ready);
+ }
+
+ private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
+ YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready) {
+ BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
+ batched.addModification(new WriteModification(path, data));
+ batched.setReady(ready);
+ return batched;
+ }
+
+ @SuppressWarnings("unchecked")
@Test
- public void testCommitWithPersistenceDisabled() throws Throwable {
- dataStoreContextBuilder.persistent(false);
+ public void testMultipleBatchedModifications() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitPhaseFailure");
+ "testMultipleBatchedModifications");
waitUntilLeader(shard);
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+ final String transactionID = "tx";
+ FiniteDuration duration = duration("5 seconds");
- // Setup a simulated transactions with a mock cohort.
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
+ if(mockCohort.get() == null) {
+ mockCohort.set(createDelegatingMockCohort("cohort", actual));
+ }
- String transactionID = "tx";
- MutableCompositeModification modification = new MutableCompositeModification();
- NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
- TestModel.TEST_PATH, containerNode, modification);
+ return mockCohort.get();
+ }
+ };
- FiniteDuration duration = duration("5 seconds");
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
+ // Send a BatchedModifications to start a transaction.
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ // Send a couple more BatchedModifications.
+
+ shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
+
+ shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
+ TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message.
shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
- InOrder inOrder = inOrder(cohort);
- inOrder.verify(cohort).canCommit();
- inOrder.verify(cohort).preCommit();
- inOrder.verify(cohort).commit();
+ InOrder inOrder = inOrder(mockCohort.get());
+ inOrder.verify(mockCohort.get()).canCommit();
+ inOrder.verify(mockCohort.get()).preCommit();
+ inOrder.verify(mockCohort.get()).commit();
+
+ // Verify data in the data store.
+
+ NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
+ assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
+ assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
+ outerList.getValue() instanceof Iterable);
+ Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
+ assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
+ entry instanceof MapEntryNode);
+ MapEntryNode mapEntry = (MapEntryNode)entry;
+ Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
+ mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
+ assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
+ assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testBatchedModificationsOnTransactionChain() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testBatchedModificationsOnTransactionChain");
+
+ waitUntilLeader(shard);
+
+ String transactionChainID = "txChain";
+ String transactionID1 = "tx1";
+ String transactionID2 = "tx2";
+
+ FiniteDuration duration = duration("5 seconds");
+
+ // Send a BatchedModifications to start a chained write transaction and ready it.
+
+ ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
+ containerNode, true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
+
+ // Create a read Tx on the same chain.
+
+ shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() ,
+ transactionChainID).toSerializable(), getRef());
+
+ CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
+
+ getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
+ ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
+ assertEquals("Read node", containerNode, readReply.getNormalizedNode());
+
+ // Commit the write transaction.
+
+ shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+ // Verify data in the data store.
+
+ NormalizedNode<?, ?> actualNode = readStore(shard, path);
+ assertEquals("Stored node", containerNode, actualNode);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testOnBatchedModificationsWhenNotLeader() {
+ final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
+ new ShardTestKit(getSystem()) {{
+ Creator<Shard> creator = new Creator<Shard>() {
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ newDatastoreContext(), SCHEMA_CONTEXT) {
+ @Override
+ protected boolean isLeader() {
+ return overrideLeaderCalls.get() ? false : super.isLeader();
+ }
+
+ @Override
+ protected ActorSelection getLeader() {
+ return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
+ super.getLeader();
+ }
+ };
+ }
+ };
+
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
+
+ waitUntilLeader(shard);
+
+ overrideLeaderCalls.set(true);
+
+ BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
+
+ shard.tell(batched, ActorRef.noSender());
+
+ expectMsgEquals(batched);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testCommitWithPersistenceDisabled() throws Throwable {
+ dataStoreContextBuilder.persistent(false);
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWithPersistenceDisabled");
+
+ waitUntilLeader(shard);
+
+ String transactionID = "tx";
+ FiniteDuration duration = duration("5 seconds");
+
+ // Send a BatchedModifications to start a transaction.
+
+ NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
}};
}
+ @Test
+ public void testCommitWhenTransactionHasNoModifications(){
+ // Note that persistence is enabled which would normally result in the entry getting written to the journal
+ // but here that need not happen
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWhenTransactionHasNoModifications");
+
+ waitUntilLeader(shard);
+
+ String transactionID = "tx1";
+ MutableCompositeModification modification = new MutableCompositeModification();
+ DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+
+ FiniteDuration duration = duration("5 seconds");
+
+ // Simulate the ForwardedReadyTransaction messages that would be sent
+ // by the ShardTransaction.
+
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+ expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+
+ InOrder inOrder = inOrder(cohort);
+ inOrder.verify(cohort).canCommit();
+ inOrder.verify(cohort).preCommit();
+ inOrder.verify(cohort).commit();
+
+ // Use MBean for verification
+ // Committed transaction count should increase as usual
+ assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+
+ // Commit index should not advance because this does not go into the journal
+ assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+ }
+ };
+ }
+
+ @Test
+ public void testCommitWhenTransactionHasModifications(){
+ new ShardTestKit(getSystem()) {
+ {
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWhenTransactionHasModifications");
+
+ waitUntilLeader(shard);
+
+ String transactionID = "tx1";
+ MutableCompositeModification modification = new MutableCompositeModification();
+ modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
+ DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
+ doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+
+ FiniteDuration duration = duration("5 seconds");
+
+ // Simulate the ForwardedReadyTransaction messages that would be sent
+ // by the ShardTransaction.
+
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+ expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+
+ InOrder inOrder = inOrder(cohort);
+ inOrder.verify(cohort).canCommit();
+ inOrder.verify(cohort).preCommit();
+ inOrder.verify(cohort).commit();
+
+ // Use MBean for verification
+ // Committed transaction count should increase as usual
+ assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+
+ // Commit index should advance as we do not have an empty modification
+ assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+ }
+ };
+ }
+
@Test
public void testCommitPhaseFailure() throws Throwable {
new ShardTestKit(getSystem()) {{
waitUntilLeader(shard);
- // Setup 2 simulated transactions with mock cohorts. The first one fails in the
- // commit phase.
+ // Setup 2 mock cohorts. The first one fails in the commit phase.
- String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ final String transactionID1 = "tx1";
+ final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
- String transactionID2 = "tx2";
- MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+ final String transactionID2 = "tx2";
+ final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
+ DOMStoreThreePhaseCommitCohort actual) {
+ return transactionID1.equals(transactionID) ? cohort1 : cohort2;
+ }
+ };
+
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
+ // Send BatchedModifications to start and ready each transaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message for the first Tx.
waitUntilLeader(shard);
String transactionID = "tx1";
- MutableCompositeModification modification = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
+ DOMStoreThreePhaseCommitCohort actual) {
+ return cohort;
+ }
+ };
+
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
FiniteDuration duration = duration("5 seconds");
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
+ // Send BatchedModifications to start and ready a transaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message.
final FiniteDuration duration = duration("5 seconds");
String transactionID = "tx1";
- MutableCompositeModification modification = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
+ DOMStoreThreePhaseCommitCohort actual) {
+ return cohort;
+ }
+ };
+
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
+ // Send BatchedModifications to start and ready a transaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message.
}
};
- MutableCompositeModification modification = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
- modification, preCommit);
-
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
final FiniteDuration duration = duration("5 seconds");
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
-
writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
writeToStore(shard, TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
- // Create 1st Tx - will timeout
+ // Create and ready the 1st Tx - will timeout
String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
- modification1);
+ shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder(
+ TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- // Create 2nd Tx
+ // Create and ready the 2nd Tx
- String transactionID2 = "tx3";
- MutableCompositeModification modification2 = new MutableCompositeModification();
+ String transactionID2 = "tx2";
YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
- DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
- listNodePath,
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
- modification2);
-
- // Ready the Tx's
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
-
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+ shard.tell(newBatchedModifications(transactionID2, listNodePath,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// canCommit 1st Tx. We don't send the commit so it should timeout.
final FiniteDuration duration = duration("5 seconds");
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
-
String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
String transactionID2 = "tx2";
- MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
- TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
- modification2);
-
String transactionID3 = "tx3";
- MutableCompositeModification modification3 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
- // Ready the Tx's
+ // Send a BatchedModifications to start transactions and ready them.
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// canCommit 1st Tx.
// Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
- String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ final String transactionID1 = "tx1";
+ final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
- String transactionID2 = "tx2";
- MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+ final String transactionID2 = "tx2";
+ final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
+ DOMStoreThreePhaseCommitCohort actual) {
+ return transactionID1.equals(transactionID) ? cohort1 : cohort2;
+ }
+ };
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ // Send BatchedModifications to start and ready each transaction.
+
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
+
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message for the first Tx.
shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
+
}
@Test
YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
- BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+ BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
batched.addModification(new WriteModification(writePath, writeData));
batched.addModification(new MergeModification(mergePath, mergeData));
batched.addModification(new DeleteModification(deletePath));
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
verify(mockActorContext, times(0)).acquireTxCreationPermit();
}
+ /**
+ * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
+ * initiated until the first one completes its read future.
+ */
+ @Test
+ public void testChainedWriteOnlyTransactions() throws Exception {
+ dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
+
+ Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
+ doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
+
+ DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
+
+ NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ writeTx1.write(TestModel.TEST_PATH, writeNode1);
+
+ writeTx1.ready();
+
+ verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+ verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
+
+ ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
+
+ expectBatchedModifications(txActorRef2, 1);
+
+ final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+ final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
+
+ final AtomicReference<Exception> caughtEx = new AtomicReference<>();
+ final CountDownLatch write2Complete = new CountDownLatch(1);
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
+ } catch (Exception e) {
+ caughtEx.set(e);
+ } finally {
+ write2Complete.countDown();
+ }
+ }
+ }.start();
+
+ assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
+
+ if(caughtEx.get() != null) {
+ throw caughtEx.get();
+ }
+
+ try {
+ verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ } catch (AssertionError e) {
+ fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
+ }
+
+ batchedReplyPromise1.success(new BatchedModificationsReply(1, txActorRef1.path().toString()));
+
+ // Tx 2 should've proceeded to find the primary shard.
+ verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ }
+
/**
* Tests 2 successive chained read-write transactions and verifies the second transaction isn't
* initiated until the first one completes its read future.
writeTx1.ready();
- verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1));
+ verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), false);
String tx2MemberName = "tx2MemberName";
doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@Test
public void testWrite() throws Exception {
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
expectBatchedModifications(actorRef, 1);
- expectReadyTransaction(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- // This sends the batched modification.
- transactionProxy.ready();
-
- verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
-
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class);
+ verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
}
@Test
// This sends the batched modification.
transactionProxy.ready();
- verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+ verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
BatchedModificationsReply.class);
@Test
public void testMerge() throws Exception {
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
expectBatchedModifications(actorRef, 1);
- expectReadyTransaction(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
- // This sends the batched modification.
- transactionProxy.ready();
-
- verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite));
-
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class);
+ verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
}
@Test
public void testDelete() throws Exception {
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
expectBatchedModifications(actorRef, 1);
- expectReadyTransaction(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.delete(TestModel.TEST_PATH);
- // This sends the batched modification.
- transactionProxy.ready();
-
- verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH));
-
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class);
+ verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
}
@Test
- public void testReady() throws Exception {
+ public void testReadyWithReadWrite() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
isA(BatchedModifications.class));
+
+ verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ }
+
+ @Test
+ public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
+ dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ expectBatchedModificationsReady(actorRef, 1);
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures());
+
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+ List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+ assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+ verifyBatchedModifications(batchedModifications.get(0), true,
+ new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+
+ verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ }
+
+ @Test
+ public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
+ dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ expectBatchedModificationsReady(actorRef, 1);
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ BatchedModificationsReply.class);
+
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+ List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+ assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
+
+ verifyBatchedModifications(batchedModifications.get(0), false,
+ new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+
+ verifyBatchedModifications(batchedModifications.get(1), true);
+
+ verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
}
@Test
public void testReadyWithRecordingOperationFailure() throws Exception {
+ dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
+
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
expectFailedBatchedModifications(actorRef);
- expectReadyTransaction(actorRef);
-
doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
@Test
public void testReadyWithReplyFailure() throws Exception {
+ dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- expectBatchedModifications(actorRef, 1);
-
- doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)),
- isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ expectFailedBatchedModifications(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class);
-
verifyCohortFutures(proxy, TestException.class);
}
@Test
public void testReadyWithInvalidReplyMessageType() throws Exception {
+ dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- expectBatchedModifications(actorRef, 1);
+ //expectBatchedModifications(actorRef, 1);
doReturn(Futures.successful(new Object())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)),
- isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ isA(BatchedModifications.class));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
verifyCohortFutures(proxy, IllegalArgumentException.class);
}
- @Test
- public void testUnusedTransaction() throws Exception {
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
-
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
- assertEquals("canCommit", true, ready.canCommit().get());
- ready.preCommit().get();
- ready.commit().get();
- }
-
@Test
public void testGetIdentifier() {
setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
*/
@Test
public void testLocalTxActorRead() throws Exception {
- ActorSystem actorSystem = getSystem();
- ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-
- doReturn(actorSystem.actorSelection(shardActorRef.path())).
- when(mockActorContext).actorSelection(shardActorRef.path().toString());
-
- doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
- when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
- String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
- CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
- .setTransactionId("txn-1").setTransactionActorPath(actorPath).build();
-
- doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
- executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
- eqCreateTransaction(memberName, READ_ONLY));
-
- doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+ setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
+ doReturn(true).when(mockActorContext).isPathLocal(anyString());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
@Test
public void testLocalTxActorReady() throws Exception {
- ActorSystem actorSystem = getSystem();
- ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-
- doReturn(actorSystem.actorSelection(shardActorRef.path())).
- when(mockActorContext).actorSelection(shardActorRef.path().toString());
-
- doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
- when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
- String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
- CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
- setTransactionId("txn-1").setTransactionActorPath(actorPath).
- setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
-
- doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
- executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
- eqCreateTransaction(memberName, WRITE_ONLY));
-
- doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+ doReturn(true).when(mockActorContext).isPathLocal(anyString());
doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
any(ActorSelection.class), isA(BatchedModifications.class));
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class);
-
// testing ready
- doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), isA(ReadyTransaction.class));
+ doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.class));
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
}
private static interface TransactionProxyOperation {
doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
} else {
- doReturn(Futures.failed(new Exception("not found")))
+ doReturn(Futures.failed(new PrimaryNotFoundException("test")))
.when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
}
- String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+ ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+ String actorPath = txActorRef.path().toString();
CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
setTransactionId("txn-1").setTransactionActorPath(actorPath).
setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
+ doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
+
doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
eqCreateTransaction(memberName, READ_WRITE));
- doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+ doReturn(true).when(mockActorContext).isPathLocal(anyString());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
@Test
public void testWriteThrottlingWhenShardFound(){
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testWriteThrottlingWhenShardNotFound(){
// Confirm that there is no throttling when the Shard is not found
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testWriteCompletion(){
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeThrottlingWhenShardFound(){
-
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeThrottlingWhenShardNotFound(){
-
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeCompletion(){
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testDeleteCompletion(){
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
}, 2, true);
}
- @Test
- public void testModificationOperationBatching() throws Throwable {
+ private void testModificationOperationBatching(TransactionType type) throws Exception {
int shardBatchedModificationCount = 3;
- doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
- when(mockActorContext).getDatastoreContext();
+ dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
expectBatchedModifications(actorRef, shardBatchedModificationCount);
YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type);
transactionProxy.write(writePath1, writeNode1);
transactionProxy.write(writePath2, writeNode2);
List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
- verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+ verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
- verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+ verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
- verifyBatchedModifications(batchedModifications.get(2), new MergeModification(mergePath3, mergeNode3),
+ boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled();
+ verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3),
new DeleteModification(deletePath2));
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+ if(optimizedWriteOnly) {
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ BatchedModificationsReply.class, BatchedModificationsReply.class);
+ } else {
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+ }
+ }
+
+ @Test
+ public void testReadWriteModificationOperationBatching() throws Throwable {
+ testModificationOperationBatching(READ_WRITE);
+ }
+
+ @Test
+ public void testWriteOnlyModificationOperationBatching() throws Throwable {
+ testModificationOperationBatching(WRITE_ONLY);
+ }
+
+ @Test
+ public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
+ dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+ testModificationOperationBatching(WRITE_ONLY);
}
@Test
public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
+
int shardBatchedModificationCount = 10;
- doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
- when(mockActorContext).getDatastoreContext();
+ dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
- verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+ verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
new WriteModification(writePath2, writeNode2));
- verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+ verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
new MergeModification(mergePath2, mergeNode2));
- verifyBatchedModifications(batchedModifications.get(2), new DeleteModification(deletePath));
+ verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
InOrder inOrder = Mockito.inOrder(mockActorContext);
inOrder.verify(mockActorContext).executeOperationAsync(
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testConcurrentThreePhaseCommits");
+ "testPreLithiumConcurrentThreePhaseCommits");
waitUntilLeader(shard);
assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
- verifyLastLogIndex(shard, 2);
+ verifyLastApplied(shard, 2);
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
import akka.actor.ActorRef;
import akka.dispatch.Futures;
import com.google.common.base.Optional;
import java.util.concurrent.TimeUnit;
+import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
eq(actorRef.path().toString()));
}
+
+ @Test
+ @Ignore
+ // FIXME: disabled until we can get the primary shard version from the ShardManager as we now skip
+ // creating transaction actors for write-only Tx's.
+ public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception {
+ short version = DataStoreVersions.HELIUM_2_VERSION;
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version);
+
+ NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
+ executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
+
+ doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+ doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+ eq(actorRef.path().toString()));
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+
+ transactionProxy.write(TestModel.TEST_PATH, testNode);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+ }
}
YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
- BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+ BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, "txChain");
batched.addModification(new WriteModification(writePath, writeData));
batched.addModification(new MergeModification(mergePath, mergeData));
batched.addModification(new DeleteModification(deletePath));
+ batched.setReady(true);
BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
(Serializable) batched.toSerializable());
assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
+ assertEquals("getTransactionID", "tx1", clone.getTransactionID());
+ assertEquals("getTransactionChainID", "txChain", clone.getTransactionChainID());
+ assertEquals("isReady", true, clone.isReady());
assertEquals("getModifications size", 3, clone.getModifications().size());
DeleteModification delete = (DeleteModification)clone.getModifications().get(2);
assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
assertEquals("getPath", deletePath, delete.getPath());
+
+ // Test with different params.
+
+ batched = new BatchedModifications("tx2", (short)10, null);
+
+ clone = (BatchedModifications) SerializationUtils.clone((Serializable) batched.toSerializable());
+
+ assertEquals("getVersion", 10, clone.getVersion());
+ assertEquals("getTransactionID", "tx2", clone.getTransactionID());
+ assertEquals("getTransactionChainID", "", clone.getTransactionChainID());
+ assertEquals("isReady", false, clone.isReady());
+
+ assertEquals("getModifications size", 0, clone.getModifications().size());
+
}
@Test
BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone(
(Serializable) new BatchedModificationsReply(100).toSerializable());
assertEquals("getNumBatched", 100, clone.getNumBatched());
+ assertEquals("getCohortPath", null, clone.getCohortPath());
+
+ clone = (BatchedModificationsReply) SerializationUtils.clone(
+ (Serializable) new BatchedModificationsReply(50, "cohort path").toSerializable());
+ assertEquals("getNumBatched", 50, clone.getNumBatched());
+ assertEquals("getCohortPath", "cohort path", clone.getCohortPath());
}
}