import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
-import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyReadTransaction;
+import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyReadWriteTransaction;
import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyWriteTransaction;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
* @param actorSystem system
* @param id id
* @param masterNode {@link org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor} ref
- * @param askTimeout ask timeout
+ * @param askTimeout ask timeout
*/
public ProxyDOMDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
final ActorRef masterNode, final Timeout askTimeout) {
@Override
public DOMDataReadWriteTransaction newReadWriteTransaction() {
- return new ReadWriteTx(newReadOnlyTransaction(), newWriteOnlyTransaction());
+ final Future<Object> txActorFuture = Patterns.ask(masterNode, new NewReadWriteTransactionRequest(), askTimeout);
+ try {
+ final Object msg = Await.result(txActorFuture, askTimeout.duration());
+ if (msg instanceof Throwable) {
+ throw (Throwable) msg;
+ }
+ Preconditions.checkState(msg instanceof NewReadWriteTransactionReply);
+ final NewReadWriteTransactionReply reply = (NewReadWriteTransactionReply) msg;
+ return new ProxyReadWriteTransaction(reply.getTxActor(), id, actorSystem, askTimeout);
+ } catch (final Throwable t) {
+ throw new IllegalStateException("Can't create ProxyReadTransaction", t);
+ }
}
@Override
import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
sender().tell(t, self());
}
+ } else if (message instanceof NewReadWriteTransactionRequest) {
+ try {
+ final DOMDataReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
+ final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
+ sender().tell(new NewReadWriteTransactionReply(txActor), self());
+ } catch (final Throwable t) {
+ sender().tell(t, self());
+ }
} else if (message instanceof InvokeRpcMessage) { // master
final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+class ReadAdapter {
+
+ private final DOMDataReadTransaction tx;
+
+ public ReadAdapter(final DOMDataReadTransaction tx) {
+ this.tx = tx;
+ }
+
+ public void handle(final Object message, final ActorRef sender, final ActorRef self) throws Throwable {
+ if (message instanceof ReadRequest) {
+
+ final ReadRequest readRequest = (ReadRequest) message;
+ final YangInstanceIdentifier path = readRequest.getPath();
+ final LogicalDatastoreType store = readRequest.getStore();
+ read(path, store, sender, self);
+
+ } else if (message instanceof ExistsRequest) {
+ final ExistsRequest readRequest = (ExistsRequest) message;
+ final YangInstanceIdentifier path = readRequest.getPath();
+ final LogicalDatastoreType store = readRequest.getStore();
+ exists(path, store, sender, self);
+ }
+ }
+
+ private void read(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender,
+ final ActorRef self) {
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(store, path);
+ Futures.addCallback(read, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+
+ @Override
+ public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
+ if (!result.isPresent()) {
+ sender.tell(new EmptyReadResponse(), self);
+ return;
+ }
+ sender.tell(new NormalizedNodeMessage(path, result.get()), self);
+ }
+
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ sender.tell(throwable, self);
+ }
+ });
+ }
+
+ private void exists(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender,
+ final ActorRef self) {
+ final CheckedFuture<Boolean, ReadFailedException> readFuture = tx.exists(store, path);
+ Futures.addCallback(readFuture, new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(final Boolean result) {
+ if (result == null) {
+ sender.tell(false, self);
+ } else {
+ sender.tell(result, self);
+ }
+ }
+
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ sender.tell(throwable, self);
+ }
+ });
+ }
+}
package org.opendaylight.netconf.topology.singleton.impl.actors;
-import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadActorMessage;
/**
* ReadTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes.
*/
public class ReadTransactionActor extends UntypedActor {
- private final DOMDataReadOnlyTransaction tx;
+ private final ReadAdapter readAdapter;
+
+ private ReadTransactionActor(final DOMDataReadOnlyTransaction tx) {
+ readAdapter = new ReadAdapter(tx);
+ }
/**
* Creates new actor Props.
return Props.create(ReadTransactionActor.class, () -> new ReadTransactionActor(tx));
}
- private ReadTransactionActor(final DOMDataReadOnlyTransaction tx) {
- this.tx = tx;
- }
-
@Override
public void onReceive(final Object message) throws Throwable {
- if (message instanceof ReadRequest) {
-
- final ReadRequest readRequest = (ReadRequest) message;
- final YangInstanceIdentifier path = readRequest.getPath();
- final LogicalDatastoreType store = readRequest.getStore();
- read(path, store, sender(), self());
-
- } else if (message instanceof ExistsRequest) {
- final ExistsRequest readRequest = (ExistsRequest) message;
- final YangInstanceIdentifier path = readRequest.getPath();
- final LogicalDatastoreType store = readRequest.getStore();
- exists(path, store, sender(), self());
-
+ if (message instanceof ReadActorMessage) {
+ readAdapter.handle(message, sender(), self());
} else {
unhandled(message);
}
}
- private void read(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender,
- final ActorRef self) {
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(store, path);
- Futures.addCallback(read, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
-
- @Override
- public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
- if (!result.isPresent()) {
- sender.tell(new EmptyReadResponse(), self);
- return;
- }
- sender.tell(new NormalizedNodeMessage(path, result.get()), self);
- }
-
- @Override
- public void onFailure(@Nonnull final Throwable throwable) {
- sender.tell(throwable, self);
- }
- });
- }
-
- private void exists(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender,
- final ActorRef self) {
- final CheckedFuture<Boolean, ReadFailedException> readFuture = tx.exists(store, path);
- Futures.addCallback(readFuture, new FutureCallback<Boolean>() {
- @Override
- public void onSuccess(final Boolean result) {
- if (result == null) {
- sender.tell(false, self);
- } else {
- sender.tell(result, self);
- }
- }
-
- @Override
- public void onFailure(@Nonnull final Throwable throwable) {
- sender.tell(throwable, self);
- }
- });
- }
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import akka.actor.Props;
+import akka.actor.ReceiveTimeout;
+import akka.actor.UntypedActor;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadActorMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.WriteActorMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+public class ReadWriteTransactionActor extends UntypedActor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReadWriteTransactionActor.class);
+
+ private final DOMDataReadWriteTransaction tx;
+ private final long idleTimeout;
+ private final ReadAdapter readAdapter;
+ private final WriteAdapter writeAdapter;
+
+ private ReadWriteTransactionActor(final DOMDataReadWriteTransaction tx, final Duration idleTimeout) {
+ this.tx = tx;
+ this.idleTimeout = idleTimeout.toSeconds();
+ if (this.idleTimeout > 0) {
+ context().setReceiveTimeout(idleTimeout);
+ }
+ readAdapter = new ReadAdapter(tx);
+ writeAdapter = new WriteAdapter(tx);
+ }
+
+ /**
+ * Creates new actor Props.
+ *
+ * @param tx delegate device read write transaction
+ * @param idleTimeout idle time in seconds, after which transaction is closed automatically
+ * @return props
+ */
+ static Props props(final DOMDataReadWriteTransaction tx, final Duration idleTimeout) {
+ return Props.create(ReadWriteTransactionActor.class, () -> new ReadWriteTransactionActor(tx, idleTimeout));
+ }
+
+ @Override
+ public void onReceive(final Object message) throws Throwable {
+ if (message instanceof ReadActorMessage) {
+ readAdapter.handle(message, sender(), self());
+ } else if (message instanceof WriteActorMessage) {
+ writeAdapter.handle(message, sender(), context(), self());
+ } else if (message instanceof ReceiveTimeout) {
+ LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
+ idleTimeout);
+ tx.cancel();
+ context().stop(self());
+ } else {
+ unhandled(message);
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+
+class WriteAdapter {
+ private final DOMDataWriteTransaction tx;
+
+ public WriteAdapter(final DOMDataWriteTransaction tx) {
+ this.tx = tx;
+ }
+
+ private void cancel(final ActorContext context, final ActorRef sender, final ActorRef self) {
+ final boolean cancelled = tx.cancel();
+ sender.tell(cancelled, self);
+ context.stop(self);
+ }
+
+ private void submit(final ActorRef requester, final ActorRef self, final ActorContext context) {
+ final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+ context.stop(self);
+ Futures.addCallback(submitFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ requester.tell(new SubmitReply(), self);
+ }
+
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ requester.tell(throwable, self);
+ }
+ });
+ }
+
+ public void handle(final Object message, final ActorRef sender, final ActorContext context, final ActorRef self) {
+ if (message instanceof MergeRequest) {
+ final MergeRequest mergeRequest = (MergeRequest) message;
+ final NormalizedNodeMessage data = mergeRequest.getNormalizedNodeMessage();
+ tx.merge(mergeRequest.getStore(), data.getIdentifier(), data.getNode());
+ } else if (message instanceof PutRequest) {
+ final PutRequest putRequest = (PutRequest) message;
+ final NormalizedNodeMessage data = putRequest.getNormalizedNodeMessage();
+ tx.put(putRequest.getStore(), data.getIdentifier(), data.getNode());
+ } else if (message instanceof DeleteRequest) {
+ final DeleteRequest deleteRequest = (DeleteRequest) message;
+ tx.delete(deleteRequest.getStore(), deleteRequest.getPath());
+ } else if (message instanceof CancelRequest) {
+ cancel(context, sender, self);
+ } else if (message instanceof SubmitRequest) {
+ submit(sender, self, context);
+ }
+ }
+}
package org.opendaylight.netconf.topology.singleton.impl.actors;
-import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.actor.UntypedActor;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.WriteActorMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
private final DOMDataWriteTransaction tx;
private final long idleTimeout;
+ private final WriteAdapter writeAdapter;
+
+ private WriteTransactionActor(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
+ this.tx = tx;
+ this.idleTimeout = idleTimeout.toSeconds();
+ if (this.idleTimeout > 0) {
+ context().setReceiveTimeout(idleTimeout);
+ }
+ writeAdapter = new WriteAdapter(tx);
+ }
/**
* Creates new actor Props.
*
- * @param tx delegate device write transaction
+ * @param tx delegate device write transaction
* @param idleTimeout idle time in seconds, after which transaction is closed automatically
* @return props
*/
return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx, idleTimeout));
}
- private WriteTransactionActor(final DOMDataWriteTransaction tx, final Duration idleTimeout) {
- this.tx = tx;
- this.idleTimeout = idleTimeout.toSeconds();
- if (this.idleTimeout > 0) {
- context().setReceiveTimeout(idleTimeout);
- }
- }
-
@Override
public void onReceive(final Object message) throws Throwable {
- if (message instanceof MergeRequest) {
- final MergeRequest mergeRequest = (MergeRequest) message;
- final NormalizedNodeMessage data = mergeRequest.getNormalizedNodeMessage();
- tx.merge(mergeRequest.getStore(), data.getIdentifier(), data.getNode());
- } else if (message instanceof PutRequest) {
- final PutRequest putRequest = (PutRequest) message;
- final NormalizedNodeMessage data = putRequest.getNormalizedNodeMessage();
- tx.put(putRequest.getStore(), data.getIdentifier(), data.getNode());
- } else if (message instanceof DeleteRequest) {
- final DeleteRequest deleteRequest = (DeleteRequest) message;
- tx.delete(deleteRequest.getStore(), deleteRequest.getPath());
- } else if (message instanceof CancelRequest) {
- cancel();
- } else if (message instanceof SubmitRequest) {
- submit(sender(), self());
+ if (message instanceof WriteActorMessage) {
+ writeAdapter.handle(message, sender(), context(), self());
} else if (message instanceof ReceiveTimeout) {
LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
idleTimeout);
}
}
- private void cancel() {
- final boolean cancelled = tx.cancel();
- sender().tell(cancelled, self());
- context().stop(self());
- }
-
- private void submit(final ActorRef requester, final ActorRef self) {
- final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
- context().stop(self);
- Futures.addCallback(submitFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- requester.tell(new SubmitReply(), self);
- }
- @Override
- public void onFailure(@Nonnull final Throwable throwable) {
- requester.tell(throwable, self);
- }
- });
- }
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+class ProxyReadAdapter {
+ private static final Logger LOG = LoggerFactory.getLogger(ProxyReadAdapter.class);
+
+ private final ActorRef masterTxActor;
+ private final RemoteDeviceId id;
+ private final ActorSystem actorSystem;
+ private final Timeout askTimeout;
+
+ public ProxyReadAdapter(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
+ final Timeout askTimeout) {
+ this.masterTxActor = masterTxActor;
+ this.id = id;
+ this.actorSystem = actorSystem;
+ this.askTimeout = askTimeout;
+ }
+
+ public void close() {
+ //noop
+ }
+
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
+
+ final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure,
+ final Object success) throws Throwable {
+ if (failure != null) { // ask timeout
+ final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
+ settableFuture.setException(exception);
+ return;
+ }
+ if (success instanceof Throwable) { // Error sended by master
+ settableFuture.setException((Throwable) success);
+ return;
+ }
+ if (success instanceof EmptyReadResponse) {
+ settableFuture.set(Optional.absent());
+ return;
+ }
+ if (success instanceof NormalizedNodeMessage) {
+ final NormalizedNodeMessage data = (NormalizedNodeMessage) success;
+ settableFuture.set(Optional.of(data.getNode()));
+ }
+ }
+ }, actorSystem.dispatcher());
+ return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
+ }
+
+ public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ final Future<Object> existsScalaFuture =
+ Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
+
+ LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
+
+ final SettableFuture<Boolean> settableFuture = SettableFuture.create();
+ existsScalaFuture.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object success) throws Throwable {
+ if (failure != null) { // ask timeout
+ final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
+ settableFuture.setException(exception);
+ return;
+ }
+ if (success instanceof Throwable) {
+ settableFuture.setException((Throwable) success);
+ return;
+ }
+ settableFuture.set((Boolean) success);
+ }
+ }, actorSystem.dispatcher());
+ return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
+ }
+
+}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
/**
* ProxyReadTransaction uses provided {@link ActorRef} to delegate method calls to master
*/
public class ProxyReadTransaction implements DOMDataReadOnlyTransaction {
- private static final Logger LOG = LoggerFactory.getLogger(ProxyReadTransaction.class);
-
- private final ActorRef masterTxActor;
- private final RemoteDeviceId id;
- private final ActorSystem actorSystem;
- private final Timeout askTimeout;
+ private final ProxyReadAdapter delegate;
/**
* @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActor} ref
*/
public ProxyReadTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
final Timeout askTimeout) {
- this.masterTxActor = masterTxActor;
- this.id = id;
- this.actorSystem = actorSystem;
- this.askTimeout = askTimeout;
+ delegate = new ProxyReadAdapter(masterTxActor, id, actorSystem, askTimeout);
}
@Override
public void close() {
- //noop
+ delegate.close();
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
final YangInstanceIdentifier path) {
- LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
-
- final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
- final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
- future.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable failure,
- final Object success) throws Throwable {
- if (failure != null) { // ask timeout
- final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
- settableFuture.setException(exception);
- return;
- }
- if (success instanceof Throwable) { // Error sended by master
- settableFuture.setException((Throwable) success);
- return;
- }
- if (success instanceof EmptyReadResponse) {
- settableFuture.set(Optional.absent());
- return;
- }
- if (success instanceof NormalizedNodeMessage) {
- final NormalizedNodeMessage data = (NormalizedNodeMessage) success;
- settableFuture.set(Optional.of(data.getNode()));
- }
- }
- }, actorSystem.dispatcher());
- return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
+ return delegate.read(store, path);
}
@Override
public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
final YangInstanceIdentifier path) {
- final Future<Object> existsScalaFuture =
- Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
-
- LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
-
- final SettableFuture<Boolean> settableFuture = SettableFuture.create();
- existsScalaFuture.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable failure, final Object success) throws Throwable {
- if (failure != null) { // ask timeout
- final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
- settableFuture.setException(exception);
- return;
- }
- if (success instanceof Throwable) {
- settableFuture.setException((Throwable) success);
- return;
- }
- settableFuture.set((Boolean) success);
- }
- }, actorSystem.dispatcher());
- return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
+ return delegate.exists(store, path);
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * ProxyReadWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
+ * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor}.
+ */
+public class ProxyReadWriteTransaction implements DOMDataReadWriteTransaction {
+
+ private final ProxyReadAdapter delegateRead;
+ private final ProxyWriteAdapter delegateWrite;
+
+ /**
+ * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor} ref
+ * @param id device id
+ * @param actorSystem system
+ * @param askTimeout
+ */
+ public ProxyReadWriteTransaction(final ActorRef masterTxActor, final RemoteDeviceId id,
+ final ActorSystem actorSystem, final Timeout askTimeout) {
+ delegateRead = new ProxyReadAdapter(masterTxActor, id, actorSystem, askTimeout);
+ delegateWrite = new ProxyWriteAdapter(masterTxActor, id, actorSystem, askTimeout);
+ }
+
+ @Override
+ public boolean cancel() {
+ return delegateWrite.cancel();
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<TransactionStatus>> commit() {
+ return delegateWrite.commit(getIdentifier());
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ return delegateRead.read(store, path);
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ return delegateRead.exists(store, path);
+ }
+
+ @Override
+ public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ delegateWrite.delete(store, path);
+ }
+
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+ return delegateWrite.submit(getIdentifier());
+ }
+
+ @Override
+ public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> data) {
+ delegateWrite.put(store, path, data, getIdentifier());
+ }
+
+ @Override
+ public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> data) {
+ delegateWrite.merge(store, path, data, getIdentifier());
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+public class ProxyWriteAdapter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ProxyWriteAdapter.class);
+
+ private final ActorRef masterTxActor;
+ private final RemoteDeviceId id;
+ private final ActorSystem actorSystem;
+ private final AtomicBoolean opened = new AtomicBoolean(true);
+ private final Timeout askTimeout;
+
+ public ProxyWriteAdapter(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
+ final Timeout askTimeout) {
+ this.masterTxActor = masterTxActor;
+ this.id = id;
+ this.actorSystem = actorSystem;
+ this.askTimeout = askTimeout;
+ }
+
+ public boolean cancel() {
+ if (!opened.compareAndSet(true, false)) {
+ return false;
+ }
+ final Future<Object> cancelScalaFuture =
+ Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
+
+ LOG.trace("{}: Cancel {} via NETCONF", id);
+
+ try {
+ // here must be Await because AsyncWriteTransaction do not return future
+ return (boolean) Await.result(cancelScalaFuture, askTimeout.duration());
+ } catch (final Exception e) {
+ return false;
+ }
+ }
+
+ public CheckedFuture<Void, TransactionCommitFailedException> submit(final Object identifier) {
+ if (!opened.compareAndSet(true, false)) {
+ throw new IllegalStateException(id + ": Transaction" + identifier + " is closed");
+ }
+ final Future<Object> submitScalaFuture =
+ Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
+
+ LOG.trace("{}: Submit {} via NETCONF", id);
+
+ final SettableFuture<Void> settableFuture = SettableFuture.create();
+ submitScalaFuture.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object success) throws Throwable {
+ if (failure != null) { // ask timeout
+ final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
+ settableFuture.setException(exception);
+ return;
+ }
+ if (success instanceof Throwable) {
+ settableFuture.setException((Throwable) success);
+ } else {
+ if (success instanceof SubmitFailedReply) {
+ LOG.error("{}: Transaction was not submitted because already closed.", id);
+ }
+ settableFuture.set(null);
+ }
+ }
+ }, actorSystem.dispatcher());
+
+ return Futures.makeChecked(settableFuture, new Function<Exception, TransactionCommitFailedException>() {
+ @Nullable
+ @Override
+ public TransactionCommitFailedException apply(@Nullable final Exception input) {
+ final String message = "Submit of transaction " + identifier + " failed";
+ return new TransactionCommitFailedException(message, input);
+ }
+ });
+ }
+
+ public ListenableFuture<RpcResult<TransactionStatus>> commit(final Object identifier) {
+ LOG.trace("{}: Commit", id);
+
+ final CheckedFuture<Void, TransactionCommitFailedException> submit = submit(identifier);
+ return Futures.transform(submit, new Function<Void, RpcResult<TransactionStatus>>() {
+ @Nullable
+ @Override
+ public RpcResult<TransactionStatus> apply(@Nullable final Void input) {
+ return RpcResultBuilder.success(TransactionStatus.SUBMITED).build();
+ }
+ });
+ }
+
+ public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) {
+ Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier);
+ LOG.trace("{}: Delete {} via NETCONF: {}", id, store, identifier);
+ masterTxActor.tell(new DeleteRequest(store, identifier), ActorRef.noSender());
+ }
+
+ public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> data, final Object identifier) {
+ Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier);
+ final NormalizedNodeMessage msg = new NormalizedNodeMessage(path, data);
+ LOG.trace("{}: Put {} via NETCONF: {} with payload {}", id, store, path, data);
+ masterTxActor.tell(new PutRequest(store, msg), ActorRef.noSender());
+ }
+
+ public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> data, final Object identifier) {
+ Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, identifier);
+ final NormalizedNodeMessage msg = new NormalizedNodeMessage(path, data);
+ LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, path, data);
+ masterTxActor.tell(new MergeRequest(store, msg), ActorRef.noSender());
+ }
+
+}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
import akka.util.Timeout;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
/**
* ProxyWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
*/
public class ProxyWriteTransaction implements DOMDataWriteTransaction {
- private static final Logger LOG = LoggerFactory.getLogger(ProxyWriteTransaction.class);
-
- private final ActorRef masterTxActor;
- private final RemoteDeviceId id;
- private final ActorSystem actorSystem;
- private final AtomicBoolean opened = new AtomicBoolean(true);
- private final Timeout askTimeout;
+ private final ProxyWriteAdapter proxyWriteAdapter;
/**
* @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.WriteTransactionActor} ref
*/
public ProxyWriteTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
final Timeout askTimeout) {
- this.masterTxActor = masterTxActor;
- this.id = id;
- this.actorSystem = actorSystem;
- this.askTimeout = askTimeout;
+ proxyWriteAdapter = new ProxyWriteAdapter(masterTxActor, id, actorSystem, askTimeout);
}
@Override
public boolean cancel() {
- if (!opened.compareAndSet(true, false)) {
- return false;
- }
- final Future<Object> cancelScalaFuture =
- Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
-
- LOG.trace("{}: Cancel {} via NETCONF", id);
-
- try {
- // here must be Await because AsyncWriteTransaction do not return future
- return (boolean) Await.result(cancelScalaFuture, askTimeout.duration());
- } catch (final Exception e) {
- return false;
- }
+ return proxyWriteAdapter.cancel();
}
@Override
public CheckedFuture<Void, TransactionCommitFailedException> submit() {
- if (!opened.compareAndSet(true, false)) {
- throw new IllegalStateException(id + ": Transaction" + getIdentifier() + " is closed");
- }
- final Future<Object> submitScalaFuture =
- Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
-
- LOG.trace("{}: Submit {} via NETCONF", id);
-
- final SettableFuture<Void> settableFuture = SettableFuture.create();
- submitScalaFuture.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable failure, final Object success) throws Throwable {
- if (failure != null) { // ask timeout
- final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
- settableFuture.setException(exception);
- return;
- }
- if (success instanceof Throwable) {
- settableFuture.setException((Throwable) success);
- } else {
- if (success instanceof SubmitFailedReply) {
- LOG.error("{}: Transaction was not submitted because already closed.", id);
- }
- settableFuture.set(null);
- }
- }
- }, actorSystem.dispatcher());
-
- return Futures.makeChecked(settableFuture, new Function<Exception, TransactionCommitFailedException>() {
- @Nullable
- @Override
- public TransactionCommitFailedException apply(@Nullable final Exception input) {
- final String message = "Submit of transaction " + getIdentifier() + " failed";
- return new TransactionCommitFailedException(message, input);
- }
- });
+ return proxyWriteAdapter.submit(getIdentifier());
}
@Override
public ListenableFuture<RpcResult<TransactionStatus>> commit() {
- LOG.trace("{}: Commit", id);
-
- final CheckedFuture<Void, TransactionCommitFailedException> submit = submit();
- return Futures.transform(submit, new Function<Void, RpcResult<TransactionStatus>>() {
- @Nullable
- @Override
- public RpcResult<TransactionStatus> apply(@Nullable final Void input) {
- return RpcResultBuilder.success(TransactionStatus.SUBMITED).build();
- }
- });
+ return proxyWriteAdapter.commit(getIdentifier());
}
@Override
public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) {
- Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
- LOG.trace("{}: Delete {} via NETCONF: {}", id, store, identifier);
- masterTxActor.tell(new DeleteRequest(store, identifier), ActorRef.noSender());
+ proxyWriteAdapter.delete(store, identifier);
}
@Override
public void put(final LogicalDatastoreType store, final YangInstanceIdentifier identifier,
final NormalizedNode<?, ?> data) {
- Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
- final NormalizedNodeMessage msg = new NormalizedNodeMessage(identifier, data);
- LOG.trace("{}: Put {} via NETCONF: {} with payload {}", id, store, identifier, data);
- masterTxActor.tell(new PutRequest(store, msg), ActorRef.noSender());
+ proxyWriteAdapter.put(store, identifier, data, getIdentifier());
}
@Override
public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier identifier,
final NormalizedNode<?, ?> data) {
- Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
- final NormalizedNodeMessage msg = new NormalizedNodeMessage(identifier, data);
- LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, identifier, data);
- masterTxActor.tell(new MergeRequest(store, msg), ActorRef.noSender());
+ proxyWriteAdapter.merge(store, identifier, data, getIdentifier());
}
@Override
package org.opendaylight.netconf.topology.singleton.messages.transactions;
-public class CancelRequest implements TransactionRequest {
+public class CancelRequest implements WriteActorMessage {
private static final long serialVersionUID = 1L;
}
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-public class DeleteRequest implements TransactionRequest {
+public class DeleteRequest implements WriteActorMessage {
private static final long serialVersionUID = 1L;
private final LogicalDatastoreType store;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-public class ExistsRequest implements TransactionRequest {
+public class ExistsRequest implements ReadActorMessage {
private static final long serialVersionUID = 1L;
private final LogicalDatastoreType store;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-public class MergeRequest implements TransactionRequest {
+public class MergeRequest implements WriteActorMessage {
private static final long serialVersionUID = 1L;
private final NormalizedNodeMessage data;
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import akka.actor.ActorRef;
+import java.io.Serializable;
+
+public class NewReadWriteTransactionReply implements Serializable {
+
+ private final ActorRef txActor;
+
+ public NewReadWriteTransactionReply(final ActorRef txActor) {
+ this.txActor = txActor;
+ }
+
+ public ActorRef getTxActor() {
+ return txActor;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import java.io.Serializable;
+
+public class NewReadWriteTransactionRequest implements Serializable {
+ private static final long serialVersionUID = 1L;
+}
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-public class PutRequest implements TransactionRequest {
+public class PutRequest implements WriteActorMessage {
private static final long serialVersionUID = 1L;
private final LogicalDatastoreType store;
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+public interface ReadActorMessage extends TransactionRequest {
+}
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-public class ReadRequest implements TransactionRequest {
+public class ReadRequest implements ReadActorMessage {
private static final long serialVersionUID = 1L;
private final LogicalDatastoreType store;
package org.opendaylight.netconf.topology.singleton.messages.transactions;
-public class SubmitRequest implements TransactionRequest {
+public class SubmitRequest implements WriteActorMessage {
private static final long serialVersionUID = 1L;
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+public interface WriteActorMessage extends TransactionRequest {
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.testkit.TestProbe;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+public class ReadWriteTransactionActorTest {
+
+ private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
+ private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+ private static final Timeout TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS);
+
+ @Mock
+ private DOMDataReadWriteTransaction deviceReadWriteTx;
+ private TestProbe probe;
+ private ActorSystem system;
+ private TestActorRef<WriteTransactionActor> actorRef;
+ private NormalizedNode<?, ?> node;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ system = ActorSystem.apply();
+ probe = TestProbe.apply(system);
+ node = Builders.containerBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+ .build();
+ actorRef = TestActorRef.create(system, ReadWriteTransactionActor.props(deviceReadWriteTx,
+ Duration.apply(2, TimeUnit.SECONDS)), "testA");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ JavaTestKit.shutdownActorSystem(system, null, true);
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ final ContainerNode node = Builders.containerBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+ .build();
+ when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.of(node)));
+ actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
+ verify(deviceReadWriteTx).read(STORE, PATH);
+ probe.expectMsgClass(NormalizedNodeMessage.class);
+ }
+
+ @Test
+ public void testReadEmpty() throws Exception {
+ when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
+ actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
+ verify(deviceReadWriteTx).read(STORE, PATH);
+ probe.expectMsgClass(EmptyReadResponse.class);
+ }
+
+ @Test
+ public void testReadFailure() throws Exception {
+ final ReadFailedException cause = new ReadFailedException("fail");
+ when(deviceReadWriteTx.read(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+ actorRef.tell(new ReadRequest(STORE, PATH), probe.ref());
+ verify(deviceReadWriteTx).read(STORE, PATH);
+ probe.expectMsg(cause);
+ }
+
+ @Test
+ public void testExists() throws Exception {
+ when(deviceReadWriteTx.exists(STORE, PATH)).thenReturn(Futures.immediateCheckedFuture(true));
+ actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref());
+ verify(deviceReadWriteTx).exists(STORE, PATH);
+ probe.expectMsg(true);
+ }
+
+ @Test
+ public void testExistsFailure() throws Exception {
+ final ReadFailedException cause = new ReadFailedException("fail");
+ when(deviceReadWriteTx.exists(STORE, PATH)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+ actorRef.tell(new ExistsRequest(STORE, PATH), probe.ref());
+ verify(deviceReadWriteTx).exists(STORE, PATH);
+ probe.expectMsg(cause);
+ }
+
+ @Test
+ public void testPut() throws Exception {
+ final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
+ actorRef.tell(new PutRequest(STORE, normalizedNodeMessage), probe.ref());
+ verify(deviceReadWriteTx).put(STORE, PATH, node);
+ }
+
+ @Test
+ public void testMerge() throws Exception {
+ final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
+ actorRef.tell(new MergeRequest(STORE, normalizedNodeMessage), probe.ref());
+ verify(deviceReadWriteTx).merge(STORE, PATH, node);
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ actorRef.tell(new DeleteRequest(STORE, PATH), probe.ref());
+ verify(deviceReadWriteTx).delete(STORE, PATH);
+ }
+
+ @Test
+ public void testCancel() throws Exception {
+ when(deviceReadWriteTx.cancel()).thenReturn(true);
+ final Future<Object> cancelFuture = Patterns.ask(actorRef, new CancelRequest(), TIMEOUT);
+ final Object result = Await.result(cancelFuture, TIMEOUT.duration());
+ Preconditions.checkState(result instanceof Boolean);
+ verify(deviceReadWriteTx).cancel();
+ Assert.assertTrue((Boolean) result);
+ }
+
+ @Test
+ public void testSubmit() throws Exception {
+ when(deviceReadWriteTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
+ final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
+ final Object result = Await.result(submitFuture, TIMEOUT.duration());
+ Assert.assertTrue(result instanceof SubmitReply);
+ verify(deviceReadWriteTx).submit();
+ }
+
+ @Test
+ public void testSubmitFail() throws Exception {
+ final RpcError rpcError =
+ RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "fail", "fail");
+ final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail", rpcError);
+ when(deviceReadWriteTx.submit()).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+ final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
+ final Object result = Await.result(submitFuture, TIMEOUT.duration());
+ Assert.assertEquals(cause, result);
+ verify(deviceReadWriteTx).submit();
+ }
+
+ @Test
+ public void testIdleTimeout() throws Exception {
+ final TestProbe probe = new TestProbe(system);
+ probe.watch(actorRef);
+ verify(deviceReadWriteTx, timeout(3000)).cancel();
+ probe.expectTerminated(actorRef, TIMEOUT.duration());
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.config.util.xml.DocumentedException;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+
+public class ProxyReadWriteTransactionTest {
+ private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
+ private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+
+ private ActorSystem system;
+ private TestProbe masterActor;
+ private ContainerNode node;
+ private ProxyReadWriteTransaction tx;
+
+ @Before
+ public void setUp() throws Exception {
+ system = ActorSystem.apply();
+ masterActor = new TestProbe(system);
+ final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
+ node = Builders.containerBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+ .build();
+ tx = new ProxyReadWriteTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ JavaTestKit.shutdownActorSystem(system, null, true);
+ }
+
+ @Test
+ public void testCancel() throws Exception {
+ final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
+ masterActor.expectMsgClass(CancelRequest.class);
+ masterActor.reply(true);
+ Assert.assertTrue(submit.get());
+ }
+
+ @Test
+ public void testCancelSubmitted() throws Exception {
+ final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+ masterActor.expectMsgClass(SubmitRequest.class);
+ masterActor.reply(new SubmitReply());
+ submitFuture.checkedGet();
+ final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
+ masterActor.expectNoMsg();
+ Assert.assertFalse(submit.get());
+ }
+
+ @Test
+ public void testSubmit() throws Exception {
+ final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+ masterActor.expectMsgClass(SubmitRequest.class);
+ masterActor.reply(new SubmitReply());
+ submitFuture.checkedGet();
+ }
+
+ @Test
+ public void testDoubleSubmit() throws Exception {
+ final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+ masterActor.expectMsgClass(SubmitRequest.class);
+ masterActor.reply(new SubmitReply());
+ submitFuture.checkedGet();
+ try {
+ tx.submit().checkedGet();
+ Assert.fail("Should throw IllegalStateException");
+ } catch (final IllegalStateException e) {
+ masterActor.expectNoMsg();
+ }
+ }
+
+ @Test
+ public void testCommit() throws Exception {
+ final ListenableFuture<RpcResult<TransactionStatus>> submitFuture = tx.commit();
+ masterActor.expectMsgClass(SubmitRequest.class);
+ masterActor.reply(new SubmitReply());
+ Assert.assertEquals(TransactionStatus.SUBMITED, submitFuture.get().getResult());
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ tx.delete(STORE, PATH);
+ masterActor.expectMsgClass(DeleteRequest.class);
+ }
+
+ @Test
+ public void testDeleteClosed() throws Exception {
+ submit();
+ try {
+ tx.delete(STORE, PATH);
+ Assert.fail("Should throw IllegalStateException");
+ } catch (final IllegalStateException e) {
+ masterActor.expectNoMsg();
+ }
+ }
+
+ @Test
+ public void testPut() throws Exception {
+ tx.put(STORE, PATH, node);
+ masterActor.expectMsgClass(PutRequest.class);
+ }
+
+ @Test
+ public void testPutClosed() throws Exception {
+ submit();
+ try {
+ tx.put(STORE, PATH, node);
+ Assert.fail("Should throw IllegalStateException");
+ } catch (final IllegalStateException e) {
+ masterActor.expectNoMsg();
+ }
+ }
+
+ @Test
+ public void testMerge() throws Exception {
+ tx.merge(STORE, PATH, node);
+ masterActor.expectMsgClass(MergeRequest.class);
+ }
+
+ @Test
+ public void testMergeClosed() throws Exception {
+ submit();
+ try {
+ tx.merge(STORE, PATH, node);
+ Assert.fail("Should throw IllegalStateException");
+ } catch (final IllegalStateException e) {
+ masterActor.expectNoMsg();
+ }
+ }
+
+ @Test
+ public void testGetIdentifier() throws Exception {
+ Assert.assertEquals(tx, tx.getIdentifier());
+ }
+
+ private void submit() throws TransactionCommitFailedException {
+ final CheckedFuture<Void, TransactionCommitFailedException> submit = tx.submit();
+ masterActor.expectMsgClass(SubmitRequest.class);
+ masterActor.reply(new SubmitReply());
+ submit.checkedGet();
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+ masterActor.expectMsgClass(ReadRequest.class);
+ masterActor.reply(new NormalizedNodeMessage(PATH, node));
+ final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
+ Assert.assertTrue(result.isPresent());
+ Assert.assertEquals(node, result.get());
+ }
+
+ @Test
+ public void testReadEmpty() throws Exception {
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+ masterActor.expectMsgClass(ReadRequest.class);
+ masterActor.reply(new EmptyReadResponse());
+ final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
+ Assert.assertFalse(result.isPresent());
+ }
+
+ @Test(expected = ReadFailedException.class)
+ public void testReadFail() throws Exception {
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+ masterActor.expectMsgClass(ReadRequest.class);
+ masterActor.reply(new RuntimeException("fail"));
+ read.checkedGet();
+ }
+
+ @Test
+ public void testExists() throws Exception {
+ final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
+ masterActor.expectMsgClass(ExistsRequest.class);
+ masterActor.reply(true);
+ final Boolean result = read.checkedGet();
+ Assert.assertTrue(result);
+ }
+
+ @Test(expected = ReadFailedException.class)
+ public void testExistsFail() throws Exception {
+ final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
+ masterActor.expectMsgClass(ExistsRequest.class);
+ masterActor.reply(new RuntimeException("fail"));
+ read.checkedGet();
+ }
+
+ @Test
+ public void testMasterDownRead() throws Exception {
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+ masterActor.expectMsgClass(ReadRequest.class);
+ //master doesn't reply
+ try {
+ read.checkedGet();
+ Assert.fail("Exception should be thrown");
+ } catch (final ReadFailedException e) {
+ final Throwable cause = e.getCause();
+ Assert.assertTrue(cause instanceof DocumentedException);
+ final DocumentedException de = (DocumentedException) cause;
+ Assert.assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity());
+ Assert.assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag());
+ Assert.assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType());
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import static junit.framework.TestCase.assertNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.MockitoAnnotations.initMocks;
+import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker;
+import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
+import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+public class ReadWriteTransactionTest {
+ private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
+ private static final int TIMEOUT_SEC = 5;
+ private static ActorSystem system;
+
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ @Mock
+ private DOMDataBroker deviceDataBroker;
+ @Mock
+ private DOMDataReadWriteTransaction readWriteTx;
+ @Mock
+ private DOMRpcService domRpcService;
+ private ActorRef masterRef;
+ private ProxyDOMDataBroker slaveDataBroker;
+ private List<SourceIdentifier> sourceIdentifiers;
+ private NormalizedNode<?, ?> testNode;
+ private YangInstanceIdentifier instanceIdentifier;
+ private LogicalDatastoreType storeType;
+
+ @Before
+ public void setup() throws Exception {
+ initMocks(this);
+
+ system = ActorSystem.create();
+
+ final RemoteDeviceId remoteDeviceId = new RemoteDeviceId("netconf-topology",
+ new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
+
+ final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
+ doReturn(Duration.apply(0, TimeUnit.SECONDS)).when(setup).getIdleTimeout();
+ final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
+ DEFAULT_SCHEMA_REPOSITORY, TIMEOUT);
+
+ masterRef = TestActorRef.create(system, props, "master_read");
+
+ sourceIdentifiers = Lists.newArrayList();
+
+ doReturn(readWriteTx).when(deviceDataBroker).newReadWriteTransaction();
+ doNothing().when(readWriteTx).put(storeType, instanceIdentifier, testNode);
+ doNothing().when(readWriteTx).merge(storeType, instanceIdentifier, testNode);
+ doNothing().when(readWriteTx).delete(storeType, instanceIdentifier);
+
+ // Create slave data broker for testing proxy
+ slaveDataBroker =
+ new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS));
+ initializeDataTest();
+ testNode = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname")))
+ .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build();
+ instanceIdentifier = YangInstanceIdentifier.EMPTY;
+ storeType = LogicalDatastoreType.CONFIGURATION;
+ }
+
+ @After
+ public void teardown() {
+ JavaTestKit.shutdownActorSystem(system, null, true);
+ system = null;
+ }
+
+ @Test
+ public void testPut() throws Exception {
+ // Test of invoking put on master through slave proxy
+ final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
+ wTx.put(storeType, instanceIdentifier, testNode);
+
+ verify(readWriteTx, timeout(2000)).put(storeType, instanceIdentifier, testNode);
+
+ wTx.cancel();
+ }
+
+ @Test
+ public void testMerge() throws Exception {
+ // Test of invoking merge on master through slave proxy
+ final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
+ wTx.merge(storeType, instanceIdentifier, testNode);
+
+ verify(readWriteTx, timeout(2000)).merge(storeType, instanceIdentifier, testNode);
+
+ wTx.cancel();
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ final YangInstanceIdentifier instanceIdentifier = YangInstanceIdentifier.EMPTY;
+ final LogicalDatastoreType storeType = LogicalDatastoreType.CONFIGURATION;
+
+ // Test of invoking delete on master through slave proxy
+ final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
+ wTx.delete(storeType, instanceIdentifier);
+ wTx.cancel();
+
+ verify(readWriteTx, timeout(2000)).delete(storeType, instanceIdentifier);
+ }
+
+ @Test
+ public void testSubmit() throws Exception {
+ final CheckedFuture<Void, TransactionCommitFailedException> resultSubmit = Futures.immediateCheckedFuture(null);
+ doReturn(resultSubmit).when(readWriteTx).submit();
+
+ // Without Tx
+ final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
+
+ final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitResponse = wTx.submit();
+
+ final Object result = resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+ assertNull(result);
+ }
+
+ @Test
+ public void testSubmitWithOperation() throws Exception {
+ final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTx = Futures.immediateCheckedFuture(null);
+ doReturn(resultSubmitTx).when(readWriteTx).submit();
+ // With Tx
+ final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
+ wTx.delete(LogicalDatastoreType.CONFIGURATION,
+ YangInstanceIdentifier.EMPTY);
+
+ final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTxResponse = wTx.submit();
+
+ final Object resultTx = resultSubmitTxResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+ assertNull(resultTx);
+ }
+
+ @Test
+ public void testSubmitFail() throws Exception {
+ final TransactionCommitFailedException throwable = new TransactionCommitFailedException("Fail", null);
+ final CheckedFuture<Void, TransactionCommitFailedException> resultThrowable =
+ Futures.immediateFailedCheckedFuture(throwable);
+ doReturn(resultThrowable).when(readWriteTx).submit();
+
+ final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
+ wTx.delete(LogicalDatastoreType.CONFIGURATION,
+ YangInstanceIdentifier.EMPTY);
+ final CheckedFuture<Void, TransactionCommitFailedException> resultThrowableResponse =
+ wTx.submit();
+ exception.expect(TransactionCommitFailedException.class);
+ resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testCancel() throws Exception {
+ doReturn(true).when(readWriteTx).cancel();
+
+ // Without Tx
+ final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
+ final Boolean resultFalseNoTx = wTx.cancel();
+ assertEquals(true, resultFalseNoTx);
+ }
+
+ @Test
+ public void testCancelWithOperation() throws Exception {
+ doReturn(true).when(readWriteTx).cancel();
+
+ // With Tx, readWriteTx test
+ final DOMDataWriteTransaction wTx = slaveDataBroker.newReadWriteTransaction();
+ wTx.delete(LogicalDatastoreType.CONFIGURATION,
+ YangInstanceIdentifier.EMPTY);
+
+ final Boolean resultTrue = wTx.cancel();
+ assertEquals(true, resultTrue);
+
+ final Boolean resultFalse = wTx.cancel();
+ assertEquals(false, resultFalse);
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ // Message: NormalizedNodeMessage
+ final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname")))
+ .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build();
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultNormalizedNodeMessage =
+ Futures.immediateCheckedFuture(Optional.of(outputNode));
+ doReturn(resultNormalizedNodeMessage).when(readWriteTx).read(storeType, instanceIdentifier);
+
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultNodeMessageResponse =
+ slaveDataBroker.newReadWriteTransaction().read(storeType, instanceIdentifier);
+
+ final Optional<NormalizedNode<?, ?>> resultNodeMessage =
+ resultNodeMessageResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+ assertTrue(resultNodeMessage.isPresent());
+ assertEquals(resultNodeMessage.get(), outputNode);
+ }
+
+ @Test
+ public void testReadEmpty() throws Exception {
+ // Message: EmptyReadResponse
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultEmpty =
+ Futures.immediateCheckedFuture(Optional.absent());
+ doReturn(resultEmpty).when(readWriteTx).read(storeType, instanceIdentifier);
+
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultEmptyResponse =
+ slaveDataBroker.newReadWriteTransaction().read(storeType,
+ instanceIdentifier);
+
+ final Optional<NormalizedNode<?, ?>> resultEmptyMessage =
+ resultEmptyResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+ assertEquals(resultEmptyMessage, Optional.absent());
+ }
+
+ @Test
+ public void testReadFail() throws Exception {
+ // Message: Throwable
+ final ReadFailedException readFailedException = new ReadFailedException("Fail", null);
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultThrowable =
+ Futures.immediateFailedCheckedFuture(readFailedException);
+
+ doReturn(resultThrowable).when(readWriteTx).read(storeType, instanceIdentifier);
+
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultThrowableResponse =
+ slaveDataBroker.newReadWriteTransaction().read(storeType, instanceIdentifier);
+
+ exception.expect(ReadFailedException.class);
+ resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testExist() throws Exception {
+ // Message: True
+ final CheckedFuture<Boolean, ReadFailedException> resultTrue =
+ Futures.immediateCheckedFuture(true);
+ doReturn(resultTrue).when(readWriteTx).exists(storeType, instanceIdentifier);
+
+ final CheckedFuture<Boolean, ReadFailedException> trueResponse =
+ slaveDataBroker.newReadWriteTransaction().exists(storeType, instanceIdentifier);
+
+ final Boolean trueMessage = trueResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+ assertEquals(true, trueMessage);
+ }
+
+ @Test
+ public void testExistsNull() throws Exception {
+ // Message: False, result null
+ final CheckedFuture<Boolean, ReadFailedException> resultNull = Futures.immediateCheckedFuture(null);
+ doReturn(resultNull).when(readWriteTx).exists(storeType, instanceIdentifier);
+
+ final CheckedFuture<Boolean, ReadFailedException> nullResponse =
+ slaveDataBroker.newReadWriteTransaction().exists(storeType,
+ instanceIdentifier);
+
+ final Boolean nullFalseMessage = nullResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+ assertEquals(false, nullFalseMessage);
+ }
+
+ @Test
+ public void testExistsFalse() throws Exception {
+ // Message: False
+ final CheckedFuture<Boolean, ReadFailedException> resultFalse = Futures.immediateCheckedFuture(false);
+ doReturn(resultFalse).when(readWriteTx).exists(storeType, instanceIdentifier);
+
+ final CheckedFuture<Boolean, ReadFailedException> falseResponse =
+ slaveDataBroker.newReadWriteTransaction().exists(storeType,
+ instanceIdentifier);
+
+ final Boolean falseMessage = falseResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+ assertEquals(false, falseMessage);
+ }
+
+ @Test
+ public void testExistsFail() throws Exception {
+ // Message: Throwable
+ final ReadFailedException readFailedException = new ReadFailedException("Fail", null);
+ final CheckedFuture<Boolean, ReadFailedException> resultThrowable =
+ Futures.immediateFailedCheckedFuture(readFailedException);
+ doReturn(resultThrowable).when(readWriteTx).exists(storeType, instanceIdentifier);
+
+ final CheckedFuture<Boolean, ReadFailedException> resultThrowableResponse =
+ slaveDataBroker.newReadWriteTransaction().exists(storeType, instanceIdentifier);
+
+ exception.expect(ReadFailedException.class);
+ resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+ }
+
+ private void initializeDataTest() throws Exception {
+ final Future<Object> initialDataToActor =
+ Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
+ domRpcService), TIMEOUT);
+
+ final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
+
+ assertTrue(success instanceof MasterActorDataInitialized);
+ }
+
+}