import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction {
+class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction, Identifiable<String> {
private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
}
}
+ private static final AtomicLong COUNTER = new AtomicLong();
+
private final ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
private final InMemoryDOMDataTreeShardChangePublisher changePublisher;
+ private final InMemoryDOMDataTreeShardProducer producer;
private final ShardDataModification modification;
private final ListeningExecutorService executor;
private final DataTree rootShardDataTree;
+ private final String identifier;
private DataTreeModification rootModification = null;
private DOMDataTreeWriteCursor cursor;
private boolean finished = false;
- InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root,
+ InmemoryDOMDataTreeShardWriteTransaction(final InMemoryDOMDataTreeShardProducer producer,
+ final ShardDataModification root,
final DataTree rootShardDataTree,
final InMemoryDOMDataTreeShardChangePublisher changePublisher,
final ListeningExecutorService executor) {
+ this.producer = producer;
this.modification = Preconditions.checkNotNull(root);
this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree);
this.changePublisher = Preconditions.checkNotNull(changePublisher);
+ this.identifier = "INMEMORY-SHARD-TX-" + COUNTER.getAndIncrement();
+ LOG.debug("Shard transaction{} created", identifier);
this.executor = executor;
}
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
private DOMDataTreeWriteCursor getCursor() {
if (cursor == null) {
cursor = new ShardDataModificationCursor(modification, this);
if (cursor != null) {
cursor.close();
}
+ producer.transactionAborted(this);
finished = true;
}
LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
rootModification = modification.seal();
+ producer.transactionReady(this, rootModification);
cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(
rootShardDataTree, rootModification, changePublisher));
for (final Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry :
Preconditions.checkState(!cohorts.isEmpty(), "Transaction was not readied yet.");
final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(
- modification.getPrefix(), cohorts));
+ modification.getPrefix(), cohorts, this));
return submit;
}
LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(
- modification.getPrefix(), cohorts));
+ modification.getPrefix(), cohorts, this));
return submit;
}
+ DataTreeModification getRootModification() {
+ Preconditions.checkNotNull(rootModification, "Transaction wasn't sealed yet");
+ return rootModification;
+ }
+
+ void transactionCommited(final InmemoryDOMDataTreeShardWriteTransaction tx) {
+ producer.onTransactionCommited(tx);
+ }
+
@Override
public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
Preconditions.checkState(!finished, "Transaction is finished/closed already.");