import static java.util.Objects.requireNonNull;
import akka.actor.ActorSelection;
-import akka.dispatch.OnComplete;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
+import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
AtomicLongFieldUpdater.newUpdater(AbstractTransactionContextFactory.class, "nextTx");
private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
- private final LocalHistoryIdentifier historyId;
- private final ActorUtils actorUtils;
+ private final @NonNull LocalHistoryIdentifier historyId;
+ private final @NonNull ActorUtils actorUtils;
// Used via TX_COUNTER_UPDATER
@SuppressWarnings("unused")
return null;
}
- private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
- String shardName, TransactionContextWrapper transactionContextWrapper) {
+ private AbstractTransactionContextWrapper maybeCreateDirectTransactionContextWrapper(
+ final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
+ final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
+ LOG.debug("Tx {}: Found primary {} for shard {}, trying to use DirectTransactionContextWrapper",
+ parent.getIdentifier(), primaryShardInfo.getPrimaryShardActor(), shardName);
+
+ updateShardInfo(shardName, primaryShardInfo);
+
+ final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
+ try {
+ if (localContext != null) {
+ LOG.debug("Tx {}: Local transaction context created successfully, using DirectTransactionWrapper",
+ parent.getIdentifier());
+ return new DirectTransactionContextWrapper(parent.getIdentifier(), actorUtils, shardName,
+ localContext);
+ }
+
+ LOG.debug("Tx {}: Local transaction context creation failed, using DelayedTransactionWrapper",
+ parent.getIdentifier());
+ final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
+ transactionContextWrapper, parent, shardName);
+ remote.setPrimaryShard(primaryShardInfo);
+ return transactionContextWrapper;
+ } finally {
+ onTransactionContextCreated(parent.getIdentifier());
+ }
+ }
+
+ private void onFindPrimaryShardSuccess(final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
+ final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
primaryShardInfo.getPrimaryShardActor(), shardName);
updateShardInfo(shardName, primaryShardInfo);
+ final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
try {
- TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
if (localContext != null) {
transactionContextWrapper.executePriorTransactionOperations(localContext);
} else {
- RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
- parent, shardName);
+ final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
+ transactionContextWrapper, parent, shardName);
remote.setPrimaryShard(primaryShardInfo);
}
} finally {
}
}
- private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
- String shardName, TransactionContextWrapper transactionContextWrapper) {
+ private void onFindPrimaryShardFailure(final Throwable failure, final TransactionProxy parent,
+ final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
try {
- transactionContextWrapper.executePriorTransactionOperations(new NoOpTransactionContext(failure,
- parent.getIdentifier()));
+ transactionContextWrapper.executePriorTransactionOperations(
+ new NoOpTransactionContext(failure, parent.getIdentifier()));
} finally {
onTransactionContextCreated(parent.getIdentifier());
}
}
- final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
+ final AbstractTransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
final String shardName) {
- final TransactionContextWrapper transactionContextWrapper =
- new TransactionContextWrapper(parent.getIdentifier(), actorUtils, shardName);
-
- Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
+ final DelayedTransactionContextWrapper contextWrapper = new DelayedTransactionContextWrapper(
+ parent.getIdentifier(), actorUtils, shardName);
+ final Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
if (findPrimaryFuture.isCompleted()) {
- Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
+ final Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
if (maybe.isSuccess()) {
- onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
- } else {
- onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
+ return maybeCreateDirectTransactionContextWrapper(maybe.get(), parent, shardName, contextWrapper);
}
+
+ onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, contextWrapper);
} else {
- findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
- @Override
- public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
- if (failure == null) {
- onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextWrapper);
- } else {
- onFindPrimaryShardFailure(failure, parent, shardName, transactionContextWrapper);
- }
+ findPrimaryFuture.onComplete(result -> {
+ if (result.isSuccess()) {
+ onFindPrimaryShardSuccess(result.get(), parent, shardName, contextWrapper);
+ } else {
+ onFindPrimaryShardFailure(result.failed().get(), parent, shardName, contextWrapper);
}
+ return null;
}, actorUtils.getClientDispatcher());
}
-
- return transactionContextWrapper;
+ return contextWrapper;
}
private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
@Override
- protected DOMStoreWriteTransaction getWriteDelegate() {
+ DOMStoreWriteTransaction getWriteDelegate() {
throw new UnsupportedOperationException();
}
@Override
- protected DOMStoreReadTransaction getReadDelegate() {
+ DOMStoreReadTransaction getReadDelegate() {
return readOnly;
}
};
final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
@Override
- protected DOMStoreWriteTransaction getWriteDelegate() {
+ DOMStoreWriteTransaction getWriteDelegate() {
return readWrite;
}
@Override
- protected DOMStoreReadTransaction getReadDelegate() {
+ DOMStoreReadTransaction getReadDelegate() {
return readWrite;
}
};
final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
@Override
- protected DOMStoreWriteTransaction getWriteDelegate() {
+ DOMStoreWriteTransaction getWriteDelegate() {
return writeOnly;
}
@Override
- protected DOMStoreReadTransaction getReadDelegate() {
+ DOMStoreReadTransaction getReadDelegate() {
throw new UnsupportedOperationException();
}
};