--- /dev/null
+package org.opendaylight.mdsal.dom.broker.test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.util.concurrent.Futures;
+import java.util.Collection;
+import java.util.Collections;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
+import org.opendaylight.mdsal.dom.broker.test.util.TestModel;
+import org.opendaylight.mdsal.dom.store.inmemory.DOMDataTreeShardProducer;
+import org.opendaylight.mdsal.dom.store.inmemory.DOMDataTreeShardWriteTransaction;
+import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataTreeShard;
+import org.opendaylight.mdsal.dom.store.inmemory.WriteableDOMDataTreeShard;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class ShardedDOMDataTreeProducerMultiShardTest {
+
+ private final SchemaContext schemaContext = TestModel.createTestContext();
+
+ private static final DOMDataTreeIdentifier ROOT_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
+ YangInstanceIdentifier.EMPTY);
+ private static final DOMDataTreeIdentifier TEST_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
+ TestModel.TEST_PATH);
+
+ private static final DOMDataTreeIdentifier TEST2_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
+ TestModel.TEST2_PATH);
+
+ private static final DOMDataTreeIdentifier INNER_CONTAINER_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, TestModel.INNER_CONTAINER_PATH);
+ private static final DOMDataTreeIdentifier ANOTHER_SHARD_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, TestModel.ANOTHER_SHARD_PATH);
+
+ private InMemoryDOMDataTreeShard rootShard;
+
+ private InMemoryDOMDataTreeShard anotherInnerShard;
+
+
+ private ShardedDOMDataTree dataTreeService;
+ private ListenerRegistration<InMemoryDOMDataTreeShard> rootShardReg;
+ private ListenerRegistration<InMemoryDOMDataTreeShard> innerShardReg;
+
+ @Before
+ public void setUp() throws Exception {
+
+ rootShard = InMemoryDOMDataTreeShard.create(ROOT_ID);
+ rootShard.onGlobalContextUpdated(schemaContext);
+
+ ShardedDOMDataTree dataTree = new ShardedDOMDataTree();
+ rootShardReg = dataTree.registerDataTreeShard(ROOT_ID, rootShard);
+
+ dataTreeService = dataTree;
+ }
+
+ @Test
+ public void testMultipleShards() throws Exception {
+ //FIXME after listeners are implemented add them here and test those
+
+ final InMemoryDOMDataTreeShard innerShard = InMemoryDOMDataTreeShard.create(INNER_CONTAINER_ID);
+ innerShard.onGlobalContextUpdated(schemaContext);
+ innerShardReg = dataTreeService.registerDataTreeShard(INNER_CONTAINER_ID, innerShard);
+
+ final DOMDataTreeShardProducer producer = rootShard.createProducer(Collections.singletonList(TEST_ID));
+ final DOMDataTreeShardWriteTransaction transaction = producer.createTransaction();
+ final DOMDataTreeWriteCursor cursor = transaction.createCursor(ROOT_ID);
+ cursor.enter(TestModel.TEST_PATH.getLastPathArgument());
+
+ final LeafNode<String> shardedValue1 =
+ ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(new NodeIdentifier(TestModel.SHARDED_VALUE_1)).withValue("sharded value 1").build();
+ final LeafNode<String> shardedValue2 =
+ ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(new NodeIdentifier(TestModel.SHARDED_VALUE_2)).withValue("sharded value 2").build();
+
+ final DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> containerNodeBuilder = ImmutableContainerNodeBuilder.create();
+ final ContainerNode containerNode =
+ containerNodeBuilder
+ .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_CONTAINER))
+ .withChild(shardedValue1)
+ .withChild(shardedValue2)
+ .build();
+
+ cursor.write(TestModel.INNER_CONTAINER_PATH.getLastPathArgument(), containerNode);
+ cursor.enter(TestModel.INNER_CONTAINER_PATH.getLastPathArgument());
+
+ final ContainerNode lowerShardContainer = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new NodeIdentifier(TestModel.ANOTHER_SHARD_CONTAINER))
+ .withChild(ImmutableLeafNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.ANOTHER_SHARD_VALUE)).build())
+ .build();
+
+ cursor.write(TestModel.ANOTHER_SHARD_PATH.getLastPathArgument(), lowerShardContainer);
+ cursor.close();
+ transaction.ready();
+ transaction.submit().get();
+
+ //verify listeners have been notified
+ }
+
+ @Test
+ public void testMockedSubshards() throws Exception {
+ final WriteableDOMDataTreeShard mockedInnerShard = Mockito.mock(WriteableDOMDataTreeShard.class);
+ dataTreeService.registerDataTreeShard(INNER_CONTAINER_ID, mockedInnerShard);
+ final DOMDataTreeShardProducer mockedProducer = Mockito.mock(DOMDataTreeShardProducer.class);
+ doReturn(mockedProducer).when(mockedInnerShard).createProducer(any(Collection.class));
+
+ final DOMDataTreeShardProducer producer = rootShard.createProducer(Collections.singletonList(TEST_ID));
+
+ final DOMDataTreeShardWriteTransaction transaction = producer.createTransaction();
+ final DOMDataTreeWriteCursor cursor = transaction.createCursor(ROOT_ID);
+ cursor.enter(TestModel.TEST_PATH.getLastPathArgument());
+
+ final LeafNode<String> shardedValue1 =
+ ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(new NodeIdentifier(TestModel.SHARDED_VALUE_1)).withValue("sharded value 1").build();
+ final LeafNode<String> shardedValue2 =
+ ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(new NodeIdentifier(TestModel.SHARDED_VALUE_2)).withValue("sharded value 2").build();
+
+ final DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> containerNodeBuilder = ImmutableContainerNodeBuilder.create();
+ final ContainerNode containerNode =
+ containerNodeBuilder
+ .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_CONTAINER))
+ .withChild(shardedValue1)
+ .withChild(shardedValue2)
+ .build();
+
+ final DOMDataTreeShardWriteTransaction mockedTx = Mockito.mock(DOMDataTreeShardWriteTransaction.class);
+ doReturn(mockedTx).when(mockedProducer).createTransaction();
+
+ doNothing().when(mockedTx).ready();
+ doReturn(Futures.immediateFuture(true)).when(mockedTx).validate();
+ doReturn(Futures.immediateFuture(null)).when(mockedTx).prepare();
+ doReturn(Futures.immediateFuture(null)).when(mockedTx).commit();
+
+ final DOMDataTreeWriteCursor mockedCursor = Mockito.mock(DOMDataTreeWriteCursor.class);
+ doNothing().when(mockedCursor).write(any(PathArgument.class), any(NormalizedNode.class));
+ doNothing().when(mockedCursor).close();
+ doReturn(mockedCursor).when(mockedTx).createCursor(any(DOMDataTreeIdentifier.class));
+
+ cursor.write(TestModel.INNER_CONTAINER_PATH.getLastPathArgument(), containerNode);
+ cursor.enter(TestModel.INNER_CONTAINER_PATH.getLastPathArgument());
+
+ final ContainerNode lowerShardContainer = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new NodeIdentifier(TestModel.ANOTHER_SHARD_CONTAINER))
+ .withChild(ImmutableLeafNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.ANOTHER_SHARD_VALUE)).build())
+ .build();
+
+ cursor.write(TestModel.ANOTHER_SHARD_PATH.getLastPathArgument(), lowerShardContainer);
+ cursor.close();
+ transaction.ready();
+ transaction.submit().get();
+
+ verify(mockedTx).ready();
+ verify(mockedTx).validate();
+ verify(mockedTx).prepare();
+ verify(mockedTx).commit();
+
+ }
+}
public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
+ public static final QName INNER_CONTAINER = QName.create(TEST_QNAME, "inner-container");
+ public static final QName ANOTHER_SHARD_CONTAINER = QName.create(TEST_QNAME, "another-shard");
+ public static final QName NEW_SHARD_LIST = QName.create(TEST_QNAME, "new-shard-list");
+ public static final QName SHARDED_VALUE_1 = QName.create(TEST_QNAME, "sharded-value-1");
+ public static final QName SHARDED_VALUE_2 = QName.create(TEST_QNAME, "sharded-value-2");
+ public static final QName ANOTHER_SHARD_VALUE = QName.create(TEST_QNAME, "another-shard-value");
private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
public static final YangInstanceIdentifier TEST2_PATH = YangInstanceIdentifier.of(TEST2_QNAME);
public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).node(OUTER_LIST_QNAME).build();
+ public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(OUTER_LIST_PATH).node(INNER_LIST_QNAME).build();
+ public static final YangInstanceIdentifier INNER_CONTAINER_PATH = YangInstanceIdentifier.builder(TEST_PATH).node(INNER_CONTAINER).build();
+ public static final YangInstanceIdentifier ANOTHER_SHARD_PATH = YangInstanceIdentifier.builder(INNER_CONTAINER_PATH).node(ANOTHER_SHARD_CONTAINER).build();
+ public static final YangInstanceIdentifier NEW_SHARD_LIST_PATH = YangInstanceIdentifier.builder(ANOTHER_SHARD_PATH).node(NEW_SHARD_LIST).build();
public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two");
public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three");
}
}
}
+
+ container inner-container {
+
+ leaf sharded-value-1 {
+ type string;
+ }
+
+ leaf sharded-value-2 {
+ type string;
+ }
+
+ container another-shard {
+ list new-shard-list {
+ key name;
+ leaf name {
+ type string;
+ }
+ leaf value {
+ type string;
+ }
+ }
+
+ leaf another-shard-value {
+ type string;
+ }
+ }
+ }
}
container test2 {
package org.opendaylight.mdsal.dom.store.inmemory;
import com.google.common.annotations.Beta;
+import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.Nonnull;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
* @throws IllegalStateException if this transaction has an unclosed cursor.
*/
void ready();
+
+ ListenableFuture<Void> submit();
+
+ //FIXME: remove these from the public api?
+ ListenableFuture<Boolean> validate();
+
+ ListenableFuture<Void> prepare();
+
+ ListenableFuture<Void> commit();
+
}
package org.opendaylight.mdsal.dom.store.inmemory;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
DOMDataTreeIdentifier getIdentifier() {
return identifier;
}
+
+ ListenableFuture<Boolean> validate() {
+ return tx.validate();
+ }
+
+ ListenableFuture<Void> prepare() {
+ return tx.prepare();
+ }
+
+ ListenableFuture<Void> submit() {
+ final ListenableFuture<Void> commit = tx.commit();
+ tx = null;
+ return commit;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.store.inmemory;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ForeignShardThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ForeignShardThreePhaseCommitCohort.class);
+
+ private final DOMDataTreeIdentifier prefix;
+ private final ForeignShardModificationContext shard;
+
+ public ForeignShardThreePhaseCommitCohort(final DOMDataTreeIdentifier prefix, final ForeignShardModificationContext shard) {
+ this.prefix = prefix;
+ this.shard = shard;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ LOG.debug("Validating transaction on foreign shard {}", prefix);
+ return shard.validate();
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ LOG.debug("Preparing transaction on foreign shard {}", prefix);
+ return shard.prepare();
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ // FIXME abort on the shard
+ return Futures.immediateFuture(null);
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ LOG.debug("Submitting transaction on foreign shard {}", prefix);
+ return shard.submit();
+ }
+}
\ No newline at end of file
}
private void updateProducersAndListeners(final Map<DOMDataTreeIdentifier, ChildShardContext> reparented) {
- // FIXME: implement this
+ // FIXME: remove reparenting of producers, shards have to be registered from top to bottom
+ if (reparented.isEmpty()) {
+ //nothing was reparented no need to update anything
+ return;
+ }
throw new UnsupportedOperationException();
}
ForeignShardModificationContext foreignContext =
new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer());
builder.addSubshard(foreignContext);
+ builder.addSubshard(spec.getPrefix(), foreignContext);
}
- return new InmemoryDOMDataTreeShardWriteTransaction(builder.build());
+ return new InmemoryDOMDataTreeShardWriteTransaction(builder.build(), dataTree);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.store.inmemory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class InMemoryDOMDataTreeShardThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataTreeShardThreePhaseCommitCohort.class);
+ private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
+ private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
+
+ private final DataTree dataTree;
+ private final DataTreeModification modification;
+ private DataTreeCandidate candidate;
+
+ InMemoryDOMDataTreeShardThreePhaseCommitCohort(final DataTree dataTree,
+ final DataTreeModification modification) {
+ Preconditions.checkNotNull(dataTree);
+ this.dataTree = dataTree;
+ this.modification = modification;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ try {
+ dataTree.validate(modification);
+ LOG.debug("DataTreeModification {} validated");
+
+ return CAN_COMMIT_FUTURE;
+ } catch (DataValidationFailedException e) {
+ LOG.warn("Data validation failed for {}", modification);
+ LOG.trace("dataTree : {}", dataTree);
+
+ return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
+ } catch (Exception e) {
+ LOG.warn("Unexpected failure in validation phase", e);
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ try {
+ candidate = dataTree.prepare(modification);
+ return SUCCESSFUL_FUTURE;
+ } catch (Exception e) {
+ LOG.warn("Unexpected failure in preparation phase", e);
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ candidate = null;
+ return SUCCESSFUL_FUTURE;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ Preconditions.checkState(candidate != null, "Attempted to commit an aborted transaction");
+ dataTree.commit(candidate);
+ return SUCCESSFUL_FUTURE;
+ }
+}
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
+
private enum SimpleCursorOperation {
MERGE {
@Override
}
}
+ private InMemoryDOMDataTreeShardThreePhaseCommitCohort commitCohort;
private final ShardDataModification modification;
private DOMDataTreeWriteCursor cursor;
+ private DataTree rootShardDataTree;
+ private DataTreeModification rootModification = null;
+
+ private ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
- InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root) {
+ // FIXME inject into shard?
+ private ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+
+ InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root, final DataTree rootShardDataTree) {
this.modification = Preconditions.checkNotNull(root);
+ this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree);
}
private DOMDataTreeWriteCursor getCursor() {
@Override
public void ready() {
- modification.seal();
+ LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
+ rootModification = modification.seal();
+
+ cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(rootShardDataTree, rootModification));
+ for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry : modification.getChildShards().entrySet()) {
+ cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> submit() {
+ LOG.debug("Submitting open transaction on shard {}", modification.getPrefix());
+
+ Preconditions.checkNotNull(cohorts);
+ Preconditions.checkState(!cohorts.isEmpty(), "Submitting an empty transaction");
+
+ final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts));
+ return submit;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> validate() {
+ LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix());
+
+ final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts));
+ return submit;
+ }
+
+ @Override
+ public ListenableFuture<Void> prepare() {
+ LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix());
+
+ final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts));
+ return submit;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
- return;
+ final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts));
+ return submit;
}
public void followUp() {
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.store.inmemory;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ShardCanCommitCoordinationTask implements Callable<Boolean> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ShardCanCommitCoordinationTask.class);
+
+ private final DOMDataTreeIdentifier rootShardPrefix;
+ private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
+
+ ShardCanCommitCoordinationTask(final DOMDataTreeIdentifier rootShardPrefix,
+ final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ this.rootShardPrefix = rootShardPrefix;
+ this.cohorts = cohorts;
+ }
+
+ @Override
+ public Boolean call() throws TransactionCommitFailedException {
+
+ try {
+ LOG.debug("Shard {}, canCommit started", rootShardPrefix);
+ canCommitBlocking();
+
+ return true;
+ } catch (TransactionCommitFailedException e) {
+ LOG.warn("Shard: {} Submit Error during phase CanCommit, starting Abort", rootShardPrefix, e);
+ //FIXME abort here
+ throw e;
+ }
+ }
+
+ void canCommitBlocking() throws TransactionCommitFailedException {
+ for (final ListenableFuture<?> canCommit : canCommitAll()) {
+ try {
+ final Boolean result = (Boolean)canCommit.get();
+ if (result == null || !result) {
+ throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new TransactionCommitFailedException("CanCommit failed", e);
+ }
+ }
+ }
+
+ private ListenableFuture<?>[] canCommitAll() {
+ final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
+ int i = 0;
+ for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+ ops[i++] = cohort.canCommit();
+ }
+ return ops;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.store.inmemory;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ShardCommitCoordinationTask implements Callable<Void> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinationTask.class);
+
+ private final DOMDataTreeIdentifier rootShardPrefix;
+ private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
+
+ ShardCommitCoordinationTask(final DOMDataTreeIdentifier rootShardPrefix,
+ final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ this.rootShardPrefix = rootShardPrefix;
+ this.cohorts = cohorts;
+ }
+
+ @Override
+ public Void call() throws TransactionCommitFailedException {
+
+ try {
+ LOG.debug("Shard {}, commit started", rootShardPrefix);
+ commitBlocking();
+
+ return null;
+ } catch (TransactionCommitFailedException e) {
+ LOG.warn("Shard: {} Submit Error during phase {}, starting Abort", rootShardPrefix, e);
+ //FIXME abort here
+ throw e;
+ }
+ }
+
+ void commitBlocking() throws TransactionCommitFailedException {
+ for (final ListenableFuture<?> commit : commitAll()) {
+ try {
+ commit.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new TransactionCommitFailedException("Commit failed", e);
+ }
+ }
+ }
+
+ private ListenableFuture<?>[] commitAll() {
+ final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
+ int i = 0;
+ for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+ ops[i++] = cohort.commit();
+ }
+ return ops;
+ }
+}
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
final class ShardDataModification extends WriteableNodeWithSubshard {
return childShards;
}
- public void seal() {
- rootContext.ready();
+ public DataTreeModification seal() {
+ final DataTreeModification rootModification = rootContext.ready();
for (ForeignShardModificationContext childShard : childShards.values()) {
childShard.ready();
}
+
+ return rootModification;
}
}
\ No newline at end of file
putNode(value.getIdentifier().getRootIdentifier(), leafNode);
}
+ public void addSubshard(final DOMDataTreeIdentifier prefix, final ForeignShardModificationContext value) {
+ childShards.put(prefix, value);
+ }
+
private void putNode(final YangInstanceIdentifier key, final WriteableSubshardBoundaryNode subshardNode) {
ModificationContextNodeBuilder<?> current = this;
Iterator<PathArgument> toBoundary = toRelative(key).getPathArguments().iterator();
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+
+package org.opendaylight.mdsal.dom.store.inmemory;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ShardPreCommitCoordinationTask implements Callable<Void>{
+
+ private static final Logger LOG = LoggerFactory.getLogger(ShardPreCommitCoordinationTask.class);
+
+ private final DOMDataTreeIdentifier rootShardPrefix;
+ private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
+
+ ShardPreCommitCoordinationTask(final DOMDataTreeIdentifier rootShardPrefix,
+ final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ this.rootShardPrefix = rootShardPrefix;
+ this.cohorts = cohorts;
+ }
+
+ @Override
+ public Void call() throws TransactionCommitFailedException {
+
+ try {
+ LOG.debug("Shard {}, preCommit started", rootShardPrefix);
+ preCommitBlocking();
+
+ return null;
+ } catch (TransactionCommitFailedException e) {
+ LOG.warn("Shard: {} Submit Error during phase {}, starting Abort", rootShardPrefix, e);
+ //FIXME abort here
+ throw e;
+ }
+ }
+
+ void preCommitBlocking() throws TransactionCommitFailedException {
+ for (final ListenableFuture<?> preCommit : preCommitAll()) {
+ try {
+ preCommit.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new TransactionCommitFailedException("PreCommit failed", e);
+ }
+ }
+ }
+
+ private ListenableFuture<?>[] preCommitAll() {
+ final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
+ int i = 0;
+ for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+ ops[i++] = cohort.preCommit();
+ }
+ return ops;
+ }
+
+}
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
class ShardRootModificationContext {
return modification != null;
}
- void ready() {
+ DataTreeModification ready() {
if (cursor != null) {
cursor.close();
}
if (modification != null) {
modification.ready();
}
+
+ return modification;
}
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.store.inmemory;
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ShardSubmitCoordinationTask implements Callable<Void> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ShardSubmitCoordinationTask.class);
+
+ private final DOMDataTreeIdentifier rootShardPrefix;
+ private final ShardCanCommitCoordinationTask canCommitCoordinationTask;
+ private final ShardPreCommitCoordinationTask preCommitCoordinationTask;
+ private final ShardCommitCoordinationTask commitCoordinationTask;
+
+
+ ShardSubmitCoordinationTask(final DOMDataTreeIdentifier rootShardPrefix,
+ final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ this.rootShardPrefix = rootShardPrefix;
+
+ canCommitCoordinationTask = new ShardCanCommitCoordinationTask(rootShardPrefix, cohorts);
+ preCommitCoordinationTask = new ShardPreCommitCoordinationTask(rootShardPrefix, cohorts);
+ commitCoordinationTask = new ShardCommitCoordinationTask(rootShardPrefix, cohorts);
+ }
+
+ @Override
+ public Void call() throws TransactionCommitFailedException {
+
+ LOG.debug("Shard {}, CanCommit started", rootShardPrefix);
+ canCommitCoordinationTask.canCommitBlocking();
+
+ LOG.debug("Shard {}, PreCommit started", rootShardPrefix);
+ preCommitCoordinationTask.preCommitBlocking();
+
+ LOG.debug("Shard {}, commit started", rootShardPrefix);
+ commitCoordinationTask.commitBlocking();
+
+ return null;
+ }
+}