package org.opendaylight.mdsal.dom.store.inmemory;
-import com.google.common.base.Optional;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map.Entry;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction {
+class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction, Identifiable<String> {
private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
private enum SimpleCursorOperation {
MERGE {
@Override
- void applyOnLeaf(final DOMDataTreeWriteCursor cursor, final PathArgument child,
- final NormalizedNode<?, ?> data) {
- cursor.merge(child, data);
+ void applyOnLeaf(final DOMDataTreeWriteCursor cur, final PathArgument child,
+ final NormalizedNode<?, ?> data) {
+ cur.merge(child, data);
}
},
DELETE {
@Override
- void applyOnLeaf(final DOMDataTreeWriteCursor cursor, final PathArgument child,
- final NormalizedNode<?, ?> data) {
- cursor.delete(child);
+ void applyOnLeaf(final DOMDataTreeWriteCursor cur, final PathArgument child,
+ final NormalizedNode<?, ?> data) {
+ cur.delete(child);
}
},
WRITE {
@Override
- void applyOnLeaf(final DOMDataTreeWriteCursor cursor, final PathArgument child,
- final NormalizedNode<?, ?> data) {
- cursor.write(child, data);
+ void applyOnLeaf(final DOMDataTreeWriteCursor cur, final PathArgument child,
+ final NormalizedNode<?, ?> data) {
+ cur.write(child, data);
}
};
- abstract void applyOnLeaf(DOMDataTreeWriteCursor cursor, PathArgument child, NormalizedNode<?, ?> data);
+ abstract void applyOnLeaf(DOMDataTreeWriteCursor cur, PathArgument child, NormalizedNode<?, ?> data);
- void apply(final DOMDataTreeWriteCursor cursor, final YangInstanceIdentifier path,
- final NormalizedNode<?, ?> data) {
+ void apply(final DOMDataTreeWriteCursor cur, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> data) {
int enterCount = 0;
- Iterator<PathArgument> it = path.getPathArguments().iterator();
- while (it.hasNext()) {
- PathArgument currentArg = it.next();
- if (it.hasNext()) {
+ final Iterator<PathArgument> it = path.getPathArguments().iterator();
+ if (it.hasNext()) {
+ while (true) {
+ final PathArgument currentArg = it.next();
+ if (!it.hasNext()) {
+ applyOnLeaf(cur, currentArg, data);
+ break;
+ }
+
// We need to enter one level deeper, we are not at leaf (modified) node
- cursor.enter(currentArg);
+ cur.enter(currentArg);
enterCount++;
- } else {
- applyOnLeaf(cursor, currentArg, data);
}
}
- cursor.exit(enterCount);
+
+ cur.exit(enterCount);
}
}
+ private static final AtomicLong COUNTER = new AtomicLong();
+
+ private final ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
+ private final InMemoryDOMDataTreeShardChangePublisher changePublisher;
+ private final InMemoryDOMDataTreeShardProducer producer;
private final ShardDataModification modification;
- private DOMDataTreeWriteCursor cursor;
- private DataTree rootShardDataTree;
- private DataTreeModification rootModification = null;
+ private final ListeningExecutorService executor;
+ private final DataTree rootShardDataTree;
+ private final String identifier;
- private ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
- private InMemoryDOMDataTreeShardChangePublisher changePublisher;
+ private DataTreeModification rootModification = null;
+ private DOMDataTreeWriteCursor cursor;
private boolean finished = false;
- // FIXME inject into shard?
- private ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
-
- InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root,
+ InmemoryDOMDataTreeShardWriteTransaction(final InMemoryDOMDataTreeShardProducer producer,
+ final ShardDataModification root,
final DataTree rootShardDataTree,
- final InMemoryDOMDataTreeShardChangePublisher changePublisher) {
- this.modification = Preconditions.checkNotNull(root);
- this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree);
- this.changePublisher = Preconditions.checkNotNull(changePublisher);
+ final InMemoryDOMDataTreeShardChangePublisher changePublisher,
+ final ListeningExecutorService executor) {
+ this.producer = producer;
+ this.modification = requireNonNull(root);
+ this.rootShardDataTree = requireNonNull(rootShardDataTree);
+ this.changePublisher = requireNonNull(changePublisher);
+ this.identifier = "INMEMORY-SHARD-TX-" + COUNTER.getAndIncrement();
+ LOG.debug("Shard transaction{} created", identifier);
+ this.executor = executor;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
}
private DOMDataTreeWriteCursor getCursor() {
if (cursor == null) {
- cursor = new ShardDataModificationCursor(modification, this);
+ cursor = new InMemoryShardDataModificationCursor(modification, this);
}
return cursor;
}
void delete(final YangInstanceIdentifier path) {
- YangInstanceIdentifier relativePath = toRelative(path);
+ final YangInstanceIdentifier relativePath = toRelative(path);
Preconditions.checkArgument(!YangInstanceIdentifier.EMPTY.equals(relativePath),
"Deletion of shard root is not allowed");
- SimpleCursorOperation.DELETE.apply(getCursor(), relativePath , null);
+ SimpleCursorOperation.DELETE.apply(getCursor(), relativePath, null);
}
void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
}
void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- SimpleCursorOperation.DELETE.apply(getCursor(), toRelative(path), data);
+ SimpleCursorOperation.WRITE.apply(getCursor(), toRelative(path), data);
}
private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
- Optional<YangInstanceIdentifier> relative =
+ final Optional<YangInstanceIdentifier> relative =
path.relativeTo(modification.getPrefix().getRootIdentifier());
Preconditions.checkArgument(relative.isPresent());
return relative.get();
}
- public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
- // FIXME: Implement this
- return null;
- }
-
- public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
- // TODO Auto-generated method stub
- return null;
- }
-
-
- public Object getIdentifier() {
- // TODO Auto-generated method stub
- return null;
- }
-
@Override
public void close() {
Preconditions.checkState(!finished, "Attempting to close an already finished transaction.");
modification.closeTransactions();
- cursor.close();
+ if (cursor != null) {
+ cursor.close();
+ }
+ producer.transactionAborted(this);
finished = true;
}
void cursorClosed() {
- Preconditions.checkNotNull(cursor);
+ requireNonNull(cursor);
+ modification.closeCursor();
cursor = null;
}
public void ready() {
Preconditions.checkState(!finished, "Attempting to ready an already finished transaction.");
Preconditions.checkState(cursor == null, "Attempting to ready a transaction that has an open cursor.");
- Preconditions.checkNotNull(modification, "Attempting to ready an empty transaction.");
+ requireNonNull(modification, "Attempting to ready an empty transaction.");
LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
rootModification = modification.seal();
- cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(rootShardDataTree, rootModification, changePublisher));
- for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry : modification.getChildShards().entrySet()) {
+ producer.transactionReady(this, rootModification);
+ cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(
+ rootShardDataTree, rootModification, changePublisher));
+ for (final Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry :
+ modification.getChildShards().entrySet()) {
cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
}
finished = true;
public ListenableFuture<Void> submit() {
LOG.debug("Submitting open transaction on shard {}", modification.getPrefix());
- Preconditions.checkNotNull(cohorts);
+ requireNonNull(cohorts);
Preconditions.checkState(!cohorts.isEmpty(), "Transaction was not readied yet.");
- final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts));
-
- return submit;
+ return executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts, this));
}
@Override
public ListenableFuture<Boolean> validate() {
LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix());
-
- final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts));
- return submit;
+ return executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts));
}
@Override
public ListenableFuture<Void> prepare() {
LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix());
-
- final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts));
- return submit;
+ return executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts));
}
@Override
public ListenableFuture<Void> commit() {
LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
-
- final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts));
- return submit;
+ return executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts, this));
}
- public void followUp() {
+ DataTreeModification getRootModification() {
+ requireNonNull(rootModification, "Transaction wasn't sealed yet");
+ return rootModification;
+ }
+ void transactionCommited(final InmemoryDOMDataTreeShardWriteTransaction tx) {
+ producer.onTransactionCommited(tx);
}
@Override
public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
Preconditions.checkState(!finished, "Transaction is finished/closed already.");
Preconditions.checkState(cursor == null, "Previous cursor wasn't closed");
- DOMDataTreeWriteCursor ret = getCursor();
- YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
+ final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
+ final DOMDataTreeWriteCursor ret = getCursor();
ret.enter(relativePath.getPathArguments());
return ret;
}