</dependencies>
</dependencyManagement>
<dependencies>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>features-yangtools</artifactId>
+ <classifier>features</classifier>
+ <version>${yangtools.version}</version>
+ <type>xml</type>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>features-mdsal</artifactId>
+ <classifier>features</classifier>
+ <version>${mdsal.version}</version>
+ <type>xml</type>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>features-restconf</artifactId>
+ <classifier>features</classifier>
+ <version>${mdsal.version}</version>
+ <type>xml</type>
+ <scope>runtime</scope>
+ </dependency>
<dependency>
<groupId>${symbol_dollar}{groupId}</groupId>
<artifactId>${artifactId}-impl</artifactId>
<properties>
<config.version>0.3.0-SNAPSHOT</config.version>
<mdsal.version>1.2.0-SNAPSHOT</mdsal.version>
+ <yangtools.version>0.7.0-SNAPSHOT</yangtools.version>
<jmxGeneratorPath>src/main/yang-gen-config</jmxGeneratorPath>
<config.file>src/main/config/default-config.xml</config.file>
</properties>
<type>pom</type>
<scope>import</scope>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yangtools-artifacts</artifactId>
+ <version>${yangtools.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
<dependencies>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-common</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-api</artifactId>
} else if (message instanceof PrintRole) {
if(LOG.isDebugEnabled()) {
- String followers = "";
if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
- followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
+ final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
getRaftActorContext().getPeerAddresses().keySet(), followers);
} else {
package org.opendaylight.controller.cluster.raft;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/**
// We define this as ArrayList so we can use ensureCapacity.
protected ArrayList<ReplicatedLogEntry> journal;
- protected long snapshotIndex = -1;
- protected long snapshotTerm = -1;
+ private long snapshotIndex = -1;
+ private long snapshotTerm = -1;
// to be used for rollback during save snapshot failure
- protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
- protected long previousSnapshotIndex = -1;
- protected long previousSnapshotTerm = -1;
+ private ArrayList<ReplicatedLogEntry> snapshottedJournal;
+ private long previousSnapshotIndex = -1;
+ private long previousSnapshotTerm = -1;
protected int dataSize = 0;
public AbstractReplicatedLogImpl(long snapshotIndex,
}
public AbstractReplicatedLogImpl() {
- this.journal = new ArrayList<>();
+ this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList());
}
protected int adjustedIndex(long logEntryIndex) {
public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
int adjustedIndex = adjustedIndex(logEntryIndex);
int size = journal.size();
- List<ReplicatedLogEntry> entries = new ArrayList<>(100);
if (adjustedIndex >= 0 && adjustedIndex < size) {
// physical index should be less than list size and >= 0
int maxIndex = adjustedIndex + max;
if(maxIndex > size){
maxIndex = size;
}
- entries.addAll(journal.subList(adjustedIndex, maxIndex));
+ return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
+ } else {
+ return Collections.emptyList();
}
- return entries;
}
-
@Override
public long size() {
return journal.size();
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* The state of the followers log as known by the Leader
*/
* Increment the value of the nextIndex
* @return
*/
- public long incrNextIndex();
+ long incrNextIndex();
/**
* Decrement the value of the nextIndex
* @return
*/
- public long decrNextIndex();
+ long decrNextIndex();
/**
*
* Increment the value of the matchIndex
* @return
*/
- public long incrMatchIndex();
+ long incrMatchIndex();
- public void setMatchIndex(long matchIndex);
+ void setMatchIndex(long matchIndex);
/**
* The identifier of the follower
* This could simply be the url of the remote actor
*/
- public String getId();
+ String getId();
/**
* for each server, index of the next log entry
* to send to that server (initialized to leader
* last log index + 1)
*/
- public AtomicLong getNextIndex();
+ long getNextIndex();
/**
* for each server, index of highest log entry
* known to be replicated on server
* (initialized to 0, increases monotonically)
*/
- public AtomicLong getMatchIndex();
+ long getMatchIndex();
/**
* Checks if the follower is active by comparing the last updated with the duration
* @return boolean
*/
- public boolean isFollowerActive();
+ boolean isFollowerActive();
/**
* restarts the timeout clock of the follower
*/
- public void markFollowerActive();
+ void markFollowerActive();
/**
* This will stop the timeout clock
*/
- public void markFollowerInActive();
-
-
+ void markFollowerInActive();
}
package org.opendaylight.controller.cluster.raft;
import com.google.common.base.Stopwatch;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import scala.concurrent.duration.FiniteDuration;
-public class FollowerLogInformationImpl implements FollowerLogInformation{
+public class FollowerLogInformationImpl implements FollowerLogInformation {
+ private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
+ private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex");
private final String id;
- private final AtomicLong nextIndex;
+ private final Stopwatch stopwatch = new Stopwatch();
- private final AtomicLong matchIndex;
+ private final long followerTimeoutMillis;
- private final Stopwatch stopwatch;
+ private volatile long nextIndex;
- private final long followerTimeoutMillis;
+ private volatile long matchIndex;
- public FollowerLogInformationImpl(String id, AtomicLong nextIndex,
- AtomicLong matchIndex, FiniteDuration followerTimeoutDuration) {
+ public FollowerLogInformationImpl(String id, long nextIndex,
+ long matchIndex, FiniteDuration followerTimeoutDuration) {
this.id = id;
this.nextIndex = nextIndex;
this.matchIndex = matchIndex;
- this.stopwatch = new Stopwatch();
this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
}
+ @Override
public long incrNextIndex(){
- return nextIndex.incrementAndGet();
+ return NEXT_INDEX_UPDATER.incrementAndGet(this);
}
- @Override public long decrNextIndex() {
- return nextIndex.decrementAndGet();
+ @Override
+ public long decrNextIndex() {
+ return NEXT_INDEX_UPDATER.decrementAndGet(this);
}
- @Override public void setNextIndex(long nextIndex) {
- this.nextIndex.set(nextIndex);
+ @Override
+ public void setNextIndex(long nextIndex) {
+ this.nextIndex = nextIndex;
}
+ @Override
public long incrMatchIndex(){
- return matchIndex.incrementAndGet();
+ return MATCH_INDEX_UPDATER.incrementAndGet(this);
}
- @Override public void setMatchIndex(long matchIndex) {
- this.matchIndex.set(matchIndex);
+ @Override
+ public void setMatchIndex(long matchIndex) {
+ this.matchIndex = matchIndex;
}
+ @Override
public String getId() {
return id;
}
- public AtomicLong getNextIndex() {
+ @Override
+ public long getNextIndex() {
return nextIndex;
}
- public AtomicLong getMatchIndex() {
+ @Override
+ public long getMatchIndex() {
return matchIndex;
}
timer.stop();
LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
replicatedLog.size(), persistenceId(), timer.toString(),
- replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+ replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
}
private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
"Persistence Id = " + persistenceId() +
" Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
"journal-size={}",
- replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
- replicatedLog.snapshotTerm, replicatedLog.size());
+ replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
+ replicatedLog.getSnapshotTerm(), replicatedLog.size());
initializeBehavior();
}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
import com.google.protobuf.ByteString;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
// This would be passed as the hash code of the last chunk when sending the first chunk
public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
- protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
- protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
-
- protected final Set<String> followers;
+ private final Map<String, FollowerLogInformation> followerToLog;
+ private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
private Cancellable heartbeatSchedule = null;
- private List<ClientRequestTracker> trackerList = new ArrayList<>();
+ private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
protected final int minReplicationCount;
public AbstractLeader(RaftActorContext context) {
super(context);
- followers = context.getPeerAddresses().keySet();
-
- for (String followerId : followers) {
+ final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
+ for (String followerId : context.getPeerAddresses().keySet()) {
FollowerLogInformation followerLogInformation =
new FollowerLogInformationImpl(followerId,
- new AtomicLong(context.getCommitIndex()),
- new AtomicLong(-1),
+ context.getCommitIndex(), -1,
context.getConfigParams().getElectionTimeOutInterval());
- followerToLog.put(followerId, followerLogInformation);
+ ftlBuilder.put(followerId, followerLogInformation);
}
+ followerToLog = ftlBuilder.build();
leaderId = context.getId();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Election:Leader has following peers: {}", followers);
- }
+ LOG.debug("Election:Leader has following peers: {}", getFollowerIds());
- minReplicationCount = getMajorityVoteCount(followers.size());
+ minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
// the isolated Leader peer count will be 1 less than the majority vote count.
// this is because the vote count has the self vote counted in it
scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
}
+ /**
+ * Return an immutable collection of follower identifiers.
+ *
+ * @return Collection of follower IDs
+ */
+ protected final Collection<String> getFollowerIds() {
+ return followerToLog.keySet();
+ }
+
private Optional<ByteString> getSnapshot() {
return snapshot;
}
int replicatedCount = 1;
for (FollowerLogInformation info : followerToLog.values()) {
- if (info.getMatchIndex().get() >= N) {
+ if (info.getMatchIndex() >= N) {
replicatedCount++;
}
}
return this;
}
+ @Override
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
-
- ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
- if(toRemove != null) {
- trackerList.remove(toRemove);
+ final Iterator<ClientRequestTracker> it = trackerList.iterator();
+ while (it.hasNext()) {
+ final ClientRequestTracker t = it.next();
+ if (t.getIndex() == logIndex) {
+ it.remove();
+ return t;
+ }
}
- return toRemove;
+ return null;
}
+ @Override
protected ClientRequestTracker findClientRequestTracker(long logIndex) {
for (ClientRequestTracker tracker : trackerList) {
if (tracker.getIndex() == logIndex) {
mapFollowerToSnapshot.remove(followerId);
if(LOG.isDebugEnabled()) {
- LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
- followerToLog.get(followerId).getNextIndex().get());
+ LOG.debug("followerToLog.get(followerId).getNextIndex()=" +
+ followerToLog.get(followerId).getNextIndex());
}
if (mapFollowerToSnapshot.isEmpty()) {
logIndex)
);
- if (followers.size() == 0) {
+ if (followerToLog.isEmpty()) {
context.setCommitIndex(logIndex);
applyLogToStateMachine(logIndex);
} else {
private void sendAppendEntries() {
// Send an AppendEntries to all followers
- for (String followerId : followers) {
+ for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+ final String followerId = e.getKey();
ActorSelection followerActor = context.getPeerActorSelection(followerId);
if (followerActor != null) {
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long followerNextIndex = followerLogInformation.getNextIndex().get();
+ long followerNextIndex = followerLogInformation.getNextIndex();
boolean isFollowerActive = followerLogInformation.isFollowerActive();
- List<ReplicatedLogEntry> entries = null;
if (mapFollowerToSnapshot.get(followerId) != null) {
// if install snapshot is in process , then sent next chunk if possible
} else {
long leaderLastIndex = context.getReplicatedLog().lastIndex();
long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+ final List<ReplicatedLogEntry> entries;
if (isFollowerActive &&
context.getReplicatedLog().isPresent(followerNextIndex)) {
*
*/
private void installSnapshotIfNeeded() {
- for (String followerId : followers) {
- ActorSelection followerActor =
- context.getPeerActorSelection(followerId);
+ for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+ final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
- if(followerActor != null) {
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- long nextIndex = followerLogInformation.getNextIndex().get();
+ if (followerActor != null) {
+ long nextIndex = e.getValue().getNextIndex();
if (!context.getReplicatedLog().isPresent(nextIndex) &&
context.getReplicatedLog().isInSnapshot(nextIndex)) {
- LOG.info("{} follower needs a snapshot install", followerId);
+ LOG.info("{} follower needs a snapshot install", e.getKey());
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
// no need to capture snapshot
- sendSnapshotChunk(followerActor, followerId);
+ sendSnapshotChunk(followerActor, e.getKey());
} else {
initiateCaptureSnapshot();
private void sendInstallSnapshot() {
- for (String followerId : followers) {
- ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+ ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
- if(followerActor != null) {
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long nextIndex = followerLogInformation.getNextIndex().get();
+ if (followerActor != null) {
+ long nextIndex = e.getValue().getNextIndex();
if (!context.getReplicatedLog().isPresent(nextIndex) &&
context.getReplicatedLog().isInSnapshot(nextIndex)) {
- sendSnapshotChunk(followerActor, followerId);
+ sendSnapshotChunk(followerActor, e.getKey());
}
}
}
}
private void sendHeartBeat() {
- if (followers.size() > 0) {
+ if (!followerToLog.isEmpty()) {
sendAppendEntries();
}
}
}
private void scheduleHeartBeat(FiniteDuration interval) {
- if(followers.size() == 0){
+ if (followerToLog.isEmpty()) {
// Optimization - do not bother scheduling a heartbeat as there are
// no followers
return;
// called from example-actor for printing the follower-states
public String printFollowerStates() {
- StringBuilder sb = new StringBuilder();
- for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
- boolean isFollowerActive = followerLogInformation.isFollowerActive();
- sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+ final StringBuilder sb = new StringBuilder();
+ sb.append('[');
+ for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
+ sb.append('{');
+ sb.append(followerLogInformation.getId());
+ sb.append(" state:");
+ sb.append(followerLogInformation.isFollowerActive());
+ sb.append("},");
}
- return "[" + sb.toString() + "]";
+ sb.append(']');
+
+ return sb.toString();
+ }
+
+ @VisibleForTesting
+ public FollowerLogInformation getFollower(String followerId) {
+ return followerToLog.get(followerId);
+ }
+
+ @VisibleForTesting
+ protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) {
+ mapFollowerToSnapshot.put(followerId, snapshot);
+ }
+
+ @VisibleForTesting
+ public int followerSnapshotSize() {
+ return mapFollowerToSnapshot.size();
}
@VisibleForTesting
- void markFollowerActive(String followerId) {
- followerToLog.get(followerId).markFollowerActive();
+ public int followerLogSize() {
+ return followerToLog.size();
}
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
}
protected void scheduleInstallSnapshotCheck(FiniteDuration interval) {
- if(followers.size() == 0){
+ if (getFollowerIds().isEmpty()) {
// Optimization - do not bother scheduling a heartbeat as there are
// no followers
return;
context.getActorSystem().dispatcher(), context.getActor());
}
- @Override public void close() throws Exception {
+ @Override
+ public void close() throws Exception {
stopInstallSnapshotSchedule();
stopIsolatedLeaderCheckSchedule();
super.close();
@VisibleForTesting
void markFollowerActive(String followerId) {
- followerToLog.get(followerId).markFollowerActive();
+ getFollower(followerId).markFollowerActive();
}
@VisibleForTesting
void markFollowerInActive(String followerId) {
- followerToLog.get(followerId).markFollowerInActive();
+ getFollower(followerId).markFollowerInActive();
}
}
public void removeFromAndPersist(final long index) {
}
- @Override
- public void setSnapshotIndex(final long snapshotIndex) {
- this.snapshotIndex = snapshotIndex;
- }
-
- @Override
- public void setSnapshotTerm(final long snapshotTerm) {
- this.snapshotTerm = snapshotTerm;
- }
-
@Override
public int dataSize() {
return -1;
*/
package org.opendaylight.controller.cluster.raft;
-
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
FollowerLogInformation followerLogInformation =
new FollowerLogInformationImpl(
- "follower1", new AtomicLong(10), new AtomicLong(9), timeoutDuration);
+ "follower1", 10, 9, timeoutDuration);
}};
}
-
@Test
public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef followerActor = getTestActor();
final String out =
new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
Object msg = fromSerializableMessage(in);
if (msg instanceof AppendEntries) {
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef followerActor = getTestActor();
final String out =
new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
Object msg = fromSerializableMessage(in);
if (msg instanceof AppendEntries) {
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef raftActor = getTestActor();
new ExpectMsg<String>(duration("1 seconds"),
"match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in instanceof ApplyState) {
if (((ApplyState) in).getIdentifier().equals("state-id")) {
final String out =
new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
InstallSnapshot is = (InstallSnapshot)
assertTrue(raftBehavior instanceof Leader);
- assertEquals(leader.mapFollowerToSnapshot.size(), 0);
- assertEquals(leader.followerToLog.size(), 1);
- assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
- FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
- assertEquals(snapshotIndex, fli.getMatchIndex().get());
- assertEquals(snapshotIndex, fli.getMatchIndex().get());
- assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
+ assertEquals(0, leader.followerSnapshotSize());
+ assertEquals(1, leader.followerLogSize());
+ assertNotNull(leader.getFollower(followerActor.path().toString()));
+ FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
+ assertEquals(snapshotIndex, fli.getMatchIndex());
+ assertEquals(snapshotIndex, fli.getMatchIndex());
+ assertEquals(snapshotIndex + 1, fli.getNextIndex());
}};
}
return createActorContext(leaderActor);
}
+ @Override
protected RaftActorContext createActorContext(ActorRef actorRef) {
return new MockRaftActorContext("test", getSystem(), actorRef);
}
public void createFollowerToSnapshot(String followerId, ByteString bs ) {
fts = new FollowerToSnapshot(bs);
- mapFollowerToSnapshot.put(followerId, fts);
-
+ setFollowerSnapshot(followerId, fts);
}
}
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
+import java.util.Map;
+import java.util.WeakHashMap;
import javassist.ClassPool;
import javassist.CtClass;
import javassist.CtMethod;
import javassist.NotFoundException;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
import org.opendaylight.controller.sal.binding.codegen.RpcIsNotRoutedException;
import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
+import org.opendaylight.yangtools.util.ClassLoaderUtils;
import org.opendaylight.yangtools.yang.binding.BindingMapping;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.NotificationListener;
import org.opendaylight.yangtools.yang.binding.RpcService;
import org.opendaylight.yangtools.yang.binding.annotations.RoutingContext;
-import org.opendaylight.yangtools.yang.binding.util.ClassLoaderUtils;
-
-import javax.annotation.concurrent.GuardedBy;
-import java.util.Map;
-import java.util.WeakHashMap;
abstract class AbstractRuntimeCodeGenerator implements org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator, NotificationInvokerFactory {
@GuardedBy("this")
*/
Thread.currentThread().getContextClassLoader().loadClass(routingPair.getInputType().getName());
} else {
- throw new RpcIsNotRoutedException("RPC " + method.getName() + " from "+ iface.getName() +" is not routed");
+ throw new RpcIsNotRoutedException(String.format("RPC %s from %s is not routed", method.getName(), iface.getName()));
}
}
}
try {
return getRpcMetadata(utils.asCtClass(serviceType));
} catch (ClassNotFoundException | NotFoundException e) {
- throw new IllegalStateException(String.format("Failed to load metadata for class {}", serviceType), e);
+ throw new IllegalStateException(String.format("Failed to load metadata for class %s", serviceType), e);
}
}
});
package org.opendaylight.controller.sal.binding.impl.connect.dom;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.concurrent.Future;
-
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
/*
* RPC's can have both input, output, one or the other, or neither.
*
*
*/
public class RpcInvocationStrategy {
+ private final Function<RpcResult<CompositeNode>, RpcResult<?>> transformationFunction = new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+ @SuppressWarnings("rawtypes")
+ @Override
+ public RpcResult<?> apply(final RpcResult<CompositeNode> result) {
+ final Object output;
+ if (getOutputClass() != null && result.getResult() != null) {
+ output = mappingService.dataObjectFromDataDom(getOutputClass().get(), result.getResult());
+ } else {
+ output = null;
+ }
+
+ return RpcResultBuilder.from( (RpcResult)result ).withResult( output ).build();
+ }
+ };
private final BindingIndependentMappingService mappingService;
private final RpcProvisionRegistry biRpcRegistry;
final Method targetMethod,
final BindingIndependentMappingService mappingService,
final RpcProvisionRegistry biRpcRegistry ) {
-
+ this.mappingService = mappingService;
+ this.biRpcRegistry = biRpcRegistry;
this.targetMethod = targetMethod;
this.rpc = rpc;
- Optional<Class<?>> outputClassOption = BindingReflections.resolveRpcOutputClass(targetMethod);
- Optional<Class<? extends DataContainer>> inputClassOption = BindingReflections.resolveRpcInputClass(targetMethod);
-
- if ( outputClassOption != null && outputClassOption.isPresent() ) {
- this.outputClass = new WeakReference(outputClassOption.get() ) ;
+ final Optional<Class<?>> outputClassOption = BindingReflections.resolveRpcOutputClass(targetMethod);
+ if (outputClassOption.isPresent()) {
+ this.outputClass = new WeakReference(outputClassOption.get());
} else {
- this.outputClass = null ;
+ this.outputClass = null;
}
- if ( inputClassOption != null && inputClassOption.isPresent() ) {
- this.inputClass = new WeakReference(inputClassOption.get() ) ;
+
+ final Optional<Class<? extends DataContainer>> inputClassOption = BindingReflections.resolveRpcInputClass(targetMethod);
+ if (inputClassOption.isPresent() ) {
+ this.inputClass = new WeakReference(inputClassOption.get());
} else {
- this.inputClass = null ;
+ this.inputClass = null;
}
-
- this.mappingService = mappingService;
- this.biRpcRegistry = biRpcRegistry;
}
@SuppressWarnings({ "unchecked" })
inputXml = ImmutableCompositeNode.create( rpc, Collections.<Node<?>>emptyList() );
}
- Function<RpcResult<CompositeNode>, RpcResult<?>> transformationFunction =
- new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
- @SuppressWarnings("rawtypes")
- @Override
- public RpcResult<?> apply(RpcResult<CompositeNode> result) {
-
- Object output = null;
-
- if( getOutputClass() != null ) {
- if (result.getResult() != null) {
- output = mappingService.dataObjectFromDataDom(getOutputClass().get(),
- result.getResult());
- }
- }
-
- return RpcResultBuilder.from( (RpcResult)result ).withResult( output ).build();
- }
- };
-
return Futures.transform(biRpcRegistry.invokeRpc(rpc, inputXml), transformationFunction);
}
}
@SuppressWarnings("rawtypes")
- public WeakReference<Class> getOutputClass() {
+ @VisibleForTesting
+ WeakReference<Class> getOutputClass() {
return outputClass;
}
}
package org.opendaylight.controller.cluster.datastore.node.utils;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
+import com.google.common.base.Splitter;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
public class AugmentationIdentifierGenerator {
+ private static final Pattern PATTERN = Pattern.compile("AugmentationIdentifier\\Q{\\EchildNames=\\Q[\\E(.*)\\Q]}\\E");
+ private static final Splitter COMMA_SPLITTER = Splitter.on(',').trimResults();
+
private final String id;
- private static final Pattern pattern = Pattern.compile("AugmentationIdentifier\\Q{\\EchildNames=\\Q[\\E(.*)\\Q]}\\E");
private final Matcher matcher;
private final boolean doesMatch;
- public AugmentationIdentifierGenerator(String id){
+ public AugmentationIdentifierGenerator(String id) {
this.id = id;
- matcher = pattern.matcher(this.id);
+ matcher = PATTERN.matcher(this.id);
doesMatch = matcher.matches();
}
- public boolean matches(){
+ public boolean matches() {
return doesMatch;
}
- public YangInstanceIdentifier.AugmentationIdentifier getPathArgument(){
- Set<QName> childNames = new HashSet<QName>();
+ public YangInstanceIdentifier.AugmentationIdentifier getPathArgument() {
final String childQNames = matcher.group(1);
- final String[] splitChildQNames = childQNames.split(",");
-
- for(String name : splitChildQNames){
- childNames.add(
- org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory
- .create(name.trim()));
+ final Set<QName> childNames = new HashSet<>();
+ for (String name : COMMA_SPLITTER.split(childQNames)) {
+ childNames.add(QNameFactory.create(name));
}
return new YangInstanceIdentifier.AugmentationIdentifier(childNames);
package org.opendaylight.controller.cluster.datastore.node.utils;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
+import com.google.common.base.Splitter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
public class PathUtils {
+ private static final Splitter SLASH_SPLITTER = Splitter.on('/').omitEmptyStrings();
/**
* Given a YangInstanceIdentifier return a serialized version of the same
* @param path
* @return
*/
- public static String toString(YangInstanceIdentifier path){
- StringBuilder sb = new StringBuilder();
- Iterator<YangInstanceIdentifier.PathArgument> iterator =
+ public static String toString(YangInstanceIdentifier path) {
+ final Iterator<YangInstanceIdentifier.PathArgument> it =
path.getPathArguments().iterator();
+ if (!it.hasNext()) {
+ return "";
+ }
- while(iterator.hasNext()){
- sb.append(toString(iterator.next()));
- if(iterator.hasNext()){
- sb.append("/");
+ final StringBuilder sb = new StringBuilder();
+ for (;;) {
+ sb.append(toString(it.next()));
+ if (!it.hasNext()) {
+ break;
}
+ sb.append('/');
}
+
return sb.toString();
}
* @return
*/
public static YangInstanceIdentifier toYangInstanceIdentifier(String path){
- String[] segments = path.split("/");
-
List<YangInstanceIdentifier.PathArgument> pathArguments = new ArrayList<>();
- for (String segment : segments) {
- if (!"".equals(segment)) {
- pathArguments.add(NodeIdentifierFactory.getArgument(segment));
- }
+ for (String segment : SLASH_SPLITTER.split(path)) {
+ pathArguments.add(NodeIdentifierFactory.getArgument(segment));
}
return YangInstanceIdentifier.create(pathArguments);
}
private static final int MAX_QNAME_CACHE_SIZE = 10000;
- private static LoadingCache<String, QName> cache = CacheBuilder.newBuilder()
+ private static final LoadingCache<String, QName> CACHE = CacheBuilder.newBuilder()
.maximumSize(MAX_QNAME_CACHE_SIZE)
.softValues()
.build(
new CacheLoader<String, QName>() {
+ @Override
public QName load(String key) {
return QName.create(key);
}
public static QName create(String name){
- return cache.getUnchecked(name);
+ return CACHE.getUnchecked(name);
}
}
package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
import com.google.protobuf.ByteString;
-import java.math.BigDecimal;
-import java.math.BigInteger;
import java.util.HashSet;
import java.util.Set;
-import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private static Object deSerializeBasicTypes(int valueType, String value) {
- switch(ValueType.values()[valueType]){
- case SHORT_TYPE: {
- return Short.valueOf(value);
- }
- case BOOL_TYPE: {
- return Boolean.valueOf(value);
- }
- case BYTE_TYPE: {
- return Byte.valueOf(value);
- }
- case INT_TYPE : {
- return Integer.valueOf(value);
- }
- case LONG_TYPE: {
- return Long.valueOf(value);
- }
- case QNAME_TYPE: {
- return QNameFactory.create(value);
- }
- case BIG_INTEGER_TYPE: {
- return new BigInteger(value);
- }
- case BIG_DECIMAL_TYPE: {
- return new BigDecimal(value);
- }
- default: {
- return value;
- }
- }
+ return ValueType.values()[valueType].deserialize(value);
}
}
package org.opendaylight.controller.cluster.datastore.node.utils.serialization;
import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
import java.math.BigDecimal;
import java.math.BigInteger;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
public enum ValueType {
- SHORT_TYPE,
- BYTE_TYPE,
- INT_TYPE,
- LONG_TYPE,
- BOOL_TYPE,
- QNAME_TYPE,
- BITS_TYPE,
- YANG_IDENTIFIER_TYPE,
- STRING_TYPE,
- BIG_INTEGER_TYPE,
- BIG_DECIMAL_TYPE,
- BINARY_TYPE;
+ SHORT_TYPE {
+ @Override
+ Object deserialize(final String str) {
+ return Short.valueOf(str);
+ }
+ },
+ BYTE_TYPE {
+ @Override
+ Object deserialize(final String str) {
+ return Byte.valueOf(str);
+ }
+ },
+ INT_TYPE {
+ @Override
+ Object deserialize(final String str) {
+ return Integer.valueOf(str);
+ }
+ },
+ LONG_TYPE {
+ @Override
+ Object deserialize(final String str) {
+ return Long.valueOf(str);
+ }
+ },
+ BOOL_TYPE {
+ @Override
+ Object deserialize(final String str) {
+ return Boolean.valueOf(str);
+ }
+ },
+ QNAME_TYPE {
+ @Override
+ Object deserialize(final String str) {
+ return QNameFactory.create(str);
+ }
+ },
+ BITS_TYPE {
+ @Override
+ Object deserialize(final String str) {
+ throw new UnsupportedOperationException("Should have been caught by caller");
+ }
+ },
+ YANG_IDENTIFIER_TYPE {
+ @Override
+ Object deserialize(final String str) {
+ throw new UnsupportedOperationException("Should have been caught by caller");
+ }
+ },
+ STRING_TYPE {
+ @Override
+ Object deserialize(final String str) {
+ return str;
+ }
+ },
+ BIG_INTEGER_TYPE {
+ @Override
+ Object deserialize(final String str) {
+ return new BigInteger(str);
+ }
+ },
+ BIG_DECIMAL_TYPE {
+ @Override
+ Object deserialize(final String str) {
+ return new BigDecimal(str);
+ }
+ },
+ BINARY_TYPE {
+ @Override
+ Object deserialize(final String str) {
+ throw new UnsupportedOperationException("Should have been caught by caller");
+ }
+ };
- private static Map<Class<?>, ValueType> types = new HashMap<>();
+ private static final Map<Class<?>, ValueType> TYPES;
static {
- types.put(String.class, STRING_TYPE);
- types.put(Byte.class, BYTE_TYPE);
- types.put(Integer.class, INT_TYPE);
- types.put(Long.class, LONG_TYPE);
- types.put(Boolean.class, BOOL_TYPE);
- types.put(QName.class, QNAME_TYPE);
- types.put(Set.class, BITS_TYPE);
- types.put(YangInstanceIdentifier.class, YANG_IDENTIFIER_TYPE);
- types.put(Short.class,SHORT_TYPE);
- types.put(BigInteger.class, BIG_INTEGER_TYPE);
- types.put(BigDecimal.class, BIG_DECIMAL_TYPE);
- types.put(byte[].class, BINARY_TYPE);
+ final Builder<Class<?>, ValueType> b = ImmutableMap.builder();
+
+ b.put(String.class, STRING_TYPE);
+ b.put(Byte.class, BYTE_TYPE);
+ b.put(Integer.class, INT_TYPE);
+ b.put(Long.class, LONG_TYPE);
+ b.put(Boolean.class, BOOL_TYPE);
+ b.put(QName.class, QNAME_TYPE);
+ b.put(YangInstanceIdentifier.class, YANG_IDENTIFIER_TYPE);
+ b.put(Short.class,SHORT_TYPE);
+ b.put(BigInteger.class, BIG_INTEGER_TYPE);
+ b.put(BigDecimal.class, BIG_DECIMAL_TYPE);
+ b.put(byte[].class, BINARY_TYPE);
+
+ TYPES = b.build();
}
- public static final ValueType getSerializableType(Object node){
+ abstract Object deserialize(String str);
+
+ public static final ValueType getSerializableType(Object node) {
Preconditions.checkNotNull(node, "node should not be null");
- ValueType type = types.get(node.getClass());
- if(type != null) {
+ final ValueType type = TYPES.get(node.getClass());
+ if (type != null) {
return type;
- } else if(node instanceof Set){
+ }
+ if (node instanceof Set) {
return BITS_TYPE;
}
package org.opendaylight.controller.cluster.datastore.node.utils.stream;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
import java.math.BigDecimal;
import java.math.BigInteger;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-public class ValueTypes {
+final class ValueTypes {
public static final byte SHORT_TYPE = 1;
public static final byte BYTE_TYPE = 2;
public static final byte INT_TYPE = 3;
public static final byte BIG_DECIMAL_TYPE = 11;
public static final byte BINARY_TYPE = 12;
- private static Map<Class<?>, Byte> types = new HashMap<>();
+ private static final Map<Class<?>, Byte> TYPES;
static {
- types.put(String.class, Byte.valueOf(STRING_TYPE));
- types.put(Byte.class, Byte.valueOf(BYTE_TYPE));
- types.put(Integer.class, Byte.valueOf(INT_TYPE));
- types.put(Long.class, Byte.valueOf(LONG_TYPE));
- types.put(Boolean.class, Byte.valueOf(BOOL_TYPE));
- types.put(QName.class, Byte.valueOf(QNAME_TYPE));
- types.put(Set.class, Byte.valueOf(BITS_TYPE));
- types.put(YangInstanceIdentifier.class, Byte.valueOf(YANG_IDENTIFIER_TYPE));
- types.put(Short.class, Byte.valueOf(SHORT_TYPE));
- types.put(BigInteger.class, Byte.valueOf(BIG_INTEGER_TYPE));
- types.put(BigDecimal.class, Byte.valueOf(BIG_DECIMAL_TYPE));
- types.put(byte[].class, Byte.valueOf(BINARY_TYPE));
+ final Builder<Class<?>, Byte> b = ImmutableMap.builder();
+
+ b.put(String.class, Byte.valueOf(STRING_TYPE));
+ b.put(Byte.class, Byte.valueOf(BYTE_TYPE));
+ b.put(Integer.class, Byte.valueOf(INT_TYPE));
+ b.put(Long.class, Byte.valueOf(LONG_TYPE));
+ b.put(Boolean.class, Byte.valueOf(BOOL_TYPE));
+ b.put(QName.class, Byte.valueOf(QNAME_TYPE));
+ b.put(YangInstanceIdentifier.class, Byte.valueOf(YANG_IDENTIFIER_TYPE));
+ b.put(Short.class, Byte.valueOf(SHORT_TYPE));
+ b.put(BigInteger.class, Byte.valueOf(BIG_INTEGER_TYPE));
+ b.put(BigDecimal.class, Byte.valueOf(BIG_DECIMAL_TYPE));
+ b.put(byte[].class, Byte.valueOf(BINARY_TYPE));
+
+ TYPES = b.build();
}
- public static final byte getSerializableType(Object node){
+ private ValueTypes() {
+ throw new UnsupportedOperationException("Utility class");
+ }
+
+ public static final byte getSerializableType(Object node) {
Preconditions.checkNotNull(node, "node should not be null");
- Byte type = types.get(node.getClass());
- if(type != null) {
+ final Byte type = TYPES.get(node.getClass());
+ if (type != null) {
return type;
- } else if(node instanceof Set){
+ }
+ if (node instanceof Set) {
return BITS_TYPE;
}
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ListMultimap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigObject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class ConfigurationImpl implements Configuration {
- private final List<ModuleShard> moduleShards = new ArrayList<>();
+ private final List<ModuleShard> moduleShards;
- private final List<Module> modules = new ArrayList<>();
+ private final List<Module> modules;
private static final Logger
LOG = LoggerFactory.getLogger(DistributedDataStore.class);
// key = shardName, value = list of replicaNames (replicaNames are the same as memberNames)
private final Map<String, List<String>> shardReplicaNames = new HashMap<>();
+ private final ListMultimap<String, String> moduleNameToShardName;
+ private final Map<String, ShardStrategy> moduleNameToStrategy;
+ private final Map<String, String> namespaceToModuleName;
+ private final Set<String> allShardNames;
public ConfigurationImpl(final String moduleShardsConfigPath,
modulesConfig = ConfigFactory.load(modulesConfigPath);
}
- readModuleShards(moduleShardsConfig);
+ this.moduleShards = readModuleShards(moduleShardsConfig);
+ this.modules = readModules(modulesConfig);
- readModules(modulesConfig);
+ this.allShardNames = createAllShardNames(moduleShards);
+ this.moduleNameToShardName = createModuleNameToShardName(moduleShards);
+ this.moduleNameToStrategy = createModuleNameToStrategy(modules);
+ this.namespaceToModuleName = createNamespaceToModuleName(modules);
+ }
+
+ private static Set<String> createAllShardNames(Iterable<ModuleShard> moduleShards) {
+ final com.google.common.collect.ImmutableSet.Builder<String> b = ImmutableSet.builder();
+ for(ModuleShard ms : moduleShards){
+ for(Shard s : ms.getShards()) {
+ b.add(s.getName());
+ }
+ }
+ return b.build();
+ }
+
+ private static Map<String, ShardStrategy> createModuleNameToStrategy(Iterable<Module> modules) {
+ final com.google.common.collect.ImmutableMap.Builder<String, ShardStrategy> b = ImmutableMap.builder();
+ for (Module m : modules) {
+ b.put(m.getName(), m.getShardStrategy());
+ }
+ return b.build();
+ }
+
+ private static Map<String, String> createNamespaceToModuleName(Iterable<Module> modules) {
+ final com.google.common.collect.ImmutableMap.Builder<String, String> b = ImmutableMap.builder();
+ for (Module m : modules) {
+ b.put(m.getNameSpace(), m.getName());
+ }
+ return b.build();
+ }
+
+ private static ListMultimap<String, String> createModuleNameToShardName(Iterable<ModuleShard> moduleShards) {
+ final com.google.common.collect.ImmutableListMultimap.Builder<String, String> b = ImmutableListMultimap.builder();
+
+ for (ModuleShard m : moduleShards) {
+ for (Shard s : m.getShards()) {
+ b.put(m.getModuleName(), s.getName());
+ }
+ }
+
+ return b.build();
}
@Override public List<String> getMemberShardNames(final String memberName){
}
- @Override public Optional<String> getModuleNameFromNameSpace(final String nameSpace) {
-
+ @Override
+ public Optional<String> getModuleNameFromNameSpace(final String nameSpace) {
Preconditions.checkNotNull(nameSpace, "nameSpace should not be null");
-
- for(Module m : modules){
- if(m.getNameSpace().equals(nameSpace)){
- return Optional.of(m.getName());
- }
- }
- return Optional.absent();
+ return Optional.fromNullable(namespaceToModuleName.get(nameSpace));
}
- @Override public Map<String, ShardStrategy> getModuleNameToShardStrategyMap() {
- Map<String, ShardStrategy> map = new HashMap<>();
- for(Module m : modules){
- map.put(m.getName(), m.getShardStrategy());
- }
- return map;
+ @Override
+ public Map<String, ShardStrategy> getModuleNameToShardStrategyMap() {
+ return moduleNameToStrategy;
}
- @Override public List<String> getShardNamesFromModuleName(final String moduleName) {
-
+ @Override
+ public List<String> getShardNamesFromModuleName(final String moduleName) {
Preconditions.checkNotNull(moduleName, "moduleName should not be null");
-
- for(ModuleShard m : moduleShards){
- if(m.getModuleName().equals(moduleName)){
- List<String> l = new ArrayList<>();
- for(Shard s : m.getShards()){
- l.add(s.getName());
- }
- return l;
- }
- }
-
- return Collections.emptyList();
+ return moduleNameToShardName.get(moduleName);
}
@Override public List<String> getMembersFromShardName(final String shardName) {
return Collections.emptyList();
}
- @Override public Set<String> getAllShardNames() {
- Set<String> shardNames = new LinkedHashSet<>();
- for(ModuleShard ms : moduleShards){
- for(Shard s : ms.getShards()) {
- shardNames.add(s.getName());
- }
- }
- return shardNames;
+ @Override
+ public Set<String> getAllShardNames() {
+ return allShardNames;
}
-
-
- private void readModules(final Config modulesConfig) {
+ private List<Module> readModules(final Config modulesConfig) {
List<? extends ConfigObject> modulesConfigObjectList =
modulesConfig.getObjectList("modules");
+ final Builder<Module> b = ImmutableList.builder();
for(ConfigObject o : modulesConfigObjectList){
ConfigObjectWrapper w = new ConfigObjectWrapper(o);
- modules.add(new Module(w.stringValue("name"), w.stringValue(
+ b.add(new Module(w.stringValue("name"), w.stringValue(
"namespace"), w.stringValue("shard-strategy")));
}
+
+ return b.build();
}
- private void readModuleShards(final Config moduleShardsConfig) {
+ private static List<ModuleShard> readModuleShards(final Config moduleShardsConfig) {
List<? extends ConfigObject> moduleShardsConfigObjectList =
moduleShardsConfig.getObjectList("module-shards");
+ final Builder<ModuleShard> b = ImmutableList.builder();
for(ConfigObject moduleShardConfigObject : moduleShardsConfigObjectList){
String moduleName = moduleShardConfigObject.get("name").unwrapped().toString();
shards.add(new Shard(shardName, replicas));
}
- this.moduleShards.add(new ModuleShard(moduleName, shards));
+ b.add(new ModuleShard(moduleName, shards));
}
- }
+ return b.build();
+ }
- private class ModuleShard {
+ private static class ModuleShard {
private final String moduleName;
private final List<Shard> shards;
}
}
- private class Shard {
+ private static class Shard {
private final String name;
private final List<String> replicas;
if(ModuleShardStrategy.NAME.equals(shardStrategy)){
this.shardStrategy = new ModuleShardStrategy(name, ConfigurationImpl.this);
} else {
- this.shardStrategy = new DefaultShardStrategy();
+ this.shardStrategy = DefaultShardStrategy.getInstance();
}
}
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
private final String transactionChainId;
private final SchemaContext schemaContext;
private boolean inReadyState;
+ private final Semaphore operationLimiter;
+ private final OperationCompleter operationCompleter;
public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
this(actorContext, transactionType, "");
phantomReferenceCache.put(cleanup, cleanup);
}
+ // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
+ this.operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
+ this.operationCompleter = new OperationCompleter(operationLimiter);
+
LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId);
}
LOG.debug("Tx {} read {}", identifier, path);
+ throttleOperation();
+
TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
return txFutureCallback.enqueueReadOperation(new ReadOperation<Optional<NormalizedNode<?, ?>>>() {
@Override
LOG.debug("Tx {} exists {}", identifier, path);
+ throttleOperation();
+
TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
return txFutureCallback.enqueueReadOperation(new ReadOperation<Boolean>() {
@Override
"Transaction is sealed - further modifications are not allowed");
}
+ private void throttleOperation() {
+ throttleOperation(1);
+ }
+
+ private void throttleOperation(int acquirePermits) {
+ try {
+ if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
+ LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
+ }
+ } catch (InterruptedException e) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
+ } else {
+ LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
+ }
+ }
+ }
+
+
@Override
public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
LOG.debug("Tx {} write {}", identifier, path);
+ throttleOperation();
+
TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
@Override
LOG.debug("Tx {} merge {}", identifier, path);
+ throttleOperation();
+
TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
@Override
LOG.debug("Tx {} delete {}", identifier, path);
+ throttleOperation();
+
TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
@Override
checkModificationState();
+ throttleOperation(txFutureCallbackMap.size());
+
inReadyState = true;
LOG.debug("Tx {} Readying {} transactions for commit", identifier,
LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
failure.getMessage());
- localTransactionContext = new NoOpTransactionContext(failure, identifier);
+ localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
} else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
localTransactionContext = createValidTransactionContext(
CreateTransactionReply.fromSerializable(response));
IllegalArgumentException exception = new IllegalArgumentException(String.format(
"Invalid reply type %s for CreateTransaction", response.getClass()));
- localTransactionContext = new NoOpTransactionContext(exception, identifier);
+ localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter);
}
for(TransactionOperation oper: txOperationsOnComplete) {
boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
return new TransactionContextImpl(transactionPath, transactionActor, identifier,
- actorContext, schemaContext, isTxActorLocal, reply.getVersion());
+ actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
}
}
private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
private final ActorContext actorContext;
- private final SchemaContext schemaContext;
private final String transactionPath;
private final ActorSelection actor;
private final boolean isTxActorLocal;
private final short remoteTransactionVersion;
+ private final OperationCompleter operationCompleter;
+
private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
ActorContext actorContext, SchemaContext schemaContext,
- boolean isTxActorLocal, short remoteTransactionVersion) {
+ boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) {
super(identifier);
this.transactionPath = transactionPath;
this.actor = actor;
this.actorContext = actorContext;
- this.schemaContext = schemaContext;
this.isTxActorLocal = isTxActorLocal;
this.remoteTransactionVersion = remoteTransactionVersion;
+ this.operationCompleter = operationCompleter;
}
+ private Future<Object> completeOperation(Future<Object> operationFuture){
+ operationFuture.onComplete(this.operationCompleter, actorContext.getActorSystem().dispatcher());
+ return operationFuture;
+ }
+
+
private ActorSelection getActor() {
return actor;
}
private Future<Object> executeOperationAsync(SerializableMessage msg) {
- return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable());
+ return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
}
private Future<Object> executeOperationAsync(VersionedSerializableMessage msg) {
- return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
- msg.toSerializable(remoteTransactionVersion));
+ return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
+ msg.toSerializable(remoteTransactionVersion)));
}
@Override
private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
private final Throwable failure;
+ private final Semaphore operationLimiter;
- public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier){
+ public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, Semaphore operationLimiter){
super(identifier);
this.failure = failure;
+ this.operationLimiter = operationLimiter;
}
@Override
@Override
public Future<ActorSelection> readyTransaction() {
LOG.debug("Tx {} readyTransaction called", identifier);
+ operationLimiter.release();
return akka.dispatch.Futures.failed(failure);
}
@Override
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+ operationLimiter.release();
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+ operationLimiter.release();
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", identifier, path);
+ operationLimiter.release();
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
YangInstanceIdentifier path) {
LOG.debug("Tx {} readData called path = {}", identifier, path);
+ operationLimiter.release();
return Futures.immediateFailedCheckedFuture(new ReadFailedException(
"Error reading data for path " + path, failure));
}
public CheckedFuture<Boolean, ReadFailedException> dataExists(
YangInstanceIdentifier path) {
LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+ operationLimiter.release();
return Futures.immediateFailedCheckedFuture(new ReadFailedException(
"Error checking exists for path " + path, failure));
}
}
+
+ private static class OperationCompleter extends OnComplete<Object> {
+ private final Semaphore operationLimiter;
+ OperationCompleter(Semaphore operationLimiter){
+ this.operationLimiter = operationLimiter;
+ }
+
+ @Override
+ public void onComplete(Throwable throwable, Object o){
+ this.operationLimiter.release();
+ }
+ }
}
* The default shard stores data for all modules for which a specific set of shards has not been configured
* </p>
*/
-public class DefaultShardStrategy implements ShardStrategy{
+public final class DefaultShardStrategy implements ShardStrategy {
+ public static final String NAME = "default";
+ public static final String DEFAULT_SHARD = "default";
+ private static final DefaultShardStrategy INSTANCE = new DefaultShardStrategy();
- public static final String NAME = "default";
- public static final String DEFAULT_SHARD = "default";
+ private DefaultShardStrategy() {
+ // Hidden to force a singleton instnace
+ }
- @Override
- public String findShard(YangInstanceIdentifier path) {
- return DEFAULT_SHARD;
- }
+ public static DefaultShardStrategy getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public String findShard(YangInstanceIdentifier path) {
+ return DEFAULT_SHARD;
+ }
}
package org.opendaylight.controller.cluster.datastore.shardstrategy;
+import java.util.List;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import java.util.List;
-
public class ModuleShardStrategy implements ShardStrategy {
public static final String NAME = "module";
this.configuration = configuration;
}
- @Override public String findShard(YangInstanceIdentifier path) {
+ @Override
+ public String findShard(YangInstanceIdentifier path) {
List<String> shardNames =
configuration.getShardNamesFromModuleName(moduleName);
- if(shardNames.size() == 0){
+ if (shardNames.isEmpty()) {
return DefaultShardStrategy.DEFAULT_SHARD;
}
return shardNames.get(0);
String moduleName = getModuleName(path);
ShardStrategy shardStrategy = moduleNameToStrategyMap.get(moduleName);
if (shardStrategy == null) {
- return new DefaultShardStrategy();
+ return DefaultShardStrategy.getInstance();
}
return shardStrategy;
package org.opendaylight.controller.cluster.datastore.utils;
+import static akka.pattern.Patterns.ask;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import static akka.pattern.Patterns.ask;
-
/**
* The ActorContext class contains utility methods which could be used by
* non-actors (like DistributedDataStore) to work with actors a little more
private final FiniteDuration operationDuration;
private final Timeout operationTimeout;
private final String selfAddressHostPort;
+ private final int transactionOutstandingOperationLimit;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration) {
} else {
selfAddressHostPort = null;
}
+
+ transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
}
public DatastoreContext getDatastoreContext() {
return builder.toString();
}
+
+ /**
+ * Get the maximum number of operations that are to be permitted within a transaction before the transaction
+ * should begin throttling the operations
+ *
+ * Parking reading this configuration here because we need to get to the actor system settings
+ *
+ * @return
+ */
+ public int getTransactionOutstandingOperationLimit(){
+ return transactionOutstandingOperationLimit;
+ }
}
package org.opendaylight.controller.cluster.datastore;
import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-public class TransactionChainProxyTest {
- ActorContext actorContext = mock(ActorContext.class);
+public class TransactionChainProxyTest extends AbstractActorTest{
+ ActorContext actorContext = null;
SchemaContext schemaContext = mock(SchemaContext.class);
@Before
public void setUp() {
- doReturn(schemaContext).when(actorContext).getSchemaContext();
+ actorContext = new MockActorContext(getSystem());
+ actorContext.setSchemaContext(schemaContext);
}
@SuppressWarnings("resource")
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
schemaContext = TestModel.createTestContext();
- DatastoreContext dataStoreContext = DatastoreContext.newBuilder().build();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
doReturn(getSystem()).when(mockActorContext).getActorSystem();
doReturn(memberName).when(mockActorContext).getCurrentMemberName();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
+ doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
ShardStrategyFactory.setConfiguration(configuration);
}
return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
}
+ private Future<Object> incompleteFuture(){
+ return mock(Future.class);
+ }
+
private Future<MergeDataReply> mergeDataReply() {
return Futures.successful(new MergeDataReply());
}
executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
eqCreateTransaction(memberName, type));
+ doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
+
+ doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
return actorRef;
}
verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
}
+
+ private static interface TransactionProxyOperation {
+ void run(TransactionProxy transactionProxy);
+ }
+
+ private void throttleOperation(TransactionProxyOperation operation) {
+ throttleOperation(operation, 1, true);
+ }
+
+ private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+ ActorSystem actorSystem = getSystem();
+ ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+ doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+ doReturn(actorSystem.actorSelection(shardActorRef.path())).
+ when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+ if(shardFound) {
+ doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ } else {
+ doReturn(Futures.failed(new Exception("not found")))
+ .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ }
+
+ String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+ CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+ .setTransactionId("txn-1")
+ .setTransactionActorPath(actorPath)
+ .build();
+
+ doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+ eqCreateTransaction(memberName, READ_WRITE));
+
+ doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+ long start = System.currentTimeMillis();
+
+ operation.run(transactionProxy);
+
+ long end = System.currentTimeMillis();
+
+ Assert.assertTrue(String.format("took less time than expected %s was %s",
+ mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
+ (end-start)), (end - start) > mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
+
+ }
+
+ private void completeOperation(TransactionProxyOperation operation){
+ completeOperation(operation, true);
+ }
+
+ private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
+ ActorSystem actorSystem = getSystem();
+ ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+ doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+ doReturn(actorSystem.actorSelection(shardActorRef.path())).
+ when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+ if(shardFound) {
+ doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ } else {
+ doReturn(Futures.failed(new Exception("not found")))
+ .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ }
+
+ String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+ CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+ .setTransactionId("txn-1")
+ .setTransactionActorPath(actorPath)
+ .build();
+
+ doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+ eqCreateTransaction(memberName, READ_WRITE));
+
+ doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+ long start = System.currentTimeMillis();
+
+ operation.run(transactionProxy);
+
+ long end = System.currentTimeMillis();
+
+ Assert.assertTrue(String.format("took more time than expected %s was %s",
+ mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
+ (end-start)), (end - start) <= mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
+ }
+
+ public void testWriteThrottling(boolean shardFound){
+
+ throttleOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+ }
+ }, 1, shardFound);
+ }
+
+ @Test
+ public void testWriteThrottlingWhenShardFound(){
+ throttleOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+ }
+ });
+
+ }
+
+ @Test
+ public void testWriteThrottlingWhenShardNotFound(){
+ // Confirm that there is no throttling when the Shard is not found
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+ }
+ }, false);
+
+ }
+
+
+ @Test
+ public void testWriteCompletion(){
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+ }
+ });
+
+ }
+
+ @Test
+ public void testMergeThrottlingWhenShardFound(){
+
+ throttleOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+ }
+ });
+ }
+
+ @Test
+ public void testMergeThrottlingWhenShardNotFound(){
+
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+ }
+ }, false);
+ }
+
+ @Test
+ public void testMergeCompletion(){
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+ }
+ });
+
+ }
+
+ @Test
+ public void testDeleteThrottlingWhenShardFound(){
+
+ throttleOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqDeleteData());
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+ }
+ });
+ }
+
+
+ @Test
+ public void testDeleteThrottlingWhenShardNotFound(){
+
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqDeleteData());
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+ }
+ }, false);
+ }
+
+ @Test
+ public void testDeleteCompletion(){
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqDeleteData());
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+ }
+ });
+
+ }
+
+ @Test
+ public void testReadThrottlingWhenShardFound(){
+
+ throttleOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqReadData());
+
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+ }
+ });
+ }
+
+ @Test
+ public void testReadThrottlingWhenShardNotFound(){
+
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqReadData());
+
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+ }
+ }, false);
+ }
+
+
+ @Test
+ public void testReadCompletion(){
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqReadData());
+
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+ }
+ });
+
+ }
+
+ @Test
+ public void testExistsThrottlingWhenShardFound(){
+
+ throttleOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqDataExists());
+
+ transactionProxy.exists(TestModel.TEST_PATH);
+
+ transactionProxy.exists(TestModel.TEST_PATH);
+ }
+ });
+ }
+
+ @Test
+ public void testExistsThrottlingWhenShardNotFound(){
+
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqDataExists());
+
+ transactionProxy.exists(TestModel.TEST_PATH);
+
+ transactionProxy.exists(TestModel.TEST_PATH);
+ }
+ }, false);
+ }
+
+
+ @Test
+ public void testExistsCompletion(){
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqDataExists());
+
+ transactionProxy.exists(TestModel.TEST_PATH);
+
+ transactionProxy.exists(TestModel.TEST_PATH);
+ }
+ });
+
+ }
+
+ @Test
+ public void testReadyThrottling(){
+
+ throttleOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), any(ReadyTransaction.class));
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.ready();
+ }
+ });
+ }
+
+ @Test
+ public void testReadyThrottlingWithTwoTransactionContexts(){
+
+ throttleOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqWriteData(carsNode));
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), any(ReadyTransaction.class));
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, carsNode);
+
+ transactionProxy.ready();
+ }
+ }, 2, true);
+ }
}
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
public class DefaultShardStrategyTest {
-
- @Test
- public void testFindShard() throws Exception {
- String shard = new DefaultShardStrategy().findShard(TestModel.TEST_PATH);
- Assert.assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shard);
- }
+ @Test
+ public void testFindShard() throws Exception {
+ String shard = DefaultShardStrategy.getInstance().findShard(TestModel.TEST_PATH);
+ Assert.assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shard);
+ }
}
\ No newline at end of file
package org.opendaylight.controller.cluster.datastore.utils;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.UntypedActor;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
public class ActorContextTest extends AbstractActorTest{
private static class MockShardManager extends UntypedActor {
@Test
public void testResolvePathForRemoteActor() {
ActorContext actorContext =
- new ActorContext(mock(ActorSystem.class), mock(ActorRef.class), mock(
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(
ClusterWrapper.class),
mock(Configuration.class));
public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
"test");
+
public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");