import akka.actor.ActorSelection;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.Futures;
import java.util.TreeMap;
import java.util.TreeSet;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.DeleteOperation;
+import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.MergeOperation;
+import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.WriteOperation;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction;
requireNonNull(path, "path should not be null");
LOG.trace("Tx {} read {}", getIdentifier(), path);
- return path.isEmpty() ? readAllData() : singleShardRead(shardNameFromIdentifier(path), path);
+ return path.isEmpty() ? readAllData() : singleShardRead(shardNameFromIdentifier(path), path);
}
private FluentFuture<Optional<NormalizedNode<?, ?>>> singleShardRead(
final Collection<FluentFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
for (String shardName : allShardNames) {
- futures.add(singleShardRead(shardName, YangInstanceIdentifier.EMPTY));
+ futures.add(singleShardRead(shardName, YangInstanceIdentifier.empty()));
}
final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> listFuture = Futures.allAsList(futures);
final ListenableFuture<Optional<NormalizedNode<?, ?>>> aggregateFuture;
- aggregateFuture = Futures.transform(listFuture,
- (Function<List<Optional<NormalizedNode<?, ?>>>, Optional<NormalizedNode<?, ?>>>) input -> {
- try {
- return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.EMPTY, input,
- txContextFactory.getActorUtils().getSchemaContext(),
- txContextFactory.getActorUtils().getDatastoreContext().getLogicalStoreType());
- } catch (DataValidationFailedException e) {
- throw new IllegalArgumentException("Failed to aggregate", e);
- }
- }, MoreExecutors.directExecutor());
+ aggregateFuture = Futures.transform(listFuture, input -> {
+ try {
+ return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.empty(), input,
+ txContextFactory.getActorUtils().getSchemaContext(),
+ txContextFactory.getActorUtils().getDatastoreContext().getLogicalStoreType());
+ } catch (DataValidationFailedException e) {
+ throw new IllegalArgumentException("Failed to aggregate", e);
+ }
+ }, MoreExecutors.directExecutor());
return FluentFuture.from(aggregateFuture);
}
@Override
public void delete(final YangInstanceIdentifier path) {
- executeModification(new DeleteModification(path));
+ checkModificationState("delete", path);
+
+ executeModification(new DeleteOperation(path));
}
@Override
public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- executeModification(new MergeModification(path, data));
+ checkModificationState("merge", path);
+
+ executeModification(new MergeOperation(path, data));
}
@Override
public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- executeModification(new WriteModification(path, data));
- }
-
- private void executeModification(final AbstractModification modification) {
- checkModificationState();
+ checkModificationState("write", path);
- LOG.trace("Tx {} executeModification {} {}", getIdentifier(), modification.getClass().getSimpleName(),
- modification.getPath());
+ executeModification(new WriteOperation(path, data));
+ }
- TransactionContextWrapper contextWrapper = getContextWrapper(modification.getPath());
- contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
- @Override
- protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
- transactionContext.executeModification(modification, havePermit);
- }
- });
+ private void executeModification(final TransactionModificationOperation operation) {
+ getContextWrapper(operation.path()).maybeExecuteTransactionOperation(operation);
}
- private void checkModificationState() {
+ private void checkModificationState(final String opName, final YangInstanceIdentifier path) {
checkState(type != TransactionType.READ_ONLY, "Modification operation on read-only transaction is not allowed");
checkState(state == TransactionState.OPEN, "Transaction is sealed - further modifications are not allowed");
+ LOG.trace("Tx {} {} {}", getIdentifier(), opName, path);
}
private boolean seal(final TransactionState newState) {
if (state == TransactionState.OPEN) {
state = newState;
return true;
- } else {
- return false;
}
+ return false;
}
@Override
// The remote tx version is obtained the via TransactionContext which may not be available yet so
// we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the
// TransactionContext is available.
- Supplier<Short> txVersionSupplier = () -> wrapper.getTransactionContext().getTransactionVersion();
-
cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(shardNames),
- txVersionSupplier));
+ () -> wrapper.getTransactionContext().getTransactionVersion()));
}
return new ThreePhaseCommitCohortProxy(txContextFactory.getActorUtils(), cohorts, getIdentifier());