} else if (message instanceof PrintRole) {
if(LOG.isDebugEnabled()) {
- String followers = "";
if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
- followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
+ final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
getRaftActorContext().getPeerAddresses().keySet(), followers);
} else {
package org.opendaylight.controller.cluster.raft;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/**
// We define this as ArrayList so we can use ensureCapacity.
protected ArrayList<ReplicatedLogEntry> journal;
- protected long snapshotIndex = -1;
- protected long snapshotTerm = -1;
+ private long snapshotIndex = -1;
+ private long snapshotTerm = -1;
// to be used for rollback during save snapshot failure
- protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
- protected long previousSnapshotIndex = -1;
- protected long previousSnapshotTerm = -1;
+ private ArrayList<ReplicatedLogEntry> snapshottedJournal;
+ private long previousSnapshotIndex = -1;
+ private long previousSnapshotTerm = -1;
protected int dataSize = 0;
public AbstractReplicatedLogImpl(long snapshotIndex,
}
public AbstractReplicatedLogImpl() {
- this.journal = new ArrayList<>();
+ this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList());
}
protected int adjustedIndex(long logEntryIndex) {
public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
int adjustedIndex = adjustedIndex(logEntryIndex);
int size = journal.size();
- List<ReplicatedLogEntry> entries = new ArrayList<>(100);
if (adjustedIndex >= 0 && adjustedIndex < size) {
// physical index should be less than list size and >= 0
int maxIndex = adjustedIndex + max;
if(maxIndex > size){
maxIndex = size;
}
- entries.addAll(journal.subList(adjustedIndex, maxIndex));
+ return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
+ } else {
+ return Collections.emptyList();
}
- return entries;
}
-
@Override
public long size() {
return journal.size();
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* The state of the followers log as known by the Leader
*/
* Increment the value of the nextIndex
* @return
*/
- public long incrNextIndex();
+ long incrNextIndex();
/**
* Decrement the value of the nextIndex
* @return
*/
- public long decrNextIndex();
+ long decrNextIndex();
/**
*
* Increment the value of the matchIndex
* @return
*/
- public long incrMatchIndex();
+ long incrMatchIndex();
- public void setMatchIndex(long matchIndex);
+ void setMatchIndex(long matchIndex);
/**
* The identifier of the follower
* This could simply be the url of the remote actor
*/
- public String getId();
+ String getId();
/**
* for each server, index of the next log entry
* to send to that server (initialized to leader
* last log index + 1)
*/
- public AtomicLong getNextIndex();
+ long getNextIndex();
/**
* for each server, index of highest log entry
* known to be replicated on server
* (initialized to 0, increases monotonically)
*/
- public AtomicLong getMatchIndex();
+ long getMatchIndex();
/**
* Checks if the follower is active by comparing the last updated with the duration
* @return boolean
*/
- public boolean isFollowerActive();
+ boolean isFollowerActive();
/**
* restarts the timeout clock of the follower
*/
- public void markFollowerActive();
+ void markFollowerActive();
/**
* This will stop the timeout clock
*/
- public void markFollowerInActive();
-
-
+ void markFollowerInActive();
}
package org.opendaylight.controller.cluster.raft;
import com.google.common.base.Stopwatch;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import scala.concurrent.duration.FiniteDuration;
-public class FollowerLogInformationImpl implements FollowerLogInformation{
+public class FollowerLogInformationImpl implements FollowerLogInformation {
+ private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
+ private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex");
private final String id;
- private final AtomicLong nextIndex;
+ private final Stopwatch stopwatch = new Stopwatch();
- private final AtomicLong matchIndex;
+ private final long followerTimeoutMillis;
- private final Stopwatch stopwatch;
+ private volatile long nextIndex;
- private final long followerTimeoutMillis;
+ private volatile long matchIndex;
- public FollowerLogInformationImpl(String id, AtomicLong nextIndex,
- AtomicLong matchIndex, FiniteDuration followerTimeoutDuration) {
+ public FollowerLogInformationImpl(String id, long nextIndex,
+ long matchIndex, FiniteDuration followerTimeoutDuration) {
this.id = id;
this.nextIndex = nextIndex;
this.matchIndex = matchIndex;
- this.stopwatch = new Stopwatch();
this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
}
+ @Override
public long incrNextIndex(){
- return nextIndex.incrementAndGet();
+ return NEXT_INDEX_UPDATER.incrementAndGet(this);
}
- @Override public long decrNextIndex() {
- return nextIndex.decrementAndGet();
+ @Override
+ public long decrNextIndex() {
+ return NEXT_INDEX_UPDATER.decrementAndGet(this);
}
- @Override public void setNextIndex(long nextIndex) {
- this.nextIndex.set(nextIndex);
+ @Override
+ public void setNextIndex(long nextIndex) {
+ this.nextIndex = nextIndex;
}
+ @Override
public long incrMatchIndex(){
- return matchIndex.incrementAndGet();
+ return MATCH_INDEX_UPDATER.incrementAndGet(this);
}
- @Override public void setMatchIndex(long matchIndex) {
- this.matchIndex.set(matchIndex);
+ @Override
+ public void setMatchIndex(long matchIndex) {
+ this.matchIndex = matchIndex;
}
+ @Override
public String getId() {
return id;
}
- public AtomicLong getNextIndex() {
+ @Override
+ public long getNextIndex() {
return nextIndex;
}
- public AtomicLong getMatchIndex() {
+ @Override
+ public long getMatchIndex() {
return matchIndex;
}
timer.stop();
LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
replicatedLog.size(), persistenceId(), timer.toString(),
- replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+ replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
}
private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
"Persistence Id = " + persistenceId() +
" Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
"journal-size={}",
- replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
- replicatedLog.snapshotTerm, replicatedLog.size());
+ replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
+ replicatedLog.getSnapshotTerm(), replicatedLog.size());
initializeBehavior();
}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
import com.google.protobuf.ByteString;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
// This would be passed as the hash code of the last chunk when sending the first chunk
public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
- protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
- protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
-
- protected final Set<String> followers;
+ private final Map<String, FollowerLogInformation> followerToLog;
+ private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
private Cancellable heartbeatSchedule = null;
- private List<ClientRequestTracker> trackerList = new ArrayList<>();
+ private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
protected final int minReplicationCount;
public AbstractLeader(RaftActorContext context) {
super(context);
- followers = context.getPeerAddresses().keySet();
-
- for (String followerId : followers) {
+ final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
+ for (String followerId : context.getPeerAddresses().keySet()) {
FollowerLogInformation followerLogInformation =
new FollowerLogInformationImpl(followerId,
- new AtomicLong(context.getCommitIndex()),
- new AtomicLong(-1),
+ context.getCommitIndex(), -1,
context.getConfigParams().getElectionTimeOutInterval());
- followerToLog.put(followerId, followerLogInformation);
+ ftlBuilder.put(followerId, followerLogInformation);
}
+ followerToLog = ftlBuilder.build();
leaderId = context.getId();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Election:Leader has following peers: {}", followers);
- }
+ LOG.debug("Election:Leader has following peers: {}", getFollowerIds());
- minReplicationCount = getMajorityVoteCount(followers.size());
+ minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
// the isolated Leader peer count will be 1 less than the majority vote count.
// this is because the vote count has the self vote counted in it
scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
}
+ /**
+ * Return an immutable collection of follower identifiers.
+ *
+ * @return Collection of follower IDs
+ */
+ protected final Collection<String> getFollowerIds() {
+ return followerToLog.keySet();
+ }
+
private Optional<ByteString> getSnapshot() {
return snapshot;
}
int replicatedCount = 1;
for (FollowerLogInformation info : followerToLog.values()) {
- if (info.getMatchIndex().get() >= N) {
+ if (info.getMatchIndex() >= N) {
replicatedCount++;
}
}
return this;
}
+ @Override
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
-
- ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
- if(toRemove != null) {
- trackerList.remove(toRemove);
+ final Iterator<ClientRequestTracker> it = trackerList.iterator();
+ while (it.hasNext()) {
+ final ClientRequestTracker t = it.next();
+ if (t.getIndex() == logIndex) {
+ it.remove();
+ return t;
+ }
}
- return toRemove;
+ return null;
}
+ @Override
protected ClientRequestTracker findClientRequestTracker(long logIndex) {
for (ClientRequestTracker tracker : trackerList) {
if (tracker.getIndex() == logIndex) {
mapFollowerToSnapshot.remove(followerId);
if(LOG.isDebugEnabled()) {
- LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
- followerToLog.get(followerId).getNextIndex().get());
+ LOG.debug("followerToLog.get(followerId).getNextIndex()=" +
+ followerToLog.get(followerId).getNextIndex());
}
if (mapFollowerToSnapshot.isEmpty()) {
logIndex)
);
- if (followers.size() == 0) {
+ if (followerToLog.isEmpty()) {
context.setCommitIndex(logIndex);
applyLogToStateMachine(logIndex);
} else {
private void sendAppendEntries() {
// Send an AppendEntries to all followers
- for (String followerId : followers) {
+ for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+ final String followerId = e.getKey();
ActorSelection followerActor = context.getPeerActorSelection(followerId);
if (followerActor != null) {
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long followerNextIndex = followerLogInformation.getNextIndex().get();
+ long followerNextIndex = followerLogInformation.getNextIndex();
boolean isFollowerActive = followerLogInformation.isFollowerActive();
- List<ReplicatedLogEntry> entries = null;
if (mapFollowerToSnapshot.get(followerId) != null) {
// if install snapshot is in process , then sent next chunk if possible
} else {
long leaderLastIndex = context.getReplicatedLog().lastIndex();
long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+ final List<ReplicatedLogEntry> entries;
if (isFollowerActive &&
context.getReplicatedLog().isPresent(followerNextIndex)) {
*
*/
private void installSnapshotIfNeeded() {
- for (String followerId : followers) {
- ActorSelection followerActor =
- context.getPeerActorSelection(followerId);
+ for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+ final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
- if(followerActor != null) {
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- long nextIndex = followerLogInformation.getNextIndex().get();
+ if (followerActor != null) {
+ long nextIndex = e.getValue().getNextIndex();
if (!context.getReplicatedLog().isPresent(nextIndex) &&
context.getReplicatedLog().isInSnapshot(nextIndex)) {
- LOG.info("{} follower needs a snapshot install", followerId);
+ LOG.info("{} follower needs a snapshot install", e.getKey());
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
// no need to capture snapshot
- sendSnapshotChunk(followerActor, followerId);
+ sendSnapshotChunk(followerActor, e.getKey());
} else {
initiateCaptureSnapshot();
private void sendInstallSnapshot() {
- for (String followerId : followers) {
- ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+ ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
- if(followerActor != null) {
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long nextIndex = followerLogInformation.getNextIndex().get();
+ if (followerActor != null) {
+ long nextIndex = e.getValue().getNextIndex();
if (!context.getReplicatedLog().isPresent(nextIndex) &&
context.getReplicatedLog().isInSnapshot(nextIndex)) {
- sendSnapshotChunk(followerActor, followerId);
+ sendSnapshotChunk(followerActor, e.getKey());
}
}
}
}
private void sendHeartBeat() {
- if (followers.size() > 0) {
+ if (!followerToLog.isEmpty()) {
sendAppendEntries();
}
}
}
private void scheduleHeartBeat(FiniteDuration interval) {
- if(followers.size() == 0){
+ if (followerToLog.isEmpty()) {
// Optimization - do not bother scheduling a heartbeat as there are
// no followers
return;
// called from example-actor for printing the follower-states
public String printFollowerStates() {
- StringBuilder sb = new StringBuilder();
- for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
- boolean isFollowerActive = followerLogInformation.isFollowerActive();
- sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+ final StringBuilder sb = new StringBuilder();
+ sb.append('[');
+ for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
+ sb.append('{');
+ sb.append(followerLogInformation.getId());
+ sb.append(" state:");
+ sb.append(followerLogInformation.isFollowerActive());
+ sb.append("},");
}
- return "[" + sb.toString() + "]";
+ sb.append(']');
+
+ return sb.toString();
+ }
+
+ @VisibleForTesting
+ public FollowerLogInformation getFollower(String followerId) {
+ return followerToLog.get(followerId);
+ }
+
+ @VisibleForTesting
+ protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) {
+ mapFollowerToSnapshot.put(followerId, snapshot);
+ }
+
+ @VisibleForTesting
+ public int followerSnapshotSize() {
+ return mapFollowerToSnapshot.size();
}
@VisibleForTesting
- void markFollowerActive(String followerId) {
- followerToLog.get(followerId).markFollowerActive();
+ public int followerLogSize() {
+ return followerToLog.size();
}
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
}
protected void scheduleInstallSnapshotCheck(FiniteDuration interval) {
- if(followers.size() == 0){
+ if (getFollowerIds().isEmpty()) {
// Optimization - do not bother scheduling a heartbeat as there are
// no followers
return;
context.getActorSystem().dispatcher(), context.getActor());
}
- @Override public void close() throws Exception {
+ @Override
+ public void close() throws Exception {
stopInstallSnapshotSchedule();
stopIsolatedLeaderCheckSchedule();
super.close();
@VisibleForTesting
void markFollowerActive(String followerId) {
- followerToLog.get(followerId).markFollowerActive();
+ getFollower(followerId).markFollowerActive();
}
@VisibleForTesting
void markFollowerInActive(String followerId) {
- followerToLog.get(followerId).markFollowerInActive();
+ getFollower(followerId).markFollowerInActive();
}
}
public void removeFromAndPersist(final long index) {
}
- @Override
- public void setSnapshotIndex(final long snapshotIndex) {
- this.snapshotIndex = snapshotIndex;
- }
-
- @Override
- public void setSnapshotTerm(final long snapshotTerm) {
- this.snapshotTerm = snapshotTerm;
- }
-
@Override
public int dataSize() {
return -1;
*/
package org.opendaylight.controller.cluster.raft;
-
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
FollowerLogInformation followerLogInformation =
new FollowerLogInformationImpl(
- "follower1", new AtomicLong(10), new AtomicLong(9), timeoutDuration);
+ "follower1", 10, 9, timeoutDuration);
}};
}
-
@Test
public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef followerActor = getTestActor();
final String out =
new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
Object msg = fromSerializableMessage(in);
if (msg instanceof AppendEntries) {
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef followerActor = getTestActor();
final String out =
new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
Object msg = fromSerializableMessage(in);
if (msg instanceof AppendEntries) {
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef raftActor = getTestActor();
new ExpectMsg<String>(duration("1 seconds"),
"match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in instanceof ApplyState) {
if (((ApplyState) in).getIdentifier().equals("state-id")) {
final String out =
new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
InstallSnapshot is = (InstallSnapshot)
assertTrue(raftBehavior instanceof Leader);
- assertEquals(leader.mapFollowerToSnapshot.size(), 0);
- assertEquals(leader.followerToLog.size(), 1);
- assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
- FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
- assertEquals(snapshotIndex, fli.getMatchIndex().get());
- assertEquals(snapshotIndex, fli.getMatchIndex().get());
- assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
+ assertEquals(0, leader.followerSnapshotSize());
+ assertEquals(1, leader.followerLogSize());
+ assertNotNull(leader.getFollower(followerActor.path().toString()));
+ FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
+ assertEquals(snapshotIndex, fli.getMatchIndex());
+ assertEquals(snapshotIndex, fli.getMatchIndex());
+ assertEquals(snapshotIndex + 1, fli.getNextIndex());
}};
}
return createActorContext(leaderActor);
}
+ @Override
protected RaftActorContext createActorContext(ActorRef actorRef) {
return new MockRaftActorContext("test", getSystem(), actorRef);
}
public void createFollowerToSnapshot(String followerId, ByteString bs ) {
fts = new FollowerToSnapshot(bs);
- mapFollowerToSnapshot.put(followerId, fts);
-
+ setFollowerSnapshot(followerId, fts);
}
}
package org.opendaylight.controller.sal.binding.impl.connect.dom;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.concurrent.Future;
-
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
/*
* RPC's can have both input, output, one or the other, or neither.
*
*
*/
public class RpcInvocationStrategy {
+ private final Function<RpcResult<CompositeNode>, RpcResult<?>> transformationFunction = new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+ @SuppressWarnings("rawtypes")
+ @Override
+ public RpcResult<?> apply(final RpcResult<CompositeNode> result) {
+ final Object output;
+ if (getOutputClass() != null && result.getResult() != null) {
+ output = mappingService.dataObjectFromDataDom(getOutputClass().get(), result.getResult());
+ } else {
+ output = null;
+ }
+
+ return RpcResultBuilder.from( (RpcResult)result ).withResult( output ).build();
+ }
+ };
private final BindingIndependentMappingService mappingService;
private final RpcProvisionRegistry biRpcRegistry;
final Method targetMethod,
final BindingIndependentMappingService mappingService,
final RpcProvisionRegistry biRpcRegistry ) {
-
+ this.mappingService = mappingService;
+ this.biRpcRegistry = biRpcRegistry;
this.targetMethod = targetMethod;
this.rpc = rpc;
- Optional<Class<?>> outputClassOption = BindingReflections.resolveRpcOutputClass(targetMethod);
- Optional<Class<? extends DataContainer>> inputClassOption = BindingReflections.resolveRpcInputClass(targetMethod);
-
- if ( outputClassOption != null && outputClassOption.isPresent() ) {
- this.outputClass = new WeakReference(outputClassOption.get() ) ;
+ final Optional<Class<?>> outputClassOption = BindingReflections.resolveRpcOutputClass(targetMethod);
+ if (outputClassOption.isPresent()) {
+ this.outputClass = new WeakReference(outputClassOption.get());
} else {
- this.outputClass = null ;
+ this.outputClass = null;
}
- if ( inputClassOption != null && inputClassOption.isPresent() ) {
- this.inputClass = new WeakReference(inputClassOption.get() ) ;
+
+ final Optional<Class<? extends DataContainer>> inputClassOption = BindingReflections.resolveRpcInputClass(targetMethod);
+ if (inputClassOption.isPresent() ) {
+ this.inputClass = new WeakReference(inputClassOption.get());
} else {
- this.inputClass = null ;
+ this.inputClass = null;
}
-
- this.mappingService = mappingService;
- this.biRpcRegistry = biRpcRegistry;
}
@SuppressWarnings({ "unchecked" })
inputXml = ImmutableCompositeNode.create( rpc, Collections.<Node<?>>emptyList() );
}
- Function<RpcResult<CompositeNode>, RpcResult<?>> transformationFunction =
- new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
- @SuppressWarnings("rawtypes")
- @Override
- public RpcResult<?> apply(RpcResult<CompositeNode> result) {
-
- Object output = null;
-
- if( getOutputClass() != null ) {
- if (result.getResult() != null) {
- output = mappingService.dataObjectFromDataDom(getOutputClass().get(),
- result.getResult());
- }
- }
-
- return RpcResultBuilder.from( (RpcResult)result ).withResult( output ).build();
- }
- };
-
return Futures.transform(biRpcRegistry.invokeRpc(rpc, inputXml), transformationFunction);
}
}
@SuppressWarnings("rawtypes")
- public WeakReference<Class> getOutputClass() {
+ @VisibleForTesting
+ WeakReference<Class> getOutputClass() {
return outputClass;
}
}
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.Node;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeContainerBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
/**
* NormalizedNodeInputStreamReader reads the byte stream and constructs the normalized node including its children nodes.
private NormalizedNodeAttrBuilder<NodeWithValue, Object,
LeafSetEntryNode<Object>> leafSetEntryBuilder;
+ private final StringBuilder reusableStringBuilder = new StringBuilder(50);
+
public NormalizedNodeInputStreamReader(InputStream stream) throws IOException {
Preconditions.checkNotNull(stream);
input = new DataInputStream(stream);
case NodeTypes.ANY_XML_NODE :
LOG.debug("Read xml node");
- Node<?> value = (Node<?>) readObject();
return Builders.anyXmlBuilder().withValue((Node<?>) readObject()).build();
case NodeTypes.MAP_NODE :
String namespace = readCodedString();
String revision = readCodedString();
- // Not using stringbuilder as compiler optimizes string concatenation of +
String qName;
if(!Strings.isNullOrEmpty(revision)) {
- qName = "(" + namespace + REVISION_ARG + revision + ")" +localName;
+ qName = reusableStringBuilder.append('(').append(namespace).append(REVISION_ARG).
+ append(revision).append(')').append(localName).toString();
} else {
- qName = "(" + namespace + ")" + localName;
+ qName = reusableStringBuilder.append('(').append(namespace).append(')').
+ append(localName).toString();
}
+ reusableStringBuilder.delete(0, reusableStringBuilder.length());
return QNameFactory.create(qName);
}
if(valueType == NormalizedNodeOutputStreamWriter.IS_CODE_VALUE) {
return codedStringMap.get(input.readInt());
} else if(valueType == NormalizedNodeOutputStreamWriter.IS_STRING_VALUE) {
- String value = input.readUTF();
+ String value = input.readUTF().intern();
codedStringMap.put(Integer.valueOf(codedStringMap.size()), value);
return value;
}
return readObjSet();
case ValueTypes.BOOL_TYPE :
- return input.readBoolean();
+ return Boolean.valueOf(input.readBoolean());
case ValueTypes.BYTE_TYPE :
- return input.readByte();
+ return Byte.valueOf(input.readByte());
case ValueTypes.INT_TYPE :
- return input.readInt();
+ return Integer.valueOf(input.readInt());
case ValueTypes.LONG_TYPE :
- return input.readLong();
+ return Long.valueOf(input.readLong());
case ValueTypes.QNAME_TYPE :
return readQName();
case ValueTypes.SHORT_TYPE :
- return input.readShort();
+ return Short.valueOf(input.readShort());
case ValueTypes.STRING_TYPE :
return input.readUTF();
case ValueTypes.BIG_INTEGER_TYPE :
return new BigInteger(input.readUTF());
+ case ValueTypes.BINARY_TYPE :
+ byte[] bytes = new byte[input.readInt()];
+ input.readFully(bytes);
+ return bytes;
+
case ValueTypes.YANG_IDENTIFIER_TYPE :
return readYangInstanceIdentifier();
return new YangInstanceIdentifier.AugmentationIdentifier(readQNameSet());
case PathArgumentTypes.NODE_IDENTIFIER :
- return new NodeIdentifier(readQName());
+ return new NodeIdentifier(readQName());
case PathArgumentTypes.NODE_IDENTIFIER_WITH_PREDICATES :
- return new NodeIdentifierWithPredicates(readQName(), readKeyValueMap());
+ return new NodeIdentifierWithPredicates(readQName(), readKeyValueMap());
case PathArgumentTypes.NODE_IDENTIFIER_WITH_VALUE :
- return new NodeWithValue(readQName(), readObject());
+ return new NodeWithValue(readQName(), readObject());
default :
return null;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* NormalizedNodeOutputStreamWriter will be used by distributed datastore to send normalized node in
}
}
- @SuppressWarnings("rawtypes")
private void writeObject(Object value) throws IOException {
byte type = ValueTypes.getSerializableType(value);
case ValueTypes.BITS_TYPE:
writeObjSet((Set<?>) value);
break;
+ case ValueTypes.BINARY_TYPE:
+ byte[] bytes = (byte[]) value;
+ output.writeInt(bytes.length);
+ output.write(bytes);
+ break;
case ValueTypes.YANG_IDENTIFIER_TYPE:
writeYangInstanceIdentifier((YangInstanceIdentifier) value);
break;
package org.opendaylight.controller.cluster.datastore.node.utils.stream;
import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
public class ValueTypes {
public static final byte SHORT_TYPE = 1;
public static final byte STRING_TYPE = 9;
public static final byte BIG_INTEGER_TYPE = 10;
public static final byte BIG_DECIMAL_TYPE = 11;
+ public static final byte BINARY_TYPE = 12;
private static Map<Class<?>, Byte> types = new HashMap<>();
types.put(Short.class, Byte.valueOf(SHORT_TYPE));
types.put(BigInteger.class, Byte.valueOf(BIG_INTEGER_TYPE));
types.put(BigDecimal.class, Byte.valueOf(BIG_DECIMAL_TYPE));
+ types.put(byte[].class, Byte.valueOf(BINARY_TYPE));
}
public static final byte getSerializableType(Object node){
// required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
boolean hasInstanceIdentifierPathArguments();
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments();
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder();
private org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier instanceIdentifierPathArguments_;
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public boolean hasInstanceIdentifierPathArguments() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
return instanceIdentifierPathArguments_;
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
return instanceIdentifierPathArguments_;
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder> instanceIdentifierPathArgumentsBuilder_;
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public boolean hasInstanceIdentifierPathArguments() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
if (instanceIdentifierPathArgumentsBuilder_ == null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public Builder setInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
if (instanceIdentifierPathArgumentsBuilder_ == null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public Builder setInstanceIdentifierPathArguments(
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder builderForValue) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public Builder mergeInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
if (instanceIdentifierPathArgumentsBuilder_ == null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public Builder clearInstanceIdentifierPathArguments() {
if (instanceIdentifierPathArgumentsBuilder_ == null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder getInstanceIdentifierPathArgumentsBuilder() {
bitField0_ |= 0x00000001;
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
if (instanceIdentifierPathArgumentsBuilder_ != null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
private com.google.protobuf.SingleFieldBuilder<
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder>
// required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
boolean hasInstanceIdentifierPathArguments();
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments();
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder();
private org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier instanceIdentifierPathArguments_;
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public boolean hasInstanceIdentifierPathArguments() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
return instanceIdentifierPathArguments_;
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
return instanceIdentifierPathArguments_;
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder> instanceIdentifierPathArgumentsBuilder_;
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public boolean hasInstanceIdentifierPathArguments() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier getInstanceIdentifierPathArguments() {
if (instanceIdentifierPathArgumentsBuilder_ == null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public Builder setInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
if (instanceIdentifierPathArgumentsBuilder_ == null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public Builder setInstanceIdentifierPathArguments(
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder builderForValue) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public Builder mergeInstanceIdentifierPathArguments(org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier value) {
if (instanceIdentifierPathArgumentsBuilder_ == null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public Builder clearInstanceIdentifierPathArguments() {
if (instanceIdentifierPathArgumentsBuilder_ == null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder getInstanceIdentifierPathArgumentsBuilder() {
bitField0_ |= 0x00000001;
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
public org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder getInstanceIdentifierPathArgumentsOrBuilder() {
if (instanceIdentifierPathArgumentsBuilder_ != null) {
}
/**
* <code>required .org.opendaylight.controller.mdsal.InstanceIdentifier instanceIdentifierPathArguments = 1;</code>
+ *
+ * <pre>
+ * base Helium version
+ * </pre>
*/
private com.google.protobuf.SingleFieldBuilder<
org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier.Builder, org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifierOrBuilder>
}
message WriteData {
- required InstanceIdentifier instanceIdentifierPathArguments = 1;
-required Node normalizedNode =2;
-
+ // base Helium version
+ required InstanceIdentifier instanceIdentifierPathArguments = 1;
+ required Node normalizedNode = 2;
}
message WriteDataReply{
}
message MergeData {
- required InstanceIdentifier instanceIdentifierPathArguments = 1;
-required Node normalizedNode =2;
-
+ // base Helium version
+ required InstanceIdentifier instanceIdentifierPathArguments = 1;
+ required Node normalizedNode = 2;
}
message MergeDataReply{
}
ContainerNode node1 = TestModel.createBaseTestContainerBuilder()
- .withChild(ImmutableNodes.leafNode(TestModel.SOME_BINARY_DATE_QNAME, binaryData))
+ .withChild(ImmutableNodes.leafNode(TestModel.SOME_BINARY_DATA_QNAME, binaryData))
.build();
NormalizedNodeMessages.Node serializedNode1 = NormalizedNodeSerializer
// FIXME: This will not work due to BUG 2326. Once that is fixed we can uncomment this assertion
// assertEquals(node1, node2);
- Optional<DataContainerChild<? extends YangInstanceIdentifier.PathArgument, ?>> child = node2.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.SOME_BINARY_DATE_QNAME));
+ Optional<DataContainerChild<? extends YangInstanceIdentifier.PathArgument, ?>> child = node2.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.SOME_BINARY_DATA_QNAME));
Object value = child.get().getValue();
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.util.TestModel;
import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public class NormalizedNodeStreamReaderWriterTest {
@Test
public void testNormalizedNodeStreamReaderWriter() throws IOException {
- testNormalizedNodeStreamReaderWriter(TestModel.createTestContainer());
+ testNormalizedNodeStreamReaderWriter(createTestContainer());
QName toaster = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","toaster");
QName darknessFactor = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","darknessFactor");
withChild(toasterNode).build());
}
+ private NormalizedNode<?, ?> createTestContainer() {
+ byte[] bytes1 = {1,2,3};
+ LeafSetEntryNode<Object> entry1 = ImmutableLeafSetEntryNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeWithValue(TestModel.BINARY_LEAF_LIST_QNAME, bytes1)).
+ withValue(bytes1).build();
+
+ byte[] bytes2 = {};
+ LeafSetEntryNode<Object> entry2 = ImmutableLeafSetEntryNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeWithValue(TestModel.BINARY_LEAF_LIST_QNAME, bytes2)).
+ withValue(bytes2).build();
+
+ return TestModel.createBaseTestContainerBuilder().
+ withChild(ImmutableLeafSetNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.BINARY_LEAF_LIST_QNAME)).
+ withChild(entry1).withChild(entry2).build()).
+ withChild(ImmutableNodes.leafNode(TestModel.SOME_BINARY_DATA_QNAME, new byte[]{1,2,3,4})).
+ withChild(Builders.orderedMapBuilder().
+ withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.ORDERED_LIST_QNAME)).
+ withChild(ImmutableNodes.mapEntry(TestModel.ORDERED_LIST_ENTRY_QNAME,
+ TestModel.ID_QNAME, 11)).build()).
+ build();
+ }
+
private void testNormalizedNodeStreamReaderWriter(NormalizedNode<?, ?> input) throws IOException {
byte[] byteData = null;
package org.opendaylight.controller.cluster.datastore.util;
+import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapEntry;
+import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapEntryBuilder;
+import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapNodeBuilder;
import com.google.common.collect.ImmutableSet;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapEntry;
-import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapEntryBuilder;
-import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapNodeBuilder;
-
public class TestModel {
public static final QName TEST_QNAME = QName.create(
public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
public static final QName POINTER_QNAME = QName.create(TEST_QNAME, "pointer");
- public static final QName SOME_BINARY_DATE_QNAME = QName.create(TEST_QNAME, "some-binary-data");
+ public static final QName SOME_BINARY_DATA_QNAME = QName.create(TEST_QNAME, "some-binary-data");
+ public static final QName BINARY_LEAF_LIST_QNAME = QName.create(TEST_QNAME, "binary_leaf_list");
public static final QName SOME_REF_QNAME = QName.create(TEST_QNAME,
"some-ref");
public static final QName MYIDENTITY_QNAME = QName.create(TEST_QNAME,
public static final QName SWITCH_FEATURES_QNAME = QName.create(TEST_QNAME,
"switch-features");
- public static final QName AUGMENTED_LIST_QNAME = QName.create(TEST_QNAME,
- "augmented-list");
+ public static final QName AUGMENTED_LIST_QNAME = QName.create(TEST_QNAME, "augmented-list");
+ public static final QName AUGMENTED_LIST_ENTRY_QNAME = QName.create(TEST_QNAME, "augmented-list-entry");
public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME,
"outer-list");
public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
+ public static final QName BOOLEAN_LEAF_QNAME = QName.create(TEST_QNAME, "boolean-leaf");
+ public static final QName SHORT_LEAF_QNAME = QName.create(TEST_QNAME, "short-leaf");
+ public static final QName BYTE_LEAF_QNAME = QName.create(TEST_QNAME, "byte-leaf");
+ public static final QName BIGINTEGER_LEAF_QNAME = QName.create(TEST_QNAME, "biginteger-leaf");
+ public static final QName BIGDECIMAL_LEAF_QNAME = QName.create(TEST_QNAME, "bigdecimal-leaf");
+ public static final QName ORDERED_LIST_QNAME = QName.create(TEST_QNAME, "ordered-list");
+ public static final QName ORDERED_LIST_ENTRY_QNAME = QName.create(TEST_QNAME, "ordered-list-entry");
+ public static final QName UNKEYED_LIST_QNAME = QName.create(TEST_QNAME, "unkeyed-list");
+ public static final QName UNKEYED_LIST_ENTRY_QNAME = QName.create(TEST_QNAME, "unkeyed-list-entry");
+ public static final QName CHOICE_QNAME = QName.create(TEST_QNAME, "choice");
private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
private static final String DATASTORE_AUG_YANG =
"/odl-datastore-augmentation.yang";
private static final String TWO_TWO_NAME = "two";
private static final String DESC = "Hello there";
private static final Long LONG_ID = 1L;
- private static final Boolean ENABLED = false;
+ private static final Boolean ENABLED = true;
private static final Short SHORT_ID = 1;
private static final Byte BYTE_ID = 1;
// Family specific constants
private static final String SECOND_GRAND_CHILD_NAME = "second grand child";
- // first child
- private static final YangInstanceIdentifier CHILDREN_1_PATH =
- YangInstanceIdentifier.builder(CHILDREN_PATH)
- .nodeWithKey(CHILDREN_QNAME, CHILD_NUMBER_QNAME, FIRST_CHILD_ID) //
- .build();
- private static final YangInstanceIdentifier CHILDREN_1_NAME_PATH =
- YangInstanceIdentifier.builder(CHILDREN_PATH)
- .nodeWithKey(CHILDREN_QNAME, CHILD_NAME_QNAME, FIRST_CHILD_NAME) //
- .build();
-
- private static final YangInstanceIdentifier CHILDREN_2_PATH =
- YangInstanceIdentifier.builder(CHILDREN_PATH)
- .nodeWithKey(CHILDREN_QNAME, CHILD_NUMBER_QNAME, SECOND_CHILD_ID) //
- .build();
- private static final YangInstanceIdentifier CHILDREN_2_NAME_PATH =
- YangInstanceIdentifier.builder(CHILDREN_PATH)
- .nodeWithKey(CHILDREN_QNAME, CHILD_NAME_QNAME, SECOND_CHILD_NAME) //
- .build();
-
-
- private static final YangInstanceIdentifier GRAND_CHILD_1_PATH =
- YangInstanceIdentifier
- .builder(CHILDREN_1_PATH)
- .node(GRAND_CHILDREN_QNAME)
- //
- .nodeWithKey(GRAND_CHILDREN_QNAME, GRAND_CHILD_NUMBER_QNAME,
- FIRST_GRAND_CHILD_ID) //
- .build();
-
- private static final YangInstanceIdentifier GRAND_CHILD_1_NAME_PATH =
- YangInstanceIdentifier
- .builder(CHILDREN_1_PATH)
- .node(GRAND_CHILDREN_QNAME)
- //
- .nodeWithKey(GRAND_CHILDREN_QNAME, GRAND_CHILD_NAME_QNAME,
- FIRST_GRAND_CHILD_NAME) //
- .build();
-
- private static final YangInstanceIdentifier GRAND_CHILD_2_PATH =
- YangInstanceIdentifier
- .builder(CHILDREN_2_PATH)
- .node(GRAND_CHILDREN_QNAME)
- //
- .nodeWithKey(GRAND_CHILDREN_QNAME, GRAND_CHILD_NUMBER_QNAME,
- SECOND_GRAND_CHILD_ID) //
- .build();
-
- private static final YangInstanceIdentifier GRAND_CHILD_2_NAME_PATH =
- YangInstanceIdentifier
- .builder(CHILDREN_2_PATH)
- .node(GRAND_CHILDREN_QNAME)
- //
- .nodeWithKey(GRAND_CHILDREN_QNAME, GRAND_CHILD_NAME_QNAME,
- SECOND_GRAND_CHILD_NAME) //
- .build();
-
- private static final YangInstanceIdentifier DESC_PATH_ID =
- YangInstanceIdentifier.builder(DESC_PATH).build();
- private static final YangInstanceIdentifier OUTER_LIST_1_PATH =
- YangInstanceIdentifier.builder(OUTER_LIST_PATH)
- .nodeWithKey(OUTER_LIST_QNAME, ID_QNAME, ONE_ID) //
- .build();
-
- private static final YangInstanceIdentifier OUTER_LIST_2_PATH =
- YangInstanceIdentifier.builder(OUTER_LIST_PATH)
- .nodeWithKey(OUTER_LIST_QNAME, ID_QNAME, TWO_ID) //
- .build();
-
- private static final YangInstanceIdentifier TWO_TWO_PATH =
- YangInstanceIdentifier.builder(OUTER_LIST_2_PATH).node(INNER_LIST_QNAME) //
- .nodeWithKey(INNER_LIST_QNAME, NAME_QNAME, TWO_TWO_NAME) //
- .build();
-
- private static final YangInstanceIdentifier TWO_TWO_VALUE_PATH =
- YangInstanceIdentifier.builder(TWO_TWO_PATH).node(VALUE_QNAME) //
- .build();
-
private static final MapEntryNode BAR_NODE = mapEntryBuilder(
OUTER_LIST_QNAME, ID_QNAME, TWO_ID) //
.withChild(mapNodeBuilder(INNER_LIST_QNAME) //
// Create augmentations
- MapEntryNode mapEntry = createAugmentedListEntry(1, "First Test");
+ MapEntryNode augMapEntry = createAugmentedListEntry(1, "First Test");
// Create a bits leaf
NormalizedNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, Object, LeafNode<Object>>
QName.create(TEST_QNAME, "my-bits"))).withValue(
ImmutableSet.of("foo", "bar"));
- // Create unkey list entry
- UnkeyedListEntryNode binaryDataKey =
- Builders.unkeyedListEntryBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME)).
- withChild(ImmutableNodes.leafNode(SOME_BINARY_DATE_QNAME, DESC)).build();
+ // Create unkeyed list entry
+ UnkeyedListEntryNode unkeyedListEntry =
+ Builders.unkeyedListEntryBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(UNKEYED_LIST_ENTRY_QNAME)).
+ withChild(ImmutableNodes.leafNode(NAME_QNAME, "unkeyed-entry-name")).build();
+
+ // Create YangInstanceIdentifier with all path arg types.
+ YangInstanceIdentifier instanceID = YangInstanceIdentifier.create(
+ new YangInstanceIdentifier.NodeIdentifier(QName.create(TEST_QNAME, "qname")),
+ new YangInstanceIdentifier.NodeIdentifierWithPredicates(QName.create(TEST_QNAME, "list-entry"),
+ QName.create(TEST_QNAME, "key"), 10),
+ new YangInstanceIdentifier.AugmentationIdentifier(ImmutableSet.of(
+ QName.create(TEST_QNAME, "aug1"), QName.create(TEST_QNAME, "aug2"))),
+ new YangInstanceIdentifier.NodeWithValue(QName.create(TEST_QNAME, "leaf-list-entry"), "foo"));
Map<QName, Object> keyValues = new HashMap<>();
keyValues.put(CHILDREN_QNAME, FIRST_CHILD_NAME);
// Create the document
return ImmutableContainerNodeBuilder
.create()
- .withNodeIdentifier(
- new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
.withChild(myBits.build())
.withChild(ImmutableNodes.leafNode(DESC_QNAME, DESC))
- .withChild(ImmutableNodes.leafNode(POINTER_QNAME, ENABLED))
- .withChild(ImmutableNodes.leafNode(POINTER_QNAME, SHORT_ID))
- .withChild(ImmutableNodes.leafNode(POINTER_QNAME, BYTE_ID))
- .withChild(
- ImmutableNodes.leafNode(SOME_REF_QNAME, GRAND_CHILD_1_PATH))
+ .withChild(ImmutableNodes.leafNode(BOOLEAN_LEAF_QNAME, ENABLED))
+ .withChild(ImmutableNodes.leafNode(SHORT_LEAF_QNAME, SHORT_ID))
+ .withChild(ImmutableNodes.leafNode(BYTE_LEAF_QNAME, BYTE_ID))
+ .withChild(ImmutableNodes.leafNode(TestModel.BIGINTEGER_LEAF_QNAME, BigInteger.valueOf(100)))
+ .withChild(ImmutableNodes.leafNode(TestModel.BIGDECIMAL_LEAF_QNAME, BigDecimal.valueOf(1.2)))
+ .withChild(ImmutableNodes.leafNode(SOME_REF_QNAME, instanceID))
.withChild(ImmutableNodes.leafNode(MYIDENTITY_QNAME, DESC_QNAME))
- .withChild(Builders.unkeyedListBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(OUTER_LIST_QNAME))
- .withChild(binaryDataKey).build())
- .withChild(Builders.orderedMapBuilder()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME)).withChild(mapEntry).build())
- .withChild(Builders.choiceBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
+ .withChild(Builders.unkeyedListBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(UNKEYED_LIST_QNAME))
+ .withChild(unkeyedListEntry).build())
+ .withChild(Builders.choiceBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CHOICE_QNAME))
.withChild(ImmutableNodes.leafNode(DESC_QNAME, LONG_ID)).build())
// .withChild(augmentationNode)
.withChild(shoes)
.withChild(numbers)
.withChild(switchFeatures)
- .withChild(
- mapNodeBuilder(AUGMENTED_LIST_QNAME).withChild(mapEntry).build())
- .withChild(
- mapNodeBuilder(OUTER_LIST_QNAME)
- .withChild(mapEntry(OUTER_LIST_QNAME, ID_QNAME, ONE_ID))
- .withChild(BAR_NODE).build()
+ .withChild(mapNodeBuilder(AUGMENTED_LIST_QNAME).withChild(augMapEntry).build())
+ .withChild(mapNodeBuilder(OUTER_LIST_QNAME)
+ .withChild(mapEntry(OUTER_LIST_QNAME, ID_QNAME, ONE_ID))
+ .withChild(BAR_NODE).build()
);
}
.create()
.withNodeIdentifier(
new YangInstanceIdentifier.NodeIdentifierWithPredicates(
- AUGMENTED_LIST_QNAME, ID_QNAME, id))
+ AUGMENTED_LIST_ENTRY_QNAME, ID_QNAME, id))
.withChild(ImmutableNodes.leafNode(ID_QNAME, id))
.withChild(augmentationNode).build();
}
type uint8;
}
+ leaf-list binary_leaf_list {
+ type binary;
+ }
+
leaf pointer {
type leafref {
path "/network-topology/topology/node/termination-point/tp-id";
</configuration>
<required-capabilities>
<capability>urn:opendaylight:params:xml:ns:yang:controller:config:concurrent-data-broker?module=odl-concurrent-data-broker-cfg&revision=2014-11-24</capability>
- <capability>urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider?module=distributed-datastore-privider&revision=2014-06-12</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider?module=distributed-datastore-provider&revision=2014-06-12</capability>
<capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store?module=opendaylight-config-dom-datastore&revision=2014-06-17</capability>
<capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store?module=opendaylight-operational-dom-datastore&revision=2014-06-17</capability>
<capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom?module=opendaylight-md-sal-dom&revision=2013-10-28</capability>
"member-3 here would be the name of the member"
);
- currentMemberName = (String) cluster.getSelfRoles().toArray()[0];
+ currentMemberName = cluster.getSelfRoles().iterator().next();
selfAddress = cluster.selfAddress();
-
}
public void subscribeToMemberEvents(ActorRef actorRef){
// It seems the sender is never null but it doesn't hurt to check. If the caller passes in
// a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor.
if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
- getSender().tell(new DataChangedReply(), getSelf());
+ getSender().tell(DataChangedReply.INSTANCE, getSelf());
}
}
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
* DataChangeListenerProxy represents a single remote DataChangeListener
*/
public class DataChangeListenerProxy implements AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>{
private final ActorSelection dataChangeListenerActor;
- private final SchemaContext schemaContext;
- public DataChangeListenerProxy(SchemaContext schemaContext, ActorSelection dataChangeListenerActor) {
- this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor, "dataChangeListenerActor should not be null");
- this.schemaContext = schemaContext;
+ public DataChangeListenerProxy(ActorSelection dataChangeListenerActor) {
+ this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor,
+ "dataChangeListenerActor should not be null");
}
- @Override public void onDataChanged(
+ @Override
+ public void onDataChanged(
AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- dataChangeListenerActor.tell(new DataChanged(schemaContext, change), ActorRef.noSender());
+ dataChangeListenerActor.tell(new DataChanged(change), ActorRef.noSender());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Defines version numbers.
+ *
+ * @author Thomas Pantelis
+ */
+public interface DataStoreVersions {
+ short BASE_HELIUM_VERSION = 0;
+ short HELIUM_1_VERSION = 1;
+ short HELIUM_2_VERSION = 2;
+ short LITHIUM_VERSION = 3;
+ short CURRENT_VERSION = LITHIUM_VERSION;
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSystem;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.osgi.framework.BundleContext;
-import java.util.concurrent.atomic.AtomicReference;
-
public class DistributedDataStoreFactory {
+ private static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
+ private static final String CONFIGURATION_NAME = "odl-cluster-data";
- public static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
-
- public static final String CONFIGURATION_NAME = "odl-cluster-data";
-
- private static AtomicReference<ActorSystem> persistentActorSystem = new AtomicReference<>();
+ private static volatile ActorSystem persistentActorSystem = null;
public static DistributedDataStore createInstance(String name, SchemaService schemaService,
DatastoreContext datastoreContext, BundleContext bundleContext) {
return dataStore;
}
- synchronized private static final ActorSystem getOrCreateInstance(final BundleContext bundleContext, ConfigurationReader configurationReader) {
-
- AtomicReference<ActorSystem> actorSystemReference = persistentActorSystem;
- String configurationName = CONFIGURATION_NAME;
- String actorSystemName = ACTOR_SYSTEM_NAME;
-
- if (actorSystemReference.get() != null){
- return actorSystemReference.get();
+ private static final ActorSystem getOrCreateInstance(final BundleContext bundleContext, ConfigurationReader configurationReader) {
+ ActorSystem ret = persistentActorSystem;
+ if (ret == null) {
+ synchronized (DistributedDataStoreFactory.class) {
+ ret = persistentActorSystem;
+ if (ret == null) {
+ // Create an OSGi bundle classloader for actor system
+ BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
+ Thread.currentThread().getContextClassLoader());
+
+ ret = ActorSystem.create(ACTOR_SYSTEM_NAME,
+ ConfigFactory.load(configurationReader.read()).getConfig(CONFIGURATION_NAME), classLoader);
+ ret.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+
+ persistentActorSystem = ret;
+ }
+ }
}
- // Create an OSGi bundle classloader for actor system
- BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
- Thread.currentThread().getContextClassLoader());
-
- ActorSystem system = ActorSystem.create(actorSystemName,
- ConfigFactory.load(configurationReader.read()).getConfig(configurationName), classLoader);
- system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
-
- actorSystemReference.set(system);
- return system;
+ return ret;
}
-
}
*/
public class Shard extends RaftActor {
- private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
-
private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
- public static final String DEFAULT_NAME = "default";
+ @VisibleForTesting
+ static final String DEFAULT_NAME = "default";
// The state of this Shard
private final InMemoryDOMDataStore store;
cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
if(cohortEntry != null) {
commitWithNewTransaction(cohortEntry.getModification());
- sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+ sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
} else {
// This really shouldn't happen - it likely means that persistence or replication
// took so long to complete such that the cohort entry was expired from the cache.
// currently uses a same thread executor anyway.
cohortEntry.getCohort().commit().get();
- sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+ sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
// transactionId so to maintain backwards compatibility, we create a separate cohort actor
// to provide the compatible behavior.
ActorRef replyActorPath = self();
- if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
+ if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
ready.getTransactionID()));
shardMBean.incrementAbortTransactionsCount();
if(sender != null) {
- sender.tell(new AbortTransactionReply().toSerializable(), self);
+ sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
}
}
// This must be for install snapshot. Don't want to open this up and trigger
// deSerialization
- self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
+ self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)),
self());
createSnapshotTransaction = null;
}
private ActorRef createTypedTransactionActor(int transactionType,
- ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
+ ShardTransactionIdentifier transactionId, String transactionChainId,
+ short clientVersion ) {
DOMStoreTransactionFactory factory = store;
}
private ActorRef createTransaction(int transactionType, String remoteTransactionId,
- String transactionChainId, int clientVersion) {
+ String transactionChainId, short clientVersion) {
ShardTransactionIdentifier transactionId =
ShardTransactionIdentifier.builder()
dataChangeListeners.add(dataChangeListenerPath);
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
- new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
+ new DataChangeListenerProxy(dataChangeListenerPath);
LOG.debug("Registering for path {}", registerChangeListener.getPath());
createSnapshotTransaction = createTransaction(
TransactionProxy.TransactionType.READ_ONLY.ordinal(),
"createSnapshot" + ++createSnapshotTransactionCounter, "",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
createSnapshotTransaction.tell(
new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
- private static final Object CAN_COMMIT_REPLY_TRUE =
- new CanCommitTransactionReply(Boolean.TRUE).toSerializable();
-
- private static final Object CAN_COMMIT_REPLY_FALSE =
- new CanCommitTransactionReply(Boolean.FALSE).toSerializable();
-
private final Cache<String, CohortEntry> cohortCache;
private CohortEntry currentCohortEntry;
Boolean canCommit = cohortEntry.getCohort().canCommit().get();
cohortEntry.getCanCommitSender().tell(
- canCommit ? CAN_COMMIT_REPLY_TRUE : CAN_COMMIT_REPLY_FALSE, cohortEntry.getShard());
+ canCommit ? CanCommitTransactionReply.YES.toSerializable() :
+ CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
if(!canCommit) {
// Remove the entry from the cache now since the Tx will be aborted.
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
-
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
private final DatastoreContext datastoreContext;
- private final Collection<String> knownModules = new HashSet<>(128);
+ private Collection<String> knownModules = Collections.emptySet();
private final DataPersistenceProvider dataPersistenceProvider;
if(dataPersistenceProvider.isRecoveryApplicable()) {
if (message instanceof SchemaContextModules) {
SchemaContextModules msg = (SchemaContextModules) message;
- knownModules.clear();
- knownModules.addAll(msg.getModules());
+ knownModules = ImmutableSet.copyOf(msg.getModules());
} else if (message instanceof RecoveryFailure) {
RecoveryFailure failure = (RecoveryFailure) message;
LOG.error(failure.cause(), "Recovery failed");
LOG.info("New SchemaContext has a super set of current knownModules - persisting info");
- knownModules.clear();
- knownModules.addAll(newModules);
+ knownModules = ImmutableSet.copyOf(newModules);
dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
SchemaContext schemaContext, ShardStats shardStats, String transactionID,
- int txnClientVersion) {
- super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+ short clientTxVersion) {
+ super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
this.transaction = transaction;
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
-
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
SchemaContext schemaContext, ShardStats shardStats, String transactionID,
- int txnClientVersion) {
- super(transaction, shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+ short clientTxVersion) {
+ super(transaction, shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
this.transaction = transaction;
}
private final SchemaContext schemaContext;
private final ShardStats shardStats;
private final String transactionID;
- private final int txnClientVersion;
+ private final short clientTxVersion;
protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
- ShardStats shardStats, String transactionID, int txnClientVersion) {
+ ShardStats shardStats, String transactionID, short clientTxVersion) {
super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
this.shardActor = shardActor;
this.schemaContext = schemaContext;
this.shardStats = shardStats;
this.transactionID = transactionID;
- this.txnClientVersion = txnClientVersion;
+ this.clientTxVersion = clientTxVersion;
}
public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
- String transactionID, int txnClientVersion) {
+ String transactionID, short txnClientVersion) {
return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
datastoreContext, shardStats, transactionID, txnClientVersion));
}
return schemaContext;
}
- protected int getTxnClientVersion() {
- return txnClientVersion;
+ protected short getClientTxVersion() {
+ return clientTxVersion;
}
@Override
getDOMStoreTransaction().close();
if(sendReply) {
- getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
+ getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
}
getSelf().tell(PoisonPill.getInstance(), getSelf());
}
- protected void readData(DOMStoreReadTransaction transaction, ReadData message, final boolean returnSerialized) {
+ protected void readData(DOMStoreReadTransaction transaction, ReadData message,
+ final boolean returnSerialized) {
final ActorRef sender = getSender();
final ActorRef self = getSelf();
final YangInstanceIdentifier path = message.getPath();
final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
transaction.read(path);
-
future.addListener(new Runnable() {
@Override
public void run() {
try {
Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
- ReadDataReply readDataReply = new ReadDataReply(schemaContext, optional.orNull());
+ ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
- sender.tell((returnSerialized ? readDataReply.toSerializable():
+ sender.tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion):
readDataReply), self);
} catch (Exception e) {
final DatastoreContext datastoreContext;
final ShardStats shardStats;
final String transactionID;
- final int txnClientVersion;
+ final short txnClientVersion;
ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
SchemaContext schemaContext, DatastoreContext datastoreContext,
- ShardStats shardStats, String transactionID, int txnClientVersion) {
+ ShardStats shardStats, String transactionID, short txnClientVersion) {
this.transaction = transaction;
this.shardActor = shardActor;
this.shardStats = shardStats;
import akka.actor.Props;
import akka.japi.Creator;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
createTransaction(createTransaction);
} else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
chain.close();
- getSender().tell(new CloseTransactionChainReply().toSerializable(), getSelf());
+ getSender().tell(CloseTransactionChainReply.INSTANCE.toSerializable(), getSelf());
}else{
unknownMessage(message);
}
public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
SchemaContext schemaContext, ShardStats shardStats, String transactionID,
- int txnClientVersion) {
- super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+ short clientTxVersion) {
+ super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
this.transaction = transaction;
}
deleteData(transaction, (DeleteData) message, !SERIALIZED_REPLY);
} else if (message instanceof ReadyTransaction) {
- readyTransaction(transaction, new ReadyTransaction(), !SERIALIZED_REPLY);
+ readyTransaction(transaction, !SERIALIZED_REPLY);
- } else if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- writeData(transaction, WriteData.fromSerializable(message, getSchemaContext()), SERIALIZED_REPLY);
+ } else if(WriteData.isSerializedType(message)) {
+ writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY);
- } else if(MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- mergeData(transaction, MergeData.fromSerializable(message, getSchemaContext()), SERIALIZED_REPLY);
+ } else if(MergeData.isSerializedType(message)) {
+ mergeData(transaction, MergeData.fromSerializable(message), SERIALIZED_REPLY);
} else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY);
} else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
- readyTransaction(transaction, new ReadyTransaction(), SERIALIZED_REPLY);
+ readyTransaction(transaction, SERIALIZED_REPLY);
} else if (message instanceof GetCompositedModification) {
// This is here for testing only
new WriteModification(message.getPath(), message.getData(), getSchemaContext()));
try {
transaction.write(message.getPath(), message.getData());
- WriteDataReply writeDataReply = new WriteDataReply();
- getSender().tell(returnSerialized ? writeDataReply.toSerializable() : writeDataReply,
- getSelf());
+ WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
+ getSender().tell(returnSerialized ? writeDataReply.toSerializable(message.getVersion()) :
+ writeDataReply, getSelf());
}catch(Exception e){
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
try {
transaction.merge(message.getPath(), message.getData());
- MergeDataReply mergeDataReply = new MergeDataReply();
- getSender().tell(returnSerialized ? mergeDataReply.toSerializable() : mergeDataReply ,
- getSelf());
+ MergeDataReply mergeDataReply = MergeDataReply.INSTANCE;
+ getSender().tell(returnSerialized ? mergeDataReply.toSerializable(message.getVersion()) :
+ mergeDataReply, getSelf());
}catch(Exception e){
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
}
}
- private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message,
- boolean returnSerialized) {
+ private void readyTransaction(DOMStoreWriteTransaction transaction, boolean returnSerialized) {
String transactionID = getTransactionID();
LOG.debug("readyTransaction : {}", transactionID);
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
- getShardActor().forward(new ForwardedReadyTransaction(transactionID, getTxnClientVersion(),
+ getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
cohort, modification, returnSerialized), getContext());
// The shard will handle the commit from here so we're no longer needed - self-destruct.
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
+import org.opendaylight.controller.cluster.datastore.messages.VersionedSerializableMessage;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
if(remoteTransactionActorsMB.get()) {
for(ActorSelection actor : remoteTransactionActors) {
LOG.trace("Sending CloseTransaction to {}", actor);
- actorContext.sendOperationAsync(actor,
- new CloseTransaction().toSerializable());
+ actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
}
}
}
private final String transactionChainId;
private final SchemaContext schemaContext;
private boolean inReadyState;
+ private final Semaphore operationLimiter;
+ private final OperationCompleter operationCompleter;
public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
this(actorContext, transactionType, "");
phantomReferenceCache.put(cleanup, cleanup);
}
+ // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
+ this.operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
+ this.operationCompleter = new OperationCompleter(operationLimiter);
+
LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId);
}
LOG.debug("Tx {} read {}", identifier, path);
+ throttleOperation();
+
TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
return txFutureCallback.enqueueReadOperation(new ReadOperation<Optional<NormalizedNode<?, ?>>>() {
@Override
LOG.debug("Tx {} exists {}", identifier, path);
+ throttleOperation();
+
TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
return txFutureCallback.enqueueReadOperation(new ReadOperation<Boolean>() {
@Override
"Transaction is sealed - further modifications are not allowed");
}
+ private void throttleOperation() {
+ throttleOperation(1);
+ }
+
+ private void throttleOperation(int acquirePermits) {
+ try {
+ if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
+ LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
+ }
+ } catch (InterruptedException e) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
+ } else {
+ LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
+ }
+ }
+ }
+
+
@Override
public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
LOG.debug("Tx {} write {}", identifier, path);
+ throttleOperation();
+
TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
@Override
LOG.debug("Tx {} merge {}", identifier, path);
+ throttleOperation();
+
TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
@Override
LOG.debug("Tx {} delete {}", identifier, path);
+ throttleOperation();
+
TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
@Override
checkModificationState();
+ throttleOperation(txFutureCallbackMap.size());
+
inReadyState = true;
LOG.debug("Tx {} Readying {} transactions for commit", identifier,
}
}
-
-
-
-
/**
* Performs a CreateTransaction try async.
*/
LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
failure.getMessage());
- localTransactionContext = new NoOpTransactionContext(failure, identifier);
+ localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
} else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
localTransactionContext = createValidTransactionContext(
CreateTransactionReply.fromSerializable(response));
IllegalArgumentException exception = new IllegalArgumentException(String.format(
"Invalid reply type %s for CreateTransaction", response.getClass()));
- localTransactionContext = new NoOpTransactionContext(exception, identifier);
+ localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter);
}
for(TransactionOperation oper: txOperationsOnComplete) {
boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
return new TransactionContextImpl(transactionPath, transactionActor, identifier,
- actorContext, schemaContext, isTxActorLocal, reply.getVersion());
+ actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
}
}
private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
private final ActorContext actorContext;
- private final SchemaContext schemaContext;
private final String transactionPath;
private final ActorSelection actor;
private final boolean isTxActorLocal;
- private final int remoteTransactionVersion;
+ private final short remoteTransactionVersion;
+ private final OperationCompleter operationCompleter;
+
private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
ActorContext actorContext, SchemaContext schemaContext,
- boolean isTxActorLocal, int remoteTransactionVersion) {
+ boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) {
super(identifier);
this.transactionPath = transactionPath;
this.actor = actor;
this.actorContext = actorContext;
- this.schemaContext = schemaContext;
this.isTxActorLocal = isTxActorLocal;
this.remoteTransactionVersion = remoteTransactionVersion;
+ this.operationCompleter = operationCompleter;
+ }
+
+ private Future<Object> completeOperation(Future<Object> operationFuture){
+ operationFuture.onComplete(this.operationCompleter, actorContext.getActorSystem().dispatcher());
+ return operationFuture;
}
+
private ActorSelection getActor() {
return actor;
}
private Future<Object> executeOperationAsync(SerializableMessage msg) {
- return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable());
+ return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
+ }
+
+ private Future<Object> executeOperationAsync(VersionedSerializableMessage msg) {
+ return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
+ msg.toSerializable(remoteTransactionVersion)));
}
@Override
public void closeTransaction() {
LOG.debug("Tx {} closeTransaction called", identifier);
- actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
+ actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable());
}
@Override
// Send the ReadyTransaction message to the Tx actor.
- final Future<Object> replyFuture = executeOperationAsync(new ReadyTransaction());
+ final Future<Object> replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
// Combine all the previously recorded put/merge/delete operation reply Futures and the
// ReadyTransactionReply Future into one Future. If any one fails then the combined
// At some point in the future when upgrades from Helium are not supported
// we could remove this code to resolvePath and just use the cohortPath as the
// resolved cohortPath
- if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
+ if(TransactionContextImpl.this.remoteTransactionVersion <
+ DataStoreVersions.HELIUM_1_VERSION) {
cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
}
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", identifier, path);
- recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data, schemaContext)));
+ recordedOperationFutures.add(executeOperationAsync(new MergeData(path, data)));
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", identifier, path);
- recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data, schemaContext)));
+ recordedOperationFutures.add(executeOperationAsync(new WriteData(path, data)));
}
@Override
ReadDataReply reply = (ReadDataReply) readResponse;
returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
- } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
- ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse);
+ } else if (ReadDataReply.isSerializedType(readResponse)) {
+ ReadDataReply reply = ReadDataReply.fromSerializable(readResponse);
returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
} else {
private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
private final Throwable failure;
+ private final Semaphore operationLimiter;
- public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier){
+ public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, Semaphore operationLimiter){
super(identifier);
this.failure = failure;
+ this.operationLimiter = operationLimiter;
}
@Override
@Override
public Future<ActorSelection> readyTransaction() {
LOG.debug("Tx {} readyTransaction called", identifier);
+ operationLimiter.release();
return akka.dispatch.Futures.failed(failure);
}
@Override
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+ operationLimiter.release();
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+ operationLimiter.release();
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", identifier, path);
+ operationLimiter.release();
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
YangInstanceIdentifier path) {
LOG.debug("Tx {} readData called path = {}", identifier, path);
+ operationLimiter.release();
return Futures.immediateFailedCheckedFuture(new ReadFailedException(
"Error reading data for path " + path, failure));
}
public CheckedFuture<Boolean, ReadFailedException> dataExists(
YangInstanceIdentifier path) {
LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+ operationLimiter.release();
return Futures.immediateFailedCheckedFuture(new ReadFailedException(
"Error checking exists for path " + path, failure));
}
}
+
+ private static class OperationCompleter extends OnComplete<Object> {
+ private final Semaphore operationLimiter;
+ OperationCompleter(Semaphore operationLimiter){
+ this.operationLimiter = operationLimiter;
+ }
+
+ @Override
+ public void onComplete(Throwable throwable, Object o){
+ this.operationLimiter.release();
+ }
+ }
}
public static final Class<ThreePhaseCommitCohortMessages.AbortTransactionReply> SERIALIZABLE_CLASS =
ThreePhaseCommitCohortMessages.AbortTransactionReply.class;
+ private static final Object SERIALIZED_INSTANCE =
+ ThreePhaseCommitCohortMessages.AbortTransactionReply.newBuilder().build();
+
+ public static final AbortTransactionReply INSTANCE = new AbortTransactionReply();
+
@Override
public Object toSerializable() {
- return ThreePhaseCommitCohortMessages.AbortTransactionReply.newBuilder().build();
+ return SERIALIZED_INSTANCE;
}
}
public static final Class<ThreePhaseCommitCohortMessages.CanCommitTransactionReply> SERIALIZABLE_CLASS =
ThreePhaseCommitCohortMessages.CanCommitTransactionReply.class;
- private final Boolean canCommit;
+ public static final CanCommitTransactionReply YES = new CanCommitTransactionReply(true);
+ public static final CanCommitTransactionReply NO = new CanCommitTransactionReply(false);
- public CanCommitTransactionReply(final Boolean canCommit) {
+ private final boolean canCommit;
+ private final Object serializedMessage;
+
+ private CanCommitTransactionReply(final boolean canCommit) {
this.canCommit = canCommit;
+ this.serializedMessage = ThreePhaseCommitCohortMessages.CanCommitTransactionReply.newBuilder().
+ setCanCommit(canCommit).build();
}
- public Boolean getCanCommit() {
+ public boolean getCanCommit() {
return canCommit;
}
@Override
public Object toSerializable() {
- return ThreePhaseCommitCohortMessages.CanCommitTransactionReply.newBuilder().setCanCommit(canCommit).build();
+ return serializedMessage;
}
public static CanCommitTransactionReply fromSerializable(final Object message) {
- return new CanCommitTransactionReply(
- ((ThreePhaseCommitCohortMessages.CanCommitTransactionReply) message).getCanCommit());
+ ThreePhaseCommitCohortMessages.CanCommitTransactionReply serialized =
+ (ThreePhaseCommitCohortMessages.CanCommitTransactionReply) message;
+ return serialized.getCanCommit() ? YES : NO;
}
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
public class CloseTransaction implements SerializableMessage{
- public static final Class<ShardTransactionMessages.CloseTransaction> SERIALIZABLE_CLASS =
- ShardTransactionMessages.CloseTransaction.class;
- @Override
- public Object toSerializable() {
- return ShardTransactionMessages.CloseTransaction.newBuilder().build();
- }
+ public static final Class<ShardTransactionMessages.CloseTransaction> SERIALIZABLE_CLASS =
+ ShardTransactionMessages.CloseTransaction.class;
+
+ private static final Object SERIALIZED_INSTANCE =
+ ShardTransactionMessages.CloseTransaction.newBuilder().build();
+
+ public static final CloseTransaction INSTANCE = new CloseTransaction();
+
+ @Override
+ public Object toSerializable() {
+ return SERIALIZED_INSTANCE;
+ }
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages;
public class CloseTransactionChainReply implements SerializableMessage {
- public static final Class<ShardTransactionChainMessages.CloseTransactionChainReply> SERIALIZABLE_CLASS =
- ShardTransactionChainMessages.CloseTransactionChainReply.class;
- @Override
- public Object toSerializable() {
- return ShardTransactionChainMessages.CloseTransactionChainReply.newBuilder().build();
- }
+ public static final Class<ShardTransactionChainMessages.CloseTransactionChainReply> SERIALIZABLE_CLASS =
+ ShardTransactionChainMessages.CloseTransactionChainReply.class;
+ private static final Object SERIALIZED_INSTANCE =
+ ShardTransactionChainMessages.CloseTransactionChainReply.newBuilder().build();
+
+ public static final CloseTransactionChainReply INSTANCE = new CloseTransactionChainReply();
+
+ @Override
+ public Object toSerializable() {
+ return SERIALIZED_INSTANCE;
+ }
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
public class CloseTransactionReply implements SerializableMessage {
- public static final Class<ShardTransactionMessages.CloseTransactionReply> SERIALIZABLE_CLASS =
- ShardTransactionMessages.CloseTransactionReply.class;
- @Override
- public Object toSerializable() {
- return ShardTransactionMessages.CloseTransactionReply.newBuilder().build();
- }
+ public static final Class<ShardTransactionMessages.CloseTransactionReply> SERIALIZABLE_CLASS =
+ ShardTransactionMessages.CloseTransactionReply.class;
+
+ private static final Object SERIALIZED_INSTANCE =
+ ShardTransactionMessages.CloseTransactionReply.newBuilder().build();
+
+ public static final CloseTransactionReply INSTANCE = new CloseTransactionReply();
+
+ @Override
+ public Object toSerializable() {
+ return SERIALIZED_INSTANCE;
+ }
}
public static final Class<ThreePhaseCommitCohortMessages.CommitTransactionReply> SERIALIZABLE_CLASS =
ThreePhaseCommitCohortMessages.CommitTransactionReply.class;
+ private static final Object SERIALIZED_INSTANCE =
+ ThreePhaseCommitCohortMessages.CommitTransactionReply.newBuilder().build();
+
+ public static final CommitTransactionReply INSTANCE = new CommitTransactionReply();
+
@Override
public Object toSerializable() {
- return ThreePhaseCommitCohortMessages.CommitTransactionReply.newBuilder().build();
+ return SERIALIZED_INSTANCE;
}
}
package org.opendaylight.controller.cluster.datastore.messages;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
public static final Class<ShardTransactionMessages.CreateTransaction> SERIALIZABLE_CLASS =
ShardTransactionMessages.CreateTransaction.class;
- public static final int HELIUM_1_VERSION = 1;
- public static final int CURRENT_VERSION = HELIUM_1_VERSION;
-
private final String transactionId;
private final int transactionType;
private final String transactionChainId;
- private final int version;
+ private final short version;
public CreateTransaction(String transactionId, int transactionType) {
this(transactionId, transactionType, "");
}
public CreateTransaction(String transactionId, int transactionType, String transactionChainId) {
- this(transactionId, transactionType, transactionChainId, CURRENT_VERSION);
+ this(transactionId, transactionType, transactionChainId, DataStoreVersions.CURRENT_VERSION);
}
private CreateTransaction(String transactionId, int transactionType, String transactionChainId,
- int version) {
+ short version) {
this.transactionId = transactionId;
this.transactionType = transactionType;
this.transactionChainId = transactionChainId;
return transactionType;
}
- public int getVersion() {
+ public short getVersion() {
return version;
}
(ShardTransactionMessages.CreateTransaction) message;
return new CreateTransaction(createTransaction.getTransactionId(),
createTransaction.getTransactionType(), createTransaction.getTransactionChainId(),
- createTransaction.getMessageVersion());
+ (short)createTransaction.getMessageVersion());
}
public String getTransactionChainId() {
package org.opendaylight.controller.cluster.datastore.messages;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
public class CreateTransactionReply implements SerializableMessage {
- public static final Class<ShardTransactionMessages.CreateTransactionReply> SERIALIZABLE_CLASS =
- ShardTransactionMessages.CreateTransactionReply.class;
+ public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransactionReply.class;
private final String transactionPath;
private final String transactionId;
- private final int version;
+ private final short version;
- public CreateTransactionReply(final String transactionPath,
- final String transactionId) {
- this(transactionPath, transactionId, CreateTransaction.CURRENT_VERSION);
+ public CreateTransactionReply(String transactionPath, String transactionId) {
+ this(transactionPath, transactionId, DataStoreVersions.CURRENT_VERSION);
}
public CreateTransactionReply(final String transactionPath,
- final String transactionId, final int version) {
+ final String transactionId, final short version) {
this.transactionPath = transactionPath;
this.transactionId = transactionId;
this.version = version;
return transactionId;
}
- public int getVersion() {
+ public short getVersion() {
return version;
}
.build();
}
- public static CreateTransactionReply fromSerializable(final Object serializable){
+ public static CreateTransactionReply fromSerializable(Object serializable){
ShardTransactionMessages.CreateTransactionReply o = (ShardTransactionMessages.CreateTransactionReply) serializable;
- return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(), o.getMessageVersion());
+ return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(),
+ (short)o.getMessageVersion());
}
}
package org.opendaylight.controller.cluster.datastore.messages;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class DataChanged implements SerializableMessage {
- public static final Class<DataChangeListenerMessages.DataChanged> SERIALIZABLE_CLASS =
- DataChangeListenerMessages.DataChanged.class;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
- final private SchemaContext schemaContext;
- private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
- change;
+public class DataChanged implements Externalizable {
+ private static final long serialVersionUID = 1L;
+ private AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
+ public DataChanged() {
+ }
- public DataChanged(SchemaContext schemaContext,
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+ public DataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
this.change = change;
- this.schemaContext = schemaContext;
}
-
public AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> getChange() {
return change;
}
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ in.readShort(); // Read the version
- private NormalizedNodeMessages.Node convertToNodeTree(
- NormalizedNode<?, ?> normalizedNode) {
+ NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
- return new NormalizedNodeToNodeCodec(schemaContext)
- .encode(normalizedNode)
- .getNormalizedNode();
+ // Note: the scope passed to builder is not actually used.
+ Builder builder = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE);
- }
+ // Read created data
- private Iterable<NormalizedNodeMessages.InstanceIdentifier> convertToRemovePaths(
- Set<YangInstanceIdentifier> removedPaths) {
- final Set<NormalizedNodeMessages.InstanceIdentifier> removedPathInstanceIds = new HashSet<>();
- for (YangInstanceIdentifier id : removedPaths) {
- removedPathInstanceIds.add(InstanceIdentifierUtils.toSerializable(id));
+ int size = in.readInt();
+ for(int i = 0; i < size; i++) {
+ YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+ NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+ builder.addCreated(path, node);
}
- return new Iterable<NormalizedNodeMessages.InstanceIdentifier>() {
- @Override
- public Iterator<NormalizedNodeMessages.InstanceIdentifier> iterator() {
- return removedPathInstanceIds.iterator();
- }
- };
- }
+ // Read updated data
- private NormalizedNodeMessages.NodeMap convertToNodeMap(
- Map<YangInstanceIdentifier, NormalizedNode<?, ?>> data) {
- NormalizedNodeToNodeCodec normalizedNodeToNodeCodec =
- new NormalizedNodeToNodeCodec(schemaContext);
- NormalizedNodeMessages.NodeMap.Builder nodeMapBuilder =
- NormalizedNodeMessages.NodeMap.newBuilder();
- NormalizedNodeMessages.NodeMapEntry.Builder builder =
- NormalizedNodeMessages.NodeMapEntry.newBuilder();
- for (Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data
- .entrySet()) {
-
-
- NormalizedNodeMessages.InstanceIdentifier instanceIdentifier =
- InstanceIdentifierUtils.toSerializable(entry.getKey());
-
- builder.setInstanceIdentifierPath(instanceIdentifier)
- .setNormalizedNode(normalizedNodeToNodeCodec
- .encode(entry.getValue())
- .getNormalizedNode());
- nodeMapBuilder.addMapEntries(builder.build());
+ size = in.readInt();
+ for(int i = 0; i < size; i++) {
+ YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+ NormalizedNode<?, ?> before = streamReader.readNormalizedNode();
+ NormalizedNode<?, ?> after = streamReader.readNormalizedNode();
+ builder.addUpdated(path, before, after);
}
- return nodeMapBuilder.build();
- }
-
- @Override
- public Object toSerializable() {
- return DataChangeListenerMessages.DataChanged.newBuilder()
- .addAllRemovedPaths(convertToRemovePaths(change.getRemovedPaths()))
- .setCreatedData(convertToNodeMap(change.getCreatedData()))
- .setOriginalData(convertToNodeMap(change.getOriginalData()))
- .setUpdatedData(convertToNodeMap(change.getUpdatedData()))
- .setOriginalSubTree(convertToNodeTree(change.getOriginalSubtree()))
- .setUpdatedSubTree(convertToNodeTree(change.getUpdatedSubtree()))
- .build();
- }
+ // Read removed data
- public static DataChanged fromSerialize(SchemaContext sc, Object message,
- YangInstanceIdentifier pathId) {
- DataChangeListenerMessages.DataChanged dataChanged =
- (DataChangeListenerMessages.DataChanged) message;
- DataChangedEvent event = new DataChangedEvent(sc);
- if (dataChanged.getCreatedData() != null && dataChanged.getCreatedData()
- .isInitialized()) {
- event.setCreatedData(dataChanged.getCreatedData());
- }
- if (dataChanged.getOriginalData() != null && dataChanged
- .getOriginalData().isInitialized()) {
- event.setOriginalData(dataChanged.getOriginalData());
+ size = in.readInt();
+ for(int i = 0; i < size; i++) {
+ YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+ NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+ builder.addRemoved(path, node);
}
- if (dataChanged.getUpdatedData() != null && dataChanged.getUpdatedData()
- .isInitialized()) {
- event.setUpdateData(dataChanged.getUpdatedData());
- }
+ // Read original subtree
- if (dataChanged.getOriginalSubTree() != null && dataChanged
- .getOriginalSubTree().isInitialized()) {
- event.setOriginalSubtree(dataChanged.getOriginalSubTree(), pathId);
+ boolean present = in.readBoolean();
+ if(present) {
+ builder.setBefore(streamReader.readNormalizedNode());
}
- if (dataChanged.getUpdatedSubTree() != null && dataChanged
- .getUpdatedSubTree().isInitialized()) {
- event.setUpdatedSubtree(dataChanged.getOriginalSubTree(), pathId);
- }
+ // Read updated subtree
- if (dataChanged.getRemovedPathsList() != null && !dataChanged
- .getRemovedPathsList().isEmpty()) {
- event.setRemovedPaths(dataChanged.getRemovedPathsList());
+ present = in.readBoolean();
+ if(present) {
+ builder.setAfter(streamReader.readNormalizedNode());
}
- return new DataChanged(sc, event);
-
+ change = builder.build();
}
- static class DataChangedEvent implements
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
- private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData;
- private final NormalizedNodeToNodeCodec nodeCodec;
- private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updatedData;
- private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> originalData;
- private NormalizedNode<?, ?> originalSubTree;
- private NormalizedNode<?, ?> updatedSubTree;
- private Set<YangInstanceIdentifier> removedPathIds;
-
- DataChangedEvent(SchemaContext schemaContext) {
- nodeCodec = new NormalizedNodeToNodeCodec(schemaContext);
- }
-
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
- if(createdData == null){
- return Collections.emptyMap();
- }
- return createdData;
- }
-
- DataChangedEvent setCreatedData(
- NormalizedNodeMessages.NodeMap nodeMap) {
- this.createdData = convertNodeMapToMap(nodeMap);
- return this;
- }
-
- private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> convertNodeMapToMap(
- NormalizedNodeMessages.NodeMap nodeMap) {
- Map<YangInstanceIdentifier, NormalizedNode<?, ?>> mapEntries =
- new HashMap<YangInstanceIdentifier, NormalizedNode<?, ?>>();
- for (NormalizedNodeMessages.NodeMapEntry nodeMapEntry : nodeMap
- .getMapEntriesList()) {
- YangInstanceIdentifier id = InstanceIdentifierUtils
- .fromSerializable(nodeMapEntry.getInstanceIdentifierPath());
- mapEntries.put(id,
- nodeCodec.decode(nodeMapEntry.getNormalizedNode()));
- }
- return mapEntries;
- }
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeShort(DataStoreVersions.CURRENT_VERSION);
+ NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+ NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(streamWriter);
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
- if(updatedData == null){
- return Collections.emptyMap();
- }
- return updatedData;
- }
+ // Write created data
- DataChangedEvent setUpdateData(NormalizedNodeMessages.NodeMap nodeMap) {
- this.updatedData = convertNodeMapToMap(nodeMap);
- return this;
+ Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData = change.getCreatedData();
+ out.writeInt(createdData.size());
+ for(Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: createdData.entrySet()) {
+ streamWriter.writeYangInstanceIdentifier(e.getKey());
+ nodeWriter.write(e.getValue());
}
- @Override
- public Set<YangInstanceIdentifier> getRemovedPaths() {
- if (removedPathIds == null) {
- return Collections.emptySet();
- }
- return removedPathIds;
- }
+ // Write updated data
- public DataChangedEvent setRemovedPaths(List<NormalizedNodeMessages.InstanceIdentifier> removedPaths) {
- Set<YangInstanceIdentifier> removedIds = new HashSet<>();
- for (NormalizedNodeMessages.InstanceIdentifier path : removedPaths) {
- removedIds.add(InstanceIdentifierUtils.fromSerializable(path));
- }
- this.removedPathIds = removedIds;
- return this;
+ Map<YangInstanceIdentifier, NormalizedNode<?, ?>> originalData = change.getOriginalData();
+ Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updatedData = change.getUpdatedData();
+ out.writeInt(updatedData.size());
+ for(Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: updatedData.entrySet()) {
+ streamWriter.writeYangInstanceIdentifier(e.getKey());
+ nodeWriter.write(originalData.get(e.getKey()));
+ nodeWriter.write(e.getValue());
}
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getOriginalData() {
- if (originalData == null) {
- Collections.emptyMap();
- }
- return originalData;
- }
+ // Write removed data
- DataChangedEvent setOriginalData(
- NormalizedNodeMessages.NodeMap nodeMap) {
- this.originalData = convertNodeMapToMap(nodeMap);
- return this;
+ Set<YangInstanceIdentifier> removed = change.getRemovedPaths();
+ out.writeInt(removed.size());
+ for(YangInstanceIdentifier path: removed) {
+ streamWriter.writeYangInstanceIdentifier(path);
+ nodeWriter.write(originalData.get(path));
}
- @Override
- public NormalizedNode<?, ?> getOriginalSubtree() {
- return originalSubTree;
- }
+ // Write original subtree
- DataChangedEvent setOriginalSubtree(NormalizedNodeMessages.Node node,
- YangInstanceIdentifier instanceIdentifierPath) {
- originalSubTree = nodeCodec.decode(node);
- return this;
+ NormalizedNode<?, ?> originalSubtree = change.getOriginalSubtree();
+ out.writeBoolean(originalSubtree != null);
+ if(originalSubtree != null) {
+ nodeWriter.write(originalSubtree);
}
- @Override
- public NormalizedNode<?, ?> getUpdatedSubtree() {
- return updatedSubTree;
- }
+ // Write original subtree
- DataChangedEvent setUpdatedSubtree(NormalizedNodeMessages.Node node,
- YangInstanceIdentifier instanceIdentifierPath) {
- updatedSubTree = nodeCodec.decode(node);
- return this;
+ NormalizedNode<?, ?> updatedSubtree = change.getUpdatedSubtree();
+ out.writeBoolean(updatedSubtree != null);
+ if(updatedSubtree != null) {
+ nodeWriter.write(updatedSubtree);
}
-
-
}
-
-
-
}
import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages;
public class DataChangedReply implements SerializableMessage {
- public static final Class<DataChangeListenerMessages.DataChangedReply> SERIALIZABLE_CLASS =
- DataChangeListenerMessages.DataChangedReply.class;
- @Override
- public Object toSerializable() {
- return DataChangeListenerMessages.DataChangedReply.newBuilder().build();
- }
+ public static final Class<DataChangeListenerMessages.DataChangedReply> SERIALIZABLE_CLASS =
+ DataChangeListenerMessages.DataChangedReply.class;
+
+ private static final Object SERIALIZED_INSTANCE =
+ DataChangeListenerMessages.DataChangedReply.newBuilder().build();
+
+ public static final DataChangedReply INSTANCE = new DataChangedReply();
+
+ @Override
+ public Object toSerializable() {
+ return SERIALIZED_INSTANCE;
+ }
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
public class DeleteDataReply implements SerializableMessage{
- public static final Class<ShardTransactionMessages.DeleteDataReply> SERIALIZABLE_CLASS =
- ShardTransactionMessages.DeleteDataReply.class;
- @Override
- public Object toSerializable() {
- return ShardTransactionMessages.DeleteDataReply.newBuilder().build();
- }
+ public static final Class<ShardTransactionMessages.DeleteDataReply> SERIALIZABLE_CLASS =
+ ShardTransactionMessages.DeleteDataReply.class;
+
+ private static final Object SERIALIZED_INSTANCE = ShardTransactionMessages.DeleteDataReply.newBuilder().build();
+
+ public static final DeleteDataReply INSTANCE = new DeleteDataReply();
+
+ @Override
+ public Object toSerializable() {
+ return SERIALIZED_INSTANCE;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Externalizable with no data.
+ *
+ * @author Thomas Pantelis
+ */
+public class EmptyExternalizable implements Externalizable {
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+
+/**
+ * A reply with no data.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class EmptyReply extends EmptyExternalizable implements VersionedSerializableMessage {
+
+ private final Object legacySerializedInstance;
+
+ protected EmptyReply(Object legacySerializedInstance) {
+ super();
+ this.legacySerializedInstance = legacySerializedInstance;
+ }
+
+ @Override
+ public Object toSerializable(short toVersion) {
+ return toVersion >= DataStoreVersions.LITHIUM_VERSION ? this : legacySerializedInstance;
+ }
+}
package org.opendaylight.controller.cluster.datastore.messages;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-public class MergeData extends ModifyData{
+public class MergeData extends ModifyData implements VersionedSerializableMessage {
+ private static final long serialVersionUID = 1L;
- public static final Class<ShardTransactionMessages.MergeData> SERIALIZABLE_CLASS =
- ShardTransactionMessages.MergeData.class;
+ public static final Class<MergeData> SERIALIZABLE_CLASS = MergeData.class;
- public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data,
- SchemaContext context) {
- super(path, data, context);
+ public MergeData() {
+ }
+
+ public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ super(path, data);
}
@Override
- public Object toSerializable() {
- Encoded encoded = new NormalizedNodeToNodeCodec(schemaContext).encode(path, data);
- return ShardTransactionMessages.MergeData.newBuilder()
- .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
- .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+ public Object toSerializable(short toVersion) {
+ if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
+ setVersion(toVersion);
+ return this;
+ } else {
+ // To base or R1 Helium version
+ Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(getPath(), getData());
+ return ShardTransactionMessages.MergeData.newBuilder()
+ .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+ .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+ }
+ }
+
+ public static MergeData fromSerializable(Object serializable){
+ if(serializable instanceof MergeData) {
+ return (MergeData) serializable;
+ } else {
+ // From base or R1 Helium version
+ ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
+ Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
+ o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
+ return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode());
+ }
}
- public static MergeData fromSerializable(Object serializable, SchemaContext schemaContext){
- ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
- Decoded decoded = new NormalizedNodeToNodeCodec(schemaContext).decode(
- o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
- return new MergeData(decoded.getDecodedPath(), decoded.getDecodedNode(), schemaContext);
+ public static boolean isSerializedType(Object message) {
+ return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) ||
+ message instanceof ShardTransactionMessages.MergeData;
}
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-public class MergeDataReply implements SerializableMessage{
- public static final Class<ShardTransactionMessages.MergeDataReply> SERIALIZABLE_CLASS =
- ShardTransactionMessages.MergeDataReply.class;
+public class MergeDataReply extends EmptyReply {
+ private static final long serialVersionUID = 1L;
- @Override
- public Object toSerializable() {
- return ShardTransactionMessages.MergeDataReply.newBuilder().build();
- }
+ private static final Object LEGACY_SERIALIZED_INSTANCE =
+ ShardTransactionMessages.MergeDataReply.newBuilder().build();
+
+ public static final MergeDataReply INSTANCE = new MergeDataReply();
+
+ public MergeDataReply() {
+ super(LEGACY_SERIALIZED_INSTANCE);
+ }
}
package org.opendaylight.controller.cluster.datastore.messages;
-import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils.Applier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-public abstract class ModifyData implements SerializableMessage {
- protected final YangInstanceIdentifier path;
- protected final NormalizedNode<?, ?> data;
- protected final SchemaContext schemaContext;
+public abstract class ModifyData implements Externalizable {
+ private static final long serialVersionUID = 1L;
- public ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data,
- SchemaContext context) {
- Preconditions.checkNotNull(context,
- "Cannot serialize an object which does not have a schema schemaContext");
+ private YangInstanceIdentifier path;
+ private NormalizedNode<?, ?> data;
+ private short version;
+ protected ModifyData() {
+ }
+ protected ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
this.path = path;
this.data = data;
- this.schemaContext = context;
}
public YangInstanceIdentifier getPath() {
return data;
}
+ public short getVersion() {
+ return version;
+ }
+
+ protected void setVersion(short version) {
+ this.version = version;
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ version = in.readShort();
+ SerializationUtils.deserializePathAndNode(in, this, APPLIER);
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeShort(version);
+ SerializationUtils.serializePathAndNode(path, data, out);
+ }
+
+ private static final Applier<ModifyData> APPLIER = new Applier<ModifyData>() {
+ @Override
+ public void apply(ModifyData instance, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ instance.path = path;
+ instance.data = data;
+ }
+ };
}
package org.opendaylight.controller.cluster.datastore.messages;
import com.google.protobuf.ByteString;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-public class ReadDataReply implements SerializableMessage {
- public static final Class<ShardTransactionMessages.ReadDataReply> SERIALIZABLE_CLASS =
- ShardTransactionMessages.ReadDataReply.class;
+public class ReadDataReply implements VersionedSerializableMessage, Externalizable {
+ private static final long serialVersionUID = 1L;
- private final NormalizedNode<?, ?> normalizedNode;
- private final SchemaContext schemaContext;
+ public static final Class<ReadDataReply> SERIALIZABLE_CLASS = ReadDataReply.class;
- public ReadDataReply(SchemaContext context,NormalizedNode<?, ?> normalizedNode){
+ private NormalizedNode<?, ?> normalizedNode;
+ private short version;
+ public ReadDataReply() {
+ }
+
+ public ReadDataReply(NormalizedNode<?, ?> normalizedNode) {
this.normalizedNode = normalizedNode;
- this.schemaContext = context;
}
public NormalizedNode<?, ?> getNormalizedNode() {
}
@Override
- public Object toSerializable(){
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ version = in.readShort();
+ normalizedNode = SerializationUtils.deserializeNormalizedNode(in);
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeShort(version);
+ SerializationUtils.serializeNormalizedNode(normalizedNode, out);
+ }
+
+ @Override
+ public Object toSerializable(short toVersion) {
+ if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
+ version = toVersion;
+ return this;
+ } else {
+ return toSerializableReadDataReply(normalizedNode);
+ }
+ }
+
+ private static ShardTransactionMessages.ReadDataReply toSerializableReadDataReply(
+ NormalizedNode<?, ?> normalizedNode) {
if(normalizedNode != null) {
return ShardTransactionMessages.ReadDataReply.newBuilder()
- .setNormalizedNode(new NormalizedNodeToNodeCodec(schemaContext)
- .encode(normalizedNode).getNormalizedNode()).build();
+ .setNormalizedNode(new NormalizedNodeToNodeCodec(null)
+ .encode(normalizedNode).getNormalizedNode()).build();
} else {
return ShardTransactionMessages.ReadDataReply.newBuilder().build();
}
}
- public static ReadDataReply fromSerializable(SchemaContext schemaContext,
- YangInstanceIdentifier id, Object serializable) {
- ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
- return new ReadDataReply(schemaContext, new NormalizedNodeToNodeCodec(schemaContext).decode(
- o.getNormalizedNode()));
+ public static ReadDataReply fromSerializable(Object serializable) {
+ if(serializable instanceof ReadDataReply) {
+ return (ReadDataReply) serializable;
+ } else {
+ ShardTransactionMessages.ReadDataReply o =
+ (ShardTransactionMessages.ReadDataReply) serializable;
+ return new ReadDataReply(new NormalizedNodeToNodeCodec(null).decode(o.getNormalizedNode()));
+ }
+ }
+
+ public static ByteString fromSerializableAsByteString(Object serializable) {
+ if(serializable instanceof ReadDataReply) {
+ ReadDataReply r = (ReadDataReply)serializable;
+ return toSerializableReadDataReply(r.getNormalizedNode()).toByteString();
+ } else {
+ ShardTransactionMessages.ReadDataReply o =
+ (ShardTransactionMessages.ReadDataReply) serializable;
+ return o.getNormalizedNode().toByteString();
+ }
}
- public static ByteString getNormalizedNodeByteString(Object serializable){
- ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
- return ((ShardTransactionMessages.ReadDataReply) serializable).getNormalizedNode().toByteString();
+ public static boolean isSerializedType(Object message) {
+ return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) ||
+ message instanceof ShardTransactionMessages.ReadDataReply;
}
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
public class ReadyTransaction implements SerializableMessage{
- public static final Class<ShardTransactionMessages.ReadyTransaction> SERIALIZABLE_CLASS =
- ShardTransactionMessages.ReadyTransaction.class;
+ public static final Class<ShardTransactionMessages.ReadyTransaction> SERIALIZABLE_CLASS =
+ ShardTransactionMessages.ReadyTransaction.class;
- @Override
- public Object toSerializable() {
- return ShardTransactionMessages.ReadyTransaction.newBuilder().build();
- }
+ private static final Object SERIALIZED_INSTANCE = ShardTransactionMessages.ReadyTransaction.newBuilder().build();
+ public static final ReadyTransaction INSTANCE = new ReadyTransaction();
+
+ @Override
+ public Object toSerializable() {
+ return SERIALIZED_INSTANCE;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Interface for a Serializable message with versioning.
+ *
+ * @author Thomas Pantelis
+ */
+public interface VersionedSerializableMessage {
+ Object toSerializable(short toVersion);
+}
package org.opendaylight.controller.cluster.datastore.messages;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-public class WriteData extends ModifyData {
+public class WriteData extends ModifyData implements VersionedSerializableMessage {
+ private static final long serialVersionUID = 1L;
- public static final Class<ShardTransactionMessages.WriteData> SERIALIZABLE_CLASS =
- ShardTransactionMessages.WriteData.class;
+ public static final Class<WriteData> SERIALIZABLE_CLASS = WriteData.class;
- public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, SchemaContext schemaContext) {
- super(path, data, schemaContext);
+ public WriteData() {
+ }
+
+ public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ super(path, data);
}
@Override
- public Object toSerializable() {
- Encoded encoded = new NormalizedNodeToNodeCodec(schemaContext).encode(path, data);
- return ShardTransactionMessages.WriteData.newBuilder()
- .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
- .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+ public Object toSerializable(short toVersion) {
+ if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
+ setVersion(toVersion);
+ return this;
+ } else {
+ // To base or R1 Helium version
+ Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(getPath(), getData());
+ return ShardTransactionMessages.WriteData.newBuilder()
+ .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+ .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+ }
+ }
+
+ public static WriteData fromSerializable(Object serializable) {
+ if(serializable instanceof WriteData) {
+ return (WriteData) serializable;
+ } else {
+ // From base or R1 Helium version
+ ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable;
+ Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(
+ o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
+ return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode());
+ }
}
- public static WriteData fromSerializable(Object serializable, SchemaContext schemaContext){
- ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable;
- Decoded decoded = new NormalizedNodeToNodeCodec(schemaContext).decode(
- o.getInstanceIdentifierPathArguments(), o.getNormalizedNode());
- return new WriteData(decoded.getDecodedPath(), decoded.getDecodedNode(), schemaContext);
+ public static boolean isSerializedType(Object message) {
+ return SERIALIZABLE_CLASS.isAssignableFrom(message.getClass()) ||
+ message instanceof ShardTransactionMessages.WriteData;
}
}
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-public class WriteDataReply implements SerializableMessage{
- public static final Class<ShardTransactionMessages.WriteDataReply> SERIALIZABLE_CLASS =
- ShardTransactionMessages.WriteDataReply.class;
- @Override
- public Object toSerializable() {
- return ShardTransactionMessages.WriteDataReply.newBuilder().build();
- }
+public class WriteDataReply extends EmptyReply {
+ private static final long serialVersionUID = 1L;
+
+ private static final Object LEGACY_SERIALIZED_INSTANCE =
+ ShardTransactionMessages.WriteDataReply.newBuilder().build();
+
+ public static final WriteDataReply INSTANCE = new WriteDataReply();
+
+ public WriteDataReply() {
+ super(LEGACY_SERIALIZED_INSTANCE);
+ }
}
package org.opendaylight.controller.cluster.datastore.utils;
+import static akka.pattern.Patterns.ask;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import static akka.pattern.Patterns.ask;
-
/**
* The ActorContext class contains utility methods which could be used by
* non-actors (like DistributedDataStore) to work with actors a little more
private final FiniteDuration operationDuration;
private final Timeout operationTimeout;
private final String selfAddressHostPort;
+ private final int transactionOutstandingOperationLimit;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration) {
} else {
selfAddressHostPort = null;
}
+
+ transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
}
public DatastoreContext getDatastoreContext() {
return builder.toString();
}
+
+ /**
+ * Get the maximum number of operations that are to be permitted within a transaction before the transaction
+ * should begin throttling the operations
+ *
+ * Parking reading this configuration here because we need to get to the actor system settings
+ *
+ * @return
+ */
+ public int getTransactionOutstandingOperationLimit(){
+ return transactionOutstandingOperationLimit;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Preconditions;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+
+/**
+ * Provides various utility methods for serialization and de-serialization.
+ *
+ * @author Thomas Pantelis
+ */
+public final class SerializationUtils {
+ public static interface Applier<T> {
+ void apply(T instance, YangInstanceIdentifier path, NormalizedNode<?, ?> node);
+ }
+
+ public static void serializePathAndNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node,
+ DataOutput out) {
+ Preconditions.checkNotNull(path);
+ Preconditions.checkNotNull(node);
+ try {
+ NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+ NormalizedNodeWriter.forStreamWriter(streamWriter).write(node);
+ streamWriter.writeYangInstanceIdentifier(path);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Error serializing path {} and Node {}",
+ path, node), e);
+ }
+ }
+
+ public static <T> void deserializePathAndNode(DataInput in, T instance, Applier<T> applier) {
+ try {
+ NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+ NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+ YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+ applier.apply(instance, path, node);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Error deserializing path and Node", e);
+ }
+ }
+
+ public static void serializeNormalizedNode(NormalizedNode<?, ?> node, DataOutput out) {
+ try {
+ out.writeBoolean(node != null);
+ if(node != null) {
+ NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+ NormalizedNodeWriter.forStreamWriter(streamWriter).write(node);
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Error serializing NormalizedNode {}",
+ node), e);
+ }
+ }
+
+ public static NormalizedNode<?, ?> deserializeNormalizedNode(DataInput in) {
+ try {
+ boolean present = in.readBoolean();
+ if(present) {
+ NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+ return streamReader.readNormalizedNode();
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
+ }
+
+ return null;
+ }
+}
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
public class DataChangeListenerProxyTest extends AbstractActorTest {
private static class MockDataChangedEvent implements AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
final ActorRef actorRef = getSystem().actorOf(props);
DataChangeListenerProxy dataChangeListenerProxy = new DataChangeListenerProxy(
- TestModel.createTestContext(), getSystem().actorSelection(actorRef.path()));
+ getSystem().actorSelection(actorRef.path()));
dataChangeListenerProxy.onDataChanged(new MockDataChangedEvent());
// Let the DataChangeListener know that notifications should be enabled
subject.tell(new EnableNotification(true), getRef());
- subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
+ subject.tell(new DataChanged(mockChangeEvent),
getRef());
expectMsgClass(DataChangedReply.class);
final ActorRef subject =
getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
- subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
+ subject.tell(new DataChanged(mockChangeEvent),
getRef());
new Within(duration("1 seconds")) {
getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
- subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
- ActorRef.noSender());
+ subject.tell(new DataChanged(mockChangeEvent), ActorRef.noSender());
// Make sure no DataChangedReply is sent to DeadLetters.
while(true) {
SchemaContext schemaContext = CompositeModel.createTestContext();
- subject.tell(new DataChanged(schemaContext, mockChangeEvent1),getRef());
+ subject.tell(new DataChanged(mockChangeEvent1),getRef());
expectMsgClass(DataChangedReply.class);
- subject.tell(new DataChanged(schemaContext, mockChangeEvent2),getRef());
+ subject.tell(new DataChanged(mockChangeEvent2),getRef());
expectMsgClass(DataChangedReply.class);
- subject.tell(new DataChanged(schemaContext, mockChangeEvent3),getRef());
+ subject.tell(new DataChanged(mockChangeEvent3),getRef());
expectMsgClass(DataChangedReply.class);
Mockito.verify(mockListener).onDataChanged(mockChangeEvent1);
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
-import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props, "testNegativeMergeTransactionReady");
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
import java.util.Collections;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.dispatch.Dispatchers;
-import akka.testkit.TestActorRef;
/**
* Tests backwards compatibility support from Helium-1 to Helium.
*/
public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest {
+ @SuppressWarnings("unchecked")
@Test
public void testTransactionCommit() throws Exception {
new ShardTestKit(getSystem()) {{
// Write data to the Tx
txActor.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
+ DataStoreVersions.BASE_HELIUM_VERSION), getRef());
- expectMsgClass(duration, WriteDataReply.class);
+ expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class);
// Ready the Tx
// Write data to the Tx
txActor.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef());
expectMsgClass(duration, WriteDataReply.class);
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
}
transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
getRef());
- ShardTransactionMessages.ReadDataReply replySerialized =
- expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
+ Object replySerialized =
+ expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
- assertNotNull(ReadDataReply.fromSerializable(
- testSchemaContext,YangInstanceIdentifier.builder().build(), replySerialized)
- .getNormalizedNode());
+ assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
// unserialized read
transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
final ActorRef shard = createShard();
Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
props, "testReadDataWhenDataNotFoundRO"));
props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
props, "testReadDataWhenDataNotFoundRW"));
// serialized read
transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
- ShardTransactionMessages.ReadDataReply replySerialized =
- expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
+ Object replySerialized =
+ expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
- assertTrue(ReadDataReply.fromSerializable(
- testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
+ assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
// unserialized read
transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
}};
}
+ @Test
+ public void testOnReceiveReadDataHeliumR1() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = createShard();
+ Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ DataStoreVersions.HELIUM_1_VERSION);
+
+ ActorRef transaction = getSystem().actorOf(props, "testOnReceiveReadDataHeliumR1");
+
+ transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
+ getRef());
+
+ ShardTransactionMessages.ReadDataReply replySerialized =
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
+
+ assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
+ }};
+ }
+
@Test
public void testOnReceiveDataExistsPositive() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
}
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
}
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
- final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
+ DataStoreVersions.CURRENT_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveWriteData");
transaction.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
- getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
+ DataStoreVersions.HELIUM_2_VERSION), getRef());
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
assertModification(transaction, WriteModification.class);
- //unserialized write
+ // unserialized write
transaction.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME),
- TestModel.createTestContext()),
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
getRef());
expectMsgClass(duration("5 seconds"), WriteDataReply.class);
}};
}
+ @Test
+ public void testOnReceiveHeliumR1WriteData() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = createShard();
+ final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ DataStoreVersions.HELIUM_1_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1WriteData");
+
+ Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
+ .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+ .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+
+ transaction.tell(serialized, getRef());
+
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
+
+ assertModification(transaction, WriteModification.class);
+ }};
+ }
+
@Test
public void testOnReceiveMergeData() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
transaction.tell(new MergeData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
- getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
+ DataStoreVersions.HELIUM_2_VERSION), getRef());
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
//unserialized merge
transaction.tell(new MergeData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
getRef());
expectMsgClass(duration("5 seconds"), MergeDataReply.class);
}};
}
+ @Test
+ public void testOnReceiveHeliumR1MergeData() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shard = createShard();
+ final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ DataStoreVersions.HELIUM_1_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1MergeData");
+
+ Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
+ .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
+ .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
+
+ transaction.tell(serialized, getRef());
+
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
+
+ assertModification(transaction, MergeModification.class);
+ }};
+ }
+
@Test
public void testOnReceiveDeleteData() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
watch(transaction);
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
watch(transaction);
}
+ @SuppressWarnings("unchecked")
@Test
public void testOnReceiveCloseTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
watch(transaction);
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
final ActorRef transaction =
getSystem().actorOf(props, "testShardTransactionInactivity");
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import scala.concurrent.Future;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.isA;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
@SuppressWarnings("serial")
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- new CanCommitTransactionReply(true));
+ CanCommitTransactionReply.YES);
ListenableFuture<Boolean> future = proxy.canCommit();
assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- new CanCommitTransactionReply(false));
+ CanCommitTransactionReply.NO);
future = proxy.canCommit();
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+ CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
ListenableFuture<Boolean> future = proxy.canCommit();
ThreePhaseCommitCohortProxy proxy = setupProxy(3);
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- new CanCommitTransactionReply(true), new CanCommitTransactionReply(false),
- new CanCommitTransactionReply(true));
+ CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
ListenableFuture<Boolean> future = proxy.canCommit();
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
- new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+ CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
new PreCommitTransactionReply(), new PreCommitTransactionReply());
package org.opendaylight.controller.cluster.datastore;
import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-public class TransactionChainProxyTest {
- ActorContext actorContext = mock(ActorContext.class);
+public class TransactionChainProxyTest extends AbstractActorTest{
+ ActorContext actorContext = null;
SchemaContext schemaContext = mock(SchemaContext.class);
@Before
public void setUp() {
- doReturn(schemaContext).when(actorContext).getSchemaContext();
+ actorContext = new MockActorContext(getSystem());
+ actorContext.setSchemaContext(schemaContext);
}
@SuppressWarnings("resource")
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.google.common.util.concurrent.CheckedFuture;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.isA;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
@SuppressWarnings("resource")
public class TransactionProxyTest {
schemaContext = TestModel.createTestContext();
- DatastoreContext dataStoreContext = DatastoreContext.newBuilder().build();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
doReturn(getSystem()).when(mockActorContext).getActorSystem();
doReturn(memberName).when(mockActorContext).getCurrentMemberName();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
+ doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
ShardStrategyFactory.setConfiguration(configuration);
}
ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
@Override
public boolean matches(Object argument) {
- CreateTransaction obj = CreateTransaction.fromSerializable(argument);
- return obj.getTransactionId().startsWith(memberName) &&
- obj.getTransactionType() == type.ordinal();
+ if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+ CreateTransaction obj = CreateTransaction.fromSerializable(argument);
+ return obj.getTransactionId().startsWith(memberName) &&
+ obj.getTransactionType() == type.ordinal();
+ }
+
+ return false;
}
};
}
private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite) {
+ return eqSerializedWriteData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
+ }
+
+ private WriteData eqSerializedWriteData(final NormalizedNode<?, ?> nodeToWrite,
+ final int transactionVersion) {
ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
@Override
public boolean matches(Object argument) {
- if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
- return false;
+ if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+ WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
+ (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
+ ShardTransactionMessages.WriteData.class.equals(argument.getClass()))) {
+
+ WriteData obj = WriteData.fromSerializable(argument);
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
}
- WriteData obj = WriteData.fromSerializable(argument, schemaContext);
- return obj.getPath().equals(TestModel.TEST_PATH) &&
- obj.getData().equals(nodeToWrite);
+ return false;
}
};
}
private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite) {
+ return eqSerializedMergeData(nodeToWrite, DataStoreVersions.CURRENT_VERSION);
+ }
+
+ private MergeData eqSerializedMergeData(final NormalizedNode<?, ?> nodeToWrite,
+ final int transactionVersion) {
ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
@Override
public boolean matches(Object argument) {
- if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
- return false;
+ if((transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+ MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) ||
+ (transactionVersion < DataStoreVersions.LITHIUM_VERSION &&
+ ShardTransactionMessages.MergeData.class.equals(argument.getClass()))) {
+
+ MergeData obj = MergeData.fromSerializable(argument);
+ return obj.getPath().equals(TestModel.TEST_PATH) &&
+ obj.getData().equals(nodeToWrite);
}
- MergeData obj = MergeData.fromSerializable(argument, schemaContext);
- return obj.getPath().equals(TestModel.TEST_PATH) &&
- obj.getData().equals(nodeToWrite);
+ return false;
}
};
return Futures.successful((Object)new ReadyTransactionReply(path));
}
+ private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
+ short transactionVersion) {
+ return Futures.successful(new ReadDataReply(data).toSerializable(transactionVersion));
+ }
private Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
- return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
+ return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
}
private Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
- return Futures.successful(new ReadDataReply(schemaContext, data));
+ return Futures.successful(new ReadDataReply(data));
}
private Future<Object> dataExistsSerializedReply(boolean exists) {
return Futures.successful(new DataExistsReply(exists));
}
+ private Future<Object> writeSerializedDataReply(short version) {
+ return Futures.successful(new WriteDataReply().toSerializable(version));
+ }
+
private Future<Object> writeSerializedDataReply() {
- return Futures.successful(new WriteDataReply().toSerializable());
+ return writeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
}
private Future<WriteDataReply> writeDataReply() {
return Futures.successful(new WriteDataReply());
}
+ private Future<Object> mergeSerializedDataReply(short version) {
+ return Futures.successful(new MergeDataReply().toSerializable(version));
+ }
+
private Future<Object> mergeSerializedDataReply() {
- return Futures.successful(new MergeDataReply().toSerializable());
+ return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
+ }
+
+ private Future<Object> incompleteFuture(){
+ return mock(Future.class);
}
private Future<MergeDataReply> mergeDataReply() {
.build();
}
- private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) {
+ private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
+ TransactionType type, int transactionVersion) {
ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
doReturn(actorSystem.actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
+ doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
return actorRef;
}
private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
- return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION);
+ return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
}
eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.SERIALIZABLE_CLASS);
+ WriteDataReply.class);
}
@Test(expected=IllegalStateException.class)
eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- MergeDataReply.SERIALIZABLE_CLASS);
+ MergeDataReply.class);
}
@Test
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.SERIALIZABLE_CLASS);
+ WriteDataReply.class);
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
}
- @Test
- public void testReadyForwardCompatibility() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0);
+ private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(),
+ READ_WRITE, version);
- NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+ doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqSerializedReadData());
- doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+ doReturn(writeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedWriteData(testNode, version));
+
+ doReturn(mergeSerializedDataReply(version)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedMergeData(testNode, version));
doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
eq(actorRef.path().toString()));
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
- transactionProxy.read(TestModel.TEST_PATH);
+ Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
+ get(5, TimeUnit.SECONDS);
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+ assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+ assertEquals("Response NormalizedNode", testNode, readOptional.get());
+
+ transactionProxy.write(TestModel.TEST_PATH, testNode);
+
+ transactionProxy.merge(TestModel.TEST_PATH, testNode);
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- WriteDataReply.SERIALIZABLE_CLASS);
+ ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class);
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+ return actorRef;
+ }
+
+ @Test
+ public void testCompatibilityWithBaseHeliumVersion() throws Exception {
+ ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION);
+
verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
eq(actorRef.path().toString()));
}
+ @Test
+ public void testCompatibilityWithHeliumR1Version() throws Exception {
+ ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION);
+
+ verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
+ eq(actorRef.path().toString()));
+ }
+
@Test
public void testReadyWithRecordingOperationFailure() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
verifyCohortFutures(proxy, TestException.class);
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
+ MergeDataReply.class, TestException.class);
}
@Test
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- MergeDataReply.SERIALIZABLE_CLASS);
+ MergeDataReply.class);
verifyCohortFutures(proxy, TestException.class);
}
verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
}
+
+ private static interface TransactionProxyOperation {
+ void run(TransactionProxy transactionProxy);
+ }
+
+ private void throttleOperation(TransactionProxyOperation operation) {
+ throttleOperation(operation, 1, true);
+ }
+
+ private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+ ActorSystem actorSystem = getSystem();
+ ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+ doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+ doReturn(actorSystem.actorSelection(shardActorRef.path())).
+ when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+ if(shardFound) {
+ doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ } else {
+ doReturn(Futures.failed(new Exception("not found")))
+ .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ }
+
+ String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+ CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+ .setTransactionId("txn-1")
+ .setTransactionActorPath(actorPath)
+ .build();
+
+ doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+ eqCreateTransaction(memberName, READ_WRITE));
+
+ doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+ long start = System.currentTimeMillis();
+
+ operation.run(transactionProxy);
+
+ long end = System.currentTimeMillis();
+
+ Assert.assertTrue(String.format("took less time than expected %s was %s",
+ mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
+ (end-start)), (end - start) > mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
+
+ }
+
+ private void completeOperation(TransactionProxyOperation operation){
+ completeOperation(operation, true);
+ }
+
+ private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
+ ActorSystem actorSystem = getSystem();
+ ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+ doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+ doReturn(actorSystem.actorSelection(shardActorRef.path())).
+ when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+ if(shardFound) {
+ doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ } else {
+ doReturn(Futures.failed(new Exception("not found")))
+ .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ }
+
+ String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+ CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+ .setTransactionId("txn-1")
+ .setTransactionActorPath(actorPath)
+ .build();
+
+ doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+ eqCreateTransaction(memberName, READ_WRITE));
+
+ doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+ long start = System.currentTimeMillis();
+
+ operation.run(transactionProxy);
+
+ long end = System.currentTimeMillis();
+
+ Assert.assertTrue(String.format("took more time than expected %s was %s",
+ mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
+ (end-start)), (end - start) <= mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
+ }
+
+ public void testWriteThrottling(boolean shardFound){
+
+ throttleOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+ }
+ }, 1, shardFound);
+ }
+
+ @Test
+ public void testWriteThrottlingWhenShardFound(){
+ throttleOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+ }
+ });
+
+ }
+
+ @Test
+ public void testWriteThrottlingWhenShardNotFound(){
+ // Confirm that there is no throttling when the Shard is not found
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+ }
+ }, false);
+
+ }
+
+
+ @Test
+ public void testWriteCompletion(){
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+ }
+ });
+
+ }
+
+ @Test
+ public void testMergeThrottlingWhenShardFound(){
+
+ throttleOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+ }
+ });
+ }
+
+ @Test
+ public void testMergeThrottlingWhenShardNotFound(){
+
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+ }
+ }, false);
+ }
+
+ @Test
+ public void testMergeCompletion(){
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+ }
+ });
+
+ }
+
+ @Test
+ public void testDeleteThrottlingWhenShardFound(){
+
+ throttleOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqDeleteData());
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+ }
+ });
+ }
+
+
+ @Test
+ public void testDeleteThrottlingWhenShardNotFound(){
+
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqDeleteData());
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+ }
+ }, false);
+ }
+
+ @Test
+ public void testDeleteCompletion(){
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqDeleteData());
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+ }
+ });
+
+ }
+
+ @Test
+ public void testReadThrottlingWhenShardFound(){
+
+ throttleOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqReadData());
+
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+ }
+ });
+ }
+
+ @Test
+ public void testReadThrottlingWhenShardNotFound(){
+
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqReadData());
+
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+ }
+ }, false);
+ }
+
+
+ @Test
+ public void testReadCompletion(){
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqReadData());
+
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+ }
+ });
+
+ }
+
+ @Test
+ public void testExistsThrottlingWhenShardFound(){
+
+ throttleOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqDataExists());
+
+ transactionProxy.exists(TestModel.TEST_PATH);
+
+ transactionProxy.exists(TestModel.TEST_PATH);
+ }
+ });
+ }
+
+ @Test
+ public void testExistsThrottlingWhenShardNotFound(){
+
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqDataExists());
+
+ transactionProxy.exists(TestModel.TEST_PATH);
+
+ transactionProxy.exists(TestModel.TEST_PATH);
+ }
+ }, false);
+ }
+
+
+ @Test
+ public void testExistsCompletion(){
+ completeOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqDataExists());
+
+ transactionProxy.exists(TestModel.TEST_PATH);
+
+ transactionProxy.exists(TestModel.TEST_PATH);
+ }
+ });
+
+ }
+
+ @Test
+ public void testReadyThrottling(){
+
+ throttleOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), any(ReadyTransaction.class));
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.ready();
+ }
+ });
+ }
+
+ @Test
+ public void testReadyThrottlingWithTwoTransactionContexts(){
+
+ throttleOperation(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqWriteData(carsNode));
+
+ doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), any(ReadyTransaction.class));
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, carsNode);
+
+ transactionProxy.ready();
+ }
+ }, 2, true);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for DataChanged.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataChangedTest {
+
+ @Test
+ public void testSerialization() {
+ DOMImmutableDataChangeEvent change = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE).
+ addCreated(TestModel.TEST_PATH, ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build()).
+ addUpdated(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+ ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "bar")).build())
+.
+ addRemoved(TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()).
+ setBefore(ImmutableNodes.containerNode(TestModel.TEST_QNAME)).
+ setAfter(ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).
+ withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "bar")).build()).build();
+
+ DataChanged expected = new DataChanged(change);
+
+ DataChanged actual = (DataChanged) SerializationUtils.clone(expected);
+
+ assertEquals("getCreatedData", change.getCreatedData(), actual.getChange().getCreatedData());
+ assertEquals("getOriginalData", change.getOriginalData(), actual.getChange().getOriginalData());
+ assertEquals("getOriginalSubtree", change.getOriginalSubtree(), actual.getChange().getOriginalSubtree());
+ assertEquals("getRemovedPaths", change.getRemovedPaths(), actual.getChange().getRemovedPaths());
+ assertEquals("getUpdatedData", change.getUpdatedData(), actual.getChange().getUpdatedData());
+ assertEquals("getUpdatedSubtree", change.getUpdatedSubtree(), actual.getChange().getUpdatedSubtree());
+ }
+}
package org.opendaylight.controller.cluster.datastore.messages;
-import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
public class MergeDataTest {
@Test
public void testSerialization() {
- SchemaContext schemaContext = TestModel.createTestContext();
- MergeData expected = new MergeData(TestModel.TEST_PATH, ImmutableNodes
- .containerNode(TestModel.TEST_QNAME), schemaContext);
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- MergeData actual = MergeData.fromSerializable(expected.toSerializable(), schemaContext);
- Assert.assertEquals("getPath", expected.getPath(), actual.getPath());
- Assert.assertEquals("getData", expected.getData(), actual.getData());
+ MergeData expected = new MergeData(path, data);
+
+ Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+ assertEquals("Serialized type", MergeData.class, serialized.getClass());
+ assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)serialized).getVersion());
+
+ Object clone = SerializationUtils.clone((Serializable) serialized);
+ assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((MergeData)clone).getVersion());
+ MergeData actual = MergeData.fromSerializable(clone);
+ assertEquals("getPath", expected.getPath(), actual.getPath());
+ assertEquals("getData", expected.getData(), actual.getData());
+ }
+
+ @Test
+ public void testIsSerializedType() {
+ assertEquals("isSerializedType", true, MergeData.isSerializedType(
+ ShardTransactionMessages.MergeData.newBuilder()
+ .setInstanceIdentifierPathArguments(InstanceIdentifier.getDefaultInstance())
+ .setNormalizedNode(Node.getDefaultInstance()).build()));
+ assertEquals("isSerializedType", true,
+ MergeData.isSerializedType(new MergeData()));
+ assertEquals("isSerializedType", false, MergeData.isSerializedType(new Object()));
+ }
+
+ /**
+ * Tests backwards compatible serialization/deserialization of a MergeData message with the
+ * base and R1 Helium versions, which used the protobuff MergeData message.
+ */
+ @Test
+ public void testSerializationWithHeliumR1Version() throws Exception {
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ MergeData expected = new MergeData(path, data);
+
+ Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+ assertEquals("Serialized type", ShardTransactionMessages.MergeData.class, serialized.getClass());
+
+ MergeData actual = MergeData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+ assertEquals("getPath", expected.getPath(), actual.getPath());
+ assertEquals("getData", expected.getData(), actual.getData());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for ReadDataReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReadDataReplyTest {
+
+ @Test
+ public void testSerialization() {
+ NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ ReadDataReply expected = new ReadDataReply(data);
+
+ Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+ assertEquals("Serialized type", ReadDataReply.class, serialized.getClass());
+
+ ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
+ (Serializable) serialized));
+ assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode());
+ }
+
+ @Test
+ public void testIsSerializedType() {
+ assertEquals("isSerializedType", true, ReadDataReply.isSerializedType(
+ ShardTransactionMessages.ReadDataReply.newBuilder().build()));
+ assertEquals("isSerializedType", true, ReadDataReply.isSerializedType(new ReadDataReply()));
+ assertEquals("isSerializedType", false, ReadDataReply.isSerializedType(new Object()));
+ }
+
+ /**
+ * Tests backwards compatible serialization/deserialization of a ReadDataReply message with the
+ * base and R1 Helium versions, which used the protobuff ReadDataReply message.
+ */
+ @Test
+ public void testSerializationWithHeliumR1Version() throws Exception {
+ NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ ReadDataReply expected = new ReadDataReply(data);
+
+ Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+ assertEquals("Serialized type", ShardTransactionMessages.ReadDataReply.class, serialized.getClass());
+
+ ReadDataReply actual = ReadDataReply.fromSerializable(SerializationUtils.clone(
+ (Serializable) serialized));
+ assertEquals("getNormalizedNode", expected.getNormalizedNode(), actual.getNormalizedNode());
+ }
+}
*/
package org.opendaylight.controller.cluster.datastore.messages;
-import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
/**
* Unit tests for WriteData.
@Test
public void testSerialization() {
- SchemaContext schemaContext = TestModel.createTestContext();
- WriteData expected = new WriteData(TestModel.TEST_PATH, ImmutableNodes
- .containerNode(TestModel.TEST_QNAME), schemaContext);
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
- WriteData actual = WriteData.fromSerializable(expected.toSerializable(), schemaContext);
- Assert.assertEquals("getPath", expected.getPath(), actual.getPath());
- Assert.assertEquals("getData", expected.getData(), actual.getData());
+ WriteData expected = new WriteData(path, data);
+
+ Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+ assertEquals("Serialized type", WriteData.class, serialized.getClass());
+ assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)serialized).getVersion());
+
+ Object clone = SerializationUtils.clone((Serializable) serialized);
+ assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((WriteData)clone).getVersion());
+ WriteData actual = WriteData.fromSerializable(clone);
+ assertEquals("getPath", expected.getPath(), actual.getPath());
+ assertEquals("getData", expected.getData(), actual.getData());
+ }
+
+ @Test
+ public void testIsSerializedType() {
+ assertEquals("isSerializedType", true, WriteData.isSerializedType(
+ ShardTransactionMessages.WriteData.newBuilder()
+ .setInstanceIdentifierPathArguments(InstanceIdentifier.getDefaultInstance())
+ .setNormalizedNode(Node.getDefaultInstance()).build()));
+ assertEquals("isSerializedType", true, WriteData.isSerializedType(new WriteData()));
+ assertEquals("isSerializedType", false, WriteData.isSerializedType(new Object()));
+ }
+
+ /**
+ * Tests backwards compatible serialization/deserialization of a WriteData message with the
+ * base and R1 Helium versions, which used the protobuff WriteData message.
+ */
+ @Test
+ public void testSerializationWithHeliumR1Version() throws Exception {
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+ WriteData expected = new WriteData(path, data);
+
+ Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+ assertEquals("Serialized type", ShardTransactionMessages.WriteData.class, serialized.getClass());
+
+ WriteData actual = WriteData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+ assertEquals("getPath", expected.getPath(), actual.getPath());
+ assertEquals("getData", expected.getData(), actual.getData());
}
}
package org.opendaylight.controller.cluster.datastore.utils;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.UntypedActor;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
public class ActorContextTest extends AbstractActorTest{
private static class MockShardManager extends UntypedActor {
@Test
public void testResolvePathForRemoteActor() {
ActorContext actorContext =
- new ActorContext(mock(ActorSystem.class), mock(ActorRef.class), mock(
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(
ClusterWrapper.class),
mock(Configuration.class));
*/
package org.opendaylight.controller.md.cluster.datastore.model;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Set;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
-import java.io.InputStream;
-import java.util.Collections;
-import java.util.Set;
-
public class TestModel {
public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
"test");
+
public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
+ public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";