+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.api;
-
-import com.google.common.base.Optional;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import scala.concurrent.Future;
-
-/**
- * Provides API for all operations of read and write transactions
- */
-// TODO we should separate between read tx and write tx
-public interface NetconfDOMTransaction {
-
-
- /**
- * Opens a new transaction. Transactions have to be opened before applying
- * any operations on them. Previous transaction has to be either submitted
- * ({@link #submit()} was invoked) or canceled ({@link #cancel()} was
- * invoked.
- *
- * @throws IllegalStateException
- * if the previous transaction was not SUBMITTED or CANCELLED.
- */
- void openTransaction();
-
- /**
- * Read data from particular data-store
- * @param store data-store type
- * @param path unique identifier of a particular node instance in the data tree
- * @return result as future
- */
- Future<Optional<NormalizedNodeMessage>> read(LogicalDatastoreType store, YangInstanceIdentifier path);
-
- /**
- * Test existence of node in certain data-store
- * @param store data-store type
- * @param path unique identifier of a particular node instance in the data tree
- * @return result as future
- */
- Future<Boolean> exists(LogicalDatastoreType store, YangInstanceIdentifier path);
-
- /**
- * Put data to particular data-store
- * @param store data-store type
- * @param data data for inserting included in NormalizedNodeMessage object
- */
- void put(LogicalDatastoreType store, NormalizedNodeMessage data);
-
- /**
- * Merge data with existing node in particular data-store
- * @param store data-store type
- * @param data data for merging included in NormalizedNodeMessage object
- */
- void merge(LogicalDatastoreType store, NormalizedNodeMessage data);
-
- /**
- * Delete node in particular data-store in path
- * @param store data-store type
- * @param path unique identifier of a particular node instance in the data tree
- */
- void delete(LogicalDatastoreType store, YangInstanceIdentifier path);
-
- /**
- * Cancel operation
- * @return success or not
- */
- boolean cancel();
-
- /**
- * Commit opened transaction.
- * @return void or raised exception
- */
- Future<Void> submit();
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.api;
-
-import akka.actor.ActorRef;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-/**
- * Provides API for remote calling operations of transactions. Slave sends message of particular
- * operation to master and master performs it.
- */
-public interface RemoteOperationTxProcessor {
-
- /**
- * Opens a new transaction.
- */
- void doOpenTransaction(ActorRef recipient, ActorRef sender);
-
- /**
- * Delete node in particular data-store in path
- * @param store data-store type
- * @param path unique identifier of a particular node instance in the data tree
- */
- void doDelete(LogicalDatastoreType store, YangInstanceIdentifier path);
-
- /**
- * Commit opened transaction.
- * @param recipient recipient of submit result
- * @param sender sender of submit result
- */
- void doSubmit(ActorRef recipient, ActorRef sender);
-
- /**
- * Cancel operation
- * @param recipient recipient of cancel result
- * @param sender sender of cancel result
- */
- void doCancel(ActorRef recipient, ActorRef sender);
-
- /**
- * Put data to particular data-store
- * @param store data-store type
- * @param data data for inserting included in NormalizedNodeMessage object
- */
- void doPut(LogicalDatastoreType store, NormalizedNodeMessage data);
-
- /**
- * Merge data with existing node in particular data-store
- * @param store data-store type
- * @param data data for merging included in NormalizedNodeMessage object
- */
- void doMerge(LogicalDatastoreType store, NormalizedNodeMessage data);
-
- /**
- * Read data from particular data-store
- * @param store data-store type
- * @param path unique identifier of a particular node instance in the data tree
- * @param recipient recipient of read result
- * @param sender sender of read result
- */
- void doRead(LogicalDatastoreType store, YangInstanceIdentifier path, ActorRef recipient, ActorRef sender);
-
- /**
- * Test existence of node in certain data-store
- * @param store data-store type
- * @param path unique identifier of a particular node instance in the data tree
- * @param recipient recipient of exists result
- * @param sender sender of exists result
- */
- void doExists(LogicalDatastoreType store, YangInstanceIdentifier path, ActorRef recipient, ActorRef sender);
-
-}
import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalProvider;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfMasterDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfProxyDOMTransaction;
import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
LOG.info("{}: Creating master data broker for device", id);
- final NetconfDOMTransaction masterDOMTransactions =
- new NetconfMasterDOMTransaction(id, remoteSchemaContext, deviceRpc, netconfSessionPreferences);
- deviceDataBroker =
- new NetconfDOMDataBroker(actorSystem, id, masterDOMTransactions);
- // We need to create NetconfProxyDOMTransaction so accessing mountpoint
+ deviceDataBroker = new NetconfDeviceDataBroker(id, remoteSchemaContext, deviceRpc, netconfSessionPreferences);
+ // We need to create ProxyDOMDataBroker so accessing mountpoint
// on leader node would be same as on follower node
- final NetconfDOMTransaction proxyDOMTransation =
- new NetconfProxyDOMTransaction(id, actorSystem, masterActorRef, actorResponseWaitTime);
- final NetconfDOMDataBroker proxyDataBroker = new NetconfDOMDataBroker(actorSystem, id, proxyDOMTransation);
+ final ProxyDOMDataBroker proxyDataBroker =
+ new ProxyDOMDataBroker(actorSystem, id, masterActorRef, actorResponseWaitTime);
salProvider.getMountInstance()
.onTopologyDeviceConnected(remoteSchemaContext, proxyDataBroker, deviceRpc, notificationService);
}
final List<SourceIdentifier> sourceIdentifiers =
remoteSchemaContext.getAllModuleIdentifiers().stream().map(mi ->
RevisionSourceIdentifier.create(mi.getName(),
- (SimpleDateFormatUtil.DEFAULT_DATE_REV == mi.getRevision() ? Optional.<String>absent() :
+ (SimpleDateFormatUtil.DEFAULT_DATE_REV == mi.getRevision() ? Optional.<String>absent() :
Optional.of(mi.getQNameModule().getFormattedRevision()))))
.collect(Collectors.toList());
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl;
-
-import akka.actor.ActorSystem;
-import java.util.Collections;
-import java.util.Map;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
-import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfReadOnlyTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfWriteOnlyTransaction;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-public class NetconfDOMDataBroker implements DOMDataBroker {
-
- private final RemoteDeviceId id;
- private final NetconfDOMTransaction masterDataBroker;
- private final ActorSystem actorSystem;
-
- public NetconfDOMDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
- final NetconfDOMTransaction masterDataBroker) {
- this.id = id;
- this.masterDataBroker = masterDataBroker;
- this.actorSystem = actorSystem;
- }
-
- @Override
- public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
- return new NetconfReadOnlyTransaction(id, actorSystem, masterDataBroker);
- }
-
- @Override
- public DOMDataReadWriteTransaction newReadWriteTransaction() {
- return new ReadWriteTx(new NetconfReadOnlyTransaction(id, actorSystem, masterDataBroker),
- new NetconfWriteOnlyTransaction(id, actorSystem, masterDataBroker));
- }
-
- @Override
- public DOMDataWriteTransaction newWriteOnlyTransaction() {
- return new NetconfWriteOnlyTransaction(id, actorSystem, masterDataBroker);
- }
-
- @Override
- public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(
- LogicalDatastoreType store, YangInstanceIdentifier path, DOMDataChangeListener listener,
- DataChangeScope triggeringScope) {
- throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point");
- }
-
- @Override
- public DOMTransactionChain createTransactionChain(TransactionChainListener listener) {
- throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
- }
-
- @Nonnull
- @Override
- public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
- return Collections.emptyMap();
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyReadTransaction;
+import org.opendaylight.netconf.topology.singleton.impl.tx.ProxyWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+public class ProxyDOMDataBroker implements DOMDataBroker {
+
+ private final Timeout askTimeout;
+ private final RemoteDeviceId id;
+ private final ActorRef masterNode;
+ private final ActorSystem actorSystem;
+
+ /**
+ * @param actorSystem system
+ * @param id id
+ * @param masterNode {@link org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor} ref
+ * @param askTimeout ask timeout
+ */
+ public ProxyDOMDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
+ final ActorRef masterNode, final Timeout askTimeout) {
+ this.id = id;
+ this.masterNode = masterNode;
+ this.actorSystem = actorSystem;
+ this.askTimeout = askTimeout;
+ }
+
+ @Override
+ public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+ final Future<Object> txActorFuture = Patterns.ask(masterNode, new NewReadTransactionRequest(), askTimeout);
+ try {
+ final Object msg = Await.result(txActorFuture, askTimeout.duration());
+ if (msg instanceof Throwable) {
+ throw (Throwable) msg;
+ }
+ Preconditions.checkState(msg instanceof NewReadTransactionReply);
+ final NewReadTransactionReply reply = (NewReadTransactionReply) msg;
+ return new ProxyReadTransaction(reply.getTxActor(), id, actorSystem, askTimeout);
+ } catch (final Throwable t) {
+ throw new IllegalStateException("Can't create ProxyReadTransaction", t);
+ }
+ }
+
+ @Override
+ public DOMDataReadWriteTransaction newReadWriteTransaction() {
+ return new ReadWriteTx(newReadOnlyTransaction(), newWriteOnlyTransaction());
+ }
+
+ @Override
+ public DOMDataWriteTransaction newWriteOnlyTransaction() {
+ final Future<Object> txActorFuture = Patterns.ask(masterNode, new NewWriteTransactionRequest(), askTimeout);
+ try {
+ final Object msg = Await.result(txActorFuture, askTimeout.duration());
+ if (msg instanceof Throwable) {
+ throw (Throwable) msg;
+ }
+ Preconditions.checkState(msg instanceof NewWriteTransactionReply);
+ final NewWriteTransactionReply reply = (NewWriteTransactionReply) msg;
+ return new ProxyWriteTransaction(reply.getTxActor(), id, actorSystem, askTimeout);
+ } catch (final Throwable t) {
+ throw new IllegalStateException("Can't create ProxyWriteTransaction", t);
+ }
+ }
+
+ @Override
+ public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(
+ final LogicalDatastoreType store, final YangInstanceIdentifier path, final DOMDataChangeListener listener,
+ final DataChangeScope triggeringScope) {
+ throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point");
+ }
+
+ @Override
+ public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
+ throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
+ }
+
+ @Nonnull
+ @Override
+ public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
+ return Collections.emptyMap();
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl;
-
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RemoteOperationTxProcessorImpl implements RemoteOperationTxProcessor, AutoCloseable {
-
- private static final Logger LOG = LoggerFactory.getLogger(RemoteOperationTxProcessorImpl.class);
-
- private final DOMDataBroker dataBroker;
- private final RemoteDeviceId id;
- private DOMDataWriteTransaction writeTx;
- private DOMDataReadOnlyTransaction readTx;
-
- private ActorRef currentUser = null;
-
- public RemoteOperationTxProcessorImpl(final DOMDataBroker dataBroker, final RemoteDeviceId id) {
- this.dataBroker = dataBroker;
- this.id = id;
- this.readTx = dataBroker.newReadOnlyTransaction();
- }
-
- @Override
- public void doOpenTransaction(ActorRef recipient, ActorRef sender) {
- if (currentUser != null) {
- LOG.error("{}: Opening a new transaction for {} failed.", id, recipient);
- recipient.tell(new Status.Failure(
- new IllegalStateException("Transaction is already opened for another user")), recipient);
- return;
- }
-
- LOG.debug("{}: Opening a new transaction for {}", id, recipient);
- currentUser = recipient;
- recipient.tell(new Status.Success(null), sender);
- }
-
- @Override
- public void doDelete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
- if (writeTx == null) {
- writeTx = dataBroker.newWriteOnlyTransaction();
- }
- writeTx.delete(store, path);
- }
-
- @Override
- public void doSubmit(final ActorRef recipient, final ActorRef sender) {
- currentUser = null;
- if (writeTx != null) {
- CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
- Futures.addCallback(submitFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void result) {
- recipient.tell(new SubmitReply(), sender);
- }
-
- @Override
- public void onFailure(@Nonnull Throwable throwable) {
- recipient.tell(throwable, sender);
- }
- });
- } else {
- recipient.tell(new SubmitFailedReply(), sender);
- LOG.warn("{}: Couldn't submit transaction because it was already closed.", id);
- }
- }
-
- @Override
- public void doCancel(final ActorRef recipient, final ActorRef sender) {
- currentUser = null;
- boolean cancel = false;
- if (writeTx != null) {
- cancel = writeTx.cancel();
- }
- recipient.tell(cancel, sender);
-
- }
-
- @Override
- public void doPut(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
- if (writeTx == null) {
- writeTx = dataBroker.newWriteOnlyTransaction();
- }
- writeTx.put(store, data.getIdentifier(), data.getNode());
- }
-
- @Override
- public void doMerge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
- if (writeTx == null) {
- writeTx = dataBroker.newWriteOnlyTransaction();
- }
- writeTx.merge(store, data.getIdentifier(), data.getNode());
- }
-
- @Override
- public void doRead(final LogicalDatastoreType store, final YangInstanceIdentifier path, final ActorRef recipient,
- final ActorRef sender) {
- final CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> readFuture =
- readTx.read(store, path);
-
- Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
-
- @Override
- public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
- if (!result.isPresent()) {
- recipient.tell(new EmptyReadResponse(), sender);
- return;
- }
- recipient.tell(new NormalizedNodeMessage(path, result.get()), sender);
- }
-
- @Override
- public void onFailure(@Nonnull final Throwable throwable) {
- recipient.tell(throwable, sender);
- }
- });
- }
-
- @Override
- public void doExists(final LogicalDatastoreType store, final YangInstanceIdentifier path, final ActorRef recipient,
- final ActorRef sender) {
- final CheckedFuture<Boolean, ReadFailedException> readFuture =
- readTx.exists(store, path);
- Futures.addCallback(readFuture, new FutureCallback<Boolean>() {
- @Override
- public void onSuccess(final Boolean result) {
- if (result == null) {
- recipient.tell(false, sender);
- } else {
- recipient.tell(result, sender);
- }
- }
-
- @Override
- public void onFailure(@Nonnull final Throwable throwable) {
- recipient.tell(throwable, sender);
- }
- });
- }
-
- @Override
- public void close() throws Exception {
- currentUser = null;
- if (readTx != null) {
- readTx.close();
- }
- }
-}
import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalProvider;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfProxyDOMTransaction;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final ActorRef masterActorRef) {
final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
- final NetconfDOMTransaction proxyDOMTransactions =
- new NetconfProxyDOMTransaction(id, actorSystem, masterActorRef, actorResponseWaitTime);
-
- final NetconfDOMDataBroker netconfDeviceDataBroker =
- new NetconfDOMDataBroker(actorSystem, id, proxyDOMTransactions);
+ final ProxyDOMDataBroker netconfDeviceDataBroker =
+ new ProxyDOMDataBroker(actorSystem, id, masterActorRef, actorResponseWaitTime);
salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, netconfDeviceDataBroker,
deviceRpc, notificationService);
import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
-import org.opendaylight.netconf.topology.singleton.impl.RemoteOperationTxProcessorImpl;
import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.OpenTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
private final SchemaSourceRegistry schemaRegistry;
private final SchemaRepository schemaRepository;
- private RemoteOperationTxProcessor operationsProcessor;
private List<SourceIdentifier> sourceIdentifiers;
private DOMRpcService deviceRpc;
private SlaveSalFacade slaveSalManager;
private final Timeout actorResponseWaitTime;
+ private DOMDataBroker deviceDataBroker;
+ //readTxActor can be shared
+ private ActorRef readTxActor;
public static Props props(final NetconfTopologySetup setup,
final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
public void onReceive(final Object message) throws Exception {
if (message instanceof CreateInitialMasterActorData) { // master
- sourceIdentifiers = ((CreateInitialMasterActorData) message).getSourceIndentifiers();
- operationsProcessor =
- new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(),
- id);
- this.deviceRpc = ((CreateInitialMasterActorData) message).getDeviceRpc();
+ final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
+ sourceIdentifiers = masterActorData.getSourceIndentifiers();
+ this.deviceDataBroker = masterActorData.getDeviceDataBroker();
+ final DOMDataReadOnlyTransaction tx = deviceDataBroker.newReadOnlyTransaction();
+ readTxActor = context().actorOf(ReadTransactionActor.props(tx));
+ this.deviceRpc = masterActorData.getDeviceRpc();
sender().tell(new MasterActorDataInitialized(), self());
id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
sender().tell(new MasterActorDataInitialized(), self());
} else if (message instanceof AskForMasterMountPoint) { // master
- // only master contains reference to operations processor
- if (operationsProcessor != null) {
+ // only master contains reference to deviceDataBroker
+ if (deviceDataBroker != null) {
getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
}
- } else if (message instanceof TransactionRequest) { // master
-
- resolveProxyCalls(message, sender(), getSelf());
-
} else if (message instanceof YangTextSchemaSourceRequest) { // master
final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
- } else if (message instanceof InvokeRpcMessage) {
+ } else if (message instanceof NewReadTransactionRequest) { // master
+
+ sender().tell(new NewReadTransactionReply(readTxActor), self());
+
+ } else if (message instanceof NewWriteTransactionRequest) { // master
+ try {
+ final DOMDataWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
+ final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx));
+ sender().tell(new NewWriteTransactionReply(txActor), self());
+ } catch (final Throwable t) {
+ sender().tell(t, self());
+ }
+
+ } else if (message instanceof InvokeRpcMessage) { // master
final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
}
}
- private void resolveProxyCalls(final Object message, final ActorRef recipient, final ActorRef futureSender) {
- if (message instanceof OpenTransaction) {
- operationsProcessor.doOpenTransaction(recipient, futureSender);
- } else if (message instanceof ReadRequest) {
-
- final ReadRequest readRequest = (ReadRequest) message;
- operationsProcessor.doRead(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
-
- } else if (message instanceof ExistsRequest) {
-
- final ExistsRequest readRequest = (ExistsRequest) message;
- operationsProcessor.doExists(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
-
- } else if (message instanceof MergeRequest) {
-
- final MergeRequest mergeRequest = (MergeRequest) message;
- operationsProcessor.doMerge(mergeRequest.getStore(), mergeRequest.getNormalizedNodeMessage());
-
- } else if (message instanceof PutRequest) {
-
- final PutRequest putRequest = (PutRequest) message;
- operationsProcessor.doPut(putRequest.getStore(), putRequest.getNormalizedNodeMessage());
-
- } else if (message instanceof DeleteRequest) {
-
- final DeleteRequest deleteRequest = (DeleteRequest) message;
- operationsProcessor.doDelete(deleteRequest.getStore(), deleteRequest.getPath());
-
- } else if (message instanceof CancelRequest) {
-
- operationsProcessor.doCancel(recipient, futureSender);
-
- } else if (message instanceof SubmitRequest) {
-
- operationsProcessor.doSubmit(recipient, futureSender);
- }
- }
private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
final CheckedFuture<YangTextSchemaSource, SchemaSourceException> yangTextSchemaSource =
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * ReadTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes.
+ */
+public class ReadTransactionActor extends UntypedActor {
+
+ private final DOMDataReadOnlyTransaction tx;
+
+ /**
+ * Creates new actor Props.
+ *
+ * @param tx delegate device read transaction
+ * @return props
+ */
+ static Props props(final DOMDataReadOnlyTransaction tx) {
+ return Props.create(ReadTransactionActor.class, () -> new ReadTransactionActor(tx));
+ }
+
+ private ReadTransactionActor(final DOMDataReadOnlyTransaction tx) {
+ this.tx = tx;
+ }
+
+ @Override
+ public void onReceive(final Object message) throws Throwable {
+ if (message instanceof ReadRequest) {
+
+ final ReadRequest readRequest = (ReadRequest) message;
+ final YangInstanceIdentifier path = readRequest.getPath();
+ final LogicalDatastoreType store = readRequest.getStore();
+ read(path, store, sender(), self());
+
+ } else if (message instanceof ExistsRequest) {
+ final ExistsRequest readRequest = (ExistsRequest) message;
+ final YangInstanceIdentifier path = readRequest.getPath();
+ final LogicalDatastoreType store = readRequest.getStore();
+ exists(path, store, sender(), self());
+
+ } else {
+ unhandled(message);
+ }
+ }
+
+ private void read(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender,
+ final ActorRef self) {
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(store, path);
+ Futures.addCallback(read, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+
+ @Override
+ public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
+ if (!result.isPresent()) {
+ sender.tell(new EmptyReadResponse(), self);
+ return;
+ }
+ sender.tell(new NormalizedNodeMessage(path, result.get()), self);
+ }
+
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ sender.tell(throwable, self);
+ }
+ });
+ }
+
+ private void exists(final YangInstanceIdentifier path, final LogicalDatastoreType store, final ActorRef sender,
+ final ActorRef self) {
+ final CheckedFuture<Boolean, ReadFailedException> readFuture = tx.exists(store, path);
+ Futures.addCallback(readFuture, new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(final Boolean result) {
+ if (result == null) {
+ sender.tell(false, self);
+ } else {
+ sender.tell(result, self);
+ }
+ }
+
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ sender.tell(throwable, self);
+ }
+ });
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+
+/**
+ * WriteTransactionActor is an interface to device's {@link DOMDataReadOnlyTransaction} for cluster nodes.
+ */
+public class WriteTransactionActor extends UntypedActor {
+
+ private final DOMDataWriteTransaction tx;
+
+ /**
+ * Creates new actor Props.
+ *
+ * @param tx delegate device write transaction
+ * @return props
+ */
+ static Props props(final DOMDataWriteTransaction tx) {
+ return Props.create(WriteTransactionActor.class, () -> new WriteTransactionActor(tx));
+ }
+
+ private WriteTransactionActor(final DOMDataWriteTransaction tx) {
+ this.tx = tx;
+ }
+
+ @Override
+ public void onReceive(final Object message) throws Throwable {
+ if (message instanceof MergeRequest) {
+ final MergeRequest mergeRequest = (MergeRequest) message;
+ final NormalizedNodeMessage data = mergeRequest.getNormalizedNodeMessage();
+ tx.merge(mergeRequest.getStore(), data.getIdentifier(), data.getNode());
+ } else if (message instanceof PutRequest) {
+ final PutRequest putRequest = (PutRequest) message;
+ final NormalizedNodeMessage data = putRequest.getNormalizedNodeMessage();
+ tx.put(putRequest.getStore(), data.getIdentifier(), data.getNode());
+ } else if (message instanceof DeleteRequest) {
+ final DeleteRequest deleteRequest = (DeleteRequest) message;
+ tx.delete(deleteRequest.getStore(), deleteRequest.getPath());
+ } else if (message instanceof CancelRequest) {
+ cancel();
+ } else if (message instanceof SubmitRequest) {
+ submit(sender(), self());
+ } else {
+ unhandled(message);
+ }
+ }
+
+ private void cancel() {
+ final boolean cancelled = tx.cancel();
+ sender().tell(cancelled, self());
+ context().stop(self());
+ }
+
+ private void submit(final ActorRef requester, final ActorRef self) {
+ final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+ context().stop(self);
+ Futures.addCallback(submitFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ requester.tell(new SubmitReply(), self);
+ }
+
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ requester.tell(throwable, self);
+ }
+ });
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
-import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.impl.Promise.DefaultPromise;
-
-public class NetconfMasterDOMTransaction implements NetconfDOMTransaction {
-
- private static final Logger LOG = LoggerFactory.getLogger(NetconfMasterDOMTransaction.class);
-
- private final RemoteDeviceId id;
- private final DOMDataBroker delegateBroker;
-
- private DOMDataReadOnlyTransaction readTx;
- private DOMDataWriteTransaction writeTx;
-
- public NetconfMasterDOMTransaction(final RemoteDeviceId id,
- final SchemaContext schemaContext,
- final DOMRpcService rpc,
- final NetconfSessionPreferences netconfSessionPreferences) {
- this(id, new NetconfDeviceDataBroker(id, schemaContext, rpc, netconfSessionPreferences));
- }
-
- public NetconfMasterDOMTransaction(final RemoteDeviceId id, final DOMDataBroker delegateBroker) {
- this.id = id;
- this.delegateBroker = delegateBroker;
-
- // only ever need 1 readTx since it doesnt need to be closed
- readTx = delegateBroker.newReadOnlyTransaction();
- }
-
- @Override
- public void openTransaction() {
- // TODO We don't have to do anything here since
- // NetconfProxyDOMTransactions and RemoteOperationTxProcessor do all
- // the work regarding opening transactions. But maybe we should check
- // for open transaction here instead in RemoteOperationTxProcessor
- }
-
- @Override
- public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store,
- final YangInstanceIdentifier path) {
- LOG.trace("{}: Read[{}] {} via NETCONF: {}", id, readTx.getIdentifier(), store, path);
-
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture = readTx.read(store, path);
-
- final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
- Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
- @Override
- public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
- if (!result.isPresent()) {
- promise.success(Optional.absent());
- } else {
- promise.success(Optional.of(new NormalizedNodeMessage(path, result.get())));
- }
- }
-
- @Override
- public void onFailure(@Nonnull final Throwable throwable) {
- promise.failure(throwable);
- }
- });
- return promise.future();
- }
-
- @Override
- public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
- LOG.trace("{}: Exists[{}] {} via NETCONF: {}", id, readTx.getIdentifier(), store, path);
-
- final CheckedFuture<Boolean, ReadFailedException> existsFuture = readTx.exists(store, path);
-
- final DefaultPromise<Boolean> promise = new DefaultPromise<>();
- Futures.addCallback(existsFuture, new FutureCallback<Boolean>() {
- @Override
- public void onSuccess(final Boolean result) {
- promise.success(result);
- }
-
- @Override
- public void onFailure(@Nonnull final Throwable throwable) {
- promise.failure(throwable);
- }
- });
- return promise.future();
- }
-
- @Override
- public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
- if (writeTx == null) {
- writeTx = delegateBroker.newWriteOnlyTransaction();
- }
-
- LOG.trace("{}: Write[{}] {} via NETCONF: {} with payload {}", id, writeTx.getIdentifier(), store,
- data.getIdentifier(), data.getNode());
-
- writeTx.put(store, data.getIdentifier(), data.getNode());
- }
-
- @Override
- public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
- if (writeTx == null) {
- writeTx = delegateBroker.newWriteOnlyTransaction();
- }
-
- LOG.trace("{}: Merge[{}] {} via NETCONF: {} with payload {}", id, writeTx.getIdentifier(),store,
- data.getIdentifier(), data.getNode());
-
- writeTx.merge(store, data.getIdentifier(), data.getNode());
- }
-
- @Override
- public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
- if (writeTx == null) {
- writeTx = delegateBroker.newWriteOnlyTransaction();
- }
-
- LOG.trace("{}: Delete[{}} {} via NETCONF: {}", id, writeTx.getIdentifier(), store, path);
-
- writeTx.delete(store, path);
- }
-
- @Override
- public boolean cancel() {
- LOG.trace("{}: Cancel[{}} via NETCONF", id, writeTx.getIdentifier());
- try {
- return writeTx.cancel();
- } finally {
- writeTx = null;
- }
- }
-
- @Override
- public Future<Void> submit() {
- LOG.trace("{}: Submit[{}} via NETCONF", id, writeTx.getIdentifier());
-
- final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
- writeTx = null;
-
- final DefaultPromise<Void> promise = new DefaultPromise<>();
- Futures.addCallback(submitFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- promise.success(result);
- }
-
- @Override
- public void onFailure(@Nonnull final Throwable throwable) {
- promise.failure(throwable);
- }
- });
- return promise.future();
- }
-
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-import org.opendaylight.controller.config.util.xml.DocumentedException;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.OpenTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.impl.Promise.DefaultPromise;
-
-
-public class NetconfProxyDOMTransaction implements NetconfDOMTransaction {
-
- private static final Logger LOG = LoggerFactory.getLogger(NetconfProxyDOMTransaction.class);
-
- private final RemoteDeviceId id;
- private final ActorSystem actorSystem;
- private final ActorRef masterContextRef;
- private final Timeout actorResponseWaitTime;
-
- public NetconfProxyDOMTransaction(final RemoteDeviceId id,
- final ActorSystem actorSystem,
- final ActorRef masterContextRef,
- final Timeout actorResponseWaitTime) {
- this.id = id;
- this.actorSystem = actorSystem;
- this.masterContextRef = masterContextRef;
- this.actorResponseWaitTime = actorResponseWaitTime;
- }
-
- @Override
- public void openTransaction() {
- // TODO we can do some checking for already opened transaction also
- // here in this class. We can track open transaction at least for this
- // node.
- LOG.debug("{}: Requesting leader {} to open new transaction", id, masterContextRef);
- final Future<Object> openTxFuture =
- Patterns.ask(masterContextRef, new OpenTransaction(), actorResponseWaitTime);
- try {
- // we have to wait here so we can see if tx can be opened
- Await.result(openTxFuture, actorResponseWaitTime.duration());
- LOG.debug("{}: New transaction opened successfully", id);
- } catch (final Exception e) {
- LOG.error("{}: Failed to open new transaction", id, e);
- Throwables.propagate(e);
- }
- }
-
- @Override
- public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store,
- final YangInstanceIdentifier path) {
-
- final Future<Object> readScalaFuture =
- Patterns.ask(masterContextRef, new ReadRequest(store, path), actorResponseWaitTime);
-
- LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
-
- final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
-
- readScalaFuture.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable failure, final Object success) throws Throwable {
- if (failure != null) { // ask timeout
- final Exception exception = new DocumentedException(
- id + ":Master is down. Please try again.",
- DocumentedException.ErrorType.TRANSPORT,
- DocumentedException.ErrorTag.RESOURCE_DENIED,
- DocumentedException.ErrorSeverity.ERROR);
- promise.failure(exception);
- return;
- }
- if (success instanceof Throwable) { // Error sended by master
- promise.failure((Throwable) success);
- return;
- }
- if (success instanceof EmptyReadResponse) {
- promise.success(Optional.absent());
- return;
- }
- promise.success(Optional.of((NormalizedNodeMessage) success));
- }
- }, actorSystem.dispatcher());
-
- return promise.future();
- }
-
- @Override
- public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
- final Future<Object> existsScalaFuture =
- Patterns.ask(masterContextRef, new ExistsRequest(store, path), actorResponseWaitTime);
-
- LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
-
- final DefaultPromise<Boolean> promise = new DefaultPromise<>();
- existsScalaFuture.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable failure, final Object success) throws Throwable {
- if (failure != null) { // ask timeout
- final Exception exception = new DocumentedException(
- id + ":Master is down. Please try again.",
- DocumentedException.ErrorType.TRANSPORT,
- DocumentedException.ErrorTag.RESOURCE_DENIED,
- DocumentedException.ErrorSeverity.ERROR);
- promise.failure(exception);
- return;
- }
- if (success instanceof Throwable) {
- promise.failure((Throwable) success);
- return;
- }
- promise.success((Boolean) success);
- }
- }, actorSystem.dispatcher());
- return promise.future();
- }
-
- @Override
- public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
- LOG.trace("{}: Write {} via NETCONF: {} with payload {}", id, store, data.getIdentifier(), data.getNode());
-
- masterContextRef.tell(new PutRequest(store, data), ActorRef.noSender());
- }
-
- @Override
- public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
- LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, data.getIdentifier(), data.getNode());
-
- masterContextRef.tell(new MergeRequest(store, data), ActorRef.noSender());
- }
-
- @Override
- public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
- LOG.trace("{}: Delete {} via NETCONF: {}", id, store, path);
-
- masterContextRef.tell(new DeleteRequest(store, path), ActorRef.noSender());
- }
-
- @Override
- public boolean cancel() {
- final Future<Object> cancelScalaFuture =
- Patterns.ask(masterContextRef, new CancelRequest(), actorResponseWaitTime);
-
- LOG.trace("{}: Cancel {} via NETCONF", id);
-
- try {
- // here must be Await because AsyncWriteTransaction do not return future
- return (boolean) Await.result(cancelScalaFuture, actorResponseWaitTime.duration());
- } catch (Exception e) {
- return false;
- }
- }
-
- @Override
- public Future<Void> submit() {
- final Future<Object> submitScalaFuture =
- Patterns.ask(masterContextRef, new SubmitRequest(), actorResponseWaitTime);
-
- LOG.trace("{}: Submit {} via NETCONF", id);
-
- final DefaultPromise<Void> promise = new DefaultPromise<>();
-
- submitScalaFuture.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable failure, final Object success) throws Throwable {
- if (failure != null) { // ask timeout
- final Exception exception = new DocumentedException(
- id + ":Master is down. Please try again.",
- DocumentedException.ErrorType.TRANSPORT,
- DocumentedException.ErrorTag.RESOURCE_DENIED,
- DocumentedException.ErrorSeverity.ERROR);
- promise.failure(exception);
- return;
- }
- if (success instanceof Throwable) {
- promise.failure((Throwable) success);
- } else {
- if (success instanceof SubmitFailedReply) {
- LOG.error("{}: Transaction was not submitted because already closed.", id);
- }
- promise.success(null);
- }
- }
- }, actorSystem.dispatcher());
-
- return promise.future();
- }
-
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.SettableFuture;
-import javax.annotation.Nullable;
-import org.opendaylight.controller.config.util.xml.DocumentedException;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-public class NetconfReadOnlyTransaction implements DOMDataReadOnlyTransaction {
-
- private static final Logger LOG = LoggerFactory.getLogger(NetconfReadOnlyTransaction.class);
-
- private final RemoteDeviceId id;
- private final NetconfDOMTransaction delegate;
- private final ActorSystem actorSystem;
-
- public NetconfReadOnlyTransaction(final RemoteDeviceId id,
- final ActorSystem actorSystem,
- final NetconfDOMTransaction delegate) {
- this.id = id;
- this.delegate = delegate;
- this.actorSystem = actorSystem;
- }
-
- @Override
- public void close() {
- //NOOP
- }
-
- @Override
- public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
- final YangInstanceIdentifier path) {
-
- LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
-
- final Future<Optional<NormalizedNodeMessage>> future = delegate.read(store, path);
- final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> checkedFuture;
- checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
- @Nullable
- @Override
- public ReadFailedException apply(Exception cause) {
- if (cause.getCause() instanceof DocumentedException) {
- final DocumentedException exception = (DocumentedException) cause.getCause();
- if (exception.getErrorSeverity() == DocumentedException.ErrorSeverity.ERROR &&
- exception.getErrorType() == DocumentedException.ErrorType.TRANSPORT &&
- exception.getErrorTag() == DocumentedException.ErrorTag.RESOURCE_DENIED) {
- final RpcError error = RpcResultBuilder.newError(
- RpcError.ErrorType.TRANSPORT,
- exception.getErrorTag().getTagValue(),
- exception.getMessage());
- return new ReadFailedException("Read from transaction failed", error);
- }
- }
- return new ReadFailedException("Read from transaction failed", cause);
- }
- });
- future.onComplete(new OnComplete<Optional<NormalizedNodeMessage>>() {
- @Override
- public void onComplete(final Throwable throwable,
- final Optional<NormalizedNodeMessage> normalizedNodeMessage) throws Throwable {
- if (throwable == null) {
- if (normalizedNodeMessage.isPresent()) {
- settableFuture.set(normalizedNodeMessage.transform(new Function<NormalizedNodeMessage,
- NormalizedNode<?, ?>>() {
-
- @Nullable
- @Override
- public NormalizedNode<?, ?> apply(final NormalizedNodeMessage input) {
- return input.getNode();
- }
- }));
- } else {
- settableFuture.set(Optional.absent());
- }
- } else {
- settableFuture.setException(throwable);
- }
- }
- }, actorSystem.dispatcher());
- return checkedFuture;
- }
-
- @Override
- public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
- final YangInstanceIdentifier path) {
-
- LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
-
- final Future<Boolean> existsFuture = delegate.exists(store, path);
- final SettableFuture<Boolean> settableFuture = SettableFuture.create();
- final CheckedFuture<Boolean, ReadFailedException> checkedFuture;
- checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
- @Nullable
- @Override
- public ReadFailedException apply(Exception cause) {
- return new ReadFailedException("Read from transaction failed", cause);
- }
- });
- existsFuture.onComplete(new OnComplete<Boolean>() {
- @Override
- public void onComplete(final Throwable throwable, final Boolean result) throws Throwable {
- if (throwable == null) {
- settableFuture.set(result);
- } else {
- settableFuture.setException(throwable);
- }
- }
- }, actorSystem.dispatcher());
- return checkedFuture;
- }
-
- @Override
- public Object getIdentifier() {
- return this;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.netconf.topology.singleton.impl.tx;
-
-import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-public class NetconfWriteOnlyTransaction implements DOMDataWriteTransaction {
-
- private static final Logger LOG = LoggerFactory.getLogger(NetconfWriteOnlyTransaction.class);
-
- private final RemoteDeviceId id;
- private final NetconfDOMTransaction delegate;
- private final ActorSystem actorSystem;
-
- public NetconfWriteOnlyTransaction(final RemoteDeviceId id,
- final ActorSystem actorSystem,
- final NetconfDOMTransaction delegate) {
- this.id = id;
- this.delegate = delegate;
- this.actorSystem = actorSystem;
-
- this.delegate.openTransaction();
- }
-
- @Override
- public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
- final NormalizedNode<?,?> data) {
- LOG.trace("{}: Write {} via NETCONF: {} with payload {}", id, store, path, data);
-
- delegate.put(store, new NormalizedNodeMessage(path, data));
- }
-
- @Override
- public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
- final NormalizedNode<?,?> data) {
- LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, path, data);
-
- delegate.merge(store, new NormalizedNodeMessage(path, data));
- }
-
- @Override
- public boolean cancel() {
- LOG.trace("{}: Cancel", id);
-
- return delegate.cancel();
- }
-
- @Override
- public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
- LOG.trace("{}: Delete {} via NETCONF: {}", id, store, path);
-
- delegate.delete(store, path);
- }
-
- @Override
- public CheckedFuture<Void, TransactionCommitFailedException> submit() {
- LOG.trace("{}: Submit", id);
-
- final Future<Void> submit = delegate.submit();
- final SettableFuture<Void> settFuture = SettableFuture.create();
- final CheckedFuture<Void, TransactionCommitFailedException> checkedFuture;
- checkedFuture = Futures.makeChecked(settFuture, new Function<Exception, TransactionCommitFailedException>() {
- @Nullable
- @Override
- public TransactionCommitFailedException apply(Exception input) {
- return new TransactionCommitFailedException("Transaction commit failed", input);
- }
- });
- submit.onComplete(new OnComplete<Void>() {
- @Override
- public void onComplete(Throwable throwable, Void object) throws Throwable {
- if (throwable == null) {
- settFuture.set(object);
- } else {
- settFuture.setException(throwable);
- }
- }
- }, actorSystem.dispatcher());
- return checkedFuture;
- }
-
- @Override
- public ListenableFuture<RpcResult<TransactionStatus>> commit() {
- LOG.trace("{}: Commit", id);
-
- final Future<Void> commit = delegate.submit();
- final SettableFuture<RpcResult<TransactionStatus>> settFuture = SettableFuture.create();
- commit.onComplete(new OnComplete<Void>() {
- @Override
- public void onComplete(final Throwable throwable, final Void result) throws Throwable {
- if (throwable == null) {
- TransactionStatus status = TransactionStatus.SUBMITED;
- RpcResult<TransactionStatus> rpcResult = RpcResultBuilder.success(status).build();
- settFuture.set(rpcResult);
- } else {
- settFuture.setException(throwable);
- }
- }
- }, actorSystem.dispatcher());
- return settFuture;
- }
-
- @Override
- public Object getIdentifier() {
- return this;
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * ProxyReadTransaction uses provided {@link ActorRef} to delegate method calls to master
+ * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActor}.
+ */
+public class ProxyReadTransaction implements DOMDataReadOnlyTransaction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ProxyReadTransaction.class);
+
+ private final ActorRef masterTxActor;
+ private final RemoteDeviceId id;
+ private final ActorSystem actorSystem;
+ private final Timeout askTimeout;
+
+ /**
+ * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadTransactionActor} ref
+ * @param id device id
+ * @param actorSystem system
+ * @param askTimeout
+ */
+ public ProxyReadTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
+ final Timeout askTimeout) {
+ this.masterTxActor = masterTxActor;
+ this.id = id;
+ this.actorSystem = actorSystem;
+ this.askTimeout = askTimeout;
+ }
+
+ @Override
+ public void close() {
+ //noop
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
+
+ final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure,
+ final Object success) throws Throwable {
+ if (failure != null) { // ask timeout
+ final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
+ settableFuture.setException(exception);
+ return;
+ }
+ if (success instanceof Throwable) { // Error sended by master
+ settableFuture.setException((Throwable) success);
+ return;
+ }
+ if (success instanceof EmptyReadResponse) {
+ settableFuture.set(Optional.absent());
+ return;
+ }
+ if (success instanceof NormalizedNodeMessage) {
+ final NormalizedNodeMessage data = (NormalizedNodeMessage) success;
+ settableFuture.set(Optional.of(data.getNode()));
+ }
+ }
+ }, actorSystem.dispatcher());
+ return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ final Future<Object> existsScalaFuture =
+ Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
+
+ LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
+
+ final SettableFuture<Boolean> settableFuture = SettableFuture.create();
+ existsScalaFuture.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object success) throws Throwable {
+ if (failure != null) { // ask timeout
+ final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
+ settableFuture.setException(exception);
+ return;
+ }
+ if (success instanceof Throwable) {
+ settableFuture.setException((Throwable) success);
+ return;
+ }
+ settableFuture.set((Boolean) success);
+ }
+ }, actorSystem.dispatcher());
+ return Futures.makeChecked(settableFuture, ReadFailedException.MAPPER);
+ }
+
+
+ @Override
+ public Object getIdentifier() {
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+/**
+ * ProxyWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
+ * {@link org.opendaylight.netconf.topology.singleton.impl.actors.WriteTransactionActor}.
+ */
+public class ProxyWriteTransaction implements DOMDataWriteTransaction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ProxyWriteTransaction.class);
+
+ private final ActorRef masterTxActor;
+ private final RemoteDeviceId id;
+ private final ActorSystem actorSystem;
+ private final AtomicBoolean opened = new AtomicBoolean(true);
+ private final Timeout askTimeout;
+
+ /**
+ * @param masterTxActor {@link org.opendaylight.netconf.topology.singleton.impl.actors.WriteTransactionActor} ref
+ * @param id device id
+ * @param actorSystem system
+ * @param askTimeout
+ */
+ public ProxyWriteTransaction(final ActorRef masterTxActor, final RemoteDeviceId id, final ActorSystem actorSystem,
+ final Timeout askTimeout) {
+ this.masterTxActor = masterTxActor;
+ this.id = id;
+ this.actorSystem = actorSystem;
+ this.askTimeout = askTimeout;
+ }
+
+ @Override
+ public boolean cancel() {
+ if (!opened.compareAndSet(true, false)) {
+ return false;
+ }
+ final Future<Object> cancelScalaFuture =
+ Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
+
+ LOG.trace("{}: Cancel {} via NETCONF", id);
+
+ try {
+ // here must be Await because AsyncWriteTransaction do not return future
+ return (boolean) Await.result(cancelScalaFuture, askTimeout.duration());
+ } catch (final Exception e) {
+ return false;
+ }
+ }
+
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+ if (!opened.compareAndSet(true, false)) {
+ throw new IllegalStateException(id + ": Transaction" + getIdentifier() + " is closed");
+ }
+ final Future<Object> submitScalaFuture =
+ Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
+
+ LOG.trace("{}: Submit {} via NETCONF", id);
+
+ final SettableFuture<Void> settableFuture = SettableFuture.create();
+ submitScalaFuture.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object success) throws Throwable {
+ if (failure != null) { // ask timeout
+ final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
+ settableFuture.setException(exception);
+ return;
+ }
+ if (success instanceof Throwable) {
+ settableFuture.setException((Throwable) success);
+ } else {
+ if (success instanceof SubmitFailedReply) {
+ LOG.error("{}: Transaction was not submitted because already closed.", id);
+ }
+ settableFuture.set(null);
+ }
+ }
+ }, actorSystem.dispatcher());
+
+ return Futures.makeChecked(settableFuture, new Function<Exception, TransactionCommitFailedException>() {
+ @Nullable
+ @Override
+ public TransactionCommitFailedException apply(@Nullable final Exception input) {
+ final String message = "Submit of transaction " + getIdentifier() + " failed";
+ return new TransactionCommitFailedException(message, input);
+ }
+ });
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<TransactionStatus>> commit() {
+ LOG.trace("{}: Commit", id);
+
+ final CheckedFuture<Void, TransactionCommitFailedException> submit = submit();
+ return Futures.transform(submit, new Function<Void, RpcResult<TransactionStatus>>() {
+ @Nullable
+ @Override
+ public RpcResult<TransactionStatus> apply(@Nullable final Void input) {
+ return RpcResultBuilder.success(TransactionStatus.SUBMITED).build();
+ }
+ });
+ }
+
+ @Override
+ public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) {
+ Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
+ LOG.trace("{}: Delete {} via NETCONF: {}", id, store, identifier);
+ masterTxActor.tell(new DeleteRequest(store, identifier), ActorRef.noSender());
+ }
+
+ @Override
+ public void put(final LogicalDatastoreType store, final YangInstanceIdentifier identifier,
+ final NormalizedNode<?, ?> data) {
+ Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
+ final NormalizedNodeMessage msg = new NormalizedNodeMessage(identifier, data);
+ LOG.trace("{}: Put {} via NETCONF: {} with payload {}", id, store, identifier, data);
+ masterTxActor.tell(new PutRequest(store, msg), ActorRef.noSender());
+ }
+
+ @Override
+ public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier identifier,
+ final NormalizedNode<?, ?> data) {
+ Preconditions.checkState(opened.get(), "%s: Transaction was closed %s", id, getIdentifier());
+ final NormalizedNodeMessage msg = new NormalizedNodeMessage(identifier, data);
+ LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, identifier, data);
+ masterTxActor.tell(new MergeRequest(store, msg), ActorRef.noSender());
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return this;
+ }
+}
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.HashMap;
+import org.opendaylight.controller.config.util.xml.DocumentedException;
import java.util.Map;
import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemasResolverImpl;
public static InstanceIdentifier<Node> createTopologyNodePath(final String topologyId) {
return createTopologyListPath(topologyId).child(Node.class);
}
+
+ public static DocumentedException createMasterIsDownException(final RemoteDeviceId id) {
+ return new DocumentedException(id + ":Master is down. Please try again.",
+ DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.OPERATION_FAILED,
+ DocumentedException.ErrorSeverity.WARNING);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import akka.actor.ActorRef;
+import java.io.Serializable;
+
+public class NewReadTransactionReply implements Serializable {
+
+ private final ActorRef txActor;
+
+ public NewReadTransactionReply(final ActorRef txActor) {
+ this.txActor = txActor;
+ }
+
+ public ActorRef getTxActor() {
+ return txActor;
+ }
+}
/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
package org.opendaylight.netconf.topology.singleton.messages.transactions;
-/**
- * A message sent to MountPoint leader to open new transaction
- */
-public class OpenTransaction implements TransactionRequest {
- private static final long serialVersionUID = 1L;
+import java.io.Serializable;
+
+public class NewReadTransactionRequest implements Serializable {
+
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import akka.actor.ActorRef;
+import java.io.Serializable;
+
+public class NewWriteTransactionReply implements Serializable {
+
+ private final ActorRef txActor;
+
+ public NewWriteTransactionReply(final ActorRef txActor) {
+ this.txActor = txActor;
+ }
+
+ public ActorRef getTxActor() {
+ return txActor;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import java.io.Serializable;
+
+public class NewWriteTransactionRequest implements Serializable {
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.testkit.TestProbe;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+
+public class ReadTransactionActorTest {
+
+ private static final YangInstanceIdentifier path = YangInstanceIdentifier.EMPTY;
+ private static final LogicalDatastoreType store = LogicalDatastoreType.CONFIGURATION;
+
+ @Mock
+ private DOMDataReadOnlyTransaction deviceReadTx;
+ private TestProbe probe;
+ private ActorSystem system;
+ private TestActorRef<ReadTransactionActor> actorRef;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ system = ActorSystem.apply();
+ probe = TestProbe.apply(system);
+ actorRef = TestActorRef.create(system, ReadTransactionActor.props(deviceReadTx), "testA");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ JavaTestKit.shutdownActorSystem(system, null, true);
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ final ContainerNode node = Builders.containerBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+ .build();
+ when(deviceReadTx.read(store, path)).thenReturn(Futures.immediateCheckedFuture(Optional.of(node)));
+ actorRef.tell(new ReadRequest(store, path), probe.ref());
+ verify(deviceReadTx).read(store, path);
+ probe.expectMsgClass(NormalizedNodeMessage.class);
+ }
+
+ @Test
+ public void testReadEmpty() throws Exception {
+ when(deviceReadTx.read(store, path)).thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
+ actorRef.tell(new ReadRequest(store, path), probe.ref());
+ verify(deviceReadTx).read(store, path);
+ probe.expectMsgClass(EmptyReadResponse.class);
+ }
+
+ @Test
+ public void testReadFailure() throws Exception {
+ final ReadFailedException cause = new ReadFailedException("fail");
+ when(deviceReadTx.read(store, path)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+ actorRef.tell(new ReadRequest(store, path), probe.ref());
+ verify(deviceReadTx).read(store, path);
+ probe.expectMsg(cause);
+ }
+
+ @Test
+ public void testExists() throws Exception {
+ when(deviceReadTx.exists(store, path)).thenReturn(Futures.immediateCheckedFuture(true));
+ actorRef.tell(new ExistsRequest(store, path), probe.ref());
+ verify(deviceReadTx).exists(store, path);
+ probe.expectMsg(true);
+ }
+
+ @Test
+ public void testExistsFailure() throws Exception {
+ final ReadFailedException cause = new ReadFailedException("fail");
+ when(deviceReadTx.exists(store, path)).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+ actorRef.tell(new ExistsRequest(store, path), probe.ref());
+ verify(deviceReadTx).exists(store, path);
+ probe.expectMsg(cause);
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.testkit.TestProbe;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+public class WriteTransactionActorTest {
+ private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
+ private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+ private static final Timeout TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS);
+
+ @Mock
+ private DOMDataWriteTransaction deviceWriteTx;
+ private TestProbe probe;
+ private ActorSystem system;
+ private TestActorRef<WriteTransactionActor> actorRef;
+ private NormalizedNode<?, ?> node;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ system = ActorSystem.apply();
+ probe = TestProbe.apply(system);
+ node = Builders.containerBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+ .build();
+ actorRef = TestActorRef.create(system, WriteTransactionActor.props(deviceWriteTx), "testA");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ JavaTestKit.shutdownActorSystem(system, null, true);
+ }
+
+ @Test
+ public void testPut() throws Exception {
+ final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
+ actorRef.tell(new PutRequest(STORE, normalizedNodeMessage), probe.ref());
+ verify(deviceWriteTx).put(STORE, PATH, node);
+ }
+
+ @Test
+ public void testMerge() throws Exception {
+ final NormalizedNodeMessage normalizedNodeMessage = new NormalizedNodeMessage(PATH, node);
+ actorRef.tell(new MergeRequest(STORE, normalizedNodeMessage), probe.ref());
+ verify(deviceWriteTx).merge(STORE, PATH, node);
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ actorRef.tell(new DeleteRequest(STORE, PATH), probe.ref());
+ verify(deviceWriteTx).delete(STORE, PATH);
+ }
+
+ @Test
+ public void testCancel() throws Exception {
+ when(deviceWriteTx.cancel()).thenReturn(true);
+ final Future<Object> cancelFuture = Patterns.ask(actorRef, new CancelRequest(), TIMEOUT);
+ final Object result = Await.result(cancelFuture, TIMEOUT.duration());
+ Preconditions.checkState(result instanceof Boolean);
+ verify(deviceWriteTx).cancel();
+ Assert.assertTrue((Boolean) result);
+ }
+
+ @Test
+ public void testSubmit() throws Exception {
+ when(deviceWriteTx.submit()).thenReturn(Futures.immediateCheckedFuture(null));
+ final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
+ final Object result = Await.result(submitFuture, TIMEOUT.duration());
+ Assert.assertTrue(result instanceof SubmitReply);
+ verify(deviceWriteTx).submit();
+ }
+
+ @Test
+ public void testSubmitFail() throws Exception {
+ final RpcError rpcError =
+ RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "fail", "fail");
+ final TransactionCommitFailedException cause = new TransactionCommitFailedException("fail", rpcError);
+ when(deviceWriteTx.submit()).thenReturn(Futures.immediateFailedCheckedFuture(cause));
+ final Future<Object> submitFuture = Patterns.ask(actorRef, new SubmitRequest(), TIMEOUT);
+ final Object result = Await.result(submitFuture, TIMEOUT.duration());
+ Assert.assertEquals(cause, result);
+ verify(deviceWriteTx).submit();
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.config.util.xml.DocumentedException;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+
+public class ProxyReadTransactionTest {
+ private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
+ private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+
+ private ActorSystem system;
+ private TestProbe masterActor;
+ private ContainerNode node;
+ private ProxyReadTransaction tx;
+
+ @Before
+ public void setUp() throws Exception {
+ system = ActorSystem.apply();
+ masterActor = new TestProbe(system);
+ final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
+ node = Builders.containerBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+ .build();
+ tx = new ProxyReadTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ JavaTestKit.shutdownActorSystem(system, null, true);
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+ masterActor.expectMsgClass(ReadRequest.class);
+ masterActor.reply(new NormalizedNodeMessage(PATH, node));
+ final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
+ Assert.assertTrue(result.isPresent());
+ Assert.assertEquals(node, result.get());
+ }
+
+ @Test
+ public void testReadEmpty() throws Exception {
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+ masterActor.expectMsgClass(ReadRequest.class);
+ masterActor.reply(new EmptyReadResponse());
+ final Optional<NormalizedNode<?, ?>> result = read.checkedGet();
+ Assert.assertFalse(result.isPresent());
+ }
+
+ @Test(expected = ReadFailedException.class)
+ public void testReadFail() throws Exception {
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+ masterActor.expectMsgClass(ReadRequest.class);
+ masterActor.reply(new RuntimeException("fail"));
+ read.checkedGet();
+ }
+
+ @Test
+ public void testExists() throws Exception {
+ final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
+ masterActor.expectMsgClass(ExistsRequest.class);
+ masterActor.reply(true);
+ final Boolean result = read.checkedGet();
+ Assert.assertTrue(result);
+ }
+
+ @Test(expected = ReadFailedException.class)
+ public void testExistsFail() throws Exception {
+ final CheckedFuture<Boolean, ReadFailedException> read = tx.exists(STORE, PATH);
+ masterActor.expectMsgClass(ExistsRequest.class);
+ masterActor.reply(new RuntimeException("fail"));
+ read.checkedGet();
+ }
+
+ @Test
+ public void testMasterDownRead() throws Exception {
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = tx.read(STORE, PATH);
+ masterActor.expectMsgClass(ReadRequest.class);
+ //master doesn't reply
+ try {
+ read.checkedGet();
+ Assert.fail("Exception should be thrown");
+ } catch (final ReadFailedException e) {
+ final Throwable cause = e.getCause();
+ Assert.assertTrue(cause instanceof DocumentedException);
+ final DocumentedException de = (DocumentedException) cause;
+ Assert.assertEquals(DocumentedException.ErrorSeverity.WARNING, de.getErrorSeverity());
+ Assert.assertEquals(DocumentedException.ErrorTag.OPERATION_FAILED, de.getErrorTag());
+ Assert.assertEquals(DocumentedException.ErrorType.APPLICATION, de.getErrorType());
+ }
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import akka.util.Timeout;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+
+public class ProxyWriteTransactionTest {
+ private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
+ private static final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
+
+ private ActorSystem system;
+ private TestProbe masterActor;
+ private ContainerNode node;
+ private ProxyWriteTransaction tx;
+
+ @Before
+ public void setUp() throws Exception {
+ system = ActorSystem.apply();
+ masterActor = new TestProbe(system);
+ final RemoteDeviceId id = new RemoteDeviceId("dev1", InetSocketAddress.createUnresolved("localhost", 17830));
+ node = Builders.containerBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("cont")))
+ .build();
+ tx = new ProxyWriteTransaction(masterActor.ref(), id, system, Timeout.apply(5, TimeUnit.SECONDS));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ JavaTestKit.shutdownActorSystem(system, null, true);
+ }
+
+ @Test
+ public void testCancel() throws Exception {
+ final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
+ masterActor.expectMsgClass(CancelRequest.class);
+ masterActor.reply(true);
+ Assert.assertTrue(submit.get());
+ }
+
+ @Test
+ public void testCancelSubmitted() throws Exception {
+ final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+ masterActor.expectMsgClass(SubmitRequest.class);
+ masterActor.reply(new SubmitReply());
+ submitFuture.checkedGet();
+ final Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(() -> tx.cancel());
+ masterActor.expectNoMsg();
+ Assert.assertFalse(submit.get());
+ }
+
+ @Test
+ public void testSubmit() throws Exception {
+ final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+ masterActor.expectMsgClass(SubmitRequest.class);
+ masterActor.reply(new SubmitReply());
+ submitFuture.checkedGet();
+ }
+
+ @Test
+ public void testDoubleSubmit() throws Exception {
+ final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
+ masterActor.expectMsgClass(SubmitRequest.class);
+ masterActor.reply(new SubmitReply());
+ submitFuture.checkedGet();
+ try {
+ tx.submit().checkedGet();
+ Assert.fail("Should throw IllegalStateException");
+ } catch (final IllegalStateException e) {
+ masterActor.expectNoMsg();
+ }
+ }
+
+ @Test
+ public void testCommit() throws Exception {
+ final ListenableFuture<RpcResult<TransactionStatus>> submitFuture = tx.commit();
+ masterActor.expectMsgClass(SubmitRequest.class);
+ masterActor.reply(new SubmitReply());
+ Assert.assertEquals(TransactionStatus.SUBMITED, submitFuture.get().getResult());
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ tx.delete(STORE, PATH);
+ masterActor.expectMsgClass(DeleteRequest.class);
+ }
+
+ @Test
+ public void testDeleteClosed() throws Exception {
+ submit();
+ try {
+ tx.delete(STORE, PATH);
+ Assert.fail("Should throw IllegalStateException");
+ } catch (final IllegalStateException e) {
+ masterActor.expectNoMsg();
+ }
+ }
+
+ @Test
+ public void testPut() throws Exception {
+ tx.put(STORE, PATH, node);
+ masterActor.expectMsgClass(PutRequest.class);
+ }
+
+ @Test
+ public void testPutClosed() throws Exception {
+ submit();
+ try {
+ tx.put(STORE, PATH, node);
+ Assert.fail("Should throw IllegalStateException");
+ } catch (final IllegalStateException e) {
+ masterActor.expectNoMsg();
+ }
+ }
+
+ @Test
+ public void testMerge() throws Exception {
+ tx.merge(STORE, PATH, node);
+ masterActor.expectMsgClass(MergeRequest.class);
+ }
+
+ @Test
+ public void testMergeClosed() throws Exception {
+ submit();
+ try {
+ tx.merge(STORE, PATH, node);
+ Assert.fail("Should throw IllegalStateException");
+ } catch (final IllegalStateException e) {
+ masterActor.expectNoMsg();
+ }
+ }
+
+ @Test
+ public void testGetIdentifier() throws Exception {
+ Assert.assertEquals(tx, tx.getIdentifier());
+ }
+
+ private void submit() throws TransactionCommitFailedException {
+ final CheckedFuture<Void, TransactionCommitFailedException> submit = tx.submit();
+ masterActor.expectMsgClass(SubmitRequest.class);
+ masterActor.reply(new SubmitReply());
+ submit.checkedGet();
+ }
+
+}
\ No newline at end of file
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.NetconfDOMDataBroker;
+import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker;
import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
public final ExpectedException exception = ExpectedException.none();
private ActorRef masterRef;
- private NetconfDOMDataBroker slaveDataBroker;
- private DOMDataBroker masterDataBroker;
+ private ProxyDOMDataBroker slaveDataBroker;
private List<SourceIdentifier> sourceIdentifiers;
-
+ @Mock
+ private DOMDataBroker deviceDataBroker;
@Mock
private DOMDataReadOnlyTransaction readTx;
-
@Mock
private DOMRpcService domRpcService;
sourceIdentifiers = Lists.newArrayList();
- // Create master data broker
-
- final DOMDataBroker delegateDataBroker = mock(DOMDataBroker.class);
+ //device read tx
readTx = mock(DOMDataReadOnlyTransaction.class);
-
- doReturn(readTx).when(delegateDataBroker).newReadOnlyTransaction();
-
- final NetconfDOMTransaction masterDOMTransactions =
- new NetconfMasterDOMTransaction(remoteDeviceId, delegateDataBroker);
-
- masterDataBroker =
- new NetconfDOMDataBroker(system, remoteDeviceId, masterDOMTransactions);
+ doReturn(readTx).when(deviceDataBroker).newReadOnlyTransaction();
// Create slave data broker for testing proxy
-
- final NetconfDOMTransaction proxyDOMTransactions =
- new NetconfProxyDOMTransaction(remoteDeviceId, system, masterRef, TIMEOUT);
-
- slaveDataBroker = new NetconfDOMDataBroker(system, remoteDeviceId, proxyDOMTransactions);
-
-
+ slaveDataBroker =
+ new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS));
}
@After
public void teardown() {
- JavaTestKit.shutdownActorSystem(system);
+ JavaTestKit.shutdownActorSystem(system, null, true);
system = null;
}
private void initializeDataTest() throws Exception {
final Future<Object> initialDataToActor =
- Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers,
+ Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
domRpcService), TIMEOUT);
final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
-import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
-import org.opendaylight.netconf.topology.singleton.impl.NetconfDOMDataBroker;
+import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMDataBroker;
import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
public final ExpectedException exception = ExpectedException.none();
private ActorRef masterRef;
- private NetconfDOMDataBroker slaveDataBroker;
- private DOMDataBroker masterDataBroker;
+ private ProxyDOMDataBroker slaveDataBroker;
private List<SourceIdentifier> sourceIdentifiers;
-
+ @Mock
+ private DOMDataBroker deviceDataBroker;
@Mock
private DOMDataWriteTransaction writeTx;
-
@Mock
private DOMRpcService domRpcService;
sourceIdentifiers = Lists.newArrayList();
- // Create master data broker
-
- final DOMDataBroker delegateDataBroker = mock(DOMDataBroker.class);
writeTx = mock(DOMDataWriteTransaction.class);
final DOMDataReadOnlyTransaction readTx = mock(DOMDataReadOnlyTransaction.class);
- doReturn(writeTx).when(delegateDataBroker).newWriteOnlyTransaction();
- doReturn(readTx).when(delegateDataBroker).newReadOnlyTransaction();
-
- final NetconfDOMTransaction masterDOMTransactions =
- new NetconfMasterDOMTransaction(remoteDeviceId, delegateDataBroker);
-
- masterDataBroker =
- new NetconfDOMDataBroker(system, remoteDeviceId, masterDOMTransactions);
+ doReturn(writeTx).when(deviceDataBroker).newWriteOnlyTransaction();
+ doReturn(readTx).when(deviceDataBroker).newReadOnlyTransaction();
// Create slave data broker for testing proxy
-
- final NetconfDOMTransaction proxyDOMTransactions =
- new NetconfProxyDOMTransaction(remoteDeviceId, system, masterRef, TIMEOUT);
-
- slaveDataBroker = new NetconfDOMDataBroker(system, remoteDeviceId, proxyDOMTransactions);
-
-
+ slaveDataBroker =
+ new ProxyDOMDataBroker(system, remoteDeviceId, masterRef, Timeout.apply(5, TimeUnit.SECONDS));
}
@After
public void teardown() {
- JavaTestKit.shutdownActorSystem(system);
+ JavaTestKit.shutdownActorSystem(system, null, true);
system = null;
}
@Test
- public void testPutMergeDeleteCalls() throws Exception {
-
+ public void testPut() throws Exception {
/* Initialize data on master */
initializeDataTest();
doNothing().when(writeTx).put(storeType, instanceIdentifier, testNode);
- DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
+ final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
wTx.put(storeType, instanceIdentifier, testNode);
- verify(writeTx, times(1)).put(storeType, instanceIdentifier, testNode);
+ verify(writeTx, timeout(2000)).put(storeType, instanceIdentifier, testNode);
wTx.cancel();
+
+ }
+
+ @Test
+ public void testMerge() throws Exception {
+
+ /* Initialize data on master */
+
+ initializeDataTest();
+
+ final YangInstanceIdentifier instanceIdentifier = YangInstanceIdentifier.EMPTY;
+ final LogicalDatastoreType storeType = LogicalDatastoreType.CONFIGURATION;
+ final NormalizedNode<?, ?> testNode = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname")))
+ .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build();
// Test of invoking merge on master through slave proxy
doNothing().when(writeTx).merge(storeType, instanceIdentifier, testNode);
- wTx = slaveDataBroker.newWriteOnlyTransaction();
+ final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
wTx.merge(storeType, instanceIdentifier, testNode);
- verify(writeTx, times(1)).merge(storeType, instanceIdentifier, testNode);
+ verify(writeTx, timeout(2000)).merge(storeType, instanceIdentifier, testNode);
wTx.cancel();
+
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+
+ /* Initialize data on master */
+
+ initializeDataTest();
+
+ final YangInstanceIdentifier instanceIdentifier = YangInstanceIdentifier.EMPTY;
+ final LogicalDatastoreType storeType = LogicalDatastoreType.CONFIGURATION;
// Test of invoking delete on master through slave proxy
doNothing().when(writeTx).delete(storeType, instanceIdentifier);
- wTx = slaveDataBroker.newWriteOnlyTransaction();
+ final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
wTx.delete(storeType, instanceIdentifier);
wTx.cancel();
- verify(writeTx, times(1)).delete(storeType, instanceIdentifier);
+ verify(writeTx, timeout(2000)).delete(storeType, instanceIdentifier);
}
// Without Tx
- DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
- final CheckedFuture<Void,TransactionCommitFailedException> resultSubmit = Futures.immediateCheckedFuture(null);
+ final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
+ final CheckedFuture<Void, TransactionCommitFailedException> resultSubmit = Futures.immediateCheckedFuture(null);
doReturn(resultSubmit).when(writeTx).submit();
final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitResponse = wTx.submit();
final Object result = resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
assertNull(result);
+ }
+ @Test
+ public void testSubmitWithOperation() throws Exception {
+
+ /* Initialize data on master */
+
+ initializeDataTest();
// With Tx
- wTx = slaveDataBroker.newWriteOnlyTransaction();
+ final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
doNothing().when(writeTx).delete(any(), any());
wTx.delete(LogicalDatastoreType.CONFIGURATION,
YangInstanceIdentifier.EMPTY);
- final CheckedFuture<Void,TransactionCommitFailedException> resultSubmitTx = Futures.immediateCheckedFuture(null);
+ final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTx = Futures.immediateCheckedFuture(null);
doReturn(resultSubmitTx).when(writeTx).submit();
final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTxResponse = wTx.submit();
final Object resultTx = resultSubmitTxResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
assertNull(resultTx);
+ }
- wTx = slaveDataBroker.newWriteOnlyTransaction();
+ @Test
+ public void testSubmitFail() throws Exception {
+
+ /* Initialize data on master */
+
+ initializeDataTest();
+ final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
wTx.delete(LogicalDatastoreType.CONFIGURATION,
YangInstanceIdentifier.EMPTY);
initializeDataTest();
// Without Tx
-
- DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
+ doReturn(true).when(writeTx).cancel();
+ final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
final Boolean resultFalseNoTx = wTx.cancel();
- assertEquals(false, resultFalseNoTx);
+ assertEquals(true, resultFalseNoTx);
+ }
+
+ @Test
+ public void testCancelWithOperation() throws Exception {
+
+ /* Initialize data on master */
+
+ initializeDataTest();
// With Tx, readWriteTx test
- wTx = slaveDataBroker.newWriteOnlyTransaction();
+ final DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
doNothing().when(writeTx).delete(any(), any());
wTx.delete(LogicalDatastoreType.CONFIGURATION,
YangInstanceIdentifier.EMPTY);
final Boolean resultTrue = wTx.cancel();
assertEquals(true, resultTrue);
- doReturn(false).when(writeTx).cancel();
-
final Boolean resultFalse = wTx.cancel();
assertEquals(false, resultFalse);
private void initializeDataTest() throws Exception {
final Future<Object> initialDataToActor =
- Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers,
+ Patterns.ask(masterRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
domRpcService), TIMEOUT);
final Object success = Await.result(initialDataToActor, TIMEOUT.duration());