</parent>
<artifactId>features-netconf</artifactId>
- <packaging>pom</packaging>
+ <packaging>jar</packaging>
<properties>
<features.file>features.xml</features.file>
<groupId>org.opendaylight.controller</groupId>
<artifactId>netconf-monitoring</artifactId>
</dependency>
+ <!-- test to validate features.xml -->
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>features-test</artifactId>
+ <version>${yangtools.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <!-- dependency for opendaylight-karaf-empty for use by testing -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>opendaylight-karaf-empty</artifactId>
+ <version>${commons.opendaylight.version}</version>
+ <type>zip</type>
+ </dependency>
</dependencies>
<build>
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${surefire.version}</version>
+ <configuration>
+ <systemPropertyVariables>
+ <karaf.distro.groupId>org.opendaylight.controller</karaf.distro.groupId>
+ <karaf.distro.artifactId>opendaylight-karaf-empty</karaf.distro.artifactId>
+ <karaf.distro.version>${commons.opendaylight.version}</karaf.distro.version>
+ </systemPropertyVariables>
+ <dependenciesToScan>
+ <dependency>org.opendaylight.yangtools:features-test</dependency>
+ </dependenciesToScan>
+ </configuration>
+ </plugin>
</plugins>
</build>
<scm>
</feature>
<feature name='odl-netconf-notifications-impl' version='${project.version}' description="OpenDaylight :: Netconf :: Monitoring :: Impl">
<feature version='${project.version}'>odl-netconf-notifications-api</feature>
+ <feature version='${project.version}'>odl-netconf-util</feature>
+ <feature version='${yangtools.version}'>odl-yangtools-binding-generator</feature>
<bundle>mvn:org.opendaylight.controller/netconf-notifications-impl/${project.version}</bundle>
</feature>
*/
package org.opendaylight.controller.config.manager.impl.dynamicmbean;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSet.Builder;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
* @return list of found annotations
*/
static <T extends Annotation> List<T> findMethodAnnotationInSuperClassesAndIfcs(
- final Method setter, Class<T> annotationType,
- Set<Class<?>> inspectedInterfaces) {
- List<T> result = new ArrayList<T>();
+ final Method setter, final Class<T> annotationType,
+ final Set<Class<?>> inspectedInterfaces) {
+ Builder<T> result = ImmutableSet.builder();
Class<?> inspectedClass = setter.getDeclaringClass();
do {
try {
} catch (NoSuchMethodException e) {
inspectedClass = Object.class; // no need to go further
}
- } while (inspectedClass.equals(Object.class) == false);
+ } while (!inspectedClass.equals(Object.class));
+
// inspect interfaces
for (Class<?> ifc : inspectedInterfaces) {
if (ifc.isInterface() == false) {
}
}
- return result;
+ return new ArrayList<>(result.build());
}
/**
* @return list of found annotations
*/
static <T extends Annotation> List<T> findClassAnnotationInSuperClassesAndIfcs(
- Class<?> clazz, Class<T> annotationType, Set<Class<?>> interfaces) {
+ final Class<?> clazz, final Class<T> annotationType, final Set<Class<?>> interfaces) {
List<T> result = new ArrayList<T>();
Class<?> declaringClass = clazz;
do {
* @return empty string if no annotation is found, or list of descriptions
* separated by newline
*/
- static String aggregateDescriptions(List<Description> descriptions) {
+ static String aggregateDescriptions(final List<Description> descriptions) {
StringBuilder builder = new StringBuilder();
for (Description d : descriptions) {
if (builder.length() != 0) {
package org.opendaylight.controller.config.manager.impl.util;
import static org.junit.Assert.assertEquals;
-
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import java.util.Collections;
public class InterfacesHelperTest {
- interface SuperA {
+ public interface SuperA {
}
- interface SuperBMXBean {
+ public interface SuperBMXBean {
}
- interface SuperC extends SuperA, SuperBMXBean {
+ public interface SuperC extends SuperA, SuperBMXBean {
}
- class SuperClass implements SuperC {
+ public class SuperClass implements SuperC {
}
@MXBean
- interface SubA {
+ public interface SubA {
}
@ServiceInterfaceAnnotation(value = "a", osgiRegistrationType = SuperA.class, namespace = "n", revision = "r", localName = "l")
- interface Service extends AbstractServiceInterface{}
+ public interface Service extends AbstractServiceInterface{}
@ServiceInterfaceAnnotation(value = "b", osgiRegistrationType = SuperC.class, namespace = "n", revision = "r", localName = "l")
- interface SubService extends Service{}
+ public interface SubService extends Service{}
- abstract class SubClass extends SuperClass implements SubA, Module {
+ public abstract class SubClass extends SuperClass implements SubA, Module {
}
- abstract class SubClassWithService implements SubService, Module {
+ public abstract class SubClassWithService implements SubService, Module {
}
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
// Upon election: send initial empty AppendEntries RPCs
// (heartbeat) to each server; repeat during idle periods to
// prevent election timeouts (§5.2)
- scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+ sendAppendEntries(0);
}
/**
context.setCommitIndex(logIndex);
applyLogToStateMachine(logIndex);
} else {
- sendAppendEntries();
+ sendAppendEntries(0);
}
}
- private void sendAppendEntries() {
+ private void sendAppendEntries(long timeSinceLastActivityInterval) {
// Send an AppendEntries to all followers
- long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis();
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final String followerId = e.getKey();
final FollowerLogInformation followerLogInformation = e.getValue();
// This checks helps not to send a repeat message to the follower
- if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) {
+ if(!followerLogInformation.isFollowerActive() ||
+ followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
sendUpdatesToFollower(followerId, followerLogInformation, true);
}
}
private void sendHeartBeat() {
if (!followerToLog.isEmpty()) {
- sendAppendEntries();
+ sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis());
}
}
public void initReplicatedLog(){
this.replicatedLog = new SimpleReplicatedLog();
- this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("")));
+ this.replicatedLog.append(new MockReplicatedLogEntry(1, 0, new MockPayload("1")));
+ this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("2")));
}
@Override public ActorRef actorOf(Props props) {
package org.opendaylight.controller.cluster.raft;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
public class RaftActorTest extends AbstractActorTest {
// simulate a real snapshot
leaderActor.onReceiveCommand(new InitiateInstallSnapshot());
assertEquals(5, leaderActor.getReplicatedLog().size());
- assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+ assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
+ leaderActor.getCurrentBehavior().state(),leaderActor.getLeaderId())
+ , RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
//reply from a slow follower does not initiate a fake snapshot
leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 9, 1));
package org.opendaylight.controller.cluster.raft.behaviors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
+import org.slf4j.impl.SimpleLogger;
import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
public class LeaderTest extends AbstractRaftActorBehaviorTest {
+ static {
+ // This enables trace logging for the tests.
+ System.setProperty(SimpleLogger.LOG_KEY_PREFIX + MockRaftActorContext.class.getName(), "trace");
+ }
+
private final ActorRef leaderActor =
getSystem().actorOf(Props.create(DoNothingActor.class));
private final ActorRef senderActor =
@Test
public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
new JavaTestKit(getSystem()) {{
-
new Within(duration("1 seconds")) {
@Override
protected void run() {
-
ActorRef followerActor = getTestActor();
MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ String followerId = "follower";
+ peerAddresses.put(followerId, followerActor.path().toString());
actorContext.setPeerAddresses(peerAddresses);
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
Leader leader = new Leader(actorContext);
- leader.markFollowerActive(followerActor.path().toString());
- Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
- TimeUnit.MILLISECONDS);
- leader.handleMessage(senderActor, new SendHeartBeat());
- final String out =
- new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- Object msg = fromSerializableMessage(in);
- if (msg instanceof AppendEntries) {
- if (((AppendEntries)msg).getTerm() == 0) {
- return "match";
- }
- return null;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
+ // Leader should send an immediate heartbeat with no entries as follower is inactive.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+ assertEquals("getTerm", term, appendEntries.getTerm());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
- assertEquals("match", out);
+ // The follower would normally reply - simulate that explicitly here.
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ followerId, term, true, lastIndex - 1, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+
+ // Sleep for the heartbeat interval so AppendEntries is sent.
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
+ getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+ leader.handleMessage(senderActor, new SendHeartBeat());
+
+ appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
}
};
}};
@Test
public void testHandleReplicateMessageSendAppendEntriesToFollower() {
new JavaTestKit(getSystem()) {{
-
new Within(duration("1 seconds")) {
@Override
protected void run() {
-
ActorRef followerActor = getTestActor();
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext();
+ MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ String followerId = "follower";
+ peerAddresses.put(followerId, followerActor.path().toString());
actorContext.setPeerAddresses(peerAddresses);
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
Leader leader = new Leader(actorContext);
- leader.markFollowerActive(followerActor.path().toString());
- Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
- TimeUnit.MILLISECONDS);
- RaftActorBehavior raftBehavior = leader
- .handleMessage(senderActor, new Replicate(null, null,
- new MockRaftActorContext.MockReplicatedLogEntry(1,
- 100,
- new MockRaftActorContext.MockPayload("foo"))
- ));
+
+ // Leader will send an immediate heartbeat - ignore it.
+ expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ followerId, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+
+ MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+ MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+ 1, lastIndex + 1, payload);
+ actorContext.getReplicatedLog().append(newEntry);
+ RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
+ new Replicate(null, null, newEntry));
// State should not change
assertTrue(raftBehavior instanceof Leader);
- final String out =
- new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- Object msg = fromSerializableMessage(in);
- if (msg instanceof AppendEntries) {
- if (((AppendEntries)msg).getTerm() == 0) {
- return "match";
- }
- return null;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
+ AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
}
};
}};
@Test
public void testHandleReplicateMessageWhenThereAreNoFollowers() {
new JavaTestKit(getSystem()) {{
-
new Within(duration("1 seconds")) {
@Override
protected void run() {
leader.handleMessage(leaderActor, new SendHeartBeat());
- AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
+ AppendEntries aeproto = MessageCollectorActor.getFirstMatching(
followerActor, AppendEntries.class);
assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
leader.handleMessage(senderActor, new SendHeartBeat());
- InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
- MessageCollectorActor.getFirstMatching(followerActor,
- InstallSnapshot.SERIALIZABLE_CLASS);
+ InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor,
+ InstallSnapshot.SERIALIZABLE_CLASS);
assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
isproto);
RaftActorBehavior raftBehavior = leader.handleMessage(
leaderActor, new InitiateInstallSnapshot());
- CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
+ CaptureSnapshot cs = MessageCollectorActor.
getFirstMatching(leaderActor, CaptureSnapshot.class);
assertNotNull(cs);
Leader leader = new Leader(actorContext);
+ // Ignore initial heartbeat.
+ expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
// new entry
ReplicatedLogImplEntry entry =
new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
MockLeader leader = new MockLeader(actorContext);
+ // Ignore initial heartbeat.
+ expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
leadersSnapshot.put("2", "B");
leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
- Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+ MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
- assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
-
- InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ assertNotNull(installSnapshot);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
+ followerActor.underlyingActor().clear();
leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
followerActor.path().toString(), -1, false));
leader.handleMessage(leaderActor, new SendHeartBeat());
- o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
-
- assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
-
- installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+ installSnapshot = MessageCollectorActor.getFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ assertNotNull(installSnapshot);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
new JavaTestKit(getSystem()) {
{
-
TestActorRef<MessageCollectorActor> followerActor =
TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
- Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
-
- assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
-
- InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ assertNotNull(installSnapshot);
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
int hashCode = installSnapshot.getData().hashCode();
- leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
-
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ followerActor.underlyingActor().clear();
- o = MessageCollectorActor.getAllMessages(followerActor).get(1);
-
- assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+ leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
- installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+ installSnapshot = MessageCollectorActor.getFirstMatching(
+ followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+ assertNotNull(installSnapshot);
assertEquals(2, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
@Override
protected RaftActorContext createActorContext(ActorRef actorRef) {
- return new MockRaftActorContext("test", getSystem(), actorRef);
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
+ configParams.setElectionTimeoutFactor(100000);
+ MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef);
+ context.setConfigParams(configParams);
+ return context;
}
private ByteString toByteString(Map<String, String> state) {
}
public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
- private static AbstractRaftActorBehavior behavior;
-
- public ForwardMessageToBehaviorActor(){
-
- }
+ AbstractRaftActorBehavior behavior;
@Override public void onReceive(Object message) throws Exception {
+ if(behavior != null) {
+ behavior.handleMessage(sender(), message);
+ }
+
super.onReceive(message);
- behavior.handleMessage(sender(), message);
}
- public static void setBehavior(AbstractRaftActorBehavior behavior){
- ForwardMessageToBehaviorActor.behavior = behavior;
+ public static Props props() {
+ return Props.create(ForwardMessageToBehaviorActor.class);
}
}
@Test
public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
new JavaTestKit(getSystem()) {{
-
- ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
+ Props.create(ForwardMessageToBehaviorActor.class));
MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
- ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+ TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+ ForwardMessageToBehaviorActor.props());
MockRaftActorContext followerActorContext =
- new MockRaftActorContext("follower", getSystem(), followerActor);
+ new MockRaftActorContext("follower", getSystem(), followerActor);
Follower follower = new Follower(followerActorContext);
-
- ForwardMessageToBehaviorActor.setBehavior(follower);
+ followerActor.underlyingActor().behavior = follower;
Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ peerAddresses.put("follower", followerActor.path().toString());
leaderActorContext.setPeerAddresses(peerAddresses);
//create 3 entries
leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
leaderActorContext.setCommitIndex(1);
// follower too has the exact same log entries and has the same commit index
followerActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
followerActorContext.setCommitIndex(1);
Leader leader = new Leader(leaderActorContext);
- leader.markFollowerActive(followerActor.path().toString());
-
- Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
- TimeUnit.MILLISECONDS);
-
- leader.handleMessage(leaderActor, new SendHeartBeat());
-
- AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
- .getFirstMatching(followerActor, AppendEntries.class);
+ AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
assertNotNull(appendEntries);
assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals(0, appendEntries.getEntries().size());
assertEquals(0, appendEntries.getPrevLogIndex());
- AppendEntriesReply appendEntriesReply =
- (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
leaderActor, AppendEntriesReply.class);
-
assertNotNull(appendEntriesReply);
- // follower returns its next index
assertEquals(2, appendEntriesReply.getLogLastIndex());
assertEquals(1, appendEntriesReply.getLogLastTerm());
+ // follower returns its next index
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
}};
}
@Test
public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
new JavaTestKit(getSystem()) {{
-
- ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
+ Props.create(ForwardMessageToBehaviorActor.class));
MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
- ActorRef followerActor = getSystem().actorOf(
- Props.create(ForwardMessageToBehaviorActor.class));
+ TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+ ForwardMessageToBehaviorActor.props());
MockRaftActorContext followerActorContext =
- new MockRaftActorContext("follower", getSystem(), followerActor);
+ new MockRaftActorContext("follower", getSystem(), followerActor);
Follower follower = new Follower(followerActorContext);
-
- ForwardMessageToBehaviorActor.setBehavior(follower);
+ followerActor.underlyingActor().behavior = follower;
Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ peerAddresses.put("follower", followerActor.path().toString());
leaderActorContext.setPeerAddresses(peerAddresses);
leaderActorContext.getReplicatedLog().removeFrom(0);
leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
leaderActorContext.setCommitIndex(1);
followerActorContext.getReplicatedLog().removeFrom(0);
followerActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
// follower has the same log entries but its commit index > leaders commit index
followerActorContext.setCommitIndex(2);
Leader leader = new Leader(leaderActorContext);
- leader.markFollowerActive(followerActor.path().toString());
-
- Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
-
- leader.handleMessage(leaderActor, new SendHeartBeat());
-
- AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
- .getFirstMatching(followerActor, AppendEntries.class);
+ // Initial heartbeat
+ AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
assertNotNull(appendEntries);
assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals(0, appendEntries.getEntries().size());
assertEquals(0, appendEntries.getPrevLogIndex());
- AppendEntriesReply appendEntriesReply =
- (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
leaderActor, AppendEntriesReply.class);
+ assertNotNull(appendEntriesReply);
+
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ leaderActor.underlyingActor().behavior = leader;
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ leaderActor.underlyingActor().clear();
+ followerActor.underlyingActor().clear();
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
+ assertNotNull(appendEntries);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().size());
+ assertEquals(2, appendEntries.getPrevLogIndex());
+
+ appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
assertNotNull(appendEntriesReply);
assertEquals(2, appendEntriesReply.getLogLastIndex());
assertEquals(1, appendEntriesReply.getLogLastTerm());
+ assertEquals(1, followerActorContext.getCommitIndex());
}};
}
assertEquals(2, leaderActorContext.getCommitIndex());
ApplyLogEntries applyLogEntries =
- (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
- ApplyLogEntries.class);
+ MessageCollectorActor.getFirstMatching(leaderActor,
+ ApplyLogEntries.class);
assertNotNull(applyLogEntries);
@Test
public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
new JavaTestKit(getSystem()) {{
-
- ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ TestActorRef<MessageCollectorActor> leaderActor = TestActorRef.create(getSystem(),
+ Props.create(MessageCollectorActor.class));
MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
- configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
leaderActorContext.setConfigParams(configParams);
- ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+ TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+ ForwardMessageToBehaviorActor.props());
MockRaftActorContext followerActorContext =
- new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+ new MockRaftActorContext("follower-reply", getSystem(), followerActor);
followerActorContext.setConfigParams(configParams);
Follower follower = new Follower(followerActorContext);
-
- ForwardMessageToBehaviorActor.setBehavior(follower);
+ followerActor.underlyingActor().behavior = follower;
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put("follower-reply",
- followerActor.path().toString());
+ followerActor.path().toString());
leaderActorContext.setPeerAddresses(peerAddresses);
leaderActorContext.getReplicatedLog().removeFrom(0);
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
- //create 3 entries
- leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
-
- leaderActorContext.setCommitIndex(1);
+ followerActorContext.getReplicatedLog().removeFrom(0);
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
Leader leader = new Leader(leaderActorContext);
- leader.markFollowerActive("follower-reply");
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+ assertNotNull(appendEntriesReply);
+ System.out.println("appendEntriesReply: "+appendEntriesReply);
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ // Clear initial heartbeat messages
+
+ leaderActor.underlyingActor().clear();
+ followerActor.underlyingActor().clear();
+
+ // create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ leaderActorContext.setCommitIndex(1);
+ leaderActorContext.setLastApplied(1);
Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
- TimeUnit.MILLISECONDS);
+ TimeUnit.MILLISECONDS);
leader.handleMessage(leaderActor, new SendHeartBeat());
- AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
- .getFirstMatching(followerActor, AppendEntries.class);
-
+ AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
assertNotNull(appendEntries);
+ // Should send first log entry
assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(1, appendEntries.getEntries().get(0).getIndex());
- assertEquals(0, appendEntries.getPrevLogIndex());
-
- AppendEntriesReply appendEntriesReply =
- (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertEquals(0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals(-1, appendEntries.getPrevLogIndex());
+ appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
assertNotNull(appendEntriesReply);
- leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
-
- List<Object> entries = ForwardMessageToBehaviorActor
- .getAllMatching(followerActor, AppendEntries.class);
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+ assertEquals(0, appendEntriesReply.getLogLastIndex());
- assertEquals("AppendEntries count should be 2 ", 2, entries.size());
+ followerActor.underlyingActor().clear();
- AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
+ leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
- assertEquals(1, appendEntriesSecond.getLeaderCommit());
- assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
- assertEquals(1, appendEntriesSecond.getPrevLogIndex());
+ appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
+ assertNotNull(appendEntries);
+ // Should send second log entry
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(1, appendEntries.getEntries().get(0).getIndex());
}};
}
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class MessageCollectorActor extends UntypedActor {
- private List<Object> messages = new ArrayList<>();
+ private final List<Object> messages = new ArrayList<>();
@Override public void onReceive(Object message) throws Exception {
if(message instanceof String){
}
}
+ public void clear() {
+ messages.clear();
+ }
+
public static List<Object> getAllMessages(ActorRef actor) throws Exception {
FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
Timeout operationTimeout = new Timeout(operationDuration);
* @param clazz
* @return
*/
- public static Object getFirstMatching(ActorRef actor, Class<?> clazz) throws Exception {
- List<Object> allMessages = getAllMessages(actor);
+ public static <T> T getFirstMatching(ActorRef actor, Class<T> clazz) throws Exception {
+ for(int i = 0; i < 50; i++) {
+ List<Object> allMessages = getAllMessages(actor);
- for(Object message : allMessages){
- if(message.getClass().equals(clazz)){
- return message;
+ for(Object message : allMessages){
+ if(message.getClass().equals(clazz)){
+ return (T) message;
+ }
}
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
return null;
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-data-impl</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
</dependencies>
<build>
import akka.util.Timeout;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.text.WordUtils;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader;
import org.opendaylight.controller.cluster.raft.ConfigParams;
*/
public class DatastoreContext {
- private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
- private final Duration shardTransactionIdleTimeout;
- private final int operationTimeoutInSeconds;
- private final String dataStoreMXBeanType;
- private final ConfigParams shardRaftConfig;
- private final int shardTransactionCommitTimeoutInSeconds;
- private final int shardTransactionCommitQueueCapacity;
- private final Timeout shardInitializationTimeout;
- private final Timeout shardLeaderElectionTimeout;
- private final boolean persistent;
- private final ConfigurationReader configurationReader;
- private final long shardElectionTimeoutFactor;
-
- private DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
- ConfigParams shardRaftConfig, String dataStoreMXBeanType, int operationTimeoutInSeconds,
- Duration shardTransactionIdleTimeout, int shardTransactionCommitTimeoutInSeconds,
- int shardTransactionCommitQueueCapacity, Timeout shardInitializationTimeout,
- Timeout shardLeaderElectionTimeout,
- boolean persistent, ConfigurationReader configurationReader, long shardElectionTimeoutFactor) {
- this.dataStoreProperties = dataStoreProperties;
- this.shardRaftConfig = shardRaftConfig;
- this.dataStoreMXBeanType = dataStoreMXBeanType;
- this.operationTimeoutInSeconds = operationTimeoutInSeconds;
- this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
- this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
- this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
- this.shardInitializationTimeout = shardInitializationTimeout;
- this.shardLeaderElectionTimeout = shardLeaderElectionTimeout;
- this.persistent = persistent;
- this.configurationReader = configurationReader;
- this.shardElectionTimeoutFactor = shardElectionTimeoutFactor;
+ public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
+ public static final int DEFAULT_OPERATION_TIMEOUT_IN_SECONDS = 5;
+ public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
+ public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1000;
+ public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
+ public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
+ public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
+ public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 20000;
+ public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
+ public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
+ public static final boolean DEFAULT_PERSISTENT = true;
+ public static final FileConfigurationReader DEFAULT_CONFIGURATION_READER = new FileConfigurationReader();
+ public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
+ public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
+ public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
+ public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+
+ private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
+ private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
+ private int operationTimeoutInSeconds = DEFAULT_OPERATION_TIMEOUT_IN_SECONDS;
+ private String dataStoreMXBeanType;
+ private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
+ private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
+ private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
+ private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
+ private boolean persistent = DEFAULT_PERSISTENT;
+ private ConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
+ private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
+ private DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
+ private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
+
+ private DatastoreContext(){
+ setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
+ setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
+ setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
+ setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
+ setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
}
public static Builder newBuilder() {
}
public ConfigParams getShardRaftConfig() {
- return shardRaftConfig;
+ return raftConfig;
}
public int getShardTransactionCommitTimeoutInSeconds() {
}
public long getShardElectionTimeoutFactor(){
- return this.shardElectionTimeoutFactor;
+ return raftConfig.getElectionTimeoutFactor();
+ }
+
+ public String getDataStoreType(){
+ return dataStoreType;
+ }
+
+ public long getTransactionCreationInitialRateLimit() {
+ return transactionCreationInitialRateLimit;
+ }
+
+ private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){
+ raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
+ TimeUnit.MILLISECONDS));
+ }
+
+ private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize){
+ raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
+ }
+
+
+ private void setIsolatedLeaderCheckInterval(long shardIsolatedLeaderCheckIntervalInMillis) {
+ raftConfig.setIsolatedLeaderCheckInterval(
+ new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
+ }
+
+ private void setElectionTimeoutFactor(long shardElectionTimeoutFactor) {
+ raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
+ }
+
+ private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+ raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
+ }
+
+ private void setSnapshotBatchCount(int shardSnapshotBatchCount) {
+ raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
}
public static class Builder {
- private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
- private Duration shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
- private int operationTimeoutInSeconds = 5;
- private String dataStoreMXBeanType;
- private int shardTransactionCommitTimeoutInSeconds = 30;
- private int shardJournalRecoveryLogBatchSize = 1000;
- private int shardSnapshotBatchCount = 20000;
- private int shardHeartbeatIntervalInMillis = 500;
- private int shardTransactionCommitQueueCapacity = 20000;
- private Timeout shardInitializationTimeout = new Timeout(5, TimeUnit.MINUTES);
- private Timeout shardLeaderElectionTimeout = new Timeout(30, TimeUnit.SECONDS);
- private boolean persistent = true;
- private ConfigurationReader configurationReader = new FileConfigurationReader();
- private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10;
- private int shardSnapshotDataThresholdPercentage = 12;
- private long shardElectionTimeoutFactor = 2;
+ private DatastoreContext datastoreContext = new DatastoreContext();
public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
- this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
+ datastoreContext.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
return this;
}
public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) {
- this.operationTimeoutInSeconds = operationTimeoutInSeconds;
+ datastoreContext.operationTimeoutInSeconds = operationTimeoutInSeconds;
return this;
}
public Builder dataStoreMXBeanType(String dataStoreMXBeanType) {
- this.dataStoreMXBeanType = dataStoreMXBeanType;
+ datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
return this;
}
public Builder dataStoreProperties(InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
- this.dataStoreProperties = dataStoreProperties;
+ datastoreContext.dataStoreProperties = dataStoreProperties;
return this;
}
public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) {
- this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
+ datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
return this;
}
public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
- this.shardJournalRecoveryLogBatchSize = shardJournalRecoveryLogBatchSize;
+ datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
return this;
}
public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) {
- this.shardSnapshotBatchCount = shardSnapshotBatchCount;
+ datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
return this;
}
public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
- this.shardSnapshotDataThresholdPercentage = shardSnapshotDataThresholdPercentage;
+ datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
return this;
}
-
public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
- this.shardHeartbeatIntervalInMillis = shardHeartbeatIntervalInMillis;
+ datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
return this;
}
public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) {
- this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
+ datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
return this;
}
public Builder shardInitializationTimeout(long timeout, TimeUnit unit) {
- this.shardInitializationTimeout = new Timeout(timeout, unit);
+ datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
return this;
}
public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) {
- this.shardLeaderElectionTimeout = new Timeout(timeout, unit);
+ datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
return this;
}
public Builder configurationReader(ConfigurationReader configurationReader){
- this.configurationReader = configurationReader;
+ datastoreContext.configurationReader = configurationReader;
return this;
}
public Builder persistent(boolean persistent){
- this.persistent = persistent;
+ datastoreContext.persistent = persistent;
return this;
}
public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) {
- this.shardIsolatedLeaderCheckIntervalInMillis = shardIsolatedLeaderCheckIntervalInMillis;
+ datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
return this;
}
public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor){
- this.shardElectionTimeoutFactor = shardElectionTimeoutFactor;
+ datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
return this;
}
+ public Builder transactionCreationInitialRateLimit(long initialRateLimit){
+ datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
+ return this;
+ }
- public DatastoreContext build() {
- DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
- raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
- TimeUnit.MILLISECONDS));
- raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
- raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
- raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
- raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
- raftConfig.setIsolatedLeaderCheckInterval(
- new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
+ public Builder dataStoreType(String dataStoreType){
+ datastoreContext.dataStoreType = dataStoreType;
+ datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreType) + "Datastore";
+ return this;
+ }
- return new DatastoreContext(dataStoreProperties, raftConfig, dataStoreMXBeanType,
- operationTimeoutInSeconds, shardTransactionIdleTimeout,
- shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity,
- shardInitializationTimeout, shardLeaderElectionTimeout,
- persistent, configurationReader, shardElectionTimeoutFactor);
+ public DatastoreContext build() {
+ return datastoreContext;
}
}
}
private final ActorContext actorContext;
- public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
+ public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
Configuration configuration, DatastoreContext datastoreContext) {
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
- Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null");
+ String type = datastoreContext.getDataStoreType();
+
String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
LOG.info("Creating ShardManager : {}", shardManagerId);
actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
- ShardManager.props(type, cluster, configuration, datastoreContext)
+ ShardManager.props(cluster, configuration, datastoreContext)
.withMailbox(ActorContext.MAILBOX), shardManagerId ),
cluster, configuration, datastoreContext);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ actorContext.acquireTxCreationPermit();
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ actorContext.acquireTxCreationPermit();
return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
}
private static volatile ActorSystem persistentActorSystem = null;
- public static DistributedDataStore createInstance(String name, SchemaService schemaService,
+ public static DistributedDataStore createInstance(SchemaService schemaService,
DatastoreContext datastoreContext, BundleContext bundleContext) {
ActorSystem actorSystem = getOrCreateInstance(bundleContext, datastoreContext.getConfigurationReader());
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
final DistributedDataStore dataStore =
- new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
+ new DistributedDataStore(actorSystem, new ClusterWrapperImpl(actorSystem),
config, datastoreContext);
ShardStrategyFactory.setConfiguration(config);
private final DataPersistenceProvider dataPersistenceProvider;
/**
- * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
- * configuration or operational
*/
- protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
+ protected ShardManager(ClusterWrapper cluster, Configuration configuration,
DatastoreContext datastoreContext) {
- this.type = Preconditions.checkNotNull(type, "type should not be null");
this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
this.datastoreContext = datastoreContext;
this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
+ this.type = datastoreContext.getDataStoreType();
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
}
- public static Props props(final String type,
+ public static Props props(
final ClusterWrapper cluster,
final Configuration configuration,
final DatastoreContext datastoreContext) {
- Preconditions.checkNotNull(type, "type should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
- return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
+ return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext));
}
@Override
private static class ShardManagerCreator implements Creator<ShardManager> {
private static final long serialVersionUID = 1L;
- final String type;
final ClusterWrapper cluster;
final Configuration configuration;
final DatastoreContext datastoreContext;
- ShardManagerCreator(String type, ClusterWrapper cluster,
+ ShardManagerCreator(ClusterWrapper cluster,
Configuration configuration, DatastoreContext datastoreContext) {
- this.type = type;
this.cluster = cluster;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
@Override
public ShardManager create() throws Exception {
- return new ShardManager(type, cluster, configuration, datastoreContext);
+ return new ShardManager(cluster, configuration, datastoreContext);
}
}
import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
private final List<Future<ActorSelection>> cohortFutures;
private volatile List<ActorSelection> cohorts;
private final String transactionId;
+ private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
+ @Override
+ public void run() {
+ }
+
+ @Override
+ public void success() {
+ }
+
+ @Override
+ public void failure() {
+ }
+ };
public ThreePhaseCommitCohortProxy(ActorContext actorContext,
List<Future<ActorSelection>> cohortFutures, String transactionId) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
}
-
- futureList.add(actorContext.executeOperationAsync(cohort, message));
+ futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
}
return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
@Override
public ListenableFuture<Void> commit() {
- return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
- CommitTransactionReply.SERIALIZABLE_CLASS, true);
+ OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
+ new CommitCallback(actorContext);
+
+ return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
+ CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
+ }
+
+ private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
+ final Class<?> expectedResponseClass, final boolean propagateException) {
+ return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
}
private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
- final Class<?> expectedResponseClass, final boolean propagateException) {
+ final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} {}", transactionId, operationName);
if(cohorts != null) {
finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
- returnFuture);
+ returnFuture, callback);
} else {
buildCohortList().onComplete(new OnComplete<Void>() {
@Override
}
} else {
finishVoidOperation(operationName, message, expectedResponseClass,
- propagateException, returnFuture);
+ propagateException, returnFuture, callback);
}
}
}, actorContext.getActorSystem().dispatcher());
}
private void finishVoidOperation(final String operationName, final Object message,
- final Class<?> expectedResponseClass, final boolean propagateException,
- final SettableFuture<Void> returnFuture) {
+ final Class<?> expectedResponseClass, final boolean propagateException,
+ final SettableFuture<Void> returnFuture, final OperationCallback callback) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} finish {}", transactionId, operationName);
}
+
+ callback.run();
+
Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
}
if(exceptionToPropagate != null) {
+
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
operationName, exceptionToPropagate);
}
returnFuture.set(null);
}
+
+ callback.failure();
} else {
+
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
}
returnFuture.set(null);
+
+ callback.success();
}
}
}, actorContext.getActorSystem().dispatcher());
List<Future<ActorSelection>> getCohortFutures() {
return Collections.unmodifiableList(cohortFutures);
}
+
+ private static interface OperationCallback {
+ void run();
+ void success();
+ void failure();
+ }
+
+ private static class CommitCallback implements OperationCallback{
+
+ private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class);
+ private static final String COMMIT = "commit";
+
+ private final Timer commitTimer;
+ private final ActorContext actorContext;
+ private Timer.Context timerContext;
+
+ CommitCallback(ActorContext actorContext){
+ this.actorContext = actorContext;
+ commitTimer = actorContext.getOperationTimer(COMMIT);
+ }
+
+ @Override
+ public void run() {
+ timerContext = commitTimer.time();
+ }
+
+ @Override
+ public void success() {
+ timerContext.stop();
+
+ Snapshot timerSnapshot = commitTimer.getSnapshot();
+ double allowedLatencyInNanos = timerSnapshot.get98thPercentile();
+
+ long commitTimeoutInSeconds = actorContext.getDatastoreContext()
+ .getShardTransactionCommitTimeoutInSeconds();
+ long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
+
+ // Here we are trying to find out how many transactions per second are allowed
+ double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds;
+
+ LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}",
+ actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos);
+
+ actorContext.setTxCreationLimit(newRateLimit);
+ }
+
+ @Override
+ public void failure() {
+ // This would mean we couldn't get a transaction completed in 30 seconds which is
+ // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
+ // not going to be useful - so we leave it as it is
+ }
+ }
+
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ actorContext.acquireTxCreationPermit();
return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ actorContext.acquireTxCreationPermit();
return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
}
import akka.dispatch.Mapper;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
* but should not be passed to actors especially remote actors
*/
public class ActorContext {
- private static final Logger
- LOG = LoggerFactory.getLogger(ActorContext.class);
-
- public static final String MAILBOX = "bounded-mailbox";
-
+ private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
+ private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+ private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
+ private static final String METRIC_RATE = "rate";
+ private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
new Mapper<Throwable, Throwable>() {
@Override
return actualFailure;
}
};
+ public static final String MAILBOX = "bounded-mailbox";
private final ActorSystem actorSystem;
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
private final DatastoreContext datastoreContext;
- private volatile SchemaContext schemaContext;
private final FiniteDuration operationDuration;
private final Timeout operationTimeout;
private final String selfAddressHostPort;
+ private final RateLimiter txRateLimiter;
+ private final MetricRegistry metricRegistry = new MetricRegistry();
+ private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
private final int transactionOutstandingOperationLimit;
+ private final Timeout transactionCommitOperationTimeout;
+
+ private volatile SchemaContext schemaContext;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration) {
this.clusterWrapper = clusterWrapper;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
+ this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
- operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
- TimeUnit.SECONDS);
+ operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
operationTimeout = new Timeout(operationDuration);
+ transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
+ TimeUnit.SECONDS));
+
Address selfAddress = clusterWrapper.getSelfAddress();
if (selfAddress != null && !selfAddress.host().isEmpty()) {
}
transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
+ jmxReporter.start();
}
public DatastoreContext getDatastoreContext() {
public int getTransactionOutstandingOperationLimit(){
return transactionOutstandingOperationLimit;
}
+
+ /**
+ * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
+ * us to create a timer for pretty much anything.
+ *
+ * @param operationName
+ * @return
+ */
+ public Timer getOperationTimer(String operationName){
+ final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
+ return metricRegistry.timer(rate);
+ }
+
+ /**
+ * Get the type of the data store to which this ActorContext belongs
+ *
+ * @return
+ */
+ public String getDataStoreType() {
+ return datastoreContext.getDataStoreType();
+ }
+
+ /**
+ * Set the number of transaction creation permits that are to be allowed
+ *
+ * @param permitsPerSecond
+ */
+ public void setTxCreationLimit(double permitsPerSecond){
+ txRateLimiter.setRate(permitsPerSecond);
+ }
+
+ /**
+ * Get the current transaction creation rate limit
+ * @return
+ */
+ public double getTxCreationLimit(){
+ return txRateLimiter.getRate();
+ }
+
+ /**
+ * Try to acquire a transaction creation permit. Will block if no permits are available.
+ */
+ public void acquireTxCreationPermit(){
+ txRateLimiter.acquire();
+ }
+
+ /**
+ * Return the operation timeout to be used when committing transactions
+ * @return
+ */
+ public Timeout getTransactionCommitOperationTimeout(){
+ return transactionCommitOperationTimeout;
+ }
+
+
}
}
DatastoreContext datastoreContext = DatastoreContext.newBuilder()
- .dataStoreMXBeanType("DistributedConfigDatastore")
+ .dataStoreType("config")
.dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
.shardIsolatedLeaderCheckIntervalInMillis(
props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
.shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
+ .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
.build();
- return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
+ return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
datastoreContext, bundleContext);
}
}
DatastoreContext datastoreContext = DatastoreContext.newBuilder()
- .dataStoreMXBeanType("DistributedOperationalDatastore")
+ .dataStoreType("operational")
.dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
.shardIsolatedLeaderCheckIntervalInMillis(
props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
.shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
+ .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
.build();
- return DistributedDataStoreFactory.createInstance("operational",
- getOperationalSchemaServiceDependency(), datastoreContext, bundleContext);
+ return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
+ datastoreContext, bundleContext);
}
public void setBundleContext(BundleContext bundleContext) {
description "The interval at which the leader of the shard will check if its majority
followers are active and term itself as isolated";
}
+
+ leaf tx-creation-initial-rate-limit {
+ default 100;
+ type non-zero-uint32-type;
+ description "The initial number of transactions per second that are allowed before the data store
+ should begin applying back pressure. This number is only used as an initial guidance,
+ subsequently the datastore measures the latency for a commit and auto-adjusts the rate limit";
+ }
}
// Augments the 'configuration' choice node under modules/module.
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DatastoreContextTest {
+
+ private DatastoreContext.Builder builder;
+
+ @Before
+ public void setUp(){
+ builder = new DatastoreContext.Builder();
+ }
+
+ @Test
+ public void testDefaults(){
+ DatastoreContext build = builder.build();
+
+ assertEquals(DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT , build.getShardTransactionIdleTimeout());
+ assertEquals(DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS, build.getOperationTimeoutInSeconds());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS, build.getShardTransactionCommitTimeoutInSeconds());
+ assertEquals(DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE, build.getShardRaftConfig().getJournalRecoveryLogBatchSize());
+ assertEquals(DatastoreContext.DEFAULT_SNAPSHOT_BATCH_COUNT, build.getShardRaftConfig().getSnapshotBatchCount());
+ assertEquals(DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getHeartBeatInterval().length());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY, build.getShardTransactionCommitQueueCapacity());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT, build.getShardInitializationTimeout());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT, build.getShardLeaderElectionTimeout());
+ assertEquals(DatastoreContext.DEFAULT_PERSISTENT, build.isPersistent());
+ assertEquals(DatastoreContext.DEFAULT_CONFIGURATION_READER, build.getConfigurationReader());
+ assertEquals(DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getIsolatedCheckInterval().length());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, build.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+ assertEquals(DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, build.getShardRaftConfig().getElectionTimeoutFactor());
+ assertEquals(DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, build.getTransactionCreationInitialRateLimit());
+ }
+
+}
\ No newline at end of file
Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
ShardStrategyFactory.setConfiguration(config);
+ datastoreContextBuilder.dataStoreType(typeName);
+
DatastoreContext datastoreContext = datastoreContextBuilder.build();
- DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
+
+ DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
config, datastoreContext);
SchemaContext schemaContext = SchemaContextHelper.full();
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class DistributedDataStoreTest extends AbstractActorTest {
+
+ private SchemaContext schemaContext;
+
+ @Mock
+ private ActorContext actorContext;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ schemaContext = TestModel.createTestContext();
+
+ doReturn(schemaContext).when(actorContext).getSchemaContext();
+ }
+
+ @Test
+ public void testRateLimitingUsedInReadWriteTxCreation(){
+ DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+ distributedDataStore.newReadWriteTransaction();
+
+ verify(actorContext, times(1)).acquireTxCreationPermit();
+ }
+
+ @Test
+ public void testRateLimitingUsedInWriteOnlyTxCreation(){
+ DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+ distributedDataStore.newWriteOnlyTransaction();
+
+ verify(actorContext, times(1)).acquireTxCreationPermit();
+ }
+
+
+ @Test
+ public void testRateLimitingNotUsedInReadOnlyTxCreation(){
+ DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+ distributedDataStore.newReadOnlyTransaction();
+ distributedDataStore.newReadOnlyTransaction();
+ distributedDataStore.newReadOnlyTransaction();
+
+ verify(actorContext, times(0)).acquireTxCreationPermit();
+ }
+
+}
\ No newline at end of file
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.Creator;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
public class ShardManagerTest extends AbstractActorTest {
private static int ID_COUNTER = 1;
}
private Props newShardMgrProps() {
- return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
- DatastoreContext.newBuilder().build());
+
+ DatastoreContext.Builder builder = DatastoreContext.newBuilder();
+ builder.dataStoreType(shardMrgIDSuffix);
+ return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), builder.build());
}
@Test
public void testRecoveryApplicable(){
new JavaTestKit(getSystem()) {
{
- final Props persistentProps = ShardManager.props(shardMrgIDSuffix,
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build());
+ final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(true).dataStoreType(shardMrgIDSuffix).build());
final TestActorRef<ShardManager> persistentShardManager =
TestActorRef.create(getSystem(), persistentProps);
assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
- final Props nonPersistentProps = ShardManager.props(shardMrgIDSuffix,
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(false).build());
+ final Props nonPersistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(false).dataStoreType(shardMrgIDSuffix).build());
final TestActorRef<ShardManager> nonPersistentShardManager =
TestActorRef.create(getSystem(), nonPersistentProps);
private static final long serialVersionUID = 1L;
@Override
public ShardManager create() throws Exception {
- return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) {
+ return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()) {
@Override
protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
DataPersistenceProviderMonitor dataPersistenceProviderMonitor
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
TestShardManager(String shardMrgIDSuffix) {
- super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
- DatastoreContext.newBuilder().build());
+ super(new MockClusterWrapper(), new MockConfiguration(),
+ DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build());
}
@Override
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
+import akka.util.Timeout;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
@Mock
private ActorContext actorContext;
+ @Mock
+ private DatastoreContext datastoreContext;
+
+ @Mock
+ private Timer commitTimer;
+
+ @Mock
+ private Timer.Context commitTimerContext;
+
+ @Mock
+ private Snapshot commitSnapshot;
+
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
doReturn(getSystem()).when(actorContext).getActorSystem();
+ doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+ doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+ doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+ doReturn(commitTimerContext).when(commitTimer).time();
+ doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+ doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get98thPercentile();
+ doReturn(10.0).when(actorContext).getTxCreationLimit();
}
private Future<ActorSelection> newCohort() {
}
stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
- isA(requestType));
+ isA(requestType), any(Timeout.class));
}
private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
verify(actorContext, times(nCohorts)).executeOperationAsync(
- any(ActorSelection.class), isA(requestType));
+ any(ActorSelection.class), isA(requestType), any(Timeout.class));
}
private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
try {
propagateExecutionExceptionCause(proxy.commit());
} finally {
+
+ verify(actorContext, never()).setTxCreationLimit(anyLong());
verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
}
+
}
@Test
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
new CommitTransactionReply(), new CommitTransactionReply());
+ assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
proxy.canCommit().get(5, TimeUnit.SECONDS);
proxy.preCommit().get(5, TimeUnit.SECONDS);
proxy.commit().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+
+ // Verify that the creation limit was changed to 0.5 (based on setup)
+ verify(actorContext, timeout(5000)).setTxCreationLimit(0.5);
+ }
+
+ @Test
+ public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxy(0);
+
+ assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
+ proxy.canCommit().get(5, TimeUnit.SECONDS);
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
+ proxy.commit().get(5, TimeUnit.SECONDS);
+
+ verify(actorContext, never()).setTxCreationLimit(anyLong());
}
}
package org.opendaylight.controller.cluster.datastore;
import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
ActorContext actorContext = null;
SchemaContext schemaContext = mock(SchemaContext.class);
+ @Mock
+ ActorContext mockActorContext;
+
@Before
public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
actorContext = new MockActorContext(getSystem());
actorContext.setSchemaContext(schemaContext);
+
+ doReturn(schemaContext).when(mockActorContext).getSchemaContext();
}
@SuppressWarnings("resource")
Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
}
+
+ @Test
+ public void testRateLimitingUsedInReadWriteTxCreation(){
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ txChainProxy.newReadWriteTransaction();
+
+ verify(mockActorContext, times(1)).acquireTxCreationPermit();
+ }
+
+ @Test
+ public void testRateLimitingUsedInWriteOnlyTxCreation(){
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ txChainProxy.newWriteOnlyTransaction();
+
+ verify(mockActorContext, times(1)).acquireTxCreationPermit();
+ }
+
+
+ @Test
+ public void testRateLimitingNotUsedInReadOnlyTxCreation(){
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ txChainProxy.newReadOnlyTransaction();
+
+ verify(mockActorContext, times(0)).acquireTxCreationPermit();
+ }
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.time.StopWatch;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
assertEquals(expected, actual);
}
+ @Test
+ public void testRateLimiting(){
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext);
+
+ // Check that the initial value is being picked up from DataStoreContext
+ assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
+
+ actorContext.setTxCreationLimit(1.0);
+
+ assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
+
+
+ StopWatch watch = new StopWatch();
+
+ watch.start();
+
+ actorContext.acquireTxCreationPermit();
+ actorContext.acquireTxCreationPermit();
+ actorContext.acquireTxCreationPermit();
+
+ watch.stop();
+
+ assertTrue("did not take as much time as expected", watch.getTime() > 1000);
+ }
}
classes.add(NeutronLoadBalancerPoolNorthbound.class);
classes.add(NeutronLoadBalancerHealthMonitorNorthbound.class);
classes.add(NeutronLoadBalancerPoolMembersNorthbound.class);
+ classes.add(MOXyJsonProvider.class);
return classes;
}
moxyJsonProvider.setMarshalEmptyCollections(true);
moxyJsonProvider.setValueWrapper("$");
- Map<String, String> namespacePrefixMapper = new HashMap<String, String>(1);
+ Map<String, String> namespacePrefixMapper = new HashMap<String, String>(3);
namespacePrefixMapper.put("router", "router"); // FIXME: fill in with XSD
namespacePrefixMapper.put("provider", "provider"); // FIXME: fill in with XSD
+ namespacePrefixMapper.put("binding", "binding");
moxyJsonProvider.setNamespacePrefixMapper(namespacePrefixMapper);
moxyJsonProvider.setNamespaceSeparator(':');
package org.opendaylight.controller.networkconfig.neutron;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
@XmlElement (name="security_groups")
List<NeutronSecurityGroup> securityGroups;
+ @XmlElement (namespace= "binding", name="host_id")
+ String bindinghostID;
+
+ @XmlElement (namespace= "binding", name="vnic_type")
+ String bindingvnicType;
+
+ @XmlElement (namespace= "binding", name="vif_type")
+ String bindingvifType;
+
+
/* this attribute stores the floating IP address assigned to
* each fixed IP address
*/
this.securityGroups = securityGroups;
}
+ public String getBindinghostID() {
+ return bindinghostID;
+ }
+
+ public void setBindinghostID(String bindinghostID) {
+ this.bindinghostID = bindinghostID;
+ }
+
+ public String getBindingvnicType() {
+ return bindingvnicType;
+ }
+
+ public void setBindingvnicType(String bindingvnicType) {
+ this.bindingvnicType = bindingvnicType;
+ }
+
+ public String getBindingvifType() {
+ return bindingvifType;
+ }
+
+ public void setBindingvifType(String bindingvifType) {
+ this.bindingvifType = bindingvifType;
+ }
+
public NeutronFloatingIP getFloatingIP(String key) {
if (!floatingIPMap.containsKey(key)) {
return null;
return "NeutronPort [portUUID=" + portUUID + ", networkUUID=" + networkUUID + ", name=" + name
+ ", adminStateUp=" + adminStateUp + ", status=" + status + ", macAddress=" + macAddress
+ ", fixedIPs=" + fixedIPs + ", deviceID=" + deviceID + ", deviceOwner=" + deviceOwner + ", tenantID="
- + tenantID + ", floatingIPMap=" + floatingIPMap + ", securityGroups=" + securityGroups + "]";
+ + tenantID + ", floatingIPMap=" + floatingIPMap + ", securityGroups=" + securityGroups
+ + ", bindinghostID=" + bindinghostID + ", bindingvnicType=" + bindingvnicType
+ + ", bindingvnicType=" + bindingvnicType + "]";
}
}