import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map.Entry;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction {
+class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction, Identifiable<String> {
private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
}
}
- private final ShardDataModification modification;
- private DOMDataTreeWriteCursor cursor;
- private final DataTree rootShardDataTree;
- private DataTreeModification rootModification = null;
+ private static final AtomicLong COUNTER = new AtomicLong();
private final ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
private final InMemoryDOMDataTreeShardChangePublisher changePublisher;
- private boolean finished = false;
+ private final InMemoryDOMDataTreeShardProducer producer;
+ private final ShardDataModification modification;
+ private final ListeningExecutorService executor;
+ private final DataTree rootShardDataTree;
+ private final String identifier;
- // FIXME inject into shard?
- private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+ private DataTreeModification rootModification = null;
+ private DOMDataTreeWriteCursor cursor;
+ private boolean finished = false;
- InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root,
+ InmemoryDOMDataTreeShardWriteTransaction(final InMemoryDOMDataTreeShardProducer producer,
+ final ShardDataModification root,
final DataTree rootShardDataTree,
- final InMemoryDOMDataTreeShardChangePublisher changePublisher) {
+ final InMemoryDOMDataTreeShardChangePublisher changePublisher,
+ final ListeningExecutorService executor) {
+ this.producer = producer;
this.modification = Preconditions.checkNotNull(root);
this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree);
this.changePublisher = Preconditions.checkNotNull(changePublisher);
+ this.identifier = "INMEMORY-SHARD-TX-" + COUNTER.getAndIncrement();
+ LOG.debug("Shard transaction{} created", identifier);
+ this.executor = executor;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
}
private DOMDataTreeWriteCursor getCursor() {
if (cursor != null) {
cursor.close();
}
+ producer.transactionAborted(this);
finished = true;
}
void cursorClosed() {
Preconditions.checkNotNull(cursor);
+ modification.closeCursor();
cursor = null;
}
LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
rootModification = modification.seal();
- cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(rootShardDataTree, rootModification, changePublisher));
- for (final Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry : modification.getChildShards().entrySet()) {
+ producer.transactionReady(this, rootModification);
+ cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(
+ rootShardDataTree, rootModification, changePublisher));
+ for (final Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry :
+ modification.getChildShards().entrySet()) {
cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
}
finished = true;
Preconditions.checkNotNull(cohorts);
Preconditions.checkState(!cohorts.isEmpty(), "Transaction was not readied yet.");
- final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts));
+ final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(
+ modification.getPrefix(), cohorts, this));
return submit;
}
public ListenableFuture<Boolean> validate() {
LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix());
- final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts));
+ final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(
+ modification.getPrefix(), cohorts));
return submit;
}
public ListenableFuture<Void> prepare() {
LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix());
- final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts));
+ final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(
+ modification.getPrefix(), cohorts));
return submit;
}
public ListenableFuture<Void> commit() {
LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
- final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts));
+ final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(
+ modification.getPrefix(), cohorts, this));
return submit;
}
- public void followUp() {
+ DataTreeModification getRootModification() {
+ Preconditions.checkNotNull(rootModification, "Transaction wasn't sealed yet");
+ return rootModification;
+ }
+ void transactionCommited(final InmemoryDOMDataTreeShardWriteTransaction tx) {
+ producer.onTransactionCommited(tx);
}
@Override