import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
+
private enum SimpleCursorOperation {
MERGE {
@Override
}
}
+ private InMemoryDOMDataTreeShardThreePhaseCommitCohort commitCohort;
private final ShardDataModification modification;
private DOMDataTreeWriteCursor cursor;
+ private DataTree rootShardDataTree;
+ private DataTreeModification rootModification = null;
+
+ private ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
- InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root) {
+ // FIXME inject into shard?
+ private ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+
+ InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root, final DataTree rootShardDataTree) {
this.modification = Preconditions.checkNotNull(root);
+ this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree);
}
private DOMDataTreeWriteCursor getCursor() {
@Override
public void ready() {
- modification.seal();
+ LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
+ rootModification = modification.seal();
+
+ cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(rootShardDataTree, rootModification));
+ for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry : modification.getChildShards().entrySet()) {
+ cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> submit() {
+ LOG.debug("Submitting open transaction on shard {}", modification.getPrefix());
+
+ Preconditions.checkNotNull(cohorts);
+ Preconditions.checkState(!cohorts.isEmpty(), "Submitting an empty transaction");
+
+ final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts));
+ return submit;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> validate() {
+ LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix());
+
+ final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts));
+ return submit;
+ }
+
+ @Override
+ public ListenableFuture<Void> prepare() {
+ LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix());
+
+ final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts));
+ return submit;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
- return;
+ final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts));
+ return submit;
}
public void followUp() {