import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
* transaction factories.
*/
-abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory>
- implements ShardInfoListener, AutoCloseable {
+abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
protected static final AtomicLong TX_COUNTER = new AtomicLong();
LOG.debug("Tx {} - Creating local component for shard {} using factory {}",
parent.getIdentifier(), shardName, local);
}
- return createLocalTransactionContext(local, parent);
+
+ try {
+ return createLocalTransactionContext(local, parent);
+ } catch(Exception e) {
+ return new NoOpTransactionContext(e, parent.getIdentifier());
+ }
}
return null;
}
private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
- String shardName, TransactionContextWrapper transactionContextAdapter) {
+ String shardName, TransactionContextWrapper transactionContextWrapper) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
primaryShardInfo.getPrimaryShardActor(), shardName);
updateShardInfo(shardName, primaryShardInfo);
- TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
- if(localContext != null) {
- transactionContextAdapter.executePriorTransactionOperations(localContext);
- } else {
- RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextAdapter,
- parent, shardName);
- remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
+ try {
+ TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
+ if(localContext != null) {
+ transactionContextWrapper.executePriorTransactionOperations(localContext);
+ } else {
+ RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
+ parent, shardName);
+ remote.setPrimaryShard(primaryShardInfo);
+ }
+ } finally {
+ onTransactionContextCreated(parent.getIdentifier());
}
}
private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
- String shardName, TransactionContextWrapper transactionContextAdapter) {
+ String shardName, TransactionContextWrapper transactionContextWrapper) {
LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
- transactionContextAdapter.executePriorTransactionOperations(new NoOpTransactionContext(failure,
- parent.getIdentifier(), parent.getLimiter()));
+ try {
+ transactionContextWrapper.executePriorTransactionOperations(new NoOpTransactionContext(failure,
+ parent.getIdentifier()));
+ } finally {
+ onTransactionContextCreated(parent.getIdentifier());
+ }
}
- final TransactionContextWrapper newTransactionAdapter(final TransactionProxy parent, final String shardName) {
- final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getIdentifier());
+ final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent, final String shardName) {
+ final TransactionContextWrapper transactionContextWrapper =
+ new TransactionContextWrapper(parent.getIdentifier(), actorContext);
- Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
+ Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
if(findPrimaryFuture.isCompleted()) {
Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
if(maybe.isSuccess()) {
- onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextAdapter);
+ onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
} else {
- onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextAdapter);
+ onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
}
} else {
findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
if (failure == null) {
- onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextAdapter);
+ onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextWrapper);
} else {
- onFindPrimaryShardFailure(failure, parent, shardName, transactionContextAdapter);
+ onFindPrimaryShardFailure(failure, parent, shardName, transactionContextWrapper);
}
}
}, actorContext.getClientDispatcher());
}
- return transactionContextAdapter;
+ return transactionContextWrapper;
}
private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
if (maybeDataTree.isPresent()) {
- knownLocal.put(shardName, factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get()));
- LOG.debug("Shard {} resolved to local data tree", shardName);
- }
- }
+ if(!knownLocal.containsKey(shardName)) {
+ LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
- @Override
- public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) {
- final F existing = knownLocal.get(shardName);
- if (existing != null) {
- if (primaryShardInfo != null) {
- final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
- if (maybeDataTree.isPresent()) {
- final DataTree newDataTree = maybeDataTree.get();
- final DataTree oldDataTree = dataTreeForFactory(existing);
- if (!oldDataTree.equals(newDataTree)) {
- final F newChain = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), newDataTree);
- knownLocal.replace(shardName, existing, newChain);
- LOG.debug("Replaced shard {} local data tree to {}", shardName, newDataTree);
- }
-
- return;
- }
- }
- if (knownLocal.remove(shardName, existing)) {
- LOG.debug("Shard {} invalidated data tree {}", shardName, existing);
- } else {
- LOG.debug("Shard {} failed to invalidate data tree {} ... strange", shardName, existing);
+ F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
+ knownLocal.putIfAbsent(shardName, factory);
}
+ } else if(knownLocal.containsKey(shardName)) {
+ LOG.debug("Shard {} invalidating local data tree", shardName);
+
+ knownLocal.remove(shardName);
}
}
* @param shardName Shard name
* @return Future containing shard information.
*/
- protected abstract Future<PrimaryShardInfo> findPrimaryShard(String shardName);
+ protected abstract Future<PrimaryShardInfo> findPrimaryShard(@Nonnull String shardName,
+ @Nonnull TransactionIdentifier txId);
/**
* Create local transaction factory for specified shard, backed by specified shard leader
*/
protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, DataTree dataTree);
- /**
- * Extract the backing data tree from a particular factory.
- *
- * @param factory Transaction factory
- * @return Backing data tree
- */
- protected abstract DataTree dataTreeForFactory(F factory);
-
/**
* Callback invoked from child transactions to push any futures, which need to
* be waited for before the next transaction is allocated.
*/
protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
- private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) {
- return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier()));
+ /**
+ * Callback invoked when the internal TransactionContext has been created for a transaction.
+ *
+ * @param transactionId the ID of the transaction.
+ */
+ protected abstract void onTransactionContextCreated(@Nonnull TransactionIdentifier transactionId);
+
+ private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
+ final TransactionProxy parent) {
+
+ switch(parent.getType()) {
+ case READ_ONLY:
+ final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
+ return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
+ @Override
+ protected DOMStoreWriteTransaction getWriteDelegate() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected DOMStoreReadTransaction getReadDelegate() {
+ return readOnly;
+ }
+ };
+ case READ_WRITE:
+ final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
+ return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
+ @Override
+ protected DOMStoreWriteTransaction getWriteDelegate() {
+ return readWrite;
+ }
+
+ @Override
+ protected DOMStoreReadTransaction getReadDelegate() {
+ return readWrite;
+ }
+ };
+ case WRITE_ONLY:
+ final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
+ return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
+ @Override
+ protected DOMStoreWriteTransaction getWriteDelegate() {
+ return writeOnly;
+ }
+
+ @Override
+ protected DOMStoreReadTransaction getReadDelegate() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ default:
+ throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
+ }
}
}