/**
* Provides API for all operations of read and write transactions
*/
+// TODO we should separate between read tx and write tx
public interface NetconfDOMTransaction {
+
+ /**
+ * Opens a new transaction. Transactions have to be opened before applying
+ * any operations on them. Previous transaction has to be either submitted
+ * ({@link #submit()} was invoked) or canceled ({@link #cancel()} was
+ * invoked.
+ *
+ * @throws IllegalStateException
+ * if the previous transaction was not SUBMITTED or CANCELLED.
+ */
+ void openTransaction();
+
/**
* Read data from particular data-store
* @param store data-store type
*/
public interface RemoteOperationTxProcessor {
+ /**
+ * Opens a new transaction.
+ */
+ void doOpenTransaction(ActorRef recipient, ActorRef sender);
+
/**
* Delete node in particular data-store in path
* @param store data-store type
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfMasterDOMTransaction;
+import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfProxyDOMTransaction;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
new NetconfMasterDOMTransaction(id, remoteSchemaContext, deviceRpc, netconfSessionPreferences);
deviceDataBroker =
new NetconfDOMDataBroker(actorSystem, id, masterDOMTransactions);
+ // We need to create NetconfProxyDOMTransaction so accessing mountpoint
+ // on leader node would be same as on follower node
+ final NetconfDOMTransaction proxyDOMTransation =
+ new NetconfProxyDOMTransaction(id, actorSystem, masterActorRef);
+ final NetconfDOMDataBroker proxyDataBroker = new NetconfDOMDataBroker(actorSystem, id, proxyDOMTransation);
salProvider.getMountInstance()
- .onTopologyDeviceConnected(remoteSchemaContext, deviceDataBroker, deviceRpc, notificationService);
+ .onTopologyDeviceConnected(remoteSchemaContext, proxyDataBroker, deviceRpc, notificationService);
}
private Future<Object> sendInitialDataToActor() {
package org.opendaylight.netconf.topology.singleton.impl;
import akka.actor.ActorRef;
+import akka.actor.Status;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
private DOMDataWriteTransaction writeTx;
private DOMDataReadOnlyTransaction readTx;
+ private ActorRef currentUser = null;
+
public RemoteOperationTxProcessorImpl(final DOMDataBroker dataBroker, final RemoteDeviceId id) {
this.dataBroker = dataBroker;
this.id = id;
this.readTx = dataBroker.newReadOnlyTransaction();
}
+ @Override
+ public void doOpenTransaction(ActorRef recipient, ActorRef sender) {
+ if (currentUser != null) {
+ LOG.error("{}: Opening a new transaction for {} failed.", id, recipient);
+ recipient.tell(new Status.Failure(
+ new IllegalStateException("Transaction is already opened for another user")), recipient);
+ return;
+ }
+
+ LOG.debug("{}: Opening a new transaction for {}", id, recipient);
+ currentUser = recipient;
+ recipient.tell(new Status.Success(null), sender);
+ }
+
@Override
public void doDelete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
if (writeTx == null) {
@Override
public void doSubmit(final ActorRef recipient, final ActorRef sender) {
+ currentUser = null;
if (writeTx != null) {
CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
Futures.addCallback(submitFuture, new FutureCallback<Void>() {
@Override
public void doCancel(final ActorRef recipient, final ActorRef sender) {
+ currentUser = null;
boolean cancel = false;
if (writeTx != null) {
cancel = writeTx.cancel();
}
recipient.tell(cancel, sender);
+
}
@Override
@Override
public void close() throws Exception {
+ currentUser = null;
if (readTx != null) {
readTx.close();
}
import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.OpenTransaction;
import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
}
private void resolveProxyCalls(final Object message, final ActorRef recipient, final ActorRef futureSender) {
- if (message instanceof ReadRequest) {
+ if (message instanceof OpenTransaction) {
+ operationsProcessor.doOpenTransaction(recipient, futureSender);
+ } else if (message instanceof ReadRequest) {
final ReadRequest readRequest = (ReadRequest) message;
operationsProcessor.doRead(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
package org.opendaylight.netconf.topology.singleton.impl.tx;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
readTx = delegateBroker.newReadOnlyTransaction();
}
+ @Override
+ public void openTransaction() {
+ // TODO We don't have to do anything here since
+ // NetconfProxyDOMTransactions and RemoteOperationTxProcessor do all
+ // the work regarding opening transactions. But maybe we should check
+ // for open transaction here instead in RemoteOperationTxProcessor
+ }
+
@Override
public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store,
final YangInstanceIdentifier path) {
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
import org.opendaylight.controller.config.util.xml.DocumentedException;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.OpenTransaction;
import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
this.masterContextRef = masterContextRef;
}
+ @Override
+ public void openTransaction() {
+ // TODO we can do some checking for already opened transaction also
+ // here in this class. We can track open transaction at least for this
+ // node.
+ LOG.debug("{}: Requesting leader {} to open new transaction", id, masterContextRef);
+ final Future<Object> openTxFuture =
+ Patterns.ask(masterContextRef, new OpenTransaction(), NetconfTopologyUtils.TIMEOUT);
+ try {
+ // we have to wait here so we can see if tx can be opened
+ Await.result(openTxFuture, NetconfTopologyUtils.TIMEOUT.duration());
+ LOG.debug("{}: New transaction opened successfully", id);
+ } catch (final Exception e) {
+ LOG.error("{}: Failed to open new transaction", id, e);
+ Throwables.propagate(e);
+ }
+ }
+
@Override
public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store,
final YangInstanceIdentifier path) {
this.id = id;
this.delegate = delegate;
this.actorSystem = actorSystem;
+
+ this.delegate.openTransaction();
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+/**
+ * A message sent to MountPoint leader to open new transaction
+ */
+public class OpenTransaction implements TransactionRequest {
+ private static final long serialVersionUID = 1L;
+}
// Test of invoking put on master through slave proxy
doNothing().when(writeTx).put(storeType, instanceIdentifier, testNode);
- slaveDataBroker.newWriteOnlyTransaction().put(storeType, instanceIdentifier, testNode);
+
+ DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
+ wTx.put(storeType, instanceIdentifier, testNode);
verify(writeTx, times(1)).put(storeType, instanceIdentifier, testNode);
+ wTx.cancel();
// Test of invoking merge on master through slave proxy
doNothing().when(writeTx).merge(storeType, instanceIdentifier, testNode);
- slaveDataBroker.newWriteOnlyTransaction().merge(storeType, instanceIdentifier, testNode);
+ wTx = slaveDataBroker.newWriteOnlyTransaction();
+ wTx.merge(storeType, instanceIdentifier, testNode);
verify(writeTx, times(1)).merge(storeType, instanceIdentifier, testNode);
+ wTx.cancel();
// Test of invoking delete on master through slave proxy
doNothing().when(writeTx).delete(storeType, instanceIdentifier);
- slaveDataBroker.newWriteOnlyTransaction().delete(storeType, instanceIdentifier);
+ wTx = slaveDataBroker.newWriteOnlyTransaction();
+ wTx.delete(storeType, instanceIdentifier);
+ wTx.cancel();
verify(writeTx, times(1)).delete(storeType, instanceIdentifier);
// Without Tx
+ DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
final CheckedFuture<Void,TransactionCommitFailedException> resultSubmit = Futures.immediateCheckedFuture(null);
doReturn(resultSubmit).when(writeTx).submit();
- final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitResponse =
- slaveDataBroker.newWriteOnlyTransaction().submit();
+ final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitResponse = wTx.submit();
- final Object result= resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+ final Object result = resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
assertNull(result);
// With Tx
-
+ wTx = slaveDataBroker.newWriteOnlyTransaction();
doNothing().when(writeTx).delete(any(), any());
- slaveDataBroker.newWriteOnlyTransaction().delete(LogicalDatastoreType.CONFIGURATION,
+ wTx.delete(LogicalDatastoreType.CONFIGURATION,
YangInstanceIdentifier.EMPTY);
final CheckedFuture<Void,TransactionCommitFailedException> resultSubmitTx = Futures.immediateCheckedFuture(null);
doReturn(resultSubmitTx).when(writeTx).submit();
- final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTxResponse =
- slaveDataBroker.newWriteOnlyTransaction().submit();
+ final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTxResponse = wTx.submit();
final Object resultTx = resultSubmitTxResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
assertNull(resultTx);
- slaveDataBroker.newWriteOnlyTransaction().delete(LogicalDatastoreType.CONFIGURATION,
+ wTx = slaveDataBroker.newWriteOnlyTransaction();
+ wTx.delete(LogicalDatastoreType.CONFIGURATION,
YangInstanceIdentifier.EMPTY);
final TransactionCommitFailedException throwable = new TransactionCommitFailedException("Fail", null);
doReturn(resultThrowable).when(writeTx).submit();
final CheckedFuture<Void, TransactionCommitFailedException> resultThrowableResponse =
- slaveDataBroker.newWriteOnlyTransaction().submit();
+ wTx.submit();
exception.expect(TransactionCommitFailedException.class);
resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
// Without Tx
- final Boolean resultFalseNoTx = slaveDataBroker.newWriteOnlyTransaction().cancel();
+ DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
+ final Boolean resultFalseNoTx = wTx.cancel();
assertEquals(false, resultFalseNoTx);
// With Tx, readWriteTx test
+ wTx = slaveDataBroker.newWriteOnlyTransaction();
doNothing().when(writeTx).delete(any(), any());
- slaveDataBroker.newReadWriteTransaction().delete(LogicalDatastoreType.CONFIGURATION,
+ wTx.delete(LogicalDatastoreType.CONFIGURATION,
YangInstanceIdentifier.EMPTY);
doReturn(true).when(writeTx).cancel();
-
- final Boolean resultTrue = slaveDataBroker.newWriteOnlyTransaction().cancel();
+ final Boolean resultTrue = wTx.cancel();
assertEquals(true, resultTrue);
doReturn(false).when(writeTx).cancel();
- final Boolean resultFalse = slaveDataBroker.newWriteOnlyTransaction().cancel();
+ final Boolean resultFalse = wTx.cancel();
assertEquals(false, resultFalse);
}