* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.openflowplugin.impl.device;
+package org.opendaylight.openflowplugin.common.txchain;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
* method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
* and submitTransaction method (wrapped {@link WriteTransaction#submit()}).
*/
-class TransactionChainManager implements TransactionChainListener, AutoCloseable {
+public class TransactionChainManager implements TransactionChainListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
private static final String CANNOT_WRITE_INTO_TRANSACTION = "Cannot write into transaction.";
private final String nodeId;
@GuardedBy("txLock")
- private WriteTransaction writeTx;
+ private ReadWriteTransaction writeTx;
@GuardedBy("txLock")
- private BindingTransactionChain txChainFactory;
+ private BindingTransactionChain transactionChain;
@GuardedBy("txLock")
private boolean submitIsEnabled;
@GuardedBy("txLock")
@GuardedBy("txLock")
private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
- TransactionChainManager(@Nonnull final DataBroker dataBroker,
- @Nonnull final DeviceInfo deviceInfo) {
+ public TransactionChainManager(@Nonnull final DataBroker dataBroker,
+ @Nonnull final String deviceIdentifier) {
this.dataBroker = dataBroker;
- this.nodeId = deviceInfo.toString();
+ this.nodeId = deviceIdentifier;
this.lastSubmittedFuture = Futures.immediateFuture(null);
}
@GuardedBy("txLock")
private void createTxChain() {
- BindingTransactionChain txChainFactoryTemp = txChainFactory;
- txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
+ BindingTransactionChain txChainFactoryTemp = transactionChain;
+ transactionChain = dataBroker.createTransactionChain(TransactionChainManager.this);
Optional.ofNullable(txChainFactoryTemp).ifPresent(TransactionChain::close);
}
- boolean initialSubmitWriteTransaction() {
+ public boolean initialSubmitWriteTransaction() {
enableSubmit();
- return submitWriteTransaction();
+ return submitTransaction();
}
/**
* registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
* transactions. Call this method for MASTER role only.
*/
- void activateTransactionManager() {
+ public void activateTransactionManager() {
if (LOG.isDebugEnabled()) {
LOG.debug("activateTransactionManager for node {} transaction submit is set to {}",
this.nodeId, submitIsEnabled);
}
synchronized (txLock) {
if (TransactionChainManagerStatus.SLEEPING == transactionChainManagerStatus) {
- Preconditions.checkState(txChainFactory == null,
+ Preconditions.checkState(transactionChain == null,
"TxChainFactory survive last close.");
Preconditions.checkState(writeTx == null,
"We have some unexpected WriteTransaction.");
* Call this method for SLAVE only.
* @return Future
*/
- ListenableFuture<Void> deactivateTransactionManager() {
+ public ListenableFuture<Void> deactivateTransactionManager() {
if (LOG.isDebugEnabled()) {
LOG.debug("deactivateTransactionManager for node {}", this.nodeId);
}
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- removeTxChainFactory();
+ closeTransactionChain();
}
@Override
- public void onFailure(final Throwable throwable) {
- removeTxChainFactory();
+ public void onFailure(@Nonnull final Throwable t) {
+ closeTransactionChain();
}
});
} else {
return future;
}
- private void removeTxChainFactory() {
- Optional.ofNullable(txChainFactory).ifPresent(TransactionChain::close);
- txChainFactory = null;
+ private void closeTransactionChain() {
+ if (writeTx != null) {
+ writeTx.cancel();
+ writeTx = null;
+ }
+ Optional.ofNullable(transactionChain).ifPresent(TransactionChain::close);
+ transactionChain = null;
}
- boolean submitWriteTransaction() {
+ public boolean submitTransaction() {
synchronized (txLock) {
if (!submitIsEnabled) {
if (LOG.isTraceEnabled()) {
return true;
}
- <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
- final InstanceIdentifier<T> path) {
+ public <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path){
synchronized (txLock) {
ensureTransaction();
if (writeTx == null) {
}
}
- <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
- final InstanceIdentifier<T> path,
- final T data,
- final boolean createParents) {
+ public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path,
+ final T data,
+ final boolean createParents){
synchronized (txLock) {
ensureTransaction();
if (writeTx == null) {
}
}
+ public <T extends DataObject> void mergeToTransaction(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path,
+ final T data,
+ final boolean createParents){
+ synchronized (txLock) {
+ ensureTransaction();
+ if (writeTx == null) {
+ LOG.debug("WriteTx is null for node {}. Merge data for {} was not realized.", this.nodeId, path);
+ throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
+ }
+
+ writeTx.merge(store, path, data, createParents);
+ }
+ }
+
+ public <T extends DataObject> CheckedFuture<com.google.common.base.Optional<T>, ReadFailedException>
+ readFromTransaction(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path){
+ synchronized (txLock) {
+ ensureTransaction();
+ if (writeTx == null) {
+ LOG.debug("WriteTx is null for node {}. Read data for {} was not realized.", this.nodeId, path);
+ throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
+ }
+
+ return writeTx.read(store, path);
+ }
+ }
+
@Override
public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
final AsyncTransaction<?, ?> transaction, final Throwable cause) {
synchronized (txLock) {
- if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus) {
+ if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus &&
+ chain.equals(this.transactionChain)) {
LOG.warn("Transaction chain failed, recreating chain due to ", cause);
+ closeTransactionChain();
createTxChain();
writeTx = null;
}
}
@GuardedBy("txLock")
- private void ensureTransaction() {
+ private void ensureTransaction() {
if (writeTx == null && TransactionChainManagerStatus.WORKING == transactionChainManagerStatus
- && txChainFactory != null) {
- writeTx = txChainFactory.newWriteOnlyTransaction();
+ && transactionChain != null) {
+ writeTx = transactionChain.newReadWriteTransaction();
}
}
- @VisibleForTesting
- void enableSubmit() {
+ private void enableSubmit() {
synchronized (txLock) {
- /* !!!IMPORTANT: never set true without txChainFactory */
- submitIsEnabled = txChainFactory != null;
+ /* !!!IMPORTANT: never set true without transactionChain */
+ submitIsEnabled = transactionChain != null;
}
}
- ListenableFuture<Void> shuttingDown() {
+ public ListenableFuture<Void> shuttingDown() {
if (LOG.isDebugEnabled()) {
LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId);
}
submitIsEnabled = false;
ListenableFuture<Void> future;
- if (!wasSubmitEnabled || txChainFactory == null) {
+ if (!wasSubmitEnabled || transactionChain == null) {
// stay with actual thread
future = Futures.immediateCheckedFuture(null);
LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", this.nodeId);
}
synchronized (txLock) {
- removeTxChainFactory();
+ closeTransactionChain();
}
}