import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction, Identifiable<String> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
-class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction {
private enum SimpleCursorOperation {
MERGE {
@Override
void apply(final DOMDataTreeWriteCursor cursor, final YangInstanceIdentifier path,
final NormalizedNode<?, ?> data) {
int enterCount = 0;
- Iterator<PathArgument> it = path.getPathArguments().iterator();
+ final Iterator<PathArgument> it = path.getPathArguments().iterator();
while (it.hasNext()) {
- PathArgument currentArg = it.next();
+ final PathArgument currentArg = it.next();
if (it.hasNext()) {
// We need to enter one level deeper, we are not at leaf (modified) node
cursor.enter(currentArg);
}
}
+ private static final AtomicLong COUNTER = new AtomicLong();
+
+ private final ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
+ private final InMemoryDOMDataTreeShardChangePublisher changePublisher;
+ private final InMemoryDOMDataTreeShardProducer producer;
private final ShardDataModification modification;
- private DOMDataTreeWriteCursor cursor;
+ private final ListeningExecutorService executor;
+ private final DataTree rootShardDataTree;
+ private final String identifier;
- InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root) {
+ private DataTreeModification rootModification = null;
+ private DOMDataTreeWriteCursor cursor;
+ private boolean finished = false;
+
+ InmemoryDOMDataTreeShardWriteTransaction(final InMemoryDOMDataTreeShardProducer producer,
+ final ShardDataModification root,
+ final DataTree rootShardDataTree,
+ final InMemoryDOMDataTreeShardChangePublisher changePublisher,
+ final ListeningExecutorService executor) {
+ this.producer = producer;
this.modification = Preconditions.checkNotNull(root);
+ this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree);
+ this.changePublisher = Preconditions.checkNotNull(changePublisher);
+ this.identifier = "INMEMORY-SHARD-TX-" + COUNTER.getAndIncrement();
+ LOG.debug("Shard transaction{} created", identifier);
+ this.executor = executor;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
}
private DOMDataTreeWriteCursor getCursor() {
if (cursor == null) {
- cursor = new ShardDataModificationCursor(modification);
+ cursor = new ShardDataModificationCursor(modification, this);
}
return cursor;
}
void delete(final YangInstanceIdentifier path) {
- YangInstanceIdentifier relativePath = toRelative(path);
+ final YangInstanceIdentifier relativePath = toRelative(path);
Preconditions.checkArgument(!YangInstanceIdentifier.EMPTY.equals(relativePath),
"Deletion of shard root is not allowed");
SimpleCursorOperation.DELETE.apply(getCursor(), relativePath , null);
}
private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
- Optional<YangInstanceIdentifier> relative =
+ final Optional<YangInstanceIdentifier> relative =
path.relativeTo(modification.getPrefix().getRootIdentifier());
Preconditions.checkArgument(relative.isPresent());
return relative.get();
}
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
- // FIXME: Implement this
- return null;
+ throw new UnsupportedOperationException("Not implemented yet");
}
public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
- // TODO Auto-generated method stub
- return null;
+ throw new UnsupportedOperationException("Not implemented yet");
}
+ @Override
+ public void close() {
+ Preconditions.checkState(!finished, "Attempting to close an already finished transaction.");
+ modification.closeTransactions();
+ if (cursor != null) {
+ cursor.close();
+ }
+ producer.transactionAborted(this);
+ finished = true;
+ }
- public Object getIdentifier() {
- // TODO Auto-generated method stub
- return null;
+ void cursorClosed() {
+ Preconditions.checkNotNull(cursor);
+ modification.closeCursor();
+ cursor = null;
}
- public void close() {
- // TODO Auto-generated method stub
+ public boolean isFinished() {
+ return finished;
}
@Override
public void ready() {
+ Preconditions.checkState(!finished, "Attempting to ready an already finished transaction.");
+ Preconditions.checkState(cursor == null, "Attempting to ready a transaction that has an open cursor.");
+ Preconditions.checkNotNull(modification, "Attempting to ready an empty transaction.");
+
+ LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
+ rootModification = modification.seal();
+
+ producer.transactionReady(this, rootModification);
+ cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(
+ rootShardDataTree, rootModification, changePublisher));
+ for (final Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry :
+ modification.getChildShards().entrySet()) {
+ cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
+ }
+ finished = true;
+ }
- modification.seal();
+ @Override
+ public ListenableFuture<Void> submit() {
+ LOG.debug("Submitting open transaction on shard {}", modification.getPrefix());
+
+ Preconditions.checkNotNull(cohorts);
+ Preconditions.checkState(!cohorts.isEmpty(), "Transaction was not readied yet.");
+
+ final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(
+ modification.getPrefix(), cohorts, this));
+
+ return submit;
+ }
+ @Override
+ public ListenableFuture<Boolean> validate() {
+ LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix());
- return;
+ final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(
+ modification.getPrefix(), cohorts));
+ return submit;
}
- public void followUp() {
+ @Override
+ public ListenableFuture<Void> prepare() {
+ LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix());
+
+ final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(
+ modification.getPrefix(), cohorts));
+ return submit;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
+
+ final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(
+ modification.getPrefix(), cohorts, this));
+ return submit;
+ }
+
+ DataTreeModification getRootModification() {
+ Preconditions.checkNotNull(rootModification, "Transaction wasn't sealed yet");
+ return rootModification;
+ }
+ void transactionCommited(final InmemoryDOMDataTreeShardWriteTransaction tx) {
+ producer.onTransactionCommited(tx);
}
@Override
public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
- DOMDataTreeWriteCursor ret = getCursor();
- YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
+ Preconditions.checkState(!finished, "Transaction is finished/closed already.");
+ Preconditions.checkState(cursor == null, "Previous cursor wasn't closed");
+ final DOMDataTreeWriteCursor ret = getCursor();
+ final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
ret.enter(relativePath.getPathArguments());
return ret;
}