uses meter:meter;
}
}
-
-
- grouping flow-node {
+ grouping ip-address-grouping {
+ leaf ip-address {
+ description "IP address of a flow capable node.";
+ type inet:ip-address;
+ }
+ }
+
+ grouping flow-node {
leaf manufacturer {
type string;
}
uses tables;
uses group:groups;
uses meters;
+ uses ip-address-grouping;
// TODO: ports
container supported-match-types {
}
}
-
+
+ rpc get-node-ip-address {
+ input {
+ uses "inv:node-context-ref";
+ }
+ output {
+ uses ip-address-grouping;
+ }
+ }
+
grouping flow-node-connector {
uses port:flow-capable-port;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
-import java.util.ArrayList;
+
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
final Map<YangInstanceIdentifier, ? extends NormalizedNode<?, ?>> normalized) {
Map<InstanceIdentifier<?>, DataObject> newMap = new HashMap<>();
- for (Map.Entry<YangInstanceIdentifier, ? extends NormalizedNode<?, ?>> entry : sortedEntries(normalized)) {
+ for (Map.Entry<YangInstanceIdentifier, ? extends NormalizedNode<?, ?>> entry : normalized.entrySet()) {
try {
Optional<Entry<InstanceIdentifier<? extends DataObject>, DataObject>> potential = getCodec().toBinding(entry);
if (potential.isPresent()) {
return newMap;
}
- private static final Comparator<Entry<YangInstanceIdentifier, ?>> MAP_ENTRY_COMPARATOR = new Comparator<Entry<YangInstanceIdentifier, ?>>() {
- @Override
- public int compare(final Entry<YangInstanceIdentifier, ?> left, final Entry<YangInstanceIdentifier, ?> right) {
- final Iterator<?> li = left.getKey().getPathArguments().iterator();
- final Iterator<?> ri = right.getKey().getPathArguments().iterator();
-
- // Iterate until left is exhausted...
- while (li.hasNext()) {
- if (!ri.hasNext()) {
- // Left is deeper
- return 1;
- }
-
- li.next();
- ri.next();
- }
-
- // Check if right is exhausted
- return ri.hasNext() ? -1 : 0;
- }
- };
-
- private static <T> Iterable<Entry<YangInstanceIdentifier, T>> sortedEntries(final Map<YangInstanceIdentifier, T> map) {
- if (!map.isEmpty()) {
- ArrayList<Entry<YangInstanceIdentifier, T>> entries = new ArrayList<>(map.entrySet());
- Collections.sort(entries, MAP_ENTRY_COMPARATOR);
- return entries;
- } else {
- return Collections.emptySet();
- }
- }
-
protected Set<InstanceIdentifier<?>> toBinding(final InstanceIdentifier<?> path,
final Set<YangInstanceIdentifier> normalized) {
Set<InstanceIdentifier<?>> hashSet = new HashSet<>();
import java.util.Iterator;
import java.util.Map.Entry;
+import javax.annotation.Nonnull;
+
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationOperation;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
}
- public org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier toNormalized(
- final InstanceIdentifier<? extends DataObject> binding) {
+ public YangInstanceIdentifier toNormalized(final InstanceIdentifier<? extends DataObject> binding) {
return codecRegistry.toYangInstanceIdentifier(binding);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- public Entry<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
+ public Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
final InstanceIdentifier<? extends DataObject> bindingPath, final DataObject bindingObject) {
return codecRegistry.toNormalizedNode((InstanceIdentifier) bindingPath, bindingObject);
}
- public Entry<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
- final Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject> binding) {
+ public Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
+ final Entry<InstanceIdentifier<? extends DataObject>, DataObject> binding) {
return toNormalizedNode(binding.getKey(),binding.getValue());
}
* augmentation.
*
*/
- public Optional<InstanceIdentifier<? extends DataObject>> toBinding(
- final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier normalized)
+ public Optional<InstanceIdentifier<? extends DataObject>> toBinding(final YangInstanceIdentifier normalized)
throws DeserializationException {
try {
- return Optional.<InstanceIdentifier<? extends DataObject>>of(codecRegistry.fromYangInstanceIdentifier(normalized));
+ return Optional.<InstanceIdentifier<? extends DataObject>>fromNullable(codecRegistry.fromYangInstanceIdentifier(normalized));
} catch (IllegalArgumentException e) {
return Optional.absent();
}
return legacyToNormalized;
}
- @SuppressWarnings("unchecked")
- public Optional<Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject>> toBinding(
- final Entry<org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier, ? extends NormalizedNode<?, ?>> normalized)
+ public Optional<Entry<InstanceIdentifier<? extends DataObject>, DataObject>> toBinding(
+ final @Nonnull Entry<YangInstanceIdentifier, ? extends NormalizedNode<?, ?>> normalized)
throws DeserializationException {
try {
- @SuppressWarnings("rawtypes")
- Entry binding = codecRegistry.fromNormalizedNode(normalized.getKey(), normalized.getValue());
- return Optional.<Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject>>fromNullable(binding);
+ /*
+ * This cast is required, due to generics behaviour in openjdk / oracle javac
+ *
+ * InstanceIdentifier has definition InstanceIdentifier<T extends DataObject>,
+ * this means '?' is always  <? extends DataObject>. Eclipse compiler
+ * is able to determine this relationship and treats
+ * Entry<InstanceIdentifier<?>,DataObject> and Entry<InstanceIdentifier<? extends DataObject,DataObject>
+ * as assignable. However openjdk / oracle javac treats this two types
+ * as incompatible and issues a compile error.
+ *
+ * It is safe to loose generic information and cast it to other generic signature.
+ *
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ final Entry<InstanceIdentifier<? extends DataObject>, DataObject> binding = (Entry) codecRegistry.fromNormalizedNode(normalized.getKey(), normalized.getValue());
+ return Optional.fromNullable(binding);
} catch (IllegalArgumentException e) {
return Optional.absent();
}
* @param path DOM Path
* @return Node with defaults set on.
*/
- public NormalizedNode<?, ?> getDefaultNodeFor(final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier path) {
+ public NormalizedNode<?, ?> getDefaultNodeFor(final YangInstanceIdentifier path) {
Iterator<PathArgument> iterator = path.getPathArguments().iterator();
DataNormalizationOperation<?> currentOp = legacyToNormalized.getRootOperation();
while (iterator.hasNext()) {
}
@Override
- public void close() throws Exception {
+ public void close() {
// NOOP Intentionally
}
}
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
+import scala.runtime.AbstractFunction1;
import java.util.Collections;
import java.util.List;
*/
public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
- private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
private final ActorContext actorContext;
- private final List<ActorPath> cohortPaths;
+ private final List<Future<ActorPath>> cohortPathFutures;
+ private volatile List<ActorPath> cohortPaths;
private final String transactionId;
- public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths,
- String transactionId) {
+ public ThreePhaseCommitCohortProxy(ActorContext actorContext,
+ List<Future<ActorPath>> cohortPathFutures, String transactionId) {
this.actorContext = actorContext;
- this.cohortPaths = cohortPaths;
+ this.cohortPathFutures = cohortPathFutures;
this.transactionId = transactionId;
}
+ private Future<Void> buildCohortPathsList() {
+
+ Future<Iterable<ActorPath>> combinedFutures = Futures.sequence(cohortPathFutures,
+ actorContext.getActorSystem().dispatcher());
+
+ return combinedFutures.transform(new AbstractFunction1<Iterable<ActorPath>, Void>() {
+ @Override
+ public Void apply(Iterable<ActorPath> paths) {
+ cohortPaths = Lists.newArrayList(paths);
+
+ LOG.debug("Tx {} successfully built cohort path list: {}",
+ transactionId, cohortPaths);
+ return null;
+ }
+ }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
+ }
+
@Override
public ListenableFuture<Boolean> canCommit() {
- LOG.debug("txn {} canCommit", transactionId);
+ LOG.debug("Tx {} canCommit", transactionId);
+
+ final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+
+ // The first phase of canCommit is to gather the list of cohort actor paths that will
+ // participate in the commit. buildCohortPathsList combines the cohort path Futures into
+ // one Future which we wait on asynchronously here. The cohort actor paths are
+ // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
+ // and passed to us from upstream processing. If any one fails then we'll fail canCommit.
+
+ buildCohortPathsList().onComplete(new OnComplete<Void>() {
+ @Override
+ public void onComplete(Throwable failure, Void notUsed) throws Throwable {
+ if(failure != null) {
+ LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure);
+ returnFuture.setException(failure);
+ } else {
+ finishCanCommit(returnFuture);
+ }
+ }
+ }, actorContext.getActorSystem().dispatcher());
+
+ return returnFuture;
+ }
+
+ private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
+
+ LOG.debug("Tx {} finishCanCommit", transactionId);
+
+ // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
+ // their canCommit processing. If any one fails then we'll fail canCommit.
Future<Iterable<Object>> combinedFuture =
invokeCohorts(new CanCommitTransaction().toSerializable());
- final SettableFuture<Boolean> returnFuture = SettableFuture.create();
-
combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
@Override
public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
if(failure != null) {
+ LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
returnFuture.setException(failure);
return;
}
}
}
+ LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+
returnFuture.set(Boolean.valueOf(result));
}
}, actorContext.getActorSystem().dispatcher());
-
- return returnFuture;
}
private Future<Iterable<Object>> invokeCohorts(Object message) {
List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
for(ActorPath actorPath : cohortPaths) {
- LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+ LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath);
ActorSelection cohort = actorContext.actorSelection(actorPath);
@Override
public ListenableFuture<Void> preCommit() {
- LOG.debug("txn {} preCommit", transactionId);
- return voidOperation(new PreCommitTransaction().toSerializable(),
+ return voidOperation("preCommit", new PreCommitTransaction().toSerializable(),
PreCommitTransactionReply.SERIALIZABLE_CLASS, true);
}
@Override
public ListenableFuture<Void> abort() {
- LOG.debug("txn {} abort", transactionId);
-
// Note - we pass false for propagateException. In the front-end data broker, this method
// is called when one of the 3 phases fails with an exception. We'd rather have that
// original exception propagated to the client. If our abort fails and we propagate the
// exception then that exception will supersede and suppress the original exception. But
// it's the original exception that is the root cause and of more interest to the client.
- return voidOperation(new AbortTransaction().toSerializable(),
+ return voidOperation("abort", new AbortTransaction().toSerializable(),
AbortTransactionReply.SERIALIZABLE_CLASS, false);
}
@Override
public ListenableFuture<Void> commit() {
- LOG.debug("txn {} commit", transactionId);
- return voidOperation(new CommitTransaction().toSerializable(),
+ return voidOperation("commit", new CommitTransaction().toSerializable(),
CommitTransactionReply.SERIALIZABLE_CLASS, true);
}
- private ListenableFuture<Void> voidOperation(final Object message,
+ private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
final Class<?> expectedResponseClass, final boolean propagateException) {
- Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+ LOG.debug("Tx {} {}", transactionId, operationName);
final SettableFuture<Void> returnFuture = SettableFuture.create();
+ // The cohort actor list should already be built at this point by the canCommit phase but,
+ // if not for some reason, we'll try to build it here.
+
+ if(cohortPaths != null) {
+ finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
+ returnFuture);
+ } else {
+ buildCohortPathsList().onComplete(new OnComplete<Void>() {
+ @Override
+ public void onComplete(Throwable failure, Void notUsed) throws Throwable {
+ if(failure != null) {
+ LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
+ operationName, failure);
+
+ if(propagateException) {
+ returnFuture.setException(failure);
+ } else {
+ returnFuture.set(null);
+ }
+ } else {
+ finishVoidOperation(operationName, message, expectedResponseClass,
+ propagateException, returnFuture);
+ }
+ }
+ }, actorContext.getActorSystem().dispatcher());
+ }
+
+ return returnFuture;
+ }
+
+ private void finishVoidOperation(final String operationName, final Object message,
+ final Class<?> expectedResponseClass, final boolean propagateException,
+ final SettableFuture<Void> returnFuture) {
+
+ LOG.debug("Tx {} finish {}", transactionId, operationName);
+
+ Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+
combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
@Override
public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
}
if(exceptionToPropagate != null) {
+ LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
+ operationName, exceptionToPropagate);
+
if(propagateException) {
// We don't log the exception here to avoid redundant logging since we're
// propagating to the caller in MD-SAL core who will log it.
returnFuture.set(null);
}
} else {
+ LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
returnFuture.set(null);
}
}
}, actorContext.getActorSystem().dispatcher());
-
- return returnFuture;
}
- public List<ActorPath> getCohortPaths() {
- return Collections.unmodifiableList(this.cohortPaths);
+ @VisibleForTesting
+ List<Future<ActorPath>> getCohortPathFutures() {
+ return Collections.unmodifiableList(cohortPathFutures);
}
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorPath;
-import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import akka.actor.Props;
import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Function1;
import scala.concurrent.Future;
+import scala.runtime.AbstractFunction1;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
READ_WRITE
}
+ static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
+ Throwable, Throwable>() {
+ @Override
+ public Throwable apply(Throwable failure) {
+ return failure;
+ }
+ };
+
private static final AtomicLong counter = new AtomicLong();
private static final Logger
}
+ @VisibleForTesting
+ List<Future<Object>> getRecordedOperationFutures() {
+ List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
+ for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+ recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+ }
+
+ return recordedOperationFutures;
+ }
+
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
final YangInstanceIdentifier path) {
Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
"Read operation on write-only transaction is not allowed");
- LOG.debug("txn {} read {}", identifier, path);
+ LOG.debug("Tx {} read {}", identifier, path);
createTransactionIfMissing(actorContext, path);
Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
"Exists operation on write-only transaction is not allowed");
- LOG.debug("txn {} exists {}", identifier, path);
+ LOG.debug("Tx {} exists {}", identifier, path);
createTransactionIfMissing(actorContext, path);
checkModificationState();
- LOG.debug("txn {} write {}", identifier, path);
+ LOG.debug("Tx {} write {}", identifier, path);
createTransactionIfMissing(actorContext, path);
checkModificationState();
- LOG.debug("txn {} merge {}", identifier, path);
+ LOG.debug("Tx {} merge {}", identifier, path);
createTransactionIfMissing(actorContext, path);
checkModificationState();
- LOG.debug("txn {} delete {}", identifier, path);
+ LOG.debug("Tx {} delete {}", identifier, path);
createTransactionIfMissing(actorContext, path);
inReadyState = true;
- List<ActorPath> cohortPaths = new ArrayList<>();
-
- LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier,
+ LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
remoteTransactionPaths.size());
+ List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
- LOG.debug("txn {} Readying transaction for shard {}", identifier,
+ LOG.debug("Tx {} Readying transaction for shard {}", identifier,
transactionContext.getShardName());
- Object result = transactionContext.readyTransaction();
-
- if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
- ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
- actorContext.getActorSystem(),result);
- String resolvedCohortPath = transactionContext.getResolvedCohortPath(
- reply.getCohortPath().toString());
- cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
- } else {
- LOG.error("Was expecting {} but got {}", ReadyTransactionReply.SERIALIZABLE_CLASS,
- result.getClass());
- }
+ cohortPathFutures.add(transactionContext.readyTransaction());
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString());
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
+ identifier.toString());
}
@Override
String transactionPath = reply.getTransactionPath();
- LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
+ LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
ActorSelection transactionActor =
actorContext.actorSelection(transactionPath);
remoteTransactionPaths.put(shardName, transactionContext);
} else {
- LOG.error("Was expecting {} but got {}", CreateTransactionReply.SERIALIZABLE_CLASS,
- response.getClass());
+ throw new IllegalArgumentException(String.format(
+ "Invalid reply type {} for CreateTransaction", response.getClass()));
}
} catch(Exception e){
- LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
+ LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e));
}
}
private interface TransactionContext {
String getShardName();
- String getResolvedCohortPath(String cohortPath);
+ void closeTransaction();
- public void closeTransaction();
+ Future<ActorPath> readyTransaction();
- public Object readyTransaction();
+ void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
void deleteData(YangInstanceIdentifier path);
CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
final YangInstanceIdentifier path);
- void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
-
CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
- }
+ List<Future<Object>> getRecordedOperationFutures();
+ }
- private class TransactionContextImpl implements TransactionContext {
- private final String shardName;
- private final String actorPath;
- private final ActorSelection actor;
+ private abstract class AbstractTransactionContext implements TransactionContext {
+ protected final String shardName;
+ protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
- private TransactionContextImpl(String shardName, String actorPath,
- ActorSelection actor) {
+ AbstractTransactionContext(String shardName) {
this.shardName = shardName;
- this.actorPath = actorPath;
- this.actor = actor;
}
@Override
return shardName;
}
+ @Override
+ public List<Future<Object>> getRecordedOperationFutures() {
+ return recordedOperationFutures;
+ }
+ }
+
+ private class TransactionContextImpl extends AbstractTransactionContext {
+ private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
+
+ private final String actorPath;
+ private final ActorSelection actor;
+
+ private TransactionContextImpl(String shardName, String actorPath,
+ ActorSelection actor) {
+ super(shardName);
+ this.actorPath = actorPath;
+ this.actor = actor;
+ }
+
private ActorSelection getActor() {
return actor;
}
- @Override
- public String getResolvedCohortPath(String cohortPath) {
+ private String getResolvedCohortPath(String cohortPath) {
return actorContext.resolvePath(actorPath, cohortPath);
}
@Override
public void closeTransaction() {
+ LOG.debug("Tx {} closeTransaction called", identifier);
actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
}
@Override
- public Object readyTransaction() {
- return actorContext.executeRemoteOperation(getActor(),
+ public Future<ActorPath> readyTransaction() {
+ LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+ identifier, recordedOperationFutures.size());
+
+ // Send the ReadyTransaction message to the Tx actor.
+
+ final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
+
+ // Combine all the previously recorded put/merge/delete operation reply Futures and the
+ // ReadyTransactionReply Future into one Future. If any one fails then the combined
+ // Future will fail. We need all prior operations and the ready operation to succeed
+ // in order to attempt commit.
+
+ List<Future<Object>> futureList =
+ Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
+ futureList.addAll(recordedOperationFutures);
+ futureList.add(replyFuture);
+
+ Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
+ actorContext.getActorSystem().dispatcher());
+
+ // Transform the combined Future into a Future that returns the cohort actor path from
+ // the ReadyTransactionReply. That's the end result of the ready operation.
+
+ return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
+ @Override
+ public ActorPath apply(Iterable<Object> notUsed) {
+
+ LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
+ identifier);
+
+ // At this point all the Futures succeeded and we need to extract the cohort
+ // actor path from the ReadyTransactionReply. For the recorded operations, they
+ // don't return any data so we're only interested that they completed
+ // successfully. We could be paranoid and verify the correct reply types but
+ // that really should never happen so it's not worth the overhead of
+ // de-serializing each reply.
+
+ // Note the Future get call here won't block as it's complete.
+ Object serializedReadyReply = replyFuture.value().get().get();
+ if(serializedReadyReply.getClass().equals(
+ ReadyTransactionReply.SERIALIZABLE_CLASS)) {
+ ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
+ actorContext.getActorSystem(), serializedReadyReply);
+
+ String resolvedCohortPath = getResolvedCohortPath(
+ reply.getCohortPath().toString());
+
+ LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
+ identifier, resolvedCohortPath);
+
+ return actorContext.actorFor(resolvedCohortPath);
+ } else {
+ // Throwing an exception here will fail the Future.
+
+ throw new IllegalArgumentException(String.format("Invalid reply type {}",
+ serializedReadyReply.getClass()));
+ }
+ }
+ }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
}
@Override
public void deleteData(YangInstanceIdentifier path) {
- actorContext.sendRemoteOperationAsync(getActor(), new DeleteData(path).toSerializable() );
+ LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+ recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+ new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- actorContext.sendRemoteOperationAsync(getActor(),
- new MergeData(path, data, schemaContext).toSerializable());
+ LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+ recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+ new MergeData(path, data, schemaContext).toSerializable(),
+ ActorContext.ASK_DURATION));
+ }
+
+ @Override
+ public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("Tx {} writeData called path = {}", identifier, path);
+ recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+ new WriteData(path, data, schemaContext).toSerializable(),
+ ActorContext.ASK_DURATION));
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
- final YangInstanceIdentifier path) {
+ final YangInstanceIdentifier path) {
+
+ LOG.debug("Tx {} readData called path = {}", identifier, path);
final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
+ // If there were any previous recorded put/merge/delete operation reply Futures then we
+ // must wait for them to successfully complete. This is necessary to honor the read
+ // uncommitted semantics of the public API contract. If any one fails then fail the read.
+
+ if(recordedOperationFutures.isEmpty()) {
+ finishReadData(path, returnFuture);
+ } else {
+ LOG.debug("Tx {} readData: verifying {} previous recorded operations",
+ identifier, recordedOperationFutures.size());
+
+ // Note: we make a copy of recordedOperationFutures to be on the safe side in case
+ // Futures#sequence accesses the passed List on a different thread, as
+ // recordedOperationFutures is not synchronized.
+
+ Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
+ Lists.newArrayList(recordedOperationFutures),
+ actorContext.getActorSystem().dispatcher());
+ OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<Object> notUsed)
+ throws Throwable {
+ if(failure != null) {
+ LOG.debug("Tx {} readData: a recorded operation failed: {}",
+ identifier, failure);
+
+ returnFuture.setException(new ReadFailedException(
+ "The read could not be performed because a previous put, merge,"
+ + "or delete operation failed", failure));
+ } else {
+ finishReadData(path, returnFuture);
+ }
+ }
+ };
+
+ combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ }
+
+ return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
+ }
+
+ private void finishReadData(final YangInstanceIdentifier path,
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
+
+ LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
+
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object response) throws Throwable {
+ public void onComplete(Throwable failure, Object readResponse) throws Throwable {
if(failure != null) {
+ LOG.debug("Tx {} read operation failed: {}", identifier, failure);
+
returnFuture.setException(new ReadFailedException(
"Error reading data for path " + path, failure));
} else {
- if (response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+ LOG.debug("Tx {} read operation succeeded", identifier, failure);
+
+ if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
- path, response);
+ path, readResponse);
if (reply.getNormalizedNode() == null) {
returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
} else {
}
};
- Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
+ Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
- future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
-
- return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
- }
-
- @Override
- public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- actorContext.sendRemoteOperationAsync(getActor(),
- new WriteData(path, data, schemaContext).toSerializable());
+ readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
}
@Override
public CheckedFuture<Boolean, ReadFailedException> dataExists(
final YangInstanceIdentifier path) {
+ LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+
final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+ // If there were any previous recorded put/merge/delete operation reply Futures then we
+ // must wait for them to successfully complete. This is necessary to honor the read
+ // uncommitted semantics of the public API contract. If any one fails then fail this
+ // request.
+
+ if(recordedOperationFutures.isEmpty()) {
+ finishDataExists(path, returnFuture);
+ } else {
+ LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
+ identifier, recordedOperationFutures.size());
+
+ // Note: we make a copy of recordedOperationFutures to be on the safe side in case
+ // Futures#sequence accesses the passed List on a different thread, as
+ // recordedOperationFutures is not synchronized.
+
+ Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
+ Lists.newArrayList(recordedOperationFutures),
+ actorContext.getActorSystem().dispatcher());
+ OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<Object> notUsed)
+ throws Throwable {
+ if(failure != null) {
+ LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
+ identifier, failure);
+
+ returnFuture.setException(new ReadFailedException(
+ "The data exists could not be performed because a previous "
+ + "put, merge, or delete operation failed", failure));
+ } else {
+ finishDataExists(path, returnFuture);
+ }
+ }
+ };
+
+ combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ }
+
+ return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
+ }
+
+ private void finishDataExists(final YangInstanceIdentifier path,
+ final SettableFuture<Boolean> returnFuture) {
+
+ LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
+
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object response) throws Throwable {
if(failure != null) {
+ LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
+
returnFuture.setException(new ReadFailedException(
- "Error checking exists for path " + path, failure));
+ "Error checking data exists for path " + path, failure));
} else {
+ LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
+
if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
returnFuture.set(Boolean.valueOf(DataExistsReply.
fromSerializable(response).exists()));
Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
-
- return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
}
}
- private class NoOpTransactionContext implements TransactionContext {
+ private class NoOpTransactionContext extends AbstractTransactionContext {
- private final Logger
- LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
+ private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
- private final String shardName;
private final Exception failure;
- private ActorRef cohort;
-
public NoOpTransactionContext(String shardName, Exception failure){
- this.shardName = shardName;
+ super(shardName);
this.failure = failure;
}
@Override
- public String getShardName() {
- return shardName;
-
+ public void closeTransaction() {
+ LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
}
@Override
- public String getResolvedCohortPath(String cohortPath) {
- return cohort.path().toString();
+ public Future<ActorPath> readyTransaction() {
+ LOG.debug("Tx {} readyTransaction called", identifier);
+ return akka.dispatch.Futures.failed(failure);
}
@Override
- public void closeTransaction() {
- LOG.warn("txn {} closeTransaction called", identifier);
- }
-
- @Override public Object readyTransaction() {
- LOG.warn("txn {} readyTransaction called", identifier);
- cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
- return new ReadyTransactionReply(cohort.path()).toSerializable();
+ public void deleteData(YangInstanceIdentifier path) {
+ LOG.debug("Tx {} deleteData called path = {}", identifier, path);
}
@Override
- public void deleteData(YangInstanceIdentifier path) {
- LOG.warn("txt {} deleteData called path = {}", identifier, path);
+ public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("Tx {} mergeData called path = {}", identifier, path);
}
@Override
- public void mergeData(YangInstanceIdentifier path,
- NormalizedNode<?, ?> data) {
- LOG.warn("txn {} mergeData called path = {}", identifier, path);
+ public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("Tx {} writeData called path = {}", identifier, path);
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
YangInstanceIdentifier path) {
- LOG.warn("txn {} readData called path = {}", identifier, path);
+ LOG.debug("Tx {} readData called path = {}", identifier, path);
return Futures.immediateFailedCheckedFuture(new ReadFailedException(
"Error reading data for path " + path, failure));
}
- @Override public void writeData(YangInstanceIdentifier path,
- NormalizedNode<?, ?> data) {
- LOG.warn("txn {} writeData called path = {}", identifier, path);
- }
-
- @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> dataExists(
YangInstanceIdentifier path) {
- LOG.warn("txn {} dataExists called path = {}", identifier, path);
+ LOG.debug("Tx {} dataExists called path = {}", identifier, path);
return Futures.immediateFailedCheckedFuture(new ReadFailedException(
"Error checking exists for path " + path, failure));
}
}
-
-
-
}
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
+
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.isA;
import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+
+import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
+ @SuppressWarnings("serial")
+ static class TestException extends RuntimeException {
+ }
+
@Mock
private ActorContext actorContext;
doReturn(getSystem()).when(actorContext).getActorSystem();
}
- private ThreePhaseCommitCohortProxy setupProxy(int nCohorts) {
- List<ActorPath> cohorts = Lists.newArrayList();
+ private Future<ActorPath> newCohortPath() {
+ ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
+ doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
+ return Futures.successful(path);
+ }
+
+ private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
+ List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
for(int i = 1; i <= nCohorts; i++) {
- ActorPath path = getSystem().actorOf(Props.create(MessageCollectorActor.class)).path();
- cohorts.add(path);
- doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
+ cohortPathFutures.add(newCohortPath());
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
+ }
+
+ private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
+ throws Exception {
+ List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+ cohortPathFutures.add(newCohortPath());
+ cohortPathFutures.add(Futures.<ActorPath>failed(new TestException()));
+
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
}
private void setupMockActorContext(Class<?> requestType, Object... responses) {
any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
}
+ private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
+
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Expected ExecutionException");
+ } catch(ExecutionException e) {
+ throw e.getCause();
+ }
+ }
+
@Test
public void testCanCommitWithOneCohort() throws Exception {
ListenableFuture<Boolean> future = proxy.canCommit();
- assertEquals("canCommit", true, future.get());
+ assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
new CanCommitTransactionReply(false));
future = proxy.canCommit();
- assertEquals("canCommit", false, future.get());
+ assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
}
ListenableFuture<Boolean> future = proxy.canCommit();
- assertEquals("canCommit", true, future.get());
+ assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
}
ListenableFuture<Boolean> future = proxy.canCommit();
- assertEquals("canCommit", false, future.get());
+ assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
}
- @Test(expected = ExecutionException.class)
- public void testCanCommitWithExceptionFailure() throws Exception {
+ @Test(expected = TestException.class)
+ public void testCanCommitWithExceptionFailure() throws Throwable {
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
- setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
- proxy.canCommit().get();
+ propagateExecutionExceptionCause(proxy.canCommit());
}
@Test(expected = ExecutionException.class)
setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
new PreCommitTransactionReply());
- proxy.canCommit().get();
+ proxy.canCommit().get(5, TimeUnit.SECONDS);
+ }
+
+ @Test(expected = TestException.class)
+ public void testCanCommitWithFailedCohortPath() throws Throwable {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+ try {
+ propagateExecutionExceptionCause(proxy.canCommit());
+ } finally {
+ verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
+ }
}
@Test
setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
new PreCommitTransactionReply());
- proxy.preCommit().get();
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS);
}
setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
new PreCommitTransactionReply(), new RuntimeException("mock"));
- proxy.preCommit().get();
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
}
@Test
setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
- proxy.abort().get();
+ proxy.abort().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
}
setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
// The exception should not get propagated.
- proxy.abort().get();
+ proxy.abort().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
}
+ @Test
+ public void testAbortWithFailedCohortPath() throws Throwable {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+ // The exception should not get propagated.
+ proxy.abort().get(5, TimeUnit.SECONDS);
+
+ verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
+ }
+
@Test
public void testCommit() throws Exception {
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
new CommitTransactionReply());
- proxy.commit().get();
+ proxy.commit().get(5, TimeUnit.SECONDS);
verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
}
- @Test(expected = ExecutionException.class)
- public void testCommitWithFailure() throws Exception {
+ @Test(expected = TestException.class)
+ public void testCommitWithFailure() throws Throwable {
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
- new RuntimeException("mock"));
+ new TestException());
- proxy.commit().get();
+ propagateExecutionExceptionCause(proxy.commit());
}
@Test(expected = ExecutionException.class)
- public void teseCommitWithInvalidResponseType() throws Exception {
+ public void testCommitWithInvalidResponseType() throws Exception {
ThreePhaseCommitCohortProxy proxy = setupProxy(1);
setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
- proxy.commit().get();
+ proxy.commit().get(5, TimeUnit.SECONDS);
+ }
+
+ @Test(expected = TestException.class)
+ public void testCommitWithFailedCohortPath() throws Throwable {
+
+ ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+ try {
+ propagateExecutionExceptionCause(proxy.commit());
+ } finally {
+ verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
+ }
}
@Test
- public void testGetCohortPaths() {
+ public void testAllThreePhasesSuccessful() throws Exception {
ThreePhaseCommitCohortProxy proxy = setupProxy(2);
- List<ActorPath> paths = proxy.getCohortPaths();
- assertNotNull("getCohortPaths returned null", paths);
- assertEquals("getCohortPaths size", 2, paths.size());
+ setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+ new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+
+ setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
+ new PreCommitTransactionReply(), new PreCommitTransactionReply());
+
+ setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
+ new CommitTransactionReply(), new CommitTransactionReply());
+
+ proxy.canCommit().get(5, TimeUnit.SECONDS);
+ proxy.preCommit().get(5, TimeUnit.SECONDS);
+ proxy.commit().get(5, TimeUnit.SECONDS);
+
+ verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(2, PreCommitTransaction.SERIALIZABLE_CLASS);
+ verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
}
}
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
+
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.times;
@SuppressWarnings("resource")
public class TransactionProxyTest extends AbstractActorTest {
}
static interface Invoker {
- void invoke(TransactionProxy proxy) throws Exception;
+ CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
}
private final Configuration configuration = new MockConfiguration();
schemaContext = TestModel.createTestContext();
doReturn(getSystem()).when(mockActorContext).getActorSystem();
+ doReturn(memberName).when(mockActorContext).getCurrentMemberName();
ShardStrategyFactory.setConfiguration(configuration);
}
ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
@Override
public boolean matches(Object argument) {
- DataExists obj = DataExists.fromSerializable(argument);
- return obj.getPath().equals(TestModel.TEST_PATH);
+ return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+ DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
}
};
ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
@Override
public boolean matches(Object argument) {
- ReadData obj = ReadData.fromSerializable(argument);
- return obj.getPath().equals(TestModel.TEST_PATH);
+ return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+ ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
}
};
ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
@Override
public boolean matches(Object argument) {
+ if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+ return false;
+ }
+
WriteData obj = WriteData.fromSerializable(argument, schemaContext);
return obj.getPath().equals(TestModel.TEST_PATH) &&
obj.getData().equals(nodeToWrite);
ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
@Override
public boolean matches(Object argument) {
+ if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+ return false;
+ }
+
MergeData obj = MergeData.fromSerializable(argument, schemaContext);
return obj.getPath().equals(TestModel.TEST_PATH) &&
obj.getData().equals(nodeToWrite);
ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
@Override
public boolean matches(Object argument) {
- DeleteData obj = DeleteData.fromSerializable(argument);
- return obj.getPath().equals(TestModel.TEST_PATH);
+ return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+ DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
}
};
return argThat(matcher);
}
- private Object readyTxReply(ActorPath path) {
- return new ReadyTransactionReply(path).toSerializable();
+ private Future<Object> readyTxReply(ActorPath path) {
+ return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
}
private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
- return Futures.successful(new ReadDataReply(schemaContext, data)
- .toSerializable());
+ return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
}
private Future<Object> dataExistsReply(boolean exists) {
return Futures.successful(new DataExistsReply(exists).toSerializable());
}
+ private Future<Object> writeDataReply() {
+ return Futures.successful(new WriteDataReply().toSerializable());
+ }
+
+ private Future<Object> mergeDataReply() {
+ return Futures.successful(new MergeDataReply().toSerializable());
+ }
+
+ private Future<Object> deleteDataReply() {
+ return Futures.successful(new DeleteDataReply().toSerializable());
+ }
+
private ActorSelection actorSelection(ActorRef actorRef) {
return getSystem().actorSelection(actorRef.path());
}
ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
doReturn(getSystem().actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
- doReturn(memberName).when(mockActorContext).getCurrentMemberName();
doReturn(createTransactionReply(actorRef)).when(mockActorContext).
executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
eqCreateTransaction(memberName, type), anyDuration());
return actorRef;
}
+ private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
+ throws Throwable {
+
+ try {
+ future.checkedGet(5, TimeUnit.SECONDS);
+ fail("Expected ReadFailedException");
+ } catch(ReadFailedException e) {
+ throw e.getCause();
+ }
+ }
+
@Test
public void testRead() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
}
@Test(expected = ReadFailedException.class)
- public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception {
+ public void testReadWithInvalidReplyMessageType() throws Exception {
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
setupActorContextWithInitialCreateTransaction(READ_ONLY);
- doThrow(new TestException()).when(mockActorContext).
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY, schemaContext);
- try {
- transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
- fail("Expected ReadFailedException");
- } catch(ReadFailedException e) {
- // Expected - throw cause - expects TestException.
- throw e.getCause();
- }
+ propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
}
private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY, schemaContext);
- try {
- invoker.invoke(transactionProxy);
- fail("Expected ReadFailedException");
- } catch(ReadFailedException e) {
- // Expected - throw cause - expects TestException.
- throw e.getCause();
- }
+ propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
}
private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
@Override
- public void invoke(TransactionProxy proxy) throws Exception {
- proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
+ return proxy.read(TestModel.TEST_PATH);
}
});
}
testReadWithExceptionOnInitialCreateTransaction(new TestException());
}
+ @Test(expected = TestException.class)
+ public void testReadWithPriorRecordingOperationFailure() throws Throwable {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
+ anyDuration());
+
+ doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ try {
+ propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
+ } finally {
+ verify(mockActorContext, times(0)).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ }
+ }
+
+ @Test
+ public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+ NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration());
+
+ doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH, expectedNode);
+
+ Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
+ TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+
+ assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+
+ assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testReadPreConditionCheck() {
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+ }
+
@Test
public void testExists() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
@Override
- public void invoke(TransactionProxy proxy) throws Exception {
- proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+ public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
+ return proxy.exists(TestModel.TEST_PATH);
}
});
}
@Test(expected = ReadFailedException.class)
- public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception {
+ public void testExistsWithInvalidReplyMessageType() throws Exception {
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
setupActorContextWithInitialCreateTransaction(READ_ONLY);
- doThrow(new TestException()).when(mockActorContext).
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY, schemaContext);
+ propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
+ }
+
+ @Test(expected = TestException.class)
+ public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
+ anyDuration());
+
+ doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
try {
- transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
- fail("Expected ReadFailedException");
- } catch(ReadFailedException e) {
- // Expected - throw cause - expects TestException.
- throw e.getCause();
+ propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
+ } finally {
+ verify(mockActorContext, times(0)).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
}
}
@Test
- public void testWrite() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+ public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
+
+ assertEquals("Exists response", true, exists);
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testxistsPreConditionCheck() {
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY, schemaContext);
+ transactionProxy.exists(TestModel.TEST_PATH);
+ }
+
+ private void verifyRecordingOperationFutures(List<Future<Object>> futures,
+ Class<?>... expResultTypes) throws Exception {
+ assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
+
+ int i = 0;
+ for( Future<Object> future: futures) {
+ assertNotNull("Recording operation Future is null", future);
+
+ Class<?> expResultType = expResultTypes[i++];
+ if(Throwable.class.isAssignableFrom(expResultType)) {
+ try {
+ Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+ fail("Expected exception from recording operation Future");
+ } catch(Exception e) {
+ // Expected
+ }
+ } else {
+ assertEquals("Recording operation Future result type", expResultType,
+ Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
+ }
+ }
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- verify(mockActorContext).sendRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+ verify(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ WriteDataReply.SERIALIZABLE_CLASS);
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testWritePreConditionCheck() {
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_ONLY, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testWriteAfterReadyPreConditionCheck() {
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ transactionProxy.ready();
+
+ transactionProxy.write(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
}
@Test
public void testMerge() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY, schemaContext);
- NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
- verify(mockActorContext).sendRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
+ verify(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ MergeDataReply.SERIALIZABLE_CLASS);
}
@Test
public void testDelete() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+ doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY, schemaContext);
transactionProxy.delete(TestModel.TEST_PATH);
- verify(mockActorContext).sendRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDeleteData());
+ verify(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ DeleteDataReply.SERIALIZABLE_CLASS);
+ }
+
+ private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
+ Object... expReplies) throws Exception {
+ assertEquals("getReadyOperationFutures size", expReplies.length,
+ proxy.getCohortPathFutures().size());
+
+ int i = 0;
+ for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
+ assertNotNull("Ready operation Future is null", future);
+
+ Object expReply = expReplies[i++];
+ if(expReply instanceof ActorPath) {
+ ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+ assertEquals("Cohort actor path", expReply, actual);
+ } else {
+ // Expecting exception.
+ try {
+ Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+ fail("Expected exception from ready operation Future");
+ } catch(Exception e) {
+ // Expected
+ }
+ }
+ }
}
@SuppressWarnings("unchecked")
public void testReady() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
eq(actorSelection(actorRef)), eqReadData(), anyDuration());
- doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation(
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
transactionProxy.read(TestModel.TEST_PATH);
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ WriteDataReply.SERIALIZABLE_CLASS);
+
+ verifyCohortPathFutures(proxy, actorRef.path());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadyWithRecordingOperationFailure() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite),
+ anyDuration());
+
+ doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
+
+ verifyCohortPathFutures(proxy, TestException.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadyWithReplyFailure() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ MergeDataReply.SERIALIZABLE_CLASS);
+
+ verifyCohortPathFutures(proxy, TestException.class);
+ }
+
+ @Test
+ public void testReadyWithInitialCreateTransactionFailure() throws Exception {
+
+ doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
+ anyString(), any(), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadyWithInvalidReplyMessageType() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+ doReturn(Futures.successful(new Object())).when(mockActorContext).
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY, schemaContext);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths());
+ verifyCohortPathFutures(proxy, IllegalArgumentException.class);
}
@Test
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
</dependency>
+
+ <!-- dependencies to use AbstractDataBrokerTest -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-broker-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-binding-broker-impl</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <artifactId>junit</artifactId>
+ <groupId>junit</groupId>
+ <scope>test</scope>
+ </dependency>
+ <!-- used to mock up classes -->
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
--- /dev/null
+/*
+* Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+*
+* This program and the accompanying materials are made available under the
+* terms of the Eclipse Public License v1.0 which accompanies this distribution,
+* and is available at http://www.eclipse.org/legal/epl-v10.html
+*/
+package org.opendaylight.controller.sample.toaster.provider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.Future;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.DisplayString;
+import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.MakeToastInput;
+import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.MakeToastInputBuilder;
+import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.Toaster;
+import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.WheatBread;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.base.Optional;
+
+public class OpenDaylightToasterTest extends AbstractDataBrokerTest{
+
+ private static InstanceIdentifier<Toaster> TOASTER_IID =
+ InstanceIdentifier.builder( Toaster.class ).build();
+ OpendaylightToaster toaster;
+
+ @Override
+ protected void setupWithDataBroker(DataBroker dataBroker) {
+ toaster = new OpendaylightToaster();
+ toaster.setDataProvider( dataBroker );
+
+ /**
+ * Doesn't look like we have support for the NotificationProviderService yet, so mock it
+ * for now.
+ */
+ NotificationProviderService mockNotification = mock( NotificationProviderService.class );
+ toaster.setNotificationProvider( mockNotification );
+ }
+
+ @Test
+ public void testToasterInitOnStartUp() throws Exception {
+ DataBroker broker = getDataBroker();
+
+ ReadOnlyTransaction rTx = broker.newReadOnlyTransaction();
+ Optional<Toaster> optional = rTx.read( LogicalDatastoreType.OPERATIONAL, TOASTER_IID ).get();
+ assertNotNull( optional );
+ assertTrue( "Operational toaster not present", optional.isPresent() );
+
+ Toaster toaster = optional.get();
+
+ assertEquals( Toaster.ToasterStatus.Up, toaster.getToasterStatus() );
+ assertEquals( new DisplayString("Opendaylight"),
+ toaster.getToasterManufacturer() );
+ assertEquals( new DisplayString("Model 1 - Binding Aware"),
+ toaster.getToasterModelNumber() );
+
+ Optional<Toaster> configToaster =
+ rTx.read( LogicalDatastoreType.CONFIGURATION, TOASTER_IID ).get();
+ assertFalse( "Didn't expect config data for toaster.",
+ configToaster.isPresent() );
+ }
+
+ @Test
+ @Ignore //ignored because it is not an e test right now. Illustrative purposes only.
+ public void testSomething() throws Exception{
+ MakeToastInput toastInput = new MakeToastInputBuilder()
+ .setToasterDoneness( 1L )
+ .setToasterToastType( WheatBread.class )
+ .build();
+
+ //NOTE: In a real test we would want to override the Thread.sleep() to prevent our junit test
+ //for sleeping for a second...
+ Future<RpcResult<Void>> makeToast = toaster.makeToast( toastInput );
+
+ RpcResult<Void> rpcResult = makeToast.get();
+
+ assertNotNull( rpcResult );
+ assertTrue( rpcResult.isSuccessful() );
+ //etc
+ }
+
+}
* of some module. Contains default value extracted from yang file.
*/
public class AttributeConfigElement {
- private final Object dafaultValue;
+ private final Object defaultValue;
private final Object value;
private Optional<?> resolvedValue;
private Object resolvedDefaultValue;
private String jmxName;
- public AttributeConfigElement(Object dafaultValue, Object value) {
- this.dafaultValue = dafaultValue;
+ public AttributeConfigElement(Object defaultValue, Object value) {
+ this.defaultValue = defaultValue;
this.value = value;
}
public void resolveValue(AttributeResolvingStrategy<?, ? extends OpenType<?>> attributeResolvingStrategy,
String attrName) throws NetconfDocumentedException {
resolvedValue = attributeResolvingStrategy.parseAttribute(attrName, value);
- Optional<?> resolvedDefault = attributeResolvingStrategy.parseAttribute(attrName, dafaultValue);
+ Optional<?> resolvedDefault = attributeResolvingStrategy.parseAttribute(attrName, defaultValue);
resolvedDefaultValue = resolvedDefault.isPresent() ? resolvedDefault.get() : null;
}
return value;
}
+ public Object getDefaultValue() {
+ return defaultValue;
+ }
+
public Optional<?> getResolvedValue() {
return resolvedValue;
}
@Override
public String toString() {
- return "AttributeConfigElement [dafaultValue=" + dafaultValue + ", value=" + value + "]";
+ return "AttributeConfigElement [defaultValue=" + defaultValue + ", value=" + value + "]";
}
}
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
List<XmlElement> recognisedChildren = Lists.newArrayList();
for (Entry<String, AttributeReadingStrategy> innerAttrEntry : innerStrategies.entrySet()) {
- List<XmlElement> childItem = null;
- childItem = complexElement.getChildElementsWithSameNamespace(innerAttrEntry.getKey());
+ List<XmlElement> childItem = complexElement.getChildElementsWithSameNamespace(
+ innerAttrEntry.getKey());
recognisedChildren.addAll(childItem);
AttributeConfigElement resolvedInner = innerAttrEntry.getValue().readElement(childItem);
- innerMap.put(innerAttrEntry.getKey(), resolvedInner.getValue());
+ Object value = resolvedInner.getValue();
+ if(value == null) {
+ value = resolvedInner.getDefaultValue();
+ }
+
+ innerMap.put(innerAttrEntry.getKey(), value);
}
complexElement.checkUnrecognisedElements(recognisedChildren);
@Override
protected AttributeReadingStrategy caseTOAttribute(CompositeType openType) {
- Preconditions.checkState(getLastAttribute() instanceof TOAttribute);
- Map<String, AttributeIfc> inner = ((TOAttribute)getLastAttribute()).getYangPropertiesToTypesMap();
+ AttributeIfc lastAttribute = getLastAttribute();
+ Preconditions.checkState(lastAttribute instanceof TOAttribute);
+ Map<String, AttributeIfc> inner = ((TOAttribute)lastAttribute).getYangPropertiesToTypesMap();
Map<String, AttributeReadingStrategy> innerStrategies = Maps.newHashMap();
innerStrategies.put(innerAttrEntry.getKey(), innerStrat);
}
- return new CompositeAttributeReadingStrategy(getLastAttribute().getNullableDefault(), innerStrategies);
+ return new CompositeAttributeReadingStrategy(lastAttribute.getNullableDefault(), innerStrategies);
}
@Override
protected AttributeReadingStrategy caseListAttribute(ArrayType<?> openType) {
- Preconditions.checkState(getLastAttribute() instanceof ListAttribute);
- AttributeReadingStrategy innerStrategy = prepareReadingStrategy(key, ((ListAttribute) getLastAttribute()).getInnerAttribute());
- return new ArrayAttributeReadingStrategy(getLastAttribute().getNullableDefault(), innerStrategy);
+ AttributeIfc lastAttribute = getLastAttribute();
+ Preconditions.checkState(lastAttribute instanceof ListAttribute);
+ AttributeReadingStrategy innerStrategy = prepareReadingStrategy(key, ((ListAttribute) lastAttribute).getInnerAttribute());
+ return new ArrayAttributeReadingStrategy(lastAttribute.getNullableDefault(), innerStrategy);
}
@Override
protected AttributeReadingStrategy caseListDependeciesAttribute(ArrayType<?> openType) {
- Preconditions.checkState(getLastAttribute() instanceof ListDependenciesAttribute);
+ AttributeIfc lastAttribute = getLastAttribute();
+ Preconditions.checkState(lastAttribute instanceof ListDependenciesAttribute);
AttributeReadingStrategy innerStrategy = caseDependencyAttribute(SimpleType.OBJECTNAME);
- return new ArrayAttributeReadingStrategy(getLastAttribute().getNullableDefault(), innerStrategy);
+ return new ArrayAttributeReadingStrategy(lastAttribute.getNullableDefault(), innerStrategy);
}
}
package org.opendaylight.controller.netconf.it;
import static java.util.Arrays.asList;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import io.netty.channel.ChannelFuture;
+import com.google.common.collect.Lists;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.junit.After;
import org.junit.Before;
super.initConfigTransactionManagerImpl(new HardcodedModuleFactoriesResolver(mockedContext, getModuleFactories().toArray(
new ModuleFactory[0])));
- NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
+ final NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
factoriesListener.onAddNetconfOperationServiceFactory(new NetconfOperationServiceFactoryImpl(getYangStore()));
commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
final NetconfServerDispatcher dispatchS = createDispatcher(factoriesListener);
- ChannelFuture s = dispatchS.createLocalServer(NetconfConfigUtil.getNetconfLocalAddress());
- s.await();
- EventLoopGroup bossGroup = new NioEventLoopGroup();
+ dispatchS.createLocalServer(NetconfConfigUtil.getNetconfLocalAddress()).await();
+ final EventLoopGroup bossGroup = new NioEventLoopGroup();
sshServer = NetconfSSHServer.start(tlsAddress.getPort(), NetconfConfigUtil.getNetconfLocalAddress(), getAuthProvider(), bossGroup);
}
- private NetconfServerDispatcher createDispatcher(NetconfOperationServiceFactoryListenerImpl factoriesListener) {
+ private NetconfServerDispatcher createDispatcher(final NetconfOperationServiceFactoryListenerImpl factoriesListener) {
return super.createDispatcher(factoriesListener, NetconfITTest.getNetconfMonitoringListenerService(), commitNot);
}
@After
public void tearDown() throws Exception {
- sshServer.stop();
+ sshServer.close();
commitNot.close();
+ sshServer.join();
}
private HardcodedYangStoreService getYangStore() throws YangStoreException, IOException {
@Test
public void testSecure() throws Exception {
- NetconfClientDispatcher dispatch = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer());
+ final NetconfClientDispatcher dispatch = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer());
try (TestingNetconfClient netconfClient = new TestingNetconfClient("testing-ssh-client", dispatch, getClientConfiguration())) {
NetconfMessage response = netconfClient.sendMessage(getConfig);
Assert.assertFalse("Unexpected error message " + XmlUtil.toString(response.getDocument()),
NetconfMessageUtil.isErrorMessage(response));
- NetconfMessage gs = new NetconfMessage(XmlUtil.readXmlToDocument("<rpc message-id=\"2\"\n" +
+ final NetconfMessage gs = new NetconfMessage(XmlUtil.readXmlToDocument("<rpc message-id=\"2\"\n" +
" xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
" <get-schema xmlns=\"urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring\">\n" +
" <identifier>config</identifier>\n" +
}
}
+ /**
+ * Test all requests are handled properly and no mismatch occurs in listener
+ */
+ @Test(timeout = 3*60*1000)
+ public void testSecureStress() throws Exception {
+ final NetconfClientDispatcher dispatch = new NetconfClientDispatcherImpl(getNettyThreadgroup(), getNettyThreadgroup(), getHashedWheelTimer());
+ try (TestingNetconfClient netconfClient = new TestingNetconfClient("testing-ssh-client", dispatch, getClientConfiguration())) {
+
+ final AtomicInteger responseCounter = new AtomicInteger(0);
+ final List<Future<?>> futures = Lists.newArrayList();
+
+ final int requests = 1000;
+ for (int i = 0; i < requests; i++) {
+ final Future<NetconfMessage> netconfMessageFuture = netconfClient.sendRequest(getConfig);
+ futures.add(netconfMessageFuture);
+ netconfMessageFuture.addListener(new GenericFutureListener<Future<? super NetconfMessage>>() {
+ @Override
+ public void operationComplete(final Future<? super NetconfMessage> future) throws Exception {
+ assertTrue("Request unsuccessful " + future.cause(), future.isSuccess());
+ responseCounter.incrementAndGet();
+ }
+ });
+ }
+
+ for (final Future<?> future : futures) {
+ future.await();
+ }
+
+ // Give future listeners some time to finish counter incrementation
+ Thread.sleep(5000);
+
+ org.junit.Assert.assertEquals(requests, responseCounter.get());
+ }
+ }
+
public NetconfClientConfiguration getClientConfiguration() throws IOException {
final NetconfClientConfigurationBuilder b = NetconfClientConfigurationBuilder.create();
b.withAddress(tlsAddress);
}
public AuthProvider getAuthProvider() throws Exception {
- AuthProvider mock = mock(AuthProviderImpl.class);
+ final AuthProvider mock = mock(AuthProviderImpl.class);
doReturn(true).when(mock).authenticated(anyString(), anyString());
doReturn(PEMGenerator.generate().toCharArray()).when(mock).getPEMAsCharArray();
return mock;
*/
package org.opendaylight.controller.netconf.monitoring;
-import com.google.common.collect.Maps;
-
+import java.util.Collections;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
import org.opendaylight.controller.netconf.api.monitoring.NetconfMonitoringService;
import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.monitoring.xml.model.NetconfState;
import org.opendaylight.controller.netconf.util.mapping.AbstractNetconfOperation;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
-import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import java.util.Map;
-
public class Get extends AbstractNetconfOperation {
private static final Logger logger = LoggerFactory.getLogger(Get.class);
private final NetconfMonitoringService netconfMonitor;
- public Get(NetconfMonitoringService netconfMonitor) {
+ public Get(final NetconfMonitoringService netconfMonitor) {
super(MonitoringConstants.MODULE_NAME);
this.netconfMonitor = netconfMonitor;
}
- private Element getPlaceholder(Document innerResult) throws NetconfDocumentedException {
- try {
- XmlElement rootElement = null;
- rootElement = XmlElement.fromDomElementWithExpected(innerResult.getDocumentElement(),
- XmlNetconfConstants.RPC_REPLY_KEY, XmlNetconfConstants.RFC4741_TARGET_NAMESPACE);
- return rootElement.getOnlyChildElement(XmlNetconfConstants.DATA_KEY).getDomElement();
- } catch (RuntimeException e) {
- throw new IllegalArgumentException(String.format(
- "Input xml in wrong format, Expecting root element %s with child element %s, but was %s",
- XmlNetconfConstants.RPC_REPLY_KEY, XmlNetconfConstants.DATA_KEY,
- XmlUtil.toString(innerResult.getDocumentElement())), e);
- }
+ private Element getPlaceholder(final Document innerResult)
+ throws NetconfDocumentedException {
+ final XmlElement rootElement = XmlElement.fromDomElementWithExpected(
+ innerResult.getDocumentElement(), XmlNetconfConstants.RPC_REPLY_KEY, XmlNetconfConstants.RFC4741_TARGET_NAMESPACE);
+ return rootElement.getOnlyChildElement(XmlNetconfConstants.DATA_KEY).getDomElement();
}
@Override
}
@Override
- public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation)
+ public Document handle(final Document requestMessage, final NetconfOperationChainedExecution subsequentOperation)
throws NetconfDocumentedException {
if (subsequentOperation.isExecutionTermination()){
throw new NetconfDocumentedException(String.format("Subsequent netconf operation expected by %s", this),
}
try {
- Document innerResult = subsequentOperation.execute(requestMessage);
+ final Document innerResult = subsequentOperation.execute(requestMessage);
- NetconfState netconfMonitoring = new NetconfState(netconfMonitor);
+ final NetconfState netconfMonitoring = new NetconfState(netconfMonitor);
Element monitoringXmlElement = new JaxBSerializer().toXml(netconfMonitoring);
monitoringXmlElement = (Element) innerResult.importNode(monitoringXmlElement, true);
- Element monitoringXmlElementPlaceholder = getPlaceholder(innerResult);
+ final Element monitoringXmlElementPlaceholder = getPlaceholder(innerResult);
monitoringXmlElementPlaceholder.appendChild(monitoringXmlElement);
return innerResult;
- } catch (RuntimeException e) {
- String errorMessage = "Get operation for netconf-state subtree failed";
+ } catch (final RuntimeException e) {
+ final String errorMessage = "Get operation for netconf-state subtree failed";
logger.warn(errorMessage, e);
- Map<String, String> info = Maps.newHashMap();
- info.put(NetconfDocumentedException.ErrorSeverity.error.toString(), e.getMessage());
+
throw new NetconfDocumentedException(errorMessage, NetconfDocumentedException.ErrorType.application,
NetconfDocumentedException.ErrorTag.operation_failed,
- NetconfDocumentedException.ErrorSeverity.error, info);
+ NetconfDocumentedException.ErrorSeverity.error,
+ Collections.singletonMap(NetconfDocumentedException.ErrorSeverity.error.toString(), e.getMessage()));
}
}
@Override
- protected Element handle(Document document, XmlElement message, NetconfOperationChainedExecution subsequentOperation)
+ protected Element handle(final Document document, final XmlElement message, final NetconfOperationChainedExecution subsequentOperation)
throws NetconfDocumentedException {
throw new UnsupportedOperationException("Never gets called");
}
public void start(final BundleContext context) {
monitor = new NetconfMonitoringServiceTracker(context);
monitor.open();
-
}
@Override
package org.opendaylight.controller.netconf.monitoring.osgi;
import com.google.common.base.Preconditions;
+import java.util.Hashtable;
import org.opendaylight.controller.netconf.api.monitoring.NetconfMonitoringService;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Dictionary;
-import java.util.Hashtable;
-
public class NetconfMonitoringServiceTracker extends ServiceTracker<NetconfMonitoringService, NetconfMonitoringService> {
private static final Logger logger = LoggerFactory.getLogger(NetconfMonitoringServiceTracker.class);
private ServiceRegistration<NetconfOperationServiceFactory> reg;
- NetconfMonitoringServiceTracker(BundleContext context) {
+ NetconfMonitoringServiceTracker(final BundleContext context) {
super(context, NetconfMonitoringService.class, null);
}
@Override
- public NetconfMonitoringService addingService(ServiceReference<NetconfMonitoringService> reference) {
+ public NetconfMonitoringService addingService(final ServiceReference<NetconfMonitoringService> reference) {
Preconditions.checkState(reg == null, "Monitoring service was already added");
- NetconfMonitoringService netconfMonitoringService = super.addingService(reference);
+ final NetconfMonitoringService netconfMonitoringService = super.addingService(reference);
final NetconfMonitoringOperationService operationService = new NetconfMonitoringOperationService(
netconfMonitoringService);
- NetconfOperationServiceFactory factory = new NetconfMonitoringActivator.NetconfMonitoringOperationServiceFactory(
+ final NetconfOperationServiceFactory factory = new NetconfMonitoringActivator.NetconfMonitoringOperationServiceFactory(
operationService);
- Dictionary<String, ?> props = new Hashtable<>();
- reg = context.registerService(NetconfOperationServiceFactory.class, factory, props);
+ reg = context.registerService(NetconfOperationServiceFactory.class, factory, new Hashtable<String, Object>());
return netconfMonitoringService;
}
@Override
- public void removedService(ServiceReference<NetconfMonitoringService> reference,
- NetconfMonitoringService netconfMonitoringService) {
+ public void removedService(final ServiceReference<NetconfMonitoringService> reference,
+ final NetconfMonitoringService netconfMonitoringService) {
if(reg!=null) {
try {
reg.unregister();
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.warn("Ignoring exception while unregistering {}", reg, e);
}
}
public class JaxBSerializer {
- public Element toXml(NetconfState monitoringModel) {
- DOMResult res = null;
+ public Element toXml(final NetconfState monitoringModel) {
+ final DOMResult res;
try {
- JAXBContext jaxbContext = JAXBContext.newInstance(NetconfState.class);
- Marshaller marshaller = jaxbContext.createMarshaller();
+ final JAXBContext jaxbContext = JAXBContext.newInstance(NetconfState.class);
+ final Marshaller marshaller = jaxbContext.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
res = new DOMResult();
marshaller.marshal(monitoringModel, res);
- } catch (JAXBException e) {
+ } catch (final JAXBException e) {
throw new RuntimeException("Unable to serialize netconf state " + monitoringModel, e);
}
return ((Document)res.getNode()).getDocumentElement();
private Schemas schemas;
private Sessions sessions;
- public NetconfState(NetconfMonitoringService monitoringService) {
+ public NetconfState(final NetconfMonitoringService monitoringService) {
this.sessions = monitoringService.getSessions();
this.schemas = monitoringService.getSchemas();
}
- public NetconfState() {
- }
-
-
+ public NetconfState() {}
@XmlElementWrapper(name="schemas")
@XmlElement(name="schema")
return Collections2.transform(schemas.getSchema(), new Function<Schema, MonitoringSchema>() {
@Nullable
@Override
- public MonitoringSchema apply(@Nullable Schema input) {
+ public MonitoringSchema apply(@Nullable final Schema input) {
return new MonitoringSchema(input);
}
});
return Collections2.transform(sessions.getSession(), new Function<Session, MonitoringSession>() {
@Nullable
@Override
- public MonitoringSession apply(@Nullable Session input) {
+ public MonitoringSession apply(@Nullable final Session input) {
return new MonitoringSession(input);
}
});
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.monitoring;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+
+import java.util.Collections;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
+import org.opendaylight.controller.netconf.api.monitoring.NetconfMonitoringService;
+import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SchemasBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SessionsBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.Schema;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.Session;
+import org.w3c.dom.Document;
+
+public class GetTest {
+
+ @Mock
+ private NetconfMonitoringService monitor;
+ @Mock
+ private Document request;
+ @Mock
+ private NetconfOperationChainedExecution subsequentOperation;
+ private Document incorrectSubsequentResult;
+ private Document correctSubsequentResult;
+
+ private Get get;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ incorrectSubsequentResult = XmlUtil.readXmlToDocument("<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>");
+ correctSubsequentResult = XmlUtil.readXmlToDocument("<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"><data></data></rpc-reply>");
+
+ doReturn(new SessionsBuilder().setSession(Collections.<Session>emptyList()).build()).when(monitor).getSessions();
+ doReturn(new SchemasBuilder().setSchema(Collections.<Schema>emptyList()).build()).when(monitor).getSchemas();
+ doReturn(false).when(subsequentOperation).isExecutionTermination();
+
+ get = new Get(monitor);
+ }
+
+ @Test
+ public void testHandleNoSubsequent() throws Exception {
+ try {
+ get.handle(null, NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
+ } catch (final NetconfDocumentedException e) {
+ assertNetconfDocumentedEx(e, NetconfDocumentedException.ErrorSeverity.error, NetconfDocumentedException.ErrorTag.operation_failed, NetconfDocumentedException.ErrorType.application);
+ return;
+ }
+
+ fail("Get should fail without subsequent operation");
+ }
+
+ @Test
+ public void testHandleWrongPlaceholder() throws Exception {
+ doReturn(incorrectSubsequentResult).when(subsequentOperation).execute(request);
+ try {
+ get.handle(request, subsequentOperation);
+ } catch (final NetconfDocumentedException e) {
+ assertNetconfDocumentedEx(e, NetconfDocumentedException.ErrorSeverity.error, NetconfDocumentedException.ErrorTag.invalid_value, NetconfDocumentedException.ErrorType.application);
+ return;
+ }
+
+ fail("Get should fail with wrong xml");
+ }
+
+ @Test
+ public void testHandleRuntimeEx() throws Exception {
+ doThrow(RuntimeException.class).when(subsequentOperation).execute(request);
+ try {
+ get.handle(request, subsequentOperation);
+ } catch (final NetconfDocumentedException e) {
+ assertNetconfDocumentedEx(e, NetconfDocumentedException.ErrorSeverity.error, NetconfDocumentedException.ErrorTag.operation_failed, NetconfDocumentedException.ErrorType.application);
+ assertEquals(1, e.getErrorInfo().size());
+ return;
+ }
+
+ fail("Get should fail with wrong xml");
+ }
+
+ @Test
+ public void testSuccessHandle() throws Exception {
+ doReturn(correctSubsequentResult).when(subsequentOperation).execute(request);
+ assertTrue(get.getHandlingPriority().getPriority().get() > HandlingPriority.HANDLE_WITH_DEFAULT_PRIORITY.getPriority().get());
+ final Document result = get.handle(request, subsequentOperation);
+ assertThat(XmlUtil.toString(result), CoreMatchers.containsString("sessions"));
+ assertThat(XmlUtil.toString(result), CoreMatchers.containsString("schemas"));
+
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testHandle() throws Exception {
+ get.handle(null, null, null);
+
+ }
+
+ private void assertNetconfDocumentedEx(final NetconfDocumentedException e, final NetconfDocumentedException.ErrorSeverity severity, final NetconfDocumentedException.ErrorTag errorTag, final NetconfDocumentedException.ErrorType type) {
+ assertEquals(severity, e.getErrorSeverity());
+ assertEquals(errorTag, e.getErrorTag());
+ assertEquals(type, e.getErrorType());
+ }
+}
*/
package org.opendaylight.controller.netconf.monitoring.xml;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
import com.google.common.collect.Lists;
+import org.hamcrest.CoreMatchers;
import org.junit.Test;
import org.opendaylight.controller.netconf.api.monitoring.NetconfMonitoringService;
import org.opendaylight.controller.netconf.monitoring.xml.model.NetconfState;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.DomainName;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.extension.rev131210.NetconfTcp;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.extension.rev131210.Session1;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.NetconfSsh;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.Transport;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.Yang;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Schemas;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SchemasBuilder;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.Sessions;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.SessionsBuilder;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.Schema;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.schemas.SchemaKey;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.rev101004.netconf.state.sessions.Session;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.DateAndTime;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.ZeroBasedCounter32;
-import org.w3c.dom.Element;
-
-import java.util.Date;
-
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
public class JaxBSerializerTest {
@Test
public void testName() throws Exception {
- NetconfMonitoringService service = new NetconfMonitoringService() {
+ final NetconfMonitoringService service = new NetconfMonitoringService() {
@Override
public Sessions getSessions() {
@Override
public Schemas getSchemas() {
- return new SchemasBuilder().setSchema(Lists.<Schema>newArrayList()).build();
+ return new SchemasBuilder().setSchema(Lists.newArrayList(getMockSchema("id", "v1", Yang.class), getMockSchema("id2", "", Yang.class))).build();
}
};
- NetconfState model = new NetconfState(service);
- Element xml = new JaxBSerializer().toXml(model);
+ final NetconfState model = new NetconfState(service);
+ final String xml = XmlUtil.toString(new JaxBSerializer().toXml(model));
+
+ assertThat(xml, CoreMatchers.containsString(
+ "<schema>\n" +
+ "<format>yang</format>\n" +
+ "<identifier>id</identifier>\n" +
+ "<location>NETCONF</location>\n" +
+ "<namespace>localhost</namespace>\n" +
+ "<version>v1</version>\n" +
+ "</schema>\n"));
+
+ assertThat(xml, CoreMatchers.containsString(
+ "<session>\n" +
+ "<session-id>1</session-id>\n" +
+ "<in-bad-rpcs>0</in-bad-rpcs>\n" +
+ "<in-rpcs>0</in-rpcs>\n" +
+ "<login-time>loginTime</login-time>\n" +
+ "<out-notifications>0</out-notifications>\n" +
+ "<out-rpc-errors>0</out-rpc-errors>\n" +
+ "<ncme:session-identifier>client</ncme:session-identifier>\n" +
+ "<source-host>address/port</source-host>\n" +
+ "<transport>ncme:netconf-tcp</transport>\n" +
+ "<username>username</username>\n" +
+ "</session>"));
+ }
+
+ private Schema getMockSchema(final String id, final String version, final Class<Yang> format) {
+ final Schema mock = mock(Schema.class);
+
+ doReturn(format).when(mock).getFormat();
+ doReturn(id).when(mock).getIdentifier();
+ doReturn(new Uri("localhost")).when(mock).getNamespace();
+ doReturn(version).when(mock).getVersion();
+ doReturn(Lists.newArrayList(new Schema.Location(Schema.Location.Enumeration.NETCONF))).when(mock).getLocation();
+ doReturn(new SchemaKey(format, id, version)).when(mock).getKey();
+ return mock;
}
- private Session getMockSession(Class<? extends Transport> transportType) {
- Session mocked = mock(Session.class);
- Session1 mockedSession1 = mock(Session1.class);
+ private Session getMockSession(final Class<? extends Transport> transportType) {
+ final Session mocked = mock(Session.class);
+ final Session1 mockedSession1 = mock(Session1.class);
doReturn("client").when(mockedSession1).getSessionIdentifier();
doReturn(1L).when(mocked).getSessionId();
- doReturn(new DateAndTime(new Date().toString())).when(mocked).getLoginTime();
+ doReturn(new DateAndTime("loginTime")).when(mocked).getLoginTime();
doReturn(new Host(new DomainName("address/port"))).when(mocked).getSourceHost();
doReturn(new ZeroBasedCounter32(0L)).when(mocked).getInBadRpcs();
doReturn(new ZeroBasedCounter32(0L)).when(mocked).getInRpcs();
<groupId>org.openexi</groupId>
<artifactId>nagasena-rta</artifactId>
</dependency>
-
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
<groupId>xmlunit</groupId>
<artifactId>xmlunit</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>mockito-configuration</artifactId>
+ </dependency>
</dependencies>
<build>
Optional<NetconfHelloMessageAdditionalHeader> headerOptional = ((NetconfHelloMessage) msg)
.getAdditionalHeader();
- // If additional header present, serialize it along with netconf hello
- // message
+ // If additional header present, serialize it along with netconf hello message
if (headerOptional.isPresent()) {
out.writeBytes(headerOptional.get().toFormattedString().getBytes(Charsets.UTF_8));
}
*/
package org.opendaylight.controller.netconf.nettyutil.handler;
-import java.util.List;
-
-import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
+import java.util.List;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class NetconfXMLToMessageDecoder extends ByteToMessageDecoder {
private static final Logger LOG = LoggerFactory.getLogger(NetconfXMLToMessageDecoder.class);
@Override
- @VisibleForTesting
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() != 0) {
package org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication;
-import ch.ethz.ssh2.Connection;
import java.io.IOException;
import org.apache.sshd.ClientSession;
* Class providing authentication facility to SSH handler.
*/
public abstract class AuthenticationHandler {
- public abstract void authenticate(Connection connection) throws IOException;
-
public abstract String getUsername();
package org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication;
import java.io.IOException;
-
import org.apache.sshd.ClientSession;
import org.apache.sshd.client.future.AuthFuture;
-import ch.ethz.ssh2.Connection;
-
/**
* Class Providing username/password authentication option to
- * {@link org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.SshHandler}
+ * {@link org.opendaylight.controller.netconf.nettyutil.handler.ssh.client.AsyncSshHandler}
*/
public class LoginPassword extends AuthenticationHandler {
private final String username;
this.password = password;
}
- @Override
- public void authenticate(Connection connection) throws IOException {
- final boolean isAuthenticated = connection.authenticateWithPassword(username, password);
-
- if (!isAuthenticated) {
- throw new IOException("Authentication failed.");
- }
- }
-
@Override
public String getUsername() {
return username;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import java.io.IOException;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.io.IoReadFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.WritePendingException;
import org.apache.sshd.common.util.Buffer;
import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
import org.slf4j.Logger;
private final SshClient sshClient;
private SshReadAsyncListener sshReadAsyncListener;
+ private SshWriteAsyncHandler sshWriteAsyncHandler;
+
private ClientChannel channel;
private ClientSession session;
private ChannelPromise connectPromise;
+
public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler) throws IOException {
return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT);
}
connectPromise.setSuccess();
connectPromise = null;
- ctx.fireChannelActive();
- final IoInputStream asyncOut = channel.getAsyncOut();
- sshReadAsyncListener = new SshReadAsyncListener(ctx, asyncOut);
+ sshReadAsyncListener = new SshReadAsyncListener(ctx, channel.getAsyncOut());
+ sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
+
+ ctx.fireChannelActive();
}
private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
@Override
public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
- try {
- if(channel.getAsyncIn().isClosed() || channel.getAsyncIn().isClosing()) {
- handleSshSessionClosed(ctx);
- } else {
- channel.getAsyncIn().write(toBuffer(msg));
- ((ByteBuf) msg).release();
- }
- } catch (final Exception e) {
- logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e);
- throw new IllegalStateException("Exception while writing to SSH remote on channel " + ctx.channel(),e);
- }
+ sshWriteAsyncHandler.write(ctx, msg, promise);
}
private static void handleSshSessionClosed(final ChannelHandlerContext ctx) {
ctx.fireChannelInactive();
}
- private Buffer toBuffer(final Object msg) {
- // TODO Buffer vs ByteBuf translate, Can we handle that better ?
- Preconditions.checkState(msg instanceof ByteBuf);
- final ByteBuf byteBuf = (ByteBuf) msg;
- final byte[] temp = new byte[byteBuf.readableBytes()];
- byteBuf.readBytes(temp, 0, byteBuf.readableBytes());
- return new Buffer(temp);
- }
-
@Override
public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
this.connectPromise = promise;
}
@Override
- public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
+ public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
if(sshReadAsyncListener != null) {
sshReadAsyncListener.close();
}
- session.close(false).addListener(new SshFutureListener<CloseFuture>() {
- @Override
- public void operationComplete(final CloseFuture future) {
- if(future.isClosed() == false) {
- session.close(true);
+ if(sshWriteAsyncHandler != null) {
+ sshWriteAsyncHandler.close();
+ }
+
+ if(session!= null && !session.isClosed() && !session.isClosing()) {
+ session.close(false).addListener(new SshFutureListener<CloseFuture>() {
+ @Override
+ public void operationComplete(final CloseFuture future) {
+ if (future.isClosed() == false) {
+ session.close(true);
+ }
+ session = null;
}
- session = null;
- }
- });
+ });
+ }
channel = null;
+ promise.setSuccess();
+
+ handleSshSessionClosed(ctx);
}
/**
}
@Override
- public synchronized void close() throws Exception {
+ public synchronized void close() {
// Remove self as listener on close to prevent reading from closed input
if(currentReadFuture != null) {
currentReadFuture.removeListener(this);
asyncOut = null;
}
}
+
+ private static final class SshWriteAsyncHandler implements AutoCloseable {
+ public static final int MAX_PENDING_WRITES = 100;
+
+ private final ChannelOutboundHandler channelHandler;
+ private IoOutputStream asyncIn;
+
+ // Counter that holds the amount of pending write messages
+ // Pending write can occur in case remote window is full
+ // In such case, we need to wait for the pending write to finish
+ private int pendingWriteCounter;
+ // Last write future, that can be pending
+ private IoWriteFuture lastWriteFuture;
+
+ public SshWriteAsyncHandler(final ChannelOutboundHandler channelHandler, final IoOutputStream asyncIn) {
+ this.channelHandler = channelHandler;
+ this.asyncIn = asyncIn;
+ }
+
+ public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
+ try {
+ if(asyncIn.isClosed() || asyncIn.isClosing()) {
+ handleSshSessionClosed(ctx);
+ } else {
+ lastWriteFuture = asyncIn.write(toBuffer(msg));
+ lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
+
+ @Override
+ public void operationComplete(final IoWriteFuture future) {
+ ((ByteBuf) msg).release();
+
+ // Notify success or failure
+ if (future.isWritten()) {
+ promise.setSuccess();
+ }
+ promise.setFailure(future.getException());
+
+ // Reset last pending future
+ synchronized (SshWriteAsyncHandler.this) {
+ lastWriteFuture = null;
+ }
+ }
+ });
+ }
+ } catch (final WritePendingException e) {
+ // Check limit for pending writes
+ pendingWriteCounter++;
+ if(pendingWriteCounter > MAX_PENDING_WRITES) {
+ handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
+ ", remote window is not getting read or is too small"));
+ }
+
+ logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
+
+ // In case of pending, re-invoke write after pending is finished
+ lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
+ @Override
+ public void operationComplete(final IoWriteFuture future) {
+ if(future.isWritten()) {
+ synchronized (SshWriteAsyncHandler.this) {
+ // Pending done, decrease counter
+ pendingWriteCounter--;
+ }
+ write(ctx, msg, promise);
+ } else {
+ // Cannot reschedule pending, fail
+ handlePendingFailed(ctx, e);
+ }
+ }
+
+ });
+ }
+ }
+
+ private void handlePendingFailed(final ChannelHandlerContext ctx, final Exception e) {
+ logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e);
+ try {
+ channelHandler.disconnect(ctx, ctx.newPromise());
+ } catch (final Exception ex) {
+ // This should not happen
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ @Override
+ public void close() {
+ asyncIn = null;
+ }
+
+ private Buffer toBuffer(final Object msg) {
+ // TODO Buffer vs ByteBuf translate, Can we handle that better ?
+ Preconditions.checkState(msg instanceof ByteBuf);
+ final ByteBuf byteBuf = (ByteBuf) msg;
+ final byte[] temp = new byte[byteBuf.readableBytes()];
+ byteBuf.readBytes(temp, 0, byteBuf.readableBytes());
+ return new Buffer(temp);
+ }
+
+ }
}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandler;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Class provides {@link InputStream} functionality to users of virtual socket.
- */
-public class ChannelInputStream extends InputStream implements ChannelInboundHandler {
- private final Object lock = new Object();
- private final ByteBuf bb = Unpooled.buffer();
-
- @Override
- public int read(byte b[], int off, int len) throws IOException {
- if (b == null) {
- throw new NullPointerException();
- } else if (off < 0 || len < 0 || len > b.length - off) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
- return 0;
- }
-
- int bytesRead = 1;
- synchronized (lock) {
- int c = read();
-
- b[off] = (byte)c;
-
- if(this.bb.readableBytes() == 0) {
- return bytesRead;
- }
-
- int ltr = len-1;
- ltr = (ltr <= bb.readableBytes()) ? ltr : bb.readableBytes();
-
- bb.readBytes(b, 1, ltr);
- bytesRead += ltr;
- }
- return bytesRead;
- }
-
- @Override
- public int read() throws IOException {
- synchronized (lock) {
- while (this.bb.readableBytes() == 0) {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException(e);
- }
- }
- return this.bb.readByte() & 0xFF;
- }
- }
-
- @Override
- public int available() throws IOException {
- synchronized (lock) {
- return this.bb.readableBytes();
- }
- }
-
- public void channelRegistered(ChannelHandlerContext ctx) {
- ctx.fireChannelRegistered();
- }
-
- public void channelUnregistered(ChannelHandlerContext ctx) {
- ctx.fireChannelUnregistered();
- }
-
- public void channelActive(ChannelHandlerContext ctx) {
- ctx.fireChannelActive();
- }
-
- public void channelInactive(ChannelHandlerContext ctx) {
- ctx.fireChannelInactive();
- }
-
- public void channelRead(ChannelHandlerContext ctx, Object o) {
- synchronized(lock) {
- this.bb.discardReadBytes();
- this.bb.writeBytes((ByteBuf) o);
- ((ByteBuf) o).release();
- lock.notifyAll();
- }
- }
-
- public void channelReadComplete(ChannelHandlerContext ctx) {
- ctx.fireChannelReadComplete();
- }
-
- public void userEventTriggered(ChannelHandlerContext ctx, Object o) {
- ctx.fireUserEventTriggered(o);
- }
-
- public void channelWritabilityChanged(ChannelHandlerContext ctx) {
- ctx.fireChannelWritabilityChanged();
- }
-
- public void handlerAdded(ChannelHandlerContext ctx) {
- }
-
- public void handlerRemoved(ChannelHandlerContext ctx) {
- }
-
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
- ctx.fireExceptionCaught(throwable);
- }
-}
-
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandler;
-import io.netty.channel.ChannelPromise;
-
-import java.io.OutputStream;
-import java.net.SocketAddress;
-
-/**
- * Class provides {@link OutputStream) functionality to users of virtual socket.
- */
-public class ChannelOutputStream extends OutputStream implements ChannelOutboundHandler {
- private final Object lock = new Object();
- private ByteBuf buff = Unpooled.buffer();
- private ChannelHandlerContext ctx;
-
- @Override
- public void flush() {
- synchronized(lock) {
- ctx.writeAndFlush(buff).awaitUninterruptibly();
- buff = Unpooled.buffer();
- }
- }
-
- @Override
- public void write(int b) {
- synchronized(lock) {
- buff.writeByte(b);
- }
- }
-
- public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
- ChannelPromise promise) {
- ctx.bind(localAddress, promise);
- }
-
- public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
- SocketAddress localAddress, ChannelPromise promise) {
- this.ctx = ctx;
- ctx.connect(remoteAddress, localAddress, promise);
- }
-
- public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
- ctx.disconnect(promise);
- }
-
- public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
- ctx.close(promise);
- }
-
- public void deregister(ChannelHandlerContext ctx, ChannelPromise channelPromise) {
- ctx.deregister(channelPromise);
- }
-
- public void read(ChannelHandlerContext ctx) {
- ctx.read();
- }
-
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
- // pass
- }
-
- public void flush(ChannelHandlerContext ctx) {
- // pass
- }
-
- public void handlerAdded(ChannelHandlerContext ctx)
- throws Exception {
- }
-
- public void handlerRemoved(ChannelHandlerContext ctx)
- throws Exception {
- }
-
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- ctx.fireExceptionCaught(cause);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket;
-
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.nio.channels.SocketChannel;
-
-/**
- * Handler class providing Socket functionality to OIO client application. By using VirtualSocket user can
- * use OIO application in asynchronous environment and NIO EventLoop. Using VirtualSocket OIO applications
- * are able to use full potential of NIO environment.
- */
-//TODO: refactor - socket should be created when connection is established
-public class VirtualSocket extends Socket implements ChannelHandler {
- private static final String INPUT_STREAM = "inputStream";
- private static final String OUTPUT_STREAM = "outputStream";
-
- private final ChannelInputStream chais = new ChannelInputStream();
- private final ChannelOutputStream chaos = new ChannelOutputStream();
- private ChannelHandlerContext ctx;
-
-
- public InputStream getInputStream() {
- return this.chais;
- }
-
- public OutputStream getOutputStream() {
- return this.chaos;
- }
-
- public void handlerAdded(ChannelHandlerContext ctx) {
- this.ctx = ctx;
-
- if (ctx.channel().pipeline().get(OUTPUT_STREAM) == null) {
- ctx.channel().pipeline().addFirst(OUTPUT_STREAM, chaos);
- }
-
- if (ctx.channel().pipeline().get(INPUT_STREAM) == null) {
- ctx.channel().pipeline().addFirst(INPUT_STREAM, chais);
- }
- }
-
- public void handlerRemoved(ChannelHandlerContext ctx) {
- if (ctx.channel().pipeline().get(OUTPUT_STREAM) != null) {
- ctx.channel().pipeline().remove(OUTPUT_STREAM);
- }
-
- if (ctx.channel().pipeline().get(INPUT_STREAM) != null) {
- ctx.channel().pipeline().remove(INPUT_STREAM);
- }
- }
-
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
- // TODO exceptionCaught is deprecated transform this handler
- ctx.fireExceptionCaught(throwable);
- }
-
-
- @Override
- public void connect(SocketAddress endpoint) throws IOException {}
-
- @Override
- public void connect(SocketAddress endpoint, int timeout) throws IOException {}
-
- @Override
- public void bind(SocketAddress bindpoint) throws IOException {}
-
- @Override
- public InetAddress getInetAddress() {
- InetSocketAddress isa = getInetSocketAddress();
- return isa.getAddress();
- }
-
- @Override
- public InetAddress getLocalAddress() {return null;}
-
- @Override
- public int getPort() {
- return getInetSocketAddress().getPort();
- }
-
- private InetSocketAddress getInetSocketAddress() {
- return (InetSocketAddress)getRemoteSocketAddress();
- }
-
- @Override
- public int getLocalPort() {return -1;}
-
- @Override
- public SocketAddress getRemoteSocketAddress() {
- return this.ctx.channel().remoteAddress();
- }
-
- @Override
- public SocketAddress getLocalSocketAddress() {
- return this.ctx.channel().localAddress();
- }
-
- @Override
- public SocketChannel getChannel() {return null;}
-
- @Override
- public void setTcpNoDelay(boolean on) throws SocketException {}
-
- @Override
- public boolean getTcpNoDelay() throws SocketException {return false;}
-
- @Override
- public void setSoLinger(boolean on, int linger) throws SocketException {}
-
- @Override
- public int getSoLinger() throws SocketException {return -1;}
-
- @Override
- public void sendUrgentData(int data) throws IOException {}
-
- @Override
- public void setOOBInline(boolean on) throws SocketException {}
-
- @Override
- public boolean getOOBInline() throws SocketException {return false;}
-
- @Override
- public synchronized void setSoTimeout(int timeout) throws SocketException {}
-
- @Override
- public synchronized int getSoTimeout() throws SocketException {return -1;}
-
- @Override
- public synchronized void setSendBufferSize(int size) throws SocketException {}
-
- @Override
- public synchronized int getSendBufferSize() throws SocketException {return -1;}
-
- @Override
- public synchronized void setReceiveBufferSize(int size) throws SocketException {}
-
- @Override
- public synchronized int getReceiveBufferSize() throws SocketException {return -1;}
-
- @Override
- public void setKeepAlive(boolean on) throws SocketException {}
-
- @Override
- public boolean getKeepAlive() throws SocketException {return false;}
-
- @Override
- public void setTrafficClass(int tc) throws SocketException {}
-
- @Override
- public int getTrafficClass() throws SocketException {return -1;}
-
- @Override
- public void setReuseAddress(boolean on) throws SocketException {}
-
- @Override
- public boolean getReuseAddress() throws SocketException {return false;}
-
- @Override
- public synchronized void close() throws IOException {}
-
- @Override
- public void shutdownInput() throws IOException {}
-
- @Override
- public void shutdownOutput() throws IOException {}
-
- @Override
- public String toString() {
- return "VirtualSocket{" + getInetAddress() + ":" + getPort() + "}";
- }
-
- @Override
- public boolean isConnected() {return false;}
-
- @Override
- public boolean isBound() {return false;}
-
- @Override
- public boolean isClosed() {return false;}
-
- @Override
- public boolean isInputShutdown() {return false;}
-
- @Override
- public boolean isOutputShutdown() {return false;}
-
- @Override
- public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) {}
-}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
+
+public class ChunkedFramingMechanismEncoderTest {
+
+ private int chunkSize;
+ @Mock
+ private ChannelHandlerContext ctx;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ chunkSize = 256;
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIllegalSize() throws Exception {
+ new ChunkedFramingMechanismEncoder(10);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIllegalSizeMax() throws Exception {
+ new ChunkedFramingMechanismEncoder(Integer.MAX_VALUE);
+ }
+
+ @Test
+ public void testEncode() throws Exception {
+ final List<ByteBuf> chunks = Lists.newArrayList();
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable {
+ chunks.add((ByteBuf) invocation.getArguments()[0]);
+ return null;
+ }
+ }).when(ctx).write(anyObject());
+
+ final ChunkedFramingMechanismEncoder encoder = new ChunkedFramingMechanismEncoder(chunkSize);
+ final int lastChunkSize = 20;
+ final ByteBuf src = Unpooled.wrappedBuffer(getByteArray(chunkSize * 4 + lastChunkSize));
+ final ByteBuf destination = Unpooled.buffer();
+ encoder.encode(ctx, src, destination);
+ assertEquals(4, chunks.size());
+
+ final int framingSize = "#256\n".getBytes().length + 1/* new line at end */;
+
+ for (final ByteBuf chunk : chunks) {
+ assertEquals(chunkSize + framingSize, chunk.readableBytes());
+ }
+
+ final int lastFramingSize = "#20\n".length() + NetconfMessageConstants.END_OF_CHUNK.length + 1/* new line at end */;
+ assertEquals(lastChunkSize + lastFramingSize, destination.readableBytes());
+ }
+
+ private byte[] getByteArray(final int size) {
+ final byte[] bytes = new byte[size];
+ for (int i = 0; i < size; i++) {
+ bytes[i] = 'a';
+ }
+ return bytes;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler;
+
+import static org.junit.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
+
+public class EOMFramingMechanismEncoderTest {
+
+ @Test
+ public void testEncode() throws Exception {
+ final byte[] content = new byte[50];
+ final ByteBuf source = Unpooled.wrappedBuffer(content);
+ final ByteBuf destination = Unpooled.buffer();
+ new EOMFramingMechanismEncoder().encode(null, source, destination);
+
+ assertEquals(Unpooled.wrappedBuffer(source.array(), NetconfMessageConstants.END_OF_MESSAGE), destination);
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler;
+
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
+
+public class FramingMechanismHandlerFactoryTest {
+
+ @Test
+ public void testCreate() throws Exception {
+ MatcherAssert.assertThat(FramingMechanismHandlerFactory
+ .createHandler(FramingMechanism.CHUNK), CoreMatchers
+ .instanceOf(ChunkedFramingMechanismEncoder.class));
+ MatcherAssert.assertThat(FramingMechanismHandlerFactory
+ .createHandler(FramingMechanism.EOM), CoreMatchers
+ .instanceOf(EOMFramingMechanismEncoder.class));
+ }
+}
\ No newline at end of file
*/
package org.opendaylight.controller.netconf.nettyutil.handler;
+import static org.junit.Assert.assertEquals;
+
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import junit.framework.Assert;
+import java.util.List;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.List;
-
public class NetconfChunkAggregatorTest {
private static final String CHUNKED_MESSAGE = "\n#4\n" +
@Test
public void testMultipleChunks() throws Exception {
- List<Object> output = Lists.newArrayList();
- ByteBuf input = Unpooled.copiedBuffer(CHUNKED_MESSAGE.getBytes(Charsets.UTF_8));
+ final List<Object> output = Lists.newArrayList();
+ final ByteBuf input = Unpooled.copiedBuffer(CHUNKED_MESSAGE.getBytes(Charsets.UTF_8));
agr.decode(null, input, output);
- Assert.assertEquals(1, output.size());
- ByteBuf chunk = (ByteBuf) output.get(0);
+ assertEquals(1, output.size());
+ final ByteBuf chunk = (ByteBuf) output.get(0);
- Assert.assertEquals(EXPECTED_MESSAGE, chunk.toString(Charsets.UTF_8));
+ assertEquals(EXPECTED_MESSAGE, chunk.toString(Charsets.UTF_8));
}
@Test
public void testOneChunks() throws Exception {
- List<Object> output = Lists.newArrayList();
- ByteBuf input = Unpooled.copiedBuffer(CHUNKED_MESSAGE_ONE.getBytes(Charsets.UTF_8));
+ final List<Object> output = Lists.newArrayList();
+ final ByteBuf input = Unpooled.copiedBuffer(CHUNKED_MESSAGE_ONE.getBytes(Charsets.UTF_8));
agr.decode(null, input, output);
- Assert.assertEquals(1, output.size());
- ByteBuf chunk = (ByteBuf) output.get(0);
+ assertEquals(1, output.size());
+ final ByteBuf chunk = (ByteBuf) output.get(0);
- Assert.assertEquals(EXPECTED_MESSAGE, chunk.toString(Charsets.UTF_8));
+ assertEquals(EXPECTED_MESSAGE, chunk.toString(Charsets.UTF_8));
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+
+public class NetconfHelloMessageToXMLEncoderTest {
+
+ @Mock
+ private ChannelHandlerContext ctx;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testEncode() throws Exception {
+ final NetconfMessage msg = new NetconfHelloMessage(XmlUtil.readXmlToDocument("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>"),
+ NetconfHelloMessageAdditionalHeader.fromString("[tomas;10.0.0.0:10000;tcp;client;]"));
+ final ByteBuf destination = Unpooled.buffer();
+ new NetconfHelloMessageToXMLEncoder().encode(ctx, msg, destination);
+
+ final String encoded = new String(destination.array());
+ assertThat(encoded, containsString("[tomas;10.0.0.0:10000;tcp;client;]"));
+ assertThat(encoded, containsString("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>"));
+ }
+
+ @Test
+ public void testEncodeNoHeader() throws Exception {
+ final NetconfMessage msg = new NetconfHelloMessage(XmlUtil.readXmlToDocument("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>"));
+ final ByteBuf destination = Unpooled.buffer();
+ new NetconfHelloMessageToXMLEncoder().encode(ctx, msg, destination);
+
+ final String encoded = new String(destination.array());
+ assertThat(encoded, not(containsString("[tomas;10.0.0.0:10000;tcp;client;]")));
+ assertThat(encoded, containsString("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>"));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testEncodeNotHello() throws Exception {
+ final NetconfMessage msg = new NetconfMessage(XmlUtil.readXmlToDocument("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>"));
+ new NetconfHelloMessageToXMLEncoder().encode(ctx, msg, null);
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.List;
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+
+public class NetconfXMLToHelloMessageDecoderTest {
+
+ @Test
+ public void testDecodeWithHeader() throws Exception {
+ final ByteBuf src = Unpooled.wrappedBuffer(String.format("%s\n%s",
+ "[tomas;10.0.0.0:10000;tcp;client;]", "<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>").getBytes());
+ final List<Object> out = Lists.newArrayList();
+ new NetconfXMLToHelloMessageDecoder().decode(null, src, out);
+
+ assertEquals(1, out.size());
+ assertThat(out.get(0), CoreMatchers.instanceOf(NetconfHelloMessage.class));
+ final NetconfHelloMessage hello = (NetconfHelloMessage) out.get(0);
+ assertTrue(hello.getAdditionalHeader().isPresent());
+ assertEquals("[tomas;10.0.0.0:10000;tcp;client;]\n", hello.getAdditionalHeader().get().toFormattedString());
+ assertThat(XmlUtil.toString(hello.getDocument()), CoreMatchers.containsString("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\""));
+ }
+
+ @Test
+ public void testDecodeNoHeader() throws Exception {
+ final ByteBuf src = Unpooled.wrappedBuffer("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>".getBytes());
+ final List<Object> out = Lists.newArrayList();
+ new NetconfXMLToHelloMessageDecoder().decode(null, src, out);
+
+ assertEquals(1, out.size());
+ assertThat(out.get(0), CoreMatchers.instanceOf(NetconfHelloMessage.class));
+ final NetconfHelloMessage hello = (NetconfHelloMessage) out.get(0);
+ assertFalse(hello.getAdditionalHeader().isPresent());
+ }
+
+ @Test
+ public void testDecodeCaching() throws Exception {
+ final ByteBuf msg1 = Unpooled.wrappedBuffer("<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>".getBytes());
+ final ByteBuf msg2 = Unpooled.wrappedBuffer("<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>".getBytes());
+ final ByteBuf src = Unpooled.wrappedBuffer("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>".getBytes());
+ final List<Object> out = Lists.newArrayList();
+ final NetconfXMLToHelloMessageDecoder decoder = new NetconfXMLToHelloMessageDecoder();
+ decoder.decode(null, src, out);
+ decoder.decode(null, msg1, out);
+ decoder.decode(null, msg2, out);
+
+ assertEquals(1, out.size());
+
+ assertEquals(2, Iterables.size(decoder.getPostHelloNetconfMessages()));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testDecodeNotHelloReceived() throws Exception {
+ final ByteBuf msg1 = Unpooled.wrappedBuffer("<rpc-reply xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\"/>".getBytes());
+ final List<Object> out = Lists.newArrayList();
+ NetconfXMLToHelloMessageDecoder decoder = new NetconfXMLToHelloMessageDecoder();
+ decoder.decode(null, msg1, out);
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.Unpooled;
+import java.util.ArrayList;
+import org.junit.Test;
+
+public class NetconfXMLToMessageDecoderTest {
+
+ @Test
+ public void testDecodeNoMoreContent() throws Exception {
+ final ArrayList<Object> out = Lists.newArrayList();
+ new NetconfXMLToMessageDecoder().decode(null, Unpooled.buffer(), out);
+ assertEquals(0, out.size());
+ }
+
+ @Test
+ public void testDecode() throws Exception {
+ final ArrayList<Object> out = Lists.newArrayList();
+ new NetconfXMLToMessageDecoder().decode(null, Unpooled.wrappedBuffer("<msg/>".getBytes()), out);
+ assertEquals(1, out.size());
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import org.apache.sshd.ClientSession;
+import org.apache.sshd.client.future.AuthFuture;
+import org.junit.Test;
+
+public class LoginPasswordTest {
+
+ @Test
+ public void testLoginPassword() throws Exception {
+ final LoginPassword loginPassword = new LoginPassword("user", "pwd");
+ assertEquals("user", loginPassword.getUsername());
+
+ final ClientSession session = mock(ClientSession.class);
+ doNothing().when(session).addPasswordIdentity("pwd");
+ doReturn(mock(AuthFuture.class)).when(session).auth();
+ loginPassword.authenticate(session);
+
+ verify(session).addPasswordIdentity("pwd");
+ verify(session).auth();
+ }
+}
\ No newline at end of file
Thread.sleep(100);
}
assertFalse(echoClientHandler.isConnected());
- assertEquals(State.FAILED_TO_CONNECT, echoClientHandler.getState());
+ assertEquals(State.CONNECTION_CLOSED, echoClientHandler.getState());
}
}
* instance of deleted LoadBalancerPool object
* @return void
*/
- public void NeutronLoadBalancerPoolMemberDeleted(NeutronLoadBalancerPoolMember loadBalancerPoolMember);
+ public void neutronLoadBalancerPoolMemberDeleted(NeutronLoadBalancerPoolMember loadBalancerPoolMember);
}