// We need to create ProxyDOMDataBroker so accessing mountpoint
// on leader node would be same as on follower node
final ProxyDOMDataBroker proxyDataBroker =
- new ProxyDOMDataBroker(actorSystem, id, masterActorRef, actorResponseWaitTime);
+ new ProxyDOMDataBroker(id, masterActorRef, actorSystem.dispatcher(), actorResponseWaitTime);
salProvider.getMountInstance()
.onTopologyDeviceConnected(currentSchemaContext, proxyDataBroker, deviceRpc, notificationService);
}
package org.opendaylight.netconf.topology.singleton.impl;
import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import akka.util.Timeout;
-import com.google.common.base.Verify;
import java.util.Collections;
import java.util.Map;
import javax.annotation.Nonnull;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyReadTransaction;
import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyReadWriteTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyWriteTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
-import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
public class ProxyDOMDataBroker implements DOMDataBroker {
private final Timeout askTimeout;
private final RemoteDeviceId id;
private final ActorRef masterNode;
- private final ActorSystem actorSystem;
+ private final ExecutionContext executionContext;
/**
* Constructor for {@code ProxyDOMDataBroker}.
*
- * @param actorSystem system
* @param id id
* @param masterNode {@link org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor} ref
+ * @param executionContext ExecutionContext
* @param askTimeout ask timeout
*/
- public ProxyDOMDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
- final ActorRef masterNode, final Timeout askTimeout) {
+ public ProxyDOMDataBroker(final RemoteDeviceId id, final ActorRef masterNode,
+ final ExecutionContext executionContext, final Timeout askTimeout) {
this.id = id;
this.masterNode = masterNode;
- this.actorSystem = actorSystem;
+ this.executionContext = executionContext;
this.askTimeout = askTimeout;
}
@Override
public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
final Future<Object> txActorFuture = Patterns.ask(masterNode, new NewReadTransactionRequest(), askTimeout);
- final Object msg;
- try {
- msg = Await.result(txActorFuture, askTimeout.duration());
- } catch (Exception e) {
- throw new IllegalStateException("Can't create ProxyReadTransaction", e);
- }
-
- if (msg instanceof Exception) {
- throw new IllegalStateException("Can't create ProxyReadTransaction", (Exception) msg);
- }
-
- Verify.verify(msg instanceof NewReadTransactionReply);
- final NewReadTransactionReply reply = (NewReadTransactionReply) msg;
- return new ProxyReadTransaction(reply.getTxActor(), id, actorSystem, askTimeout);
+ return new ProxyReadTransaction(id, txActorFuture, executionContext, askTimeout);
}
@SuppressWarnings("checkstyle:IllegalCatch")
@Override
public DOMDataReadWriteTransaction newReadWriteTransaction() {
final Future<Object> txActorFuture = Patterns.ask(masterNode, new NewReadWriteTransactionRequest(), askTimeout);
- final Object msg;
- try {
- msg = Await.result(txActorFuture, askTimeout.duration());
- } catch (Exception e) {
- throw new IllegalStateException("Can't create ProxyReadWriteTransaction", e);
- }
-
- if (msg instanceof Exception) {
- throw new IllegalStateException("Can't create ProxyReadWriteTransaction", (Exception) msg);
- }
-
- Verify.verify(msg instanceof NewReadWriteTransactionReply);
- final NewReadWriteTransactionReply reply = (NewReadWriteTransactionReply) msg;
- return new ProxyReadWriteTransaction(reply.getTxActor(), id, actorSystem, askTimeout);
+ return new ProxyReadWriteTransaction(id, txActorFuture, executionContext, askTimeout);
}
@SuppressWarnings("checkstyle:IllegalCatch")
@Override
public DOMDataWriteTransaction newWriteOnlyTransaction() {
final Future<Object> txActorFuture = Patterns.ask(masterNode, new NewWriteTransactionRequest(), askTimeout);
- final Object msg;
- try {
- msg = Await.result(txActorFuture, askTimeout.duration());
- } catch (Exception e) {
- throw new IllegalStateException("Can't create ProxyWriteTransaction", e);
- }
-
- if (msg instanceof Exception) {
- throw new IllegalStateException("Can't create ProxyWriteTransaction", (Exception) msg);
- }
-
- Verify.verify(msg instanceof NewWriteTransactionReply);
- final NewWriteTransactionReply reply = (NewWriteTransactionReply) msg;
- return new ProxyWriteTransaction(reply.getTxActor(), id, actorSystem, askTimeout);
+ return new ProxyReadWriteTransaction(id, txActorFuture, executionContext, askTimeout);
}
@Override
final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
final ProxyDOMDataBroker netconfDeviceDataBroker =
- new ProxyDOMDataBroker(actorSystem, id, masterActorRef, actorResponseWaitTime);
+ new ProxyDOMDataBroker(id, masterActorRef, actorSystem.dispatcher(), actorResponseWaitTime);
salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, netconfDeviceDataBroker,
deviceRpc, notificationService);
import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
} else if (message instanceof NewReadTransactionRequest) { // master
-
- sender().tell(new NewReadTransactionReply(readTxActor), self());
-
+ sender().tell(new Success(readTxActor), self());
} else if (message instanceof NewWriteTransactionRequest) { // master
try {
final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
- sender().tell(new NewWriteTransactionReply(txActor), self());
+ sender().tell(new Success(txActor), self());
} catch (final Exception t) {
- sender().tell(t, self());
+ sender().tell(new Failure(t), self());
}
} else if (message instanceof NewReadWriteTransactionRequest) {
try {
final DOMDataReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
- sender().tell(new NewReadWriteTransactionReply(txActor), self());
+ sender().tell(new Success(txActor), self());
} catch (final Exception t) {
- sender().tell(t, self());
+ sender().tell(new Failure(t), self());
}
} else if (message instanceof InvokeRpcMessage) { // master
final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
@Override
public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
try {
+ LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier);
sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
} catch (IOException e) {
sender.tell(new Failure(e), getSelf());
@Override
public void onFailure(@Nonnull final Throwable throwable) {
+ LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable);
sender.tell(new Failure(throwable), getSelf());
}
}, MoreExecutors.directExecutor());
package org.opendaylight.netconf.topology.singleton.impl.actors;
import akka.actor.ActorRef;
+import akka.actor.Status.Failure;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
@Override
public void onFailure(@Nonnull final Throwable throwable) {
- sender.tell(throwable, self);
+ sender.tell(new Failure(throwable), self);
}
}, MoreExecutors.directExecutor());
}
@Override
public void onFailure(@Nonnull final Throwable throwable) {
- sender.tell(throwable, self);
+ sender.tell(new Failure(throwable), self);
}
}, MoreExecutors.directExecutor());
}
import akka.actor.ActorContext;
import akka.actor.ActorRef;
+import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Futures.addCallback(submitFuture, new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- requester.tell(new SubmitReply(), self);
+ requester.tell(new Success(null), self);
}
@Override
public void onFailure(@Nonnull final Throwable throwable) {
- requester.tell(new SubmitFailedReply(throwable), self);
+ requester.tell(new Failure(throwable), self);
}
}, MoreExecutors.directExecutor());
}
--- /dev/null
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.AskTimeoutException;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Objects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+
+/**
+ * ProxyTransactionFacade implementation that interfaces with an actor.
+ *
+ * @author Thomas Pantelis
+ */
+class ActorProxyTransactionFacade implements ProxyTransactionFacade {
+ private static final Logger LOG = LoggerFactory.getLogger(ActorProxyTransactionFacade.class);
+
+ private final ActorRef masterTxActor;
+ private final RemoteDeviceId id;
+ private final ExecutionContext executionContext;
+ private final Timeout askTimeout;
+
+ ActorProxyTransactionFacade(ActorRef masterTxActor, RemoteDeviceId id, ExecutionContext executionContext,
+ Timeout askTimeout) {
+ this.masterTxActor = Objects.requireNonNull(masterTxActor);
+ this.id = Objects.requireNonNull(id);
+ this.executionContext = Objects.requireNonNull(executionContext);
+ this.askTimeout = Objects.requireNonNull(askTimeout);
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return id;
+ }
+
+ @Override
+ public boolean cancel() {
+ LOG.debug("{}: Cancel via actor {}", id, masterTxActor);
+
+ final Future<Object> future = Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
+
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ if (failure != null) {
+ LOG.warn("{}: Cancel failed", id, failure);
+ return;
+ }
+
+ LOG.debug("{}: Cancel succeeded", id);
+ }
+ }, executionContext);
+
+ return true;
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(LogicalDatastoreType store,
+ YangInstanceIdentifier path) {
+ LOG.debug("{}: Read {} {} via actor {}", id, store, path, masterTxActor);
+
+ final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
+
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ if (failure != null) {
+ LOG.debug("{}: Read {} {} failed", id, store, path, failure);
+ settableFuture.setException(processFailure(failure));
+ return;
+ }
+
+ LOG.debug("{}: Read {} {} succeeded: {}", id, store, path, response);
+
+ if (response instanceof EmptyReadResponse) {
+ settableFuture.set(Optional.absent());
+ return;
+ }
+
+ if (response instanceof NormalizedNodeMessage) {
+ final NormalizedNodeMessage data = (NormalizedNodeMessage) response;
+ settableFuture.set(Optional.of(data.getNode()));
+ }
+ }
+ }, executionContext);
+
+ return MappingCheckedFuture.create(settableFuture, ReadFailedException.MAPPER);
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(LogicalDatastoreType store, YangInstanceIdentifier path) {
+ LOG.debug("{}: Exists {} {} via actor {}", id, store, path, masterTxActor);
+
+ final Future<Object> future = Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
+
+ final SettableFuture<Boolean> settableFuture = SettableFuture.create();
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ if (failure != null) {
+ LOG.debug("{}: Exists {} {} failed", id, store, path, failure);
+ settableFuture.setException(processFailure(failure));
+ return;
+ }
+
+ LOG.debug("{}: Exists {} {} succeeded: {}", id, store, path, response);
+
+ settableFuture.set((Boolean) response);
+ }
+ }, executionContext);
+
+ return MappingCheckedFuture.create(settableFuture, ReadFailedException.MAPPER);
+ }
+
+ @Override
+ public void delete(LogicalDatastoreType store, YangInstanceIdentifier path) {
+ LOG.debug("{}: Delete {} {} via actor {}", id, store, path, masterTxActor);
+ masterTxActor.tell(new DeleteRequest(store, path), ActorRef.noSender());
+ }
+
+ @Override
+ public void put(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("{}: Put {} {} via actor {}", id, store, path, masterTxActor);
+ masterTxActor.tell(new PutRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender());
+ }
+
+ @Override
+ public void merge(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("{}: Merge {} {} via actor {}", id, store, path, masterTxActor);
+ masterTxActor.tell(new MergeRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender());
+ }
+
+ @Override
+ public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
+ LOG.debug("{}: Commit via actor {}", id, masterTxActor);
+
+ final Future<Object> future = Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
+
+ final SettableFuture<CommitInfo> settableFuture = SettableFuture.create();
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ if (failure != null) {
+ LOG.debug("{}: Commit failed", id, failure);
+ settableFuture.setException(newTransactionCommitFailedException(processFailure(failure)));
+ return;
+ }
+
+ LOG.debug("{}: Commit succeeded", id);
+
+ settableFuture.set(CommitInfo.empty());
+ }
+ }, executionContext);
+
+ return settableFuture;
+ }
+
+ private TransactionCommitFailedException newTransactionCommitFailedException(final Throwable failure) {
+ return new TransactionCommitFailedException(String.format("%s: Commit of transaction failed", getIdentifier()),
+ failure);
+ }
+
+ private Throwable processFailure(Throwable failure) {
+ if (failure instanceof AskTimeoutException) {
+ return NetconfTopologyUtils.createMasterIsDownException(id, (Exception)failure);
+ }
+
+ return failure;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.Futures;
+import java.util.Objects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of ProxyTransactionFacade that fails each request.
+ *
+ * @author Thomas Pantelis
+ */
+class FailedProxyTransactionFacade implements ProxyTransactionFacade {
+ private static final Logger LOG = LoggerFactory.getLogger(FailedProxyTransactionFacade.class);
+
+ private final RemoteDeviceId id;
+ private final Throwable failure;
+
+ FailedProxyTransactionFacade(RemoteDeviceId id, Throwable failure) {
+ this.id = Objects.requireNonNull(id);
+ this.failure = Objects.requireNonNull(failure);
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return id;
+ }
+
+ @Override
+ public boolean cancel() {
+ return true;
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(LogicalDatastoreType store,
+ YangInstanceIdentifier path) {
+ LOG.debug("{}: Read {} {} - failure {}", id, store, path, failure);
+ return Futures.immediateFailedCheckedFuture(ReadFailedException.MAPPER.apply(
+ failure instanceof Exception ? (Exception)failure : new ReadFailedException("read", failure)));
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(LogicalDatastoreType store, YangInstanceIdentifier path) {
+ LOG.debug("{}: Exists {} {} - failure {}", id, store, path, failure);
+ return Futures.immediateFailedCheckedFuture(ReadFailedException.MAPPER.apply(
+ failure instanceof Exception ? (Exception)failure : new ReadFailedException("read", failure)));
+ }
+
+ @Override
+ public void delete(LogicalDatastoreType store, YangInstanceIdentifier path) {
+ LOG.debug("{}: Delete {} {} - failure {}", id, store, path, failure);
+ }
+
+ @Override
+ public void put(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("{}: Put {} {} - failure {}", id, store, path, failure);
+ }
+
+ @Override
+ public void merge(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("{}: Merge {} {} - failure {}", id, store, path, failure);
+ }
+
+ @Override
+ public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
+ LOG.debug("{}: Commit {} {} - failure {}", id, failure);
+ return FluentFuture.from(Futures.immediateFailedFuture(failure instanceof Exception
+ ? AsyncWriteTransaction.SUBMIT_EXCEPTION_MAPPER.apply((Exception)failure)
+ : new TransactionCommitFailedException("commit", failure)));
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-class ProxyReadAdapter {
- private static final Logger LOG = LoggerFactory.getLogger(ProxyReadAdapter.class);
-
- private final ActorRef masterTxActor;
- private final RemoteDeviceId id;
- private final ActorSystem actorSystem;
- private final Timeout askTimeout;
-
- ProxyReadAdapter(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
- final Timeout askTimeout) {
- this.masterTxActor = masterTxActor;
- this.id = id;
- this.actorSystem = actorSystem;
- this.askTimeout = askTimeout;
- }
-
- public void close() {
- //noop
- }
-
- public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
- final YangInstanceIdentifier path) {
- LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
-
- final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
- final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
- future.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable failure,
- final Object success) throws Throwable {
- if (failure != null) { // ask timeout
- final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
- settableFuture.setException(exception);
- return;
- }
- if (success instanceof Throwable) { // Error sended by master
- settableFuture.setException((Throwable) success);
- return;
- }
- if (success instanceof EmptyReadResponse) {
- settableFuture.set(Optional.absent());
- return;
- }
- if (success instanceof NormalizedNodeMessage) {
- final NormalizedNodeMessage data = (NormalizedNodeMessage) success;
- settableFuture.set(Optional.of(data.getNode()));
- }
- }
- }, actorSystem.dispatcher());
- return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
- }
-
- public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
- final YangInstanceIdentifier path) {
- final Future<Object> existsScalaFuture =
- Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
-
- LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
-
- final SettableFuture<Boolean> settableFuture = SettableFuture.create();
- existsScalaFuture.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable failure, final Object success) throws Throwable {
- if (failure != null) { // ask timeout
- final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
- settableFuture.setException(exception);
- return;
- }
- if (success instanceof Throwable) {
- settableFuture.setException((Throwable) success);
- return;
- }
- settableFuture.set((Boolean) success);
- }
- }, actorSystem.dispatcher());
- return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
- }
-
-}
package org.opendaylight.netconf.topology.singleton.impl.tx;
import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
/**
* ProxyReadTransaction uses provided {@link ActorRef} to delegate method calls to master
* {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActor}.
*/
-public class ProxyReadTransaction implements DOMDataReadOnlyTransaction {
+public class ProxyReadTransaction extends ProxyReadWriteTransaction implements DOMDataReadOnlyTransaction {
- private final ProxyReadAdapter delegate;
-
- /**
- * Constructor for {@code ProxyReadTransaction}.
- *
- * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActor} ref
- * @param id device id
- * @param actorSystem system
- * @param askTimeout timeout
- */
- public ProxyReadTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
- final Timeout askTimeout) {
- delegate = new ProxyReadAdapter(masterTxActor, id, actorSystem, askTimeout);
+ public ProxyReadTransaction(final RemoteDeviceId id, final Future<Object> masterTxActorFuture,
+ final ExecutionContext executionContext, final Timeout askTimeout) {
+ super(id, masterTxActorFuture, executionContext, askTimeout);
}
@Override
public void close() {
- delegate.close();
- }
-
- @Override
- public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
- final YangInstanceIdentifier path) {
- return delegate.read(store, path);
- }
-
- @Override
- public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
- final YangInstanceIdentifier path) {
- return delegate.exists(store, path);
- }
-
-
- @Override
- public Object getIdentifier() {
- return this;
+ // noop
}
}
package org.opendaylight.netconf.topology.singleton.impl.tx;
import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
import akka.util.Timeout;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.GuardedBy;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
/**
* ProxyReadWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
* {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor}.
*/
public class ProxyReadWriteTransaction implements DOMDataReadWriteTransaction {
+ private static final Logger LOG = LoggerFactory.getLogger(ProxyReadWriteTransaction.class);
- private final ProxyReadAdapter delegateRead;
- private final ProxyWriteAdapter delegateWrite;
-
- /**
- * Constructor for {@code ProxyReadWriteTransaction}.
- *
- * @param masterTxActor
- * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor} ref
- * @param id device id
- * @param actorSystem system
- * @param askTimeout timeout
- */
- public ProxyReadWriteTransaction(final ActorRef masterTxActor, final RemoteDeviceId id,
- final ActorSystem actorSystem, final Timeout askTimeout) {
- delegateRead = new ProxyReadAdapter(masterTxActor, id, actorSystem, askTimeout);
- delegateWrite = new ProxyWriteAdapter(masterTxActor, id, actorSystem, askTimeout);
+ private final RemoteDeviceId id;
+ private final AtomicBoolean opened = new AtomicBoolean(true);
+
+ @GuardedBy("queuedTxOperations")
+ private final List<Consumer<ProxyTransactionFacade>> queuedTxOperations = new ArrayList<>();
+
+ private volatile ProxyTransactionFacade transactionFacade;
+
+ public ProxyReadWriteTransaction(final RemoteDeviceId id, final Future<Object> masterTxActorFuture,
+ final ExecutionContext executionContext, final Timeout askTimeout) {
+ this.id = id;
+
+ masterTxActorFuture.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object masterTxActor) {
+ final ProxyTransactionFacade newTransactionFacade;
+ if (failure != null) {
+ LOG.debug("{}: Failed to obtain master actor", id, failure);
+ newTransactionFacade = new FailedProxyTransactionFacade(id, failure);
+ } else {
+ LOG.debug("{}: Obtained master actor {}", id, masterTxActor);
+ newTransactionFacade = new ActorProxyTransactionFacade((ActorRef)masterTxActor, id,
+ executionContext, askTimeout);
+ }
+
+ executePriorTransactionOperations(newTransactionFacade);
+ }
+ }, executionContext);
}
@Override
public boolean cancel() {
- return delegateWrite.cancel();
+ if (!opened.compareAndSet(true, false)) {
+ return false;
+ }
+
+ processTransactionOperation(facade -> facade.cancel());
+ return true;
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
- final YangInstanceIdentifier path) {
- return delegateRead.read(store, path);
- }
+ final YangInstanceIdentifier path) {
+ LOG.debug("{}: Read {} {}", id, store, path);
- @Override
- public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
- final YangInstanceIdentifier path) {
- return delegateRead.exists(store, path);
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
+ processTransactionOperation(facade -> returnFuture.setFuture(facade.read(store, path)));
+ return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
}
@Override
- public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
- delegateWrite.delete(store, path);
- }
+ public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ LOG.debug("{}: Exists {} {}", id, store, path);
- @Override
- public CheckedFuture<Void, TransactionCommitFailedException> submit() {
- return MappingCheckedFuture.create(commit().transform(ignored -> null, MoreExecutors.directExecutor()),
- new ExceptionMapper<TransactionCommitFailedException>("commit", TransactionCommitFailedException.class) {
- @Override
- protected TransactionCommitFailedException newWithCause(String message, Throwable cause) {
- return new TransactionCommitFailedException(message, cause);
- }
- });
+ final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+ processTransactionOperation(facade -> returnFuture.setFuture(facade.exists(store, path)));
+ return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
}
@Override
- public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
- return delegateWrite.commit(getIdentifier());
+ public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ checkOpen();
+ LOG.debug("{}: Delete {} {}", id, store, path);
+ processTransactionOperation(facade -> facade.delete(store, path));
}
@Override
public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
final NormalizedNode<?, ?> data) {
- delegateWrite.put(store, path, data, getIdentifier());
+ checkOpen();
+ LOG.debug("{}: Put {} {}", id, store, path);
+ processTransactionOperation(facade -> facade.put(store, path, data));
}
@Override
public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
final NormalizedNode<?, ?> data) {
- delegateWrite.merge(store, path, data, getIdentifier());
+ checkOpen();
+ LOG.debug("{}: Merge {} {}", id, store, path);
+ processTransactionOperation(facade -> facade.merge(store, path, data));
+ }
+
+ @Override
+ public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
+ Preconditions.checkState(opened.compareAndSet(true, false), "%s: Transaction is already closed", id);
+ LOG.debug("{}: Commit", id);
+
+ final SettableFuture<CommitInfo> returnFuture = SettableFuture.create();
+ processTransactionOperation(facade -> returnFuture.setFuture(facade.commit()));
+ return returnFuture;
}
@Override
public Object getIdentifier() {
- return this;
+ return id;
+ }
+
+ private void processTransactionOperation(final Consumer<ProxyTransactionFacade> operation) {
+ final ProxyTransactionFacade facadeOnEntry;
+ synchronized (queuedTxOperations) {
+ if (transactionFacade == null) {
+ LOG.debug("{}: Queuing transaction operation", id);
+
+ queuedTxOperations.add(operation);
+ facadeOnEntry = null;
+ } else {
+ facadeOnEntry = transactionFacade;
+ }
+ }
+
+ if (facadeOnEntry != null) {
+ operation.accept(facadeOnEntry);
+ }
+ }
+
+ private void executePriorTransactionOperations(final ProxyTransactionFacade newTransactionFacade) {
+ while (true) {
+ // Access to queuedTxOperations and transactionFacade must be protected and atomic
+ // (ie synchronized) with respect to #processTransactionOperation to handle timing
+ // issues and ensure no ProxyTransactionFacade is missed and that they are processed
+ // in the order they occurred.
+
+ // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
+ // in case a transaction operation results in another transaction operation being
+ // queued (eg a put operation from a client read Future callback that is notified
+ // synchronously).
+ final Collection<Consumer<ProxyTransactionFacade>> operationsBatch;
+ synchronized (queuedTxOperations) {
+ if (queuedTxOperations.isEmpty()) {
+ // We're done invoking the transaction operations so we can now publish the
+ // ProxyTransactionFacade.
+ transactionFacade = newTransactionFacade;
+ break;
+ }
+
+ operationsBatch = new ArrayList<>(queuedTxOperations);
+ queuedTxOperations.clear();
+ }
+
+ // Invoke transaction operations outside the sync block to avoid unnecessary blocking.
+ for (Consumer<ProxyTransactionFacade> oper : operationsBatch) {
+ oper.accept(newTransactionFacade);
+ }
+ }
+ }
+
+ private void checkOpen() {
+ Preconditions.checkState(opened.get(), "%s: Transaction is closed", id);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+
+/**
+ * Interfaces with a transaction back-end.
+ *
+ * @author Thomas Pantelis
+ */
+interface ProxyTransactionFacade extends DOMDataReadWriteTransaction {
+}
+++ /dev/null
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-
-public class ProxyWriteAdapter {
-
- private static final Logger LOG = LoggerFactory.getLogger(ProxyWriteAdapter.class);
-
- private final ActorRef masterTxActor;
- private final RemoteDeviceId id;
- private final ActorSystem actorSystem;
- private final AtomicBoolean opened = new AtomicBoolean(true);
- private final Timeout askTimeout;
-
- public ProxyWriteAdapter(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
- final Timeout askTimeout) {
- this.masterTxActor = masterTxActor;
- this.id = id;
- this.actorSystem = actorSystem;
- this.askTimeout = askTimeout;
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- public boolean cancel() {
- if (!opened.compareAndSet(true, false)) {
- return false;
- }
- final Future<Object> cancelScalaFuture =
- Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
-
- LOG.trace("{}: Cancel {} via NETCONF", id);
-
- try {
- // here must be Await because AsyncWriteTransaction do not return future
- return (boolean) Await.result(cancelScalaFuture, askTimeout.duration());
- } catch (final Exception e) {
- return false;
- }
- }
-
- public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit(final Object identifier) {
- if (!opened.compareAndSet(true, false)) {
- throw new IllegalStateException(id + ": Transaction" + identifier + " is closed");
- }
- final Future<Object> submitScalaFuture =
- Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
-
- LOG.trace("{}: Commit {} via NETCONF", id);
-
- final SettableFuture<CommitInfo> settableFuture = SettableFuture.create();
- submitScalaFuture.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable failure, final Object success) throws Throwable {
- if (failure != null) { // ask timeout
- settableFuture.setException(newTransactionCommitFailedException(
- NetconfTopologyUtils.createMasterIsDownException(id), identifier));
- return;
- }
- if (success instanceof Throwable) {
- settableFuture.setException(newTransactionCommitFailedException((Throwable) success, identifier));
- } else {
- if (success instanceof SubmitFailedReply) {
- LOG.error("{}: Transaction was not submitted because already closed.", id);
- settableFuture.setException(newTransactionCommitFailedException(
- ((SubmitFailedReply) success).getThrowable(), identifier));
- return;
- }
-
- settableFuture.set(CommitInfo.empty());
- }
- }
- }, actorSystem.dispatcher());
-
- return FluentFuture.from(settableFuture);
- }
-
- private static TransactionCommitFailedException newTransactionCommitFailedException(final Throwable failure,
- final Object identifier) {
- return new TransactionCommitFailedException(
- String.format("Commit of transaction %s failed", identifier), failure);
- }
-
- public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) {
- Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier);
- LOG.trace("{}: Delete {} via NETCONF: {}", id, store, identifier);
- masterTxActor.tell(new DeleteRequest(store, identifier), ActorRef.noSender());
- }
-
- public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
- final NormalizedNode<?, ?> data, final Object identifier) {
- Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier);
- final NormalizedNodeMessage msg = new NormalizedNodeMessage(path, data);
- LOG.trace("{}: Put {} via NETCONF: {} with payload {}", id, store, path, data);
- masterTxActor.tell(new PutRequest(store, msg), ActorRef.noSender());
- }
-
- public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
- final NormalizedNode<?, ?> data, final Object identifier) {
- Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier);
- final NormalizedNodeMessage msg = new NormalizedNodeMessage(path, data);
- LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, path, data);
- masterTxActor.tell(new MergeRequest(store, msg), ActorRef.noSender());
- }
-
-}
+++ /dev/null
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.util.Timeout;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * ProxyWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
- * {@link org.opendaylight.netconf.topology.singleton.impl.actors.WriteTransactionActor}.
- */
-public class ProxyWriteTransaction implements DOMDataWriteTransaction {
-
- private final ProxyWriteAdapter proxyWriteAdapter;
-
- /**
- * Constructor for {@code ProxyWriteTransaction}.
- *
- * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.WriteTransactionActor} ref
- * @param id device id
- * @param actorSystem system
- * @param askTimeout timeout
- */
- public ProxyWriteTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
- final Timeout askTimeout) {
- proxyWriteAdapter = new ProxyWriteAdapter(masterTxActor, id, actorSystem, askTimeout);
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- @Override
- public boolean cancel() {
- return proxyWriteAdapter.cancel();
- }
-
- @Override
- public CheckedFuture<Void, TransactionCommitFailedException> submit() {
- return MappingCheckedFuture.create(commit().transform(ignored -> null, MoreExecutors.directExecutor()),
- new ExceptionMapper<TransactionCommitFailedException>("commit", TransactionCommitFailedException.class) {
- @Override
- protected TransactionCommitFailedException newWithCause(String message, Throwable cause) {
- return new TransactionCommitFailedException(message, cause);
- }
- });
- }
-
- @Override
- public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
- return proxyWriteAdapter.commit(getIdentifier());
- }
-
- @Override
- public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) {
- proxyWriteAdapter.delete(store, identifier);
- }
-
- @Override
- public void put(final LogicalDatastoreType store, final YangInstanceIdentifier identifier,
- final NormalizedNode<?, ?> data) {
- proxyWriteAdapter.put(store, identifier, data, getIdentifier());
- }
-
- @Override
- public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier identifier,
- final NormalizedNode<?, ?> data) {
- proxyWriteAdapter.merge(store, identifier, data, getIdentifier());
- }
-
- @Override
- public Object getIdentifier() {
- return this;
- }
-}
return createTopologyListPath(topologyId).child(Node.class);
}
- public static DocumentedException createMasterIsDownException(final RemoteDeviceId id) {
- return new DocumentedException(id + ":Master is down. Please try again.",
+ public static DocumentedException createMasterIsDownException(final RemoteDeviceId id, final Exception cause) {
+ return new DocumentedException(id + ":Master is down. Please try again.", cause,
DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.OPERATION_FAILED,
DocumentedException.ErrorSeverity.WARNING);
}
+++ /dev/null
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.messages.transactions;
-
-import akka.actor.ActorRef;
-import java.io.Serializable;
-
-public class NewReadTransactionReply implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final ActorRef txActor;
-
- public NewReadTransactionReply(final ActorRef txActor) {
- this.txActor = txActor;
- }
-
- public ActorRef getTxActor() {
- return txActor;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.messages.transactions;
-
-import akka.actor.ActorRef;
-import java.io.Serializable;
-
-public class NewReadWriteTransactionReply implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final ActorRef txActor;
-
- public NewReadWriteTransactionReply(final ActorRef txActor) {
- this.txActor = txActor;
- }
-
- public ActorRef getTxActor() {
- return txActor;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.messages.transactions;
-
-import akka.actor.ActorRef;
-import java.io.Serializable;
-
-public class NewWriteTransactionReply implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final ActorRef txActor;
-
- public NewWriteTransactionReply(final ActorRef txActor) {
- this.txActor = txActor;
- }
-
- public ActorRef getTxActor() {
- return txActor;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.messages.transactions;
-
-import java.io.Serializable;
-
-/**
- * Message sent from master back to the slave when submit fails, with the offending exception attached.
- */
-public class SubmitFailedReply implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final Throwable throwable;
-
- public SubmitFailedReply(final Throwable throwable) {
- this.throwable = throwable;
- }
-
- public Throwable getThrowable() {
- return throwable;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.messages.transactions;
-
-import java.io.Serializable;
-
-/**
- * Message sent from master back to the slave when submit is successfully performed.
- */
-public class SubmitReply implements Serializable {
- private static final long serialVersionUID = 1L;
-}
--- /dev/null
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import akka.actor.ActorSystem;
+import akka.actor.Status.Success;
+import akka.testkit.TestProbe;
+import akka.testkit.javadsl.TestKit;
+import akka.util.Timeout;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Unit tests for ProxyDOMDataBroker.
+ *
+ * @author Thomas Pantelis
+ */
+public class ProxyDOMDataBrokerTest {
+ private static final RemoteDeviceId DEVICE_ID =
+ new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
+
+ private static ActorSystem system = ActorSystem.apply();
+
+ private final TestProbe masterActor = new TestProbe(system);
+ private final ProxyDOMDataBroker proxy = new ProxyDOMDataBroker(DEVICE_ID, masterActor.ref(), system.dispatcher(),
+ Timeout.apply(5, TimeUnit.SECONDS));
+
+ @AfterClass
+ public static void staticTearDown() {
+ TestKit.shutdownActorSystem(system, Boolean.TRUE);
+ }
+
+ @Test
+ public void testNewReadOnlyTransaction() {
+ final DOMDataReadOnlyTransaction tx = proxy.newReadOnlyTransaction();
+ masterActor.expectMsgClass(NewReadTransactionRequest.class);
+ masterActor.reply(new Success(masterActor.ref()));
+
+ assertEquals(DEVICE_ID, tx.getIdentifier());
+
+ tx.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
+ masterActor.expectMsgClass(ReadRequest.class);
+ }
+
+ @Test
+ public void testNewWriteOnlyTransaction() {
+ final DOMDataWriteTransaction tx = proxy.newWriteOnlyTransaction();
+ masterActor.expectMsgClass(NewWriteTransactionRequest.class);
+ masterActor.reply(new Success(masterActor.ref()));
+
+ assertEquals(DEVICE_ID, tx.getIdentifier());
+
+ tx.delete(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
+ masterActor.expectMsgClass(DeleteRequest.class);
+ }
+
+ @Test
+ public void testNewReadWriteTransaction() {
+ final DOMDataReadWriteTransaction tx = proxy.newReadWriteTransaction();
+ masterActor.expectMsgClass(NewReadWriteTransactionRequest.class);
+ masterActor.reply(new Success(masterActor.ref()));
+
+ assertEquals(DEVICE_ID, tx.getIdentifier());
+
+ tx.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
+ masterActor.expectMsgClass(ReadRequest.class);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testCreateTransactionChain() {
+ proxy.createTransactionChain(null);
+ }
+
+ @Test
+ public void testGetSupportedExtensions() {
+ assertTrue(proxy.getSupportedExtensions().isEmpty());
+ }
+}
package org.opendaylight.netconf.topology.singleton.impl.actors;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import akka.testkit.TestProbe;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Futures;
-import org.junit.After;
+import akka.testkit.javadsl.TestKit;
+import org.junit.AfterClass;
import org.junit.Before;
-import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-
-public class ReadTransactionActorTest {
- private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
- private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+public class ReadTransactionActorTest extends ReadTransactionActorTestAdapter {
+ private static ActorSystem system = ActorSystem.apply();
@Mock
- private DOMDataReadOnlyTransaction deviceReadTx;
- private TestProbe probe;
- private ActorSystem system;
- private TestActorRef<ReadTransactionActor> actorRef;
+ private DOMDataReadOnlyTransaction mockReadTx;
@Before
- public void setUp() throws Exception {
+ public void setUp() {
MockitoAnnotations.initMocks(this);
- system = ActorSystem.apply();
- probe = TestProbe.apply(system);
- actorRef = TestActorRef.create(system, ReadTransactionActor.props(deviceReadTx), "testA");
- }
-
- @After
- public void tearDown() throws Exception {
- JavaTestKit.shutdownActorSystem(system, null, true);
- }
-
- @Test
- public void testRead() throws Exception {
- final ContainerNode node = Builders.containerBuilder()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
- .build();
- when(deviceReadTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.of(node)));
- actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
- verify(deviceReadTx).read(STORE, PATH);
- probe.expectMsgClass(NormalizedNodeMessage.class);
- }
-
- @Test
- public void testReadEmpty() throws Exception {
- when(deviceReadTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
- actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
- verify(deviceReadTx).read(STORE, PATH);
- probe.expectMsgClass(EmptyReadResponse.class);
- }
-
- @Test
- public void testReadFailure() throws Exception {
- final ReadFailedException cause = new ReadFailedException("fail");
- when(deviceReadTx.read(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
- actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
- verify(deviceReadTx).read(STORE, PATH);
- probe.expectMsg(cause);
- }
-
- @Test
- public void testExists() throws Exception {
- when(deviceReadTx.exists(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(true));
- actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref());
- verify(deviceReadTx).exists(STORE, PATH);
- probe.expectMsg(true);
+ init(mockReadTx, system, TestActorRef.create(system, ReadTransactionActor.props(mockReadTx)));
}
- @Test
- public void testExistsFailure() throws Exception {
- final ReadFailedException cause = new ReadFailedException("fail");
- when(deviceReadTx.exists(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
- actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref());
- verify(deviceReadTx).exists(STORE, PATH);
- probe.expectMsg(cause);
+ @AfterClass
+ public static void staticTearDown() {
+ TestKit.shutdownActorSystem(system, Boolean.TRUE);
}
-}
\ No newline at end of file
+}
--- /dev/null
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Status.Failure;
+import akka.testkit.TestProbe;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+
+/**
+ * Adapter for read transaction tests.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class ReadTransactionActorTestAdapter {
+ static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
+ static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+ static final Timeout TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS);
+ static final NormalizedNode<?, ?> NODE = Builders.containerBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont"))).build();
+
+ private DOMDataReadTransaction mockReadTx;
+ private TestProbe probe;
+ private ActorRef actorRef;
+
+ public void init(DOMDataReadTransaction inMockReadTx, ActorSystem system, ActorRef inActorRef) {
+ this.mockReadTx = inMockReadTx;
+ this.probe = TestProbe.apply(system);
+ this.actorRef = inActorRef;
+ }
+
+ @Test
+ public void testRead() {
+ when(mockReadTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.of(NODE)));
+ actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
+
+ verify(mockReadTx).read(STORE, PATH);
+ final NormalizedNodeMessage response = probe.expectMsgClass(NormalizedNodeMessage.class);
+ assertEquals(NODE, response.getNode());
+ }
+
+ @Test
+ public void testReadEmpty() {
+ when(mockReadTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
+ actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
+
+ verify(mockReadTx).read(STORE, PATH);
+ probe.expectMsgClass(EmptyReadResponse.class);
+ }
+
+ @Test
+ public void testReadFailure() {
+ final ReadFailedException cause = new ReadFailedException("fail");
+ when(mockReadTx.read(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+ actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
+
+ verify(mockReadTx).read(STORE, PATH);
+ final Failure response = probe.expectMsgClass(Failure.class);
+ assertEquals(cause, response.cause());
+ }
+
+ @Test
+ public void testExists() {
+ when(mockReadTx.exists(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(true));
+ actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref());
+
+ verify(mockReadTx).exists(STORE, PATH);
+ probe.expectMsg(true);
+ }
+
+ @Test
+ public void testExistsFailure() {
+ final ReadFailedException cause = new ReadFailedException("fail");
+ when(mockReadTx.exists(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+ actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref());
+
+ verify(mockReadTx).exists(STORE, PATH);
+ final Failure response = probe.expectMsgClass(Failure.class);
+ assertEquals(cause, response.cause());
+ }
+}
package org.opendaylight.netconf.topology.singleton.impl.actors;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import akka.testkit.TestProbe;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Futures;
+import akka.testkit.javadsl.TestKit;
import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Assert;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
public class ReadWriteTransactionActorTest {
-
- private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
- private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
- private static final Timeout TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS);
+ private static ActorSystem system = ActorSystem.apply();
@Mock
- private DOMDataReadWriteTransaction deviceReadWriteTx;
- private TestProbe probe;
- private ActorSystem system;
- private TestActorRef<WriteTransactionActor> actorRef;
- private ContainerNode node;
+ private DOMDataReadWriteTransaction mockReadWriteTx;
+
+ private final ReadTransactionActorTestAdapter readTestAdapter = new ReadTransactionActorTestAdapter() {};
+ private final WriteTransactionActorTestAdapter writeTestAdapter = new WriteTransactionActorTestAdapter() {};
@Before
- public void setUp() throws Exception {
+ public void setUp() {
MockitoAnnotations.initMocks(this);
- system = ActorSystem.apply();
- probe = TestProbe.apply(system);
- node = Builders.containerBuilder()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
- .build();
- actorRef = TestActorRef.create(system, ReadWriteTransactionActor.props(deviceReadWriteTx,
- Duration.apply(2, TimeUnit.SECONDS)), "testA");
+ TestActorRef<?> actorRef = TestActorRef.create(system, ReadWriteTransactionActor.props(mockReadWriteTx,
+ Duration.apply(2, TimeUnit.SECONDS)));
+ readTestAdapter.init(mockReadWriteTx, system, actorRef);
+ writeTestAdapter.init(mockReadWriteTx, system, actorRef);
}
- @After
- public void tearDown() throws Exception {
- JavaTestKit.shutdownActorSystem(system, null, true);
+ @AfterClass
+ public static void staticTearDown() {
+ TestKit.shutdownActorSystem(system, Boolean.TRUE);
}
@Test
- public void testRead() throws Exception {
- when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.of(node)));
- actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
- verify(deviceReadWriteTx).read(STORE, PATH);
- probe.expectMsgClass(NormalizedNodeMessage.class);
+ public void testRead() {
+ readTestAdapter.testRead();
}
@Test
- public void testReadEmpty() throws Exception {
- when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
- actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
- verify(deviceReadWriteTx).read(STORE, PATH);
- probe.expectMsgClass(EmptyReadResponse.class);
+ public void testReadEmpty() {
+ readTestAdapter.testReadEmpty();
}
@Test
- public void testReadFailure() throws Exception {
- final ReadFailedException cause = new ReadFailedException("fail");
- when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
- actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
- verify(deviceReadWriteTx).read(STORE, PATH);
- probe.expectMsg(cause);
+ public void testReadFailure() {
+ readTestAdapter.testReadFailure();
}
@Test
- public void testExists() throws Exception {
- when(deviceReadWriteTx.exists(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(true));
- actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref());
- verify(deviceReadWriteTx).exists(STORE, PATH);
- probe.expectMsg(true);
+ public void testExists() {
+ readTestAdapter.testExists();
}
@Test
- public void testExistsFailure() throws Exception {
- final ReadFailedException cause = new ReadFailedException("fail");
- when(deviceReadWriteTx.exists(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
- actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref());
- verify(deviceReadWriteTx).exists(STORE, PATH);
- probe.expectMsg(cause);
+ public void testExistsFailure() {
+ readTestAdapter.testExistsFailure();
}
@Test
- public void testPut() throws Exception {
- final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
- actorRef.tell(new PutRequest(STORE, normalizedNodeMessage), probe.ref());
- verify(deviceReadWriteTx).put(STORE, PATH, node);
+ public void testPut() {
+ writeTestAdapter.testPut();
}
@Test
- public void testMerge() throws Exception {
- final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
- actorRef.tell(new MergeRequest(STORE, normalizedNodeMessage), probe.ref());
- verify(deviceReadWriteTx).merge(STORE, PATH, node);
+ public void testMerge() {
+ writeTestAdapter.testMerge();
}
@Test
- public void testDelete() throws Exception {
- actorRef.tell(new DeleteRequest(STORE, PATH), probe.ref());
- verify(deviceReadWriteTx).delete(STORE, PATH);
+ public void testDelete() {
+ writeTestAdapter.testDelete();
}
@Test
public void testCancel() throws Exception {
- when(deviceReadWriteTx.cancel()).thenReturn(true);
- final Future<Object> cancelFuture = Patterns.ask(actorRef, new CancelRequest(), TIMEOUT);
- final Object result = Await.result(cancelFuture, TIMEOUT.duration());
- Assert.assertTrue(result instanceof Boolean);
- verify(deviceReadWriteTx).cancel();
- Assert.assertTrue((Boolean) result);
+ writeTestAdapter.testCancel();
}
@Test
public void testSubmit() throws Exception {
- when(deviceReadWriteTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
- final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
- final Object result = Await.result(submitFuture, TIMEOUT.duration());
- Assert.assertTrue(result instanceof SubmitReply);
- verify(deviceReadWriteTx).submit();
+ writeTestAdapter.testSubmit();
}
@Test
public void testSubmitFail() throws Exception {
- final RpcError rpcError =
- RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "fail", "fail");
- final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail", rpcError);
- when(deviceReadWriteTx.submit()).thenReturn(Futures.immediateFailedCheckedFuture(cause));
- final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
- final Object result = Await.result(submitFuture, TIMEOUT.duration());
- Assert.assertTrue(result instanceof SubmitFailedReply);
- Assert.assertEquals(cause, ((SubmitFailedReply)result).getThrowable());
- verify(deviceReadWriteTx).submit();
+ writeTestAdapter.testSubmitFail();
}
@Test
public void testIdleTimeout() throws Exception {
- final TestProbe testProbe = new TestProbe(system);
- testProbe.watch(actorRef);
- verify(deviceReadWriteTx, timeout(3000)).cancel();
- testProbe.expectTerminated(actorRef, TIMEOUT.duration());
+ writeTestAdapter.testIdleTimeout();
}
}
package org.opendaylight.netconf.topology.singleton.impl.actors;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import akka.testkit.TestProbe;
-import akka.util.Timeout;
-import com.google.common.util.concurrent.Futures;
+import akka.testkit.javadsl.TestKit;
import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Assert;
+import org.junit.AfterClass;
import org.junit.Before;
-import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
-public class WriteTransactionActorTest {
- private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
- private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
- private static final Timeout TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS);
+public class WriteTransactionActorTest extends WriteTransactionActorTestAdapter {
+ private static ActorSystem system = ActorSystem.apply();
@Mock
- private DOMDataWriteTransaction deviceWriteTx;
- private TestProbe probe;
- private ActorSystem system;
- private TestActorRef<WriteTransactionActor> actorRef;
- private NormalizedNode<?, ?> node;
+ private DOMDataWriteTransaction mockWriteTx;
@Before
- public void setUp() throws Exception {
+ public void setUp() {
MockitoAnnotations.initMocks(this);
- system = ActorSystem.apply();
- probe = TestProbe.apply(system);
- node = Builders.containerBuilder()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
- .build();
- actorRef = TestActorRef.create(system, WriteTransactionActor.props(deviceWriteTx,
- Duration.apply(2, TimeUnit.SECONDS)), "testA");
- }
-
- @After
- public void tearDown() throws Exception {
- JavaTestKit.shutdownActorSystem(system, null, true);
- }
-
- @Test
- public void testPut() throws Exception {
- final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
- actorRef.tell(new PutRequest(STORE, normalizedNodeMessage), probe.ref());
- verify(deviceWriteTx).put(STORE, PATH, node);
- }
-
- @Test
- public void testMerge() throws Exception {
- final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
- actorRef.tell(new MergeRequest(STORE, normalizedNodeMessage), probe.ref());
- verify(deviceWriteTx).merge(STORE, PATH, node);
- }
-
- @Test
- public void testDelete() throws Exception {
- actorRef.tell(new DeleteRequest(STORE, PATH), probe.ref());
- verify(deviceWriteTx).delete(STORE, PATH);
- }
-
- @Test
- public void testCancel() throws Exception {
- when(deviceWriteTx.cancel()).thenReturn(true);
- final Future<Object> cancelFuture = Patterns.ask(actorRef, new CancelRequest(), TIMEOUT);
- final Object result = Await.result(cancelFuture, TIMEOUT.duration());
- Assert.assertTrue(result instanceof Boolean);
- verify(deviceWriteTx).cancel();
- Assert.assertTrue((Boolean) result);
- }
-
- @Test
- public void testSubmit() throws Exception {
- when(deviceWriteTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
- final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
- final Object result = Await.result(submitFuture, TIMEOUT.duration());
- Assert.assertTrue(result instanceof SubmitReply);
- verify(deviceWriteTx).submit();
- }
-
- @Test
- public void testSubmitFail() throws Exception {
- final RpcError rpcError =
- RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "fail", "fail");
- final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail", rpcError);
- when(deviceWriteTx.submit()).thenReturn(Futures.immediateFailedCheckedFuture(cause));
- final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
- final Object result = Await.result(submitFuture, TIMEOUT.duration());
- Assert.assertTrue(result instanceof SubmitFailedReply);
- Assert.assertEquals(cause, ((SubmitFailedReply)result).getThrowable());
- verify(deviceWriteTx).submit();
+ init(mockWriteTx, system, TestActorRef.create(system,
+ WriteTransactionActor.props(mockWriteTx, Duration.apply(2, TimeUnit.SECONDS))));
}
- @Test
- public void testIdleTimeout() throws Exception {
- final TestProbe testProbe = new TestProbe(system);
- testProbe.watch(actorRef);
- verify(deviceWriteTx, timeout(3000)).cancel();
- testProbe.expectTerminated(actorRef, TIMEOUT.duration());
+ @AfterClass
+ public static void staticTearDown() {
+ TestKit.shutdownActorSystem(system, Boolean.TRUE);
}
-
}
--- /dev/null
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActorTestAdapter.NODE;
+import static org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActorTestAdapter.PATH;
+import static org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActorTestAdapter.STORE;
+import static org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActorTestAdapter.TIMEOUT;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
+import akka.testkit.TestProbe;
+import com.google.common.util.concurrent.Futures;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+
+/**
+ * Adapter for write transaction tests.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class WriteTransactionActorTestAdapter {
+ private DOMDataWriteTransaction mockWriteTx;
+ private TestProbe probe;
+ private ActorRef actorRef;
+ private ActorSystem system;
+
+ public void init(DOMDataWriteTransaction inMockWriteTx, ActorSystem inSystem, ActorRef inActorRef) {
+ this.mockWriteTx = inMockWriteTx;
+ this.probe = TestProbe.apply(inSystem);
+ this.actorRef = inActorRef;
+ this.system = inSystem;
+ }
+
+ @Test
+ public void testPut() {
+ final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, NODE);
+ actorRef.tell(new PutRequest(STORE, normalizedNodeMessage), probe.ref());
+ verify(mockWriteTx).put(STORE, PATH, NODE);
+ }
+
+ @Test
+ public void testMerge() {
+ final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, NODE);
+ actorRef.tell(new MergeRequest(STORE, normalizedNodeMessage), probe.ref());
+ verify(mockWriteTx).merge(STORE, PATH, NODE);
+ }
+
+ @Test
+ public void testDelete() {
+ actorRef.tell(new DeleteRequest(STORE, PATH), probe.ref());
+ verify(mockWriteTx).delete(STORE, PATH);
+ }
+
+ @Test
+ public void testCancel() throws Exception {
+ when(mockWriteTx.cancel()).thenReturn(true);
+ actorRef.tell(new CancelRequest(), probe.ref());
+
+ verify(mockWriteTx).cancel();
+ probe.expectMsg(true);
+ }
+
+ @Test
+ public void testSubmit() throws Exception {
+ when(mockWriteTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
+ actorRef.tell(new SubmitRequest(), probe.ref());
+
+ verify(mockWriteTx).submit();
+ probe.expectMsgClass(Success.class);
+ }
+
+ @Test
+ public void testSubmitFail() throws Exception {
+ final RpcError rpcError =
+ RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "fail", "fail");
+ final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail", rpcError);
+ when(mockWriteTx.submit()).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+ actorRef.tell(new SubmitRequest(), probe.ref());
+
+ verify(mockWriteTx).submit();
+ final Failure response = probe.expectMsgClass(Failure.class);
+ assertEquals(cause, response.cause());
+ }
+
+ @Test
+ public void testIdleTimeout() throws Exception {
+ final TestProbe testProbe = new TestProbe(system);
+ testProbe.watch(actorRef);
+ verify(mockWriteTx, timeout(3000)).cancel();
+ testProbe.expectTerminated(actorRef, TIMEOUT.duration());
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestProbe;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.netconf.api.DocumentedException;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-
-public class ProxyReadTransactionTest {
- private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
- private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
-
- private ActorSystem system;
- private TestProbe masterActor;
- private ContainerNode node;
- private ProxyReadTransaction tx;
-
- @Before
- public void setUp() throws Exception {
- system = ActorSystem.apply();
- masterActor = new TestProbe(system);
- final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
- node = Builders.containerBuilder()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
- .build();
- tx = new ProxyReadTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS));
- }
-
- @After
- public void tearDown() throws Exception {
- JavaTestKit.shutdownActorSystem(system, null, true);
- }
-
- @Test
- public void testRead() throws Exception {
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
- masterActor.expectMsgClass(ReadRequest.class);
- masterActor.reply(new NormalizedNodeMessage(PATH, node));
- final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
- Assert.assertTrue(result.isPresent());
- Assert.assertEquals(node, result.get());
- }
-
- @Test
- public void testReadEmpty() throws Exception {
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
- masterActor.expectMsgClass(ReadRequest.class);
- masterActor.reply(new EmptyReadResponse());
- final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
- Assert.assertFalse(result.isPresent());
- }
-
- @Test(expected = ReadFailedException.class)
- public void testReadFail() throws Exception {
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
- masterActor.expectMsgClass(ReadRequest.class);
- masterActor.reply(new RuntimeException("fail"));
- read.checkedGet();
- }
-
- @Test
- public void testExists() throws Exception {
- final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
- masterActor.expectMsgClass(ExistsRequest.class);
- masterActor.reply(true);
- final Boolean result = read.checkedGet();
- Assert.assertTrue(result);
- }
-
- @Test(expected = ReadFailedException.class)
- public void testExistsFail() throws Exception {
- final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
- masterActor.expectMsgClass(ExistsRequest.class);
- masterActor.reply(new RuntimeException("fail"));
- read.checkedGet();
- }
-
- @Test
- public void testMasterDownRead() throws Exception {
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
- masterActor.expectMsgClass(ReadRequest.class);
- //master doesn't reply
- try {
- read.checkedGet();
- Assert.fail("Exception should be thrown");
- } catch (final ReadFailedException e) {
- final Throwable cause = e.getCause();
- Assert.assertTrue(cause instanceof DocumentedException);
- final DocumentedException de = (DocumentedException) cause;
- Assert.assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity());
- Assert.assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag());
- Assert.assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType());
- }
- }
-
-}
package org.opendaylight.netconf.topology.singleton.impl.tx;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
+import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
+import akka.dispatch.Futures;
+import akka.pattern.AskTimeoutException;
import akka.testkit.TestProbe;
+import akka.testkit.javadsl.TestKit;
import akka.util.Timeout;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Assert;
+import java.util.concurrent.TimeoutException;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
public class ProxyReadWriteTransactionTest {
+ private static final FiniteDuration EXP_NO_MESSAGE_TIMEOUT = Duration.apply(300, TimeUnit.MILLISECONDS);
+ private static final RemoteDeviceId DEVICE_ID =
+ new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
- private ActorSystem system;
+ private static ActorSystem system = ActorSystem.apply();
private TestProbe masterActor;
private ContainerNode node;
- private ProxyReadWriteTransaction tx;
@Before
- public void setUp() throws Exception {
- system = ActorSystem.apply();
+ public void setUp() {
masterActor = new TestProbe(system);
- final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
node = Builders.containerBuilder()
.withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
.build();
- tx = new ProxyReadWriteTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS));
}
- @After
- public void tearDown() throws Exception {
- JavaTestKit.shutdownActorSystem(system, null, true);
+ @AfterClass
+ public static void staticTearDown() {
+ TestKit.shutdownActorSystem(system, Boolean.TRUE);
+ }
+
+ private ProxyReadWriteTransaction newSuccessfulProxyTx() {
+ return newSuccessfulProxyTx(Timeout.apply(5, TimeUnit.SECONDS));
+ }
+
+ private ProxyReadWriteTransaction newSuccessfulProxyTx(Timeout timeout) {
+ return new ProxyReadWriteTransaction(DEVICE_ID, Futures.successful(masterActor.ref()),
+ system.dispatcher(), timeout);
}
@Test
- public void testCancel() throws Exception {
- final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
+ public void testCancel() {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ tx.cancel();
masterActor.expectMsgClass(CancelRequest.class);
- masterActor.reply(true);
- Assert.assertTrue(submit.get());
+ masterActor.reply(Boolean.TRUE);
}
@Test
- public void testCancelSubmitted() throws Exception {
- final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
- masterActor.expectMsgClass(SubmitRequest.class);
- masterActor.reply(new SubmitReply());
- submitFuture.checkedGet();
- final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
- masterActor.expectNoMsg();
- Assert.assertFalse(submit.get());
+ public void testCommit() throws InterruptedException, ExecutionException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+ commit(tx);
}
@Test
- public void testSubmit() throws Exception {
- final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
- masterActor.expectMsgClass(SubmitRequest.class);
- masterActor.reply(new SubmitReply());
- submitFuture.checkedGet();
+ public void testCommitAfterCancel() throws InterruptedException, ExecutionException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+ commit(tx);
+ assertFalse(tx.cancel());
}
@Test
- public void testDoubleSubmit() throws Exception {
- final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
- masterActor.expectMsgClass(SubmitRequest.class);
- masterActor.reply(new SubmitReply());
- submitFuture.checkedGet();
+ public void testDoubleCommit() throws InterruptedException, ExecutionException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ commit(tx);
try {
- tx.submit().checkedGet();
- Assert.fail("Should throw IllegalStateException");
+ tx.commit();
+ fail("Should throw IllegalStateException");
} catch (final IllegalStateException e) {
- masterActor.expectNoMsg();
+ masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
}
}
@Test
- public void testDelete() throws Exception {
+ public void testDelete() {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
tx.delete(STORE, PATH);
- masterActor.expectMsgClass(DeleteRequest.class);
+ final DeleteRequest deleteRequest = masterActor.expectMsgClass(DeleteRequest.class);
+ assertEquals(STORE, deleteRequest.getStore());
+ assertEquals(PATH, deleteRequest.getPath());
}
@Test
- public void testDeleteClosed() throws Exception {
- submit();
+ public void testDeleteAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ commit(tx);
try {
tx.delete(STORE, PATH);
- Assert.fail("Should throw IllegalStateException");
+ fail("Should throw IllegalStateException");
} catch (final IllegalStateException e) {
- masterActor.expectNoMsg();
+ masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
}
}
@Test
- public void testPut() throws Exception {
+ public void testPut() {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
tx.put(STORE, PATH, node);
- masterActor.expectMsgClass(PutRequest.class);
+ final PutRequest putRequest = masterActor.expectMsgClass(PutRequest.class);
+ assertEquals(STORE, putRequest.getStore());
+ assertEquals(PATH, putRequest.getNormalizedNodeMessage().getIdentifier());
+ assertEquals(node, putRequest.getNormalizedNodeMessage().getNode());
}
@Test
- public void testPutClosed() throws Exception {
- submit();
+ public void testPutAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ commit(tx);
try {
tx.put(STORE, PATH, node);
- Assert.fail("Should throw IllegalStateException");
+ fail("Should throw IllegalStateException");
} catch (final IllegalStateException e) {
- masterActor.expectNoMsg();
+ masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
}
}
@Test
- public void testMerge() throws Exception {
+ public void testMerge() {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
tx.merge(STORE, PATH, node);
- masterActor.expectMsgClass(MergeRequest.class);
+ final MergeRequest mergeRequest = masterActor.expectMsgClass(MergeRequest.class);
+ assertEquals(STORE, mergeRequest.getStore());
+ assertEquals(PATH, mergeRequest.getNormalizedNodeMessage().getIdentifier());
+ assertEquals(node, mergeRequest.getNormalizedNodeMessage().getNode());
}
@Test
- public void testMergeClosed() throws Exception {
- submit();
+ public void testMergeAfterCommit() throws InterruptedException, ExecutionException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ commit(tx);
try {
tx.merge(STORE, PATH, node);
- Assert.fail("Should throw IllegalStateException");
+ fail("Should throw IllegalStateException");
} catch (final IllegalStateException e) {
- masterActor.expectNoMsg();
+ masterActor.expectNoMessage(EXP_NO_MESSAGE_TIMEOUT);
}
}
- @Test
- public void testGetIdentifier() throws Exception {
- Assert.assertEquals(tx, tx.getIdentifier());
- }
-
- private void submit() throws TransactionCommitFailedException {
- final CheckedFuture<Void, TransactionCommitFailedException> submit = tx.submit();
+ private void commit(ProxyReadWriteTransaction tx)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ final ListenableFuture<?> submit = tx.commit();
masterActor.expectMsgClass(SubmitRequest.class);
- masterActor.reply(new SubmitReply());
- submit.checkedGet();
+ masterActor.reply(new Success(null));
+ submit.get(5, TimeUnit.SECONDS);
}
@Test
public void testRead() throws Exception {
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
- masterActor.expectMsgClass(ReadRequest.class);
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ final ListenableFuture<Optional<NormalizedNode<?, ?>>> read = tx.read(STORE, PATH);
+ final ReadRequest readRequest = masterActor.expectMsgClass(ReadRequest.class);
+ assertEquals(STORE, readRequest.getStore());
+ assertEquals(PATH, readRequest.getPath());
+
masterActor.reply(new NormalizedNodeMessage(PATH, node));
- final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
- Assert.assertTrue(result.isPresent());
- Assert.assertEquals(node, result.get());
+ final Optional<NormalizedNode<?, ?>> result = read.get(5, TimeUnit.SECONDS);
+ assertTrue(result.isPresent());
+ assertEquals(node, result.get());
}
@Test
public void testReadEmpty() throws Exception {
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ final ListenableFuture<Optional<NormalizedNode<?, ?>>> read = tx.read(STORE, PATH);
masterActor.expectMsgClass(ReadRequest.class);
masterActor.reply(new EmptyReadResponse());
- final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
- Assert.assertFalse(result.isPresent());
+ final Optional<NormalizedNode<?, ?>> result = read.get(5, TimeUnit.SECONDS);
+ assertFalse(result.isPresent());
}
- @Test(expected = ReadFailedException.class)
- public void testReadFail() throws Exception {
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+ @Test
+ public void testReadFailure() throws InterruptedException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ final ListenableFuture<Optional<NormalizedNode<?, ?>>> read = tx.read(STORE, PATH);
masterActor.expectMsgClass(ReadRequest.class);
- masterActor.reply(new RuntimeException("fail"));
- read.checkedGet();
+ final RuntimeException mockEx = new RuntimeException("fail");
+ masterActor.reply(new Failure(mockEx));
+
+ try {
+ read.get(5, TimeUnit.SECONDS);
+ fail("Exception should be thrown");
+ } catch (final ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+ assertEquals(mockEx, cause.getCause());
+ }
}
@Test
public void testExists() throws Exception {
- final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ final ListenableFuture<Boolean> read = tx.exists(STORE, PATH);
+ final ExistsRequest existsRequest = masterActor.expectMsgClass(ExistsRequest.class);
+ assertEquals(STORE, existsRequest.getStore());
+ assertEquals(PATH, existsRequest.getPath());
+
+ masterActor.reply(Boolean.TRUE);
+ final Boolean result = read.get(5, TimeUnit.SECONDS);
+ assertTrue(result);
+ }
+
+ @Test
+ public void testExistsFailure() throws InterruptedException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx();
+
+ final ListenableFuture<Boolean> read = tx.exists(STORE, PATH);
masterActor.expectMsgClass(ExistsRequest.class);
- masterActor.reply(true);
- final Boolean result = read.checkedGet();
- Assert.assertTrue(result);
+ final RuntimeException mockEx = new RuntimeException("fail");
+ masterActor.reply(new Failure(mockEx));
+
+ try {
+ read.get(5, TimeUnit.SECONDS);
+ fail("Exception should be thrown");
+ } catch (final ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+ assertEquals(mockEx, cause.getCause());
+ }
}
- @Test(expected = ReadFailedException.class)
- public void testExistsFail() throws Exception {
- final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
+ @Test
+ public void testFutureOperationsWithMasterDown() throws InterruptedException, TimeoutException {
+ ProxyReadWriteTransaction tx = newSuccessfulProxyTx(Timeout.apply(500, TimeUnit.MILLISECONDS));
+
+ ListenableFuture<?> future = tx.read(STORE, PATH);
+ masterActor.expectMsgClass(ReadRequest.class);
+
+ // master doesn't reply
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Exception should be thrown");
+ } catch (final ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+ verifyDocumentedException(cause.getCause());
+ }
+
+ future = tx.exists(STORE, PATH);
masterActor.expectMsgClass(ExistsRequest.class);
- masterActor.reply(new RuntimeException("fail"));
- read.checkedGet();
+
+ // master doesn't reply
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Exception should be thrown");
+ } catch (final ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+ verifyDocumentedException(cause.getCause());
+ }
+
+ future = tx.commit();
+ masterActor.expectMsgClass(SubmitRequest.class);
+
+ // master doesn't reply
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Exception should be thrown");
+ } catch (final ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue("Unexpected cause " + cause, cause instanceof TransactionCommitFailedException);
+ verifyDocumentedException(cause.getCause());
+ }
}
@Test
- public void testMasterDownRead() throws Exception {
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+ public void testDelayedMasterActorFuture() throws InterruptedException, TimeoutException, ExecutionException {
+ final Promise<Object> promise = Futures.promise();
+ ProxyReadWriteTransaction tx = new ProxyReadWriteTransaction(DEVICE_ID, promise.future(),
+ system.dispatcher(), Timeout.apply(5, TimeUnit.SECONDS));
+
+ final ListenableFuture<Optional<NormalizedNode<?, ?>>> read = tx.read(STORE, PATH);
+ final ListenableFuture<Boolean> exists = tx.exists(STORE, PATH);
+
+ tx.put(STORE, PATH, node);
+ tx.merge(STORE, PATH, node);
+ tx.delete(STORE, PATH);
+
+ final ListenableFuture<?> commit = tx.commit();
+
+ promise.success(masterActor.ref());
+
masterActor.expectMsgClass(ReadRequest.class);
- //master doesn't reply
+ masterActor.reply(new NormalizedNodeMessage(PATH, node));
+
+ masterActor.expectMsgClass(ExistsRequest.class);
+ masterActor.reply(Boolean.TRUE);
+
+ masterActor.expectMsgClass(PutRequest.class);
+ masterActor.expectMsgClass(MergeRequest.class);
+ masterActor.expectMsgClass(DeleteRequest.class);
+
+ masterActor.expectMsgClass(SubmitRequest.class);
+ masterActor.reply(new Success(null));
+
+ read.get(5, TimeUnit.SECONDS).isPresent();
+ assertTrue(exists.get(5, TimeUnit.SECONDS));
+ commit.get(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testFailedMasterActorFuture() throws InterruptedException, TimeoutException {
+ final AskTimeoutException mockEx = new AskTimeoutException("mock");
+ ProxyReadWriteTransaction tx = new ProxyReadWriteTransaction(DEVICE_ID, Futures.failed(mockEx),
+ system.dispatcher(), Timeout.apply(5, TimeUnit.SECONDS));
+
+ ListenableFuture<?> future = tx.read(STORE, PATH);
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Exception should be thrown");
+ } catch (final ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+ assertEquals(mockEx, cause.getCause());
+ }
+
+ future = tx.exists(STORE, PATH);
try {
- read.checkedGet();
- Assert.fail("Exception should be thrown");
- } catch (final ReadFailedException e) {
- final Throwable cause = e.getCause();
- Assert.assertTrue(cause instanceof DocumentedException);
- final DocumentedException de = (DocumentedException) cause;
- Assert.assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity());
- Assert.assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag());
- Assert.assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType());
+ future.get(5, TimeUnit.SECONDS);
+ fail("Exception should be thrown");
+ } catch (final ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue("Unexpected cause " + cause, cause instanceof ReadFailedException);
+ assertEquals(mockEx, cause.getCause());
}
+
+ tx.put(STORE, PATH, node);
+ tx.merge(STORE, PATH, node);
+ tx.delete(STORE, PATH);
+
+ future = tx.commit();
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ fail("Exception should be thrown");
+ } catch (final ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue("Unexpected cause " + cause, cause instanceof TransactionCommitFailedException);
+ assertEquals(mockEx, cause.getCause());
+ }
+ }
+
+ private void verifyDocumentedException(Throwable cause) {
+ assertTrue("Unexpected cause " + cause, cause instanceof DocumentedException);
+ final DocumentedException de = (DocumentedException) cause;
+ assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity());
+ assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag());
+ assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType());
}
}
+++ /dev/null
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestProbe;
-import akka.util.Timeout;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-
-public class ProxyWriteTransactionTest {
- private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
- private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
-
- private ActorSystem system;
- private TestProbe masterActor;
- private ContainerNode node;
- private ProxyWriteTransaction tx;
-
- @Before
- public void setUp() throws Exception {
- system = ActorSystem.apply();
- masterActor = new TestProbe(system);
- final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
- node = Builders.containerBuilder()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
- .build();
- tx = new ProxyWriteTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS));
- }
-
- @After
- public void tearDown() throws Exception {
- JavaTestKit.shutdownActorSystem(system, null, true);
- }
-
- @Test
- public void testCancel() throws Exception {
- final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
- masterActor.expectMsgClass(CancelRequest.class);
- masterActor.reply(true);
- Assert.assertTrue(submit.get());
- }
-
- @Test
- public void testCancelSubmitted() throws Exception {
- final ListenableFuture<Void> submitFuture = tx.submit();
- masterActor.expectMsgClass(SubmitRequest.class);
- masterActor.reply(new SubmitReply());
- submitFuture.get();
- final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
- masterActor.expectNoMsg();
- Assert.assertFalse(submit.get());
- }
-
- @Test
- public void testSubmit() throws Exception {
- final ListenableFuture<Void> submitFuture = tx.submit();
- masterActor.expectMsgClass(SubmitRequest.class);
- masterActor.reply(new SubmitReply());
- submitFuture.get();
- }
-
- @Test
- public void testDoubleSubmit() throws Exception {
- final ListenableFuture<Void> submitFuture = tx.submit();
- masterActor.expectMsgClass(SubmitRequest.class);
- masterActor.reply(new SubmitReply());
- submitFuture.get();
- try {
- tx.submit().checkedGet();
- Assert.fail("Should throw IllegalStateException");
- } catch (final IllegalStateException e) {
- masterActor.expectNoMsg();
- }
- }
-
- @Test
- public void testDelete() throws Exception {
- tx.delete(STORE, PATH);
- masterActor.expectMsgClass(DeleteRequest.class);
- }
-
- @Test
- public void testDeleteClosed() throws Exception {
- submit();
- try {
- tx.delete(STORE, PATH);
- Assert.fail("Should throw IllegalStateException");
- } catch (final IllegalStateException e) {
- masterActor.expectNoMsg();
- }
- }
-
- @Test
- public void testPut() throws Exception {
- tx.put(STORE, PATH, node);
- masterActor.expectMsgClass(PutRequest.class);
- }
-
- @Test
- public void testPutClosed() throws Exception {
- submit();
- try {
- tx.put(STORE, PATH, node);
- Assert.fail("Should throw IllegalStateException");
- } catch (final IllegalStateException e) {
- masterActor.expectNoMsg();
- }
- }
-
- @Test
- public void testMerge() throws Exception {
- tx.merge(STORE, PATH, node);
- masterActor.expectMsgClass(MergeRequest.class);
- }
-
- @Test
- public void testMergeClosed() throws Exception {
- submit();
- try {
- tx.merge(STORE, PATH, node);
- Assert.fail("Should throw IllegalStateException");
- } catch (final IllegalStateException e) {
- masterActor.expectNoMsg();
- }
- }
-
- @Test
- public void testGetIdentifier() throws Exception {
- Assert.assertEquals(tx, tx.getIdentifier());
- }
-
- private void submit() throws TransactionCommitFailedException {
- final CheckedFuture<Void, TransactionCommitFailedException> submit = tx.submit();
- masterActor.expectMsgClass(SubmitRequest.class);
- masterActor.reply(new SubmitReply());
- submit.checkedGet();
- }
-
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.MockitoAnnotations.initMocks;
-import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.net.InetAddresses;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.mockito.Mock;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker;
-import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
-import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
-import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-
-public class ReadOnlyTransactionTest {
- private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
- private static final int TIMEOUT_SEC = 5;
- private static ActorSystem system;
-
- @Rule
- public final ExpectedException exception = ExpectedException.none();
-
- private ActorRef masterRef;
- private ProxyDOMDataBroker slaveDataBroker;
- private List<SourceIdentifier> sourceIdentifiers;
- private YangInstanceIdentifier instanceIdentifier;
- private LogicalDatastoreType storeType;
- @Mock
- private DOMDataBroker deviceDataBroker;
- @Mock
- private DOMDataReadOnlyTransaction readTx;
- @Mock
- private DOMRpcService domRpcService;
- @Mock
- private DOMMountPointService mountPointService;
-
-
- @Before
- public void setup() throws Exception {
- initMocks(this);
-
- system = ActorSystem.create();
-
- final RemoteDeviceId remoteDeviceId = new RemoteDeviceId("netconf-topology",
- new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
-
- final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
- final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
- DEFAULT_SCHEMA_REPOSITORY, TIMEOUT, mountPointService);
-
- masterRef = TestActorRef.create(system, props, "master_read");
-
- sourceIdentifiers = Lists.newArrayList();
-
- //device read tx
- doReturn(readTx).when(deviceDataBroker).newReadOnlyTransaction();
-
- // Create slave data broker for testing proxy
- slaveDataBroker =
- new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS));
- initializeDataTest();
- instanceIdentifier = YangInstanceIdentifier.EMPTY;
- storeType = LogicalDatastoreType.CONFIGURATION;
- }
-
- @After
- public void teardown() {
- JavaTestKit.shutdownActorSystem(system, null, true);
- system = null;
- }
-
- @Test
- public void testRead() throws Exception {
- // Message: NormalizedNodeMessage
- final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "TestQname")))
- .withChild(ImmutableNodes.leafNode(QName.create("", "NodeQname"), "foo")).build();
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultNormalizedNodeMessage =
- Futures.immediateCheckedFuture(Optional.of(outputNode));
- doReturn(resultNormalizedNodeMessage).when(readTx).read(storeType, instanceIdentifier);
-
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultNodeMessageResponse =
- slaveDataBroker.newReadOnlyTransaction().read(storeType, instanceIdentifier);
-
- final Optional<NormalizedNode<?, ?>> resultNodeMessage =
- resultNodeMessageResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
- assertTrue(resultNodeMessage.isPresent());
- assertEquals(resultNodeMessage.get(), outputNode);
- }
-
- @Test
- public void testReadEmpty() throws Exception {
- // Message: EmptyReadResponse
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultEmpty =
- Futures.immediateCheckedFuture(Optional.absent());
- doReturn(resultEmpty).when(readTx).read(storeType, instanceIdentifier);
-
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultEmptyResponse =
- slaveDataBroker.newReadOnlyTransaction().read(storeType,
- instanceIdentifier);
-
- final Optional<NormalizedNode<?, ?>> resultEmptyMessage =
- resultEmptyResponse.get(TIMEOUT_SEC, TimeUnit.SECONDS);
-
- assertEquals(resultEmptyMessage, Optional.absent());
- }
-
- @Test
- public void testReadFail() throws Exception {
- // Message: Throwable
- final ReadFailedException readFailedException = new ReadFailedException("Fail", null);
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultThrowable =
- Futures.immediateFailedCheckedFuture(readFailedException);
-
- doReturn(resultThrowable).when(readTx).read(storeType, instanceIdentifier);
-
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultThrowableResponse =
- slaveDataBroker.newReadOnlyTransaction().read(storeType, instanceIdentifier);
-
- exception.expect(ReadFailedException.class);
- resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
- }
-
- @Test
- public void testExist() throws Exception {
- // Message: True
- final CheckedFuture<Boolean, ReadFailedException> resultTrue =
- Futures.immediateCheckedFuture(true);
- doReturn(resultTrue).when(readTx).exists(storeType, instanceIdentifier);
-
- final CheckedFuture<Boolean, ReadFailedException> trueResponse =
- slaveDataBroker.newReadOnlyTransaction().exists(storeType, instanceIdentifier);
-
- final Boolean trueMessage = trueResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
- assertEquals(true, trueMessage);
- }
-
- @Test
- public void testExistsNull() throws Exception {
- // Message: False, result null
- final CheckedFuture<Boolean, ReadFailedException> resultNull = Futures.immediateCheckedFuture(null);
- doReturn(resultNull).when(readTx).exists(storeType, instanceIdentifier);
-
- final CheckedFuture<Boolean, ReadFailedException> nullResponse =
- slaveDataBroker.newReadOnlyTransaction().exists(storeType,
- instanceIdentifier);
-
- final Boolean nullFalseMessage = nullResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
- assertEquals(false, nullFalseMessage);
- }
-
- @Test
- public void testExistsFalse() throws Exception {
- // Message: False
- final CheckedFuture<Boolean, ReadFailedException> resultFalse = Futures.immediateCheckedFuture(false);
- doReturn(resultFalse).when(readTx).exists(storeType, instanceIdentifier);
-
- final CheckedFuture<Boolean, ReadFailedException> falseResponse =
- slaveDataBroker.newReadOnlyTransaction().exists(storeType,
- instanceIdentifier);
-
- final Boolean falseMessage = falseResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
- assertEquals(false, falseMessage);
- }
-
- @Test
- public void testExistsFail() throws Exception {
- // Message: Throwable
- final ReadFailedException readFailedException = new ReadFailedException("Fail", null);
- final CheckedFuture<Boolean, ReadFailedException> resultThrowable =
- Futures.immediateFailedCheckedFuture(readFailedException);
- doReturn(resultThrowable).when(readTx).exists(storeType, instanceIdentifier);
-
- final CheckedFuture<Boolean, ReadFailedException> resultThrowableResponse =
- slaveDataBroker.newReadOnlyTransaction().exists(storeType, instanceIdentifier);
-
- exception.expect(ReadFailedException.class);
- resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
- }
-
- private void initializeDataTest() throws Exception {
- final Future<Object> initialDataToActor =
- Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
- domRpcService), TIMEOUT);
-
- final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
-
- assertTrue(success instanceof MasterActorDataInitialized);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import static junit.framework.TestCase.assertNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.MockitoAnnotations.initMocks;
-import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.mockito.Mock;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker;
-import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
-import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
-import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-
-public class ReadWriteTransactionTest {
- private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
- private static final int TIMEOUT_SEC = 5;
- private static ActorSystem system;
-
- @Rule
- public final ExpectedException exception = ExpectedException.none();
-
- @Mock
- private DOMDataBroker deviceDataBroker;
- @Mock
- private DOMDataReadWriteTransaction readWriteTx;
- @Mock
- private DOMRpcService domRpcService;
- @Mock
- private DOMMountPointService mountPointService;
- private ActorRef masterRef;
- private ProxyDOMDataBroker slaveDataBroker;
- private List<SourceIdentifier> sourceIdentifiers;
- private NormalizedNode<?, ?> testNode;
- private YangInstanceIdentifier instanceIdentifier;
- private LogicalDatastoreType storeType;
-
- @Before
- public void setup() throws Exception {
- initMocks(this);
-
- system = ActorSystem.create();
-
- final RemoteDeviceId remoteDeviceId = new RemoteDeviceId("netconf-topology",
- new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
-
- final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
- doReturn(Duration.apply(0, TimeUnit.SECONDS)).when(setup).getIdleTimeout();
- final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
- DEFAULT_SCHEMA_REPOSITORY, TIMEOUT, mountPointService);
-
- masterRef = TestActorRef.create(system, props, "master_read");
-
- sourceIdentifiers = Lists.newArrayList();
-
- doReturn(readWriteTx).when(deviceDataBroker).newReadWriteTransaction();
- doNothing().when(readWriteTx).put(storeType, instanceIdentifier, testNode);
- doNothing().when(readWriteTx).merge(storeType, instanceIdentifier, testNode);
- doNothing().when(readWriteTx).delete(storeType, instanceIdentifier);
-
- // Create slave data broker for testing proxy
- slaveDataBroker =
- new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS));
- initializeDataTest();
- testNode = ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "TestQname")))
- .withChild(ImmutableNodes.leafNode(QName.create("", "NodeQname"), "foo")).build();
- instanceIdentifier = YangInstanceIdentifier.EMPTY;
- storeType = LogicalDatastoreType.CONFIGURATION;
- }
-
- @After
- public void teardown() {
- JavaTestKit.shutdownActorSystem(system, null, true);
- system = null;
- }
-
- @Test
- public void testPut() throws Exception {
- // Test of invoking put on master through slave proxy
- final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
- wTx.put(storeType, instanceIdentifier, testNode);
-
- verify(readWriteTx, timeout(2000)).put(storeType, instanceIdentifier, testNode);
-
- wTx.cancel();
- }
-
- @Test
- public void testMerge() throws Exception {
- // Test of invoking merge on master through slave proxy
- final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
- wTx.merge(storeType, instanceIdentifier, testNode);
-
- verify(readWriteTx, timeout(2000)).merge(storeType, instanceIdentifier, testNode);
-
- wTx.cancel();
- }
-
- @Test
- public void testDelete() throws Exception {
- // Test of invoking delete on master through slave proxy
- final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
- wTx.delete(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
- wTx.cancel();
-
- verify(readWriteTx, timeout(2000)).delete(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
- }
-
- @Test
- public void testSubmit() throws Exception {
- final CheckedFuture<Void, TransactionCommitFailedException> resultSubmit = Futures.immediateCheckedFuture(null);
- doReturn(resultSubmit).when(readWriteTx).submit();
-
- // Without Tx
- final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
-
- final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitResponse = wTx.submit();
-
- final Object result = resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
- assertNull(result);
- }
-
- @Test
- public void testSubmitWithOperation() throws Exception {
- final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTx =
- Futures.immediateCheckedFuture(null);
- doReturn(resultSubmitTx).when(readWriteTx).submit();
- // With Tx
- final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
- wTx.delete(LogicalDatastoreType.CONFIGURATION,
- YangInstanceIdentifier.EMPTY);
-
- final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTxResponse = wTx.submit();
-
- final Object resultTx = resultSubmitTxResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
- assertNull(resultTx);
- }
-
- @Test
- public void testSubmitFail() throws Exception {
- final TransactionCommitFailedException throwable = new TransactionCommitFailedException("Fail", null);
- final CheckedFuture<Void, TransactionCommitFailedException> resultThrowable =
- Futures.immediateFailedCheckedFuture(throwable);
- doReturn(resultThrowable).when(readWriteTx).submit();
-
- final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
- wTx.delete(LogicalDatastoreType.CONFIGURATION,
- YangInstanceIdentifier.EMPTY);
- final CheckedFuture<Void, TransactionCommitFailedException> resultThrowableResponse =
- wTx.submit();
- exception.expect(TransactionCommitFailedException.class);
- resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
- }
-
- @Test
- public void testCancel() throws Exception {
- doReturn(true).when(readWriteTx).cancel();
-
- // Without Tx
- final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
- final Boolean resultFalseNoTx = wTx.cancel();
- assertEquals(true, resultFalseNoTx);
- }
-
- @Test
- public void testCancelWithOperation() throws Exception {
- doReturn(true).when(readWriteTx).cancel();
-
- // With Tx, readWriteTx test
- final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
- wTx.delete(LogicalDatastoreType.CONFIGURATION,
- YangInstanceIdentifier.EMPTY);
-
- final Boolean resultTrue = wTx.cancel();
- assertEquals(true, resultTrue);
-
- final Boolean resultFalse = wTx.cancel();
- assertEquals(false, resultFalse);
- }
-
- @Test
- public void testRead() throws Exception {
- // Message: NormalizedNodeMessage
- final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "TestQname")))
- .withChild(ImmutableNodes.leafNode(QName.create("", "NodeQname"), "foo")).build();
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultNormalizedNodeMessage =
- Futures.immediateCheckedFuture(Optional.of(outputNode));
- doReturn(resultNormalizedNodeMessage).when(readWriteTx).read(storeType, instanceIdentifier);
-
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultNodeMessageResponse =
- slaveDataBroker.newReadWriteTransaction().read(storeType, instanceIdentifier);
-
- final Optional<NormalizedNode<?, ?>> resultNodeMessage =
- resultNodeMessageResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
- assertTrue(resultNodeMessage.isPresent());
- assertEquals(resultNodeMessage.get(), outputNode);
- }
-
- @Test
- public void testReadEmpty() throws Exception {
- // Message: EmptyReadResponse
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultEmpty =
- Futures.immediateCheckedFuture(Optional.absent());
- doReturn(resultEmpty).when(readWriteTx).read(storeType, instanceIdentifier);
-
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultEmptyResponse =
- slaveDataBroker.newReadWriteTransaction().read(storeType,
- instanceIdentifier);
-
- final Optional<NormalizedNode<?, ?>> resultEmptyMessage =
- resultEmptyResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
- assertEquals(resultEmptyMessage, Optional.absent());
- }
-
- @Test
- public void testReadFail() throws Exception {
- // Message: Throwable
- final ReadFailedException readFailedException = new ReadFailedException("Fail", null);
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultThrowable =
- Futures.immediateFailedCheckedFuture(readFailedException);
-
- doReturn(resultThrowable).when(readWriteTx).read(storeType, instanceIdentifier);
-
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultThrowableResponse =
- slaveDataBroker.newReadWriteTransaction().read(storeType, instanceIdentifier);
-
- exception.expect(ReadFailedException.class);
- resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
- }
-
- @Test
- public void testExist() throws Exception {
- // Message: True
- final CheckedFuture<Boolean, ReadFailedException> resultTrue =
- Futures.immediateCheckedFuture(true);
- doReturn(resultTrue).when(readWriteTx).exists(storeType, instanceIdentifier);
-
- final CheckedFuture<Boolean, ReadFailedException> trueResponse =
- slaveDataBroker.newReadWriteTransaction().exists(storeType, instanceIdentifier);
-
- final Boolean trueMessage = trueResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
- assertEquals(true, trueMessage);
- }
-
- @Test
- public void testExistsNull() throws Exception {
- // Message: False, result null
- final CheckedFuture<Boolean, ReadFailedException> resultNull = Futures.immediateCheckedFuture(null);
- doReturn(resultNull).when(readWriteTx).exists(storeType, instanceIdentifier);
-
- final CheckedFuture<Boolean, ReadFailedException> nullResponse =
- slaveDataBroker.newReadWriteTransaction().exists(storeType,
- instanceIdentifier);
-
- final Boolean nullFalseMessage = nullResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
- assertEquals(false, nullFalseMessage);
- }
-
- @Test
- public void testExistsFalse() throws Exception {
- // Message: False
- final CheckedFuture<Boolean, ReadFailedException> resultFalse = Futures.immediateCheckedFuture(false);
- doReturn(resultFalse).when(readWriteTx).exists(storeType, instanceIdentifier);
-
- final CheckedFuture<Boolean, ReadFailedException> falseResponse =
- slaveDataBroker.newReadWriteTransaction().exists(storeType,
- instanceIdentifier);
-
- final Boolean falseMessage = falseResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
- assertEquals(false, falseMessage);
- }
-
- @Test
- public void testExistsFail() throws Exception {
- // Message: Throwable
- final ReadFailedException readFailedException = new ReadFailedException("Fail", null);
- final CheckedFuture<Boolean, ReadFailedException> resultThrowable =
- Futures.immediateFailedCheckedFuture(readFailedException);
- doReturn(resultThrowable).when(readWriteTx).exists(storeType, instanceIdentifier);
-
- final CheckedFuture<Boolean, ReadFailedException> resultThrowableResponse =
- slaveDataBroker.newReadWriteTransaction().exists(storeType, instanceIdentifier);
-
- exception.expect(ReadFailedException.class);
- resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
- }
-
- private void initializeDataTest() throws Exception {
- final Future<Object> initialDataToActor =
- Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
- domRpcService), TIMEOUT);
-
- final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
-
- assertTrue(success instanceof MasterActorDataInitialized);
- }
-
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import static junit.framework.TestCase.assertNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.MockitoAnnotations.initMocks;
-import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import akka.util.Timeout;
-import com.google.common.collect.Lists;
-import com.google.common.net.InetAddresses;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.mockito.Mock;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker;
-import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
-import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
-import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-
-public class WriteOnlyTransactionTest {
- private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
- private static final int TIMEOUT_SEC = 5;
- private static ActorSystem system;
-
- @Rule
- public final ExpectedException exception = ExpectedException.none();
-
- @Mock
- private DOMDataBroker deviceDataBroker;
- @Mock
- private DOMDataWriteTransaction writeTx;
- @Mock
- private DOMRpcService domRpcService;
- @Mock
- private DOMMountPointService mountPointService;
- private ActorRef masterRef;
- private ProxyDOMDataBroker slaveDataBroker;
- private List<SourceIdentifier> sourceIdentifiers;
- private NormalizedNode<?, ?> testNode;
- private YangInstanceIdentifier instanceIdentifier;
- private LogicalDatastoreType storeType;
-
- @Before
- public void setup() throws Exception {
- initMocks(this);
-
- system = ActorSystem.create();
-
- final RemoteDeviceId remoteDeviceId = new RemoteDeviceId("netconf-topology",
- new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
-
- final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
- doReturn(Duration.apply(0, TimeUnit.SECONDS)).when(setup).getIdleTimeout();
- final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
- DEFAULT_SCHEMA_REPOSITORY, TIMEOUT, mountPointService);
-
- masterRef = TestActorRef.create(system, props, "master_read");
-
- sourceIdentifiers = Lists.newArrayList();
-
- final DOMDataReadOnlyTransaction readTx = mock(DOMDataReadOnlyTransaction.class);
-
- doReturn(writeTx).when(deviceDataBroker).newWriteOnlyTransaction();
- doReturn(readTx).when(deviceDataBroker).newReadOnlyTransaction();
- doNothing().when(writeTx).put(storeType, instanceIdentifier, testNode);
- doNothing().when(writeTx).merge(storeType, instanceIdentifier, testNode);
- doNothing().when(writeTx).delete(storeType, instanceIdentifier);
-
- // Create slave data broker for testing proxy
- slaveDataBroker =
- new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS));
- initializeDataTest();
- testNode = ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "TestQname")))
- .withChild(ImmutableNodes.leafNode(QName.create("", "NodeQname"), "foo")).build();
- instanceIdentifier = YangInstanceIdentifier.EMPTY;
- storeType = LogicalDatastoreType.CONFIGURATION;
- }
-
- @After
- public void teardown() {
- JavaTestKit.shutdownActorSystem(system, null, true);
- system = null;
- }
-
- @Test
- public void testPut() throws Exception {
- // Test of invoking put on master through slave proxy
- final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
- wTx.put(storeType, instanceIdentifier, testNode);
-
- verify(writeTx, timeout(2000)).put(storeType, instanceIdentifier, testNode);
-
- wTx.cancel();
- }
-
- @Test
- public void testMerge() throws Exception {
- // Test of invoking merge on master through slave proxy
- final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
- wTx.merge(storeType, instanceIdentifier, testNode);
-
- verify(writeTx, timeout(2000)).merge(storeType, instanceIdentifier, testNode);
-
- wTx.cancel();
- }
-
- @Test
- public void testDelete() throws Exception {
- // Test of invoking delete on master through slave proxy
- final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
- wTx.delete(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
- wTx.cancel();
-
- verify(writeTx, timeout(2000)).delete(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
- }
-
- @Test
- public void testSubmit() throws Exception {
- final CheckedFuture<Void, TransactionCommitFailedException> resultSubmit = Futures.immediateCheckedFuture(null);
- doReturn(resultSubmit).when(writeTx).submit();
-
- // Without Tx
- final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
-
- final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitResponse = wTx.submit();
-
- final Object result = resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
- assertNull(result);
- }
-
- @Test
- public void testSubmitWithOperation() throws Exception {
- final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTx =
- Futures.immediateCheckedFuture(null);
- doReturn(resultSubmitTx).when(writeTx).submit();
- // With Tx
- final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
- wTx.delete(LogicalDatastoreType.CONFIGURATION,
- YangInstanceIdentifier.EMPTY);
-
- final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTxResponse = wTx.submit();
-
- final Object resultTx = resultSubmitTxResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
-
- assertNull(resultTx);
- }
-
- @Test
- public void testSubmitFail() throws Exception {
- final TransactionCommitFailedException throwable = new TransactionCommitFailedException("Fail", null);
- final CheckedFuture<Void, TransactionCommitFailedException> resultThrowable =
- Futures.immediateFailedCheckedFuture(throwable);
- doReturn(resultThrowable).when(writeTx).submit();
-
- final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
- wTx.delete(LogicalDatastoreType.CONFIGURATION,
- YangInstanceIdentifier.EMPTY);
- final CheckedFuture<Void, TransactionCommitFailedException> resultThrowableResponse =
- wTx.submit();
- exception.expect(TransactionCommitFailedException.class);
- resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
- }
-
- @Test
- public void testCancel() throws Exception {
- doReturn(true).when(writeTx).cancel();
-
- // Without Tx
- final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
- final Boolean resultFalseNoTx = wTx.cancel();
- assertEquals(true, resultFalseNoTx);
- }
-
- @Test
- public void testCancelWithOperation() throws Exception {
- doReturn(true).when(writeTx).cancel();
-
- // With Tx, readWriteTx test
- final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
- wTx.delete(LogicalDatastoreType.CONFIGURATION,
- YangInstanceIdentifier.EMPTY);
-
- final Boolean resultTrue = wTx.cancel();
- assertEquals(true, resultTrue);
-
- final Boolean resultFalse = wTx.cancel();
- assertEquals(false, resultFalse);
- }
-
- private void initializeDataTest() throws Exception {
- final Future<Object> initialDataToActor =
- Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
- domRpcService), TIMEOUT);
-
- final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
-
- assertTrue(success instanceof MasterActorDataInitialized);
- }
-
-}