From: Tomas Cere Date: Fri, 18 Mar 2016 09:38:59 +0000 (+0100) Subject: Bug 4202: submit shard transactions X-Git-Tag: release/boron~152 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=d2e33286b44d4103b0aa79c97790f280c5bc899d;p=mdsal.git Bug 4202: submit shard transactions Change-Id: I27dd78a1c70edf99de646ccd1f3bf69775e829bc Signed-off-by: Tomas Cere --- diff --git a/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/test/ShardedDOMDataTreeProducerMultiShardTest.java b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/test/ShardedDOMDataTreeProducerMultiShardTest.java new file mode 100644 index 0000000000..bd18ac0f84 --- /dev/null +++ b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/test/ShardedDOMDataTreeProducerMultiShardTest.java @@ -0,0 +1,171 @@ +package org.opendaylight.mdsal.dom.broker.test; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.verify; + +import com.google.common.util.concurrent.Futures; +import java.util.Collection; +import java.util.Collections; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; +import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree; +import org.opendaylight.mdsal.dom.broker.test.util.TestModel; +import org.opendaylight.mdsal.dom.store.inmemory.DOMDataTreeShardProducer; +import org.opendaylight.mdsal.dom.store.inmemory.DOMDataTreeShardWriteTransaction; +import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataTreeShard; +import org.opendaylight.mdsal.dom.store.inmemory.WriteableDOMDataTreeShard; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +public class ShardedDOMDataTreeProducerMultiShardTest { + + private final SchemaContext schemaContext = TestModel.createTestContext(); + + private static final DOMDataTreeIdentifier ROOT_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, + YangInstanceIdentifier.EMPTY); + private static final DOMDataTreeIdentifier TEST_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, + TestModel.TEST_PATH); + + private static final DOMDataTreeIdentifier TEST2_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, + TestModel.TEST2_PATH); + + private static final DOMDataTreeIdentifier INNER_CONTAINER_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, TestModel.INNER_CONTAINER_PATH); + private static final DOMDataTreeIdentifier ANOTHER_SHARD_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, TestModel.ANOTHER_SHARD_PATH); + + private InMemoryDOMDataTreeShard rootShard; + + private InMemoryDOMDataTreeShard anotherInnerShard; + + + private ShardedDOMDataTree dataTreeService; + private ListenerRegistration rootShardReg; + private ListenerRegistration innerShardReg; + + @Before + public void setUp() throws Exception { + + rootShard = InMemoryDOMDataTreeShard.create(ROOT_ID); + rootShard.onGlobalContextUpdated(schemaContext); + + ShardedDOMDataTree dataTree = new ShardedDOMDataTree(); + rootShardReg = dataTree.registerDataTreeShard(ROOT_ID, rootShard); + + dataTreeService = dataTree; + } + + @Test + public void testMultipleShards() throws Exception { + //FIXME after listeners are implemented add them here and test those + + final InMemoryDOMDataTreeShard innerShard = InMemoryDOMDataTreeShard.create(INNER_CONTAINER_ID); + innerShard.onGlobalContextUpdated(schemaContext); + innerShardReg = dataTreeService.registerDataTreeShard(INNER_CONTAINER_ID, innerShard); + + final DOMDataTreeShardProducer producer = rootShard.createProducer(Collections.singletonList(TEST_ID)); + final DOMDataTreeShardWriteTransaction transaction = producer.createTransaction(); + final DOMDataTreeWriteCursor cursor = transaction.createCursor(ROOT_ID); + cursor.enter(TestModel.TEST_PATH.getLastPathArgument()); + + final LeafNode shardedValue1 = + ImmutableLeafNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.SHARDED_VALUE_1)).withValue("sharded value 1").build(); + final LeafNode shardedValue2 = + ImmutableLeafNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.SHARDED_VALUE_2)).withValue("sharded value 2").build(); + + final DataContainerNodeAttrBuilder containerNodeBuilder = ImmutableContainerNodeBuilder.create(); + final ContainerNode containerNode = + containerNodeBuilder + .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_CONTAINER)) + .withChild(shardedValue1) + .withChild(shardedValue2) + .build(); + + cursor.write(TestModel.INNER_CONTAINER_PATH.getLastPathArgument(), containerNode); + cursor.enter(TestModel.INNER_CONTAINER_PATH.getLastPathArgument()); + + final ContainerNode lowerShardContainer = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(TestModel.ANOTHER_SHARD_CONTAINER)) + .withChild(ImmutableLeafNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.ANOTHER_SHARD_VALUE)).build()) + .build(); + + cursor.write(TestModel.ANOTHER_SHARD_PATH.getLastPathArgument(), lowerShardContainer); + cursor.close(); + transaction.ready(); + transaction.submit().get(); + + //verify listeners have been notified + } + + @Test + public void testMockedSubshards() throws Exception { + final WriteableDOMDataTreeShard mockedInnerShard = Mockito.mock(WriteableDOMDataTreeShard.class); + dataTreeService.registerDataTreeShard(INNER_CONTAINER_ID, mockedInnerShard); + final DOMDataTreeShardProducer mockedProducer = Mockito.mock(DOMDataTreeShardProducer.class); + doReturn(mockedProducer).when(mockedInnerShard).createProducer(any(Collection.class)); + + final DOMDataTreeShardProducer producer = rootShard.createProducer(Collections.singletonList(TEST_ID)); + + final DOMDataTreeShardWriteTransaction transaction = producer.createTransaction(); + final DOMDataTreeWriteCursor cursor = transaction.createCursor(ROOT_ID); + cursor.enter(TestModel.TEST_PATH.getLastPathArgument()); + + final LeafNode shardedValue1 = + ImmutableLeafNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.SHARDED_VALUE_1)).withValue("sharded value 1").build(); + final LeafNode shardedValue2 = + ImmutableLeafNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.SHARDED_VALUE_2)).withValue("sharded value 2").build(); + + final DataContainerNodeAttrBuilder containerNodeBuilder = ImmutableContainerNodeBuilder.create(); + final ContainerNode containerNode = + containerNodeBuilder + .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_CONTAINER)) + .withChild(shardedValue1) + .withChild(shardedValue2) + .build(); + + final DOMDataTreeShardWriteTransaction mockedTx = Mockito.mock(DOMDataTreeShardWriteTransaction.class); + doReturn(mockedTx).when(mockedProducer).createTransaction(); + + doNothing().when(mockedTx).ready(); + doReturn(Futures.immediateFuture(true)).when(mockedTx).validate(); + doReturn(Futures.immediateFuture(null)).when(mockedTx).prepare(); + doReturn(Futures.immediateFuture(null)).when(mockedTx).commit(); + + final DOMDataTreeWriteCursor mockedCursor = Mockito.mock(DOMDataTreeWriteCursor.class); + doNothing().when(mockedCursor).write(any(PathArgument.class), any(NormalizedNode.class)); + doNothing().when(mockedCursor).close(); + doReturn(mockedCursor).when(mockedTx).createCursor(any(DOMDataTreeIdentifier.class)); + + cursor.write(TestModel.INNER_CONTAINER_PATH.getLastPathArgument(), containerNode); + cursor.enter(TestModel.INNER_CONTAINER_PATH.getLastPathArgument()); + + final ContainerNode lowerShardContainer = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(TestModel.ANOTHER_SHARD_CONTAINER)) + .withChild(ImmutableLeafNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.ANOTHER_SHARD_VALUE)).build()) + .build(); + + cursor.write(TestModel.ANOTHER_SHARD_PATH.getLastPathArgument(), lowerShardContainer); + cursor.close(); + transaction.ready(); + transaction.submit().get(); + + verify(mockedTx).ready(); + verify(mockedTx).validate(); + verify(mockedTx).prepare(); + verify(mockedTx).commit(); + + } +} diff --git a/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/test/util/TestModel.java b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/test/util/TestModel.java index 3761c78705..182c0fd06c 100644 --- a/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/test/util/TestModel.java +++ b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/test/util/TestModel.java @@ -28,11 +28,21 @@ public class TestModel { public static final QName ID_QNAME = QName.create(TEST_QNAME, "id"); public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name"); public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value"); + public static final QName INNER_CONTAINER = QName.create(TEST_QNAME, "inner-container"); + public static final QName ANOTHER_SHARD_CONTAINER = QName.create(TEST_QNAME, "another-shard"); + public static final QName NEW_SHARD_LIST = QName.create(TEST_QNAME, "new-shard-list"); + public static final QName SHARDED_VALUE_1 = QName.create(TEST_QNAME, "sharded-value-1"); + public static final QName SHARDED_VALUE_2 = QName.create(TEST_QNAME, "sharded-value-2"); + public static final QName ANOTHER_SHARD_VALUE = QName.create(TEST_QNAME, "another-shard-value"); private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang"; public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME); public static final YangInstanceIdentifier TEST2_PATH = YangInstanceIdentifier.of(TEST2_QNAME); public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).node(OUTER_LIST_QNAME).build(); + public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(OUTER_LIST_PATH).node(INNER_LIST_QNAME).build(); + public static final YangInstanceIdentifier INNER_CONTAINER_PATH = YangInstanceIdentifier.builder(TEST_PATH).node(INNER_CONTAINER).build(); + public static final YangInstanceIdentifier ANOTHER_SHARD_PATH = YangInstanceIdentifier.builder(INNER_CONTAINER_PATH).node(ANOTHER_SHARD_CONTAINER).build(); + public static final YangInstanceIdentifier NEW_SHARD_LIST_PATH = YangInstanceIdentifier.builder(ANOTHER_SHARD_PATH).node(NEW_SHARD_LIST).build(); public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two"); public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three"); diff --git a/dom/mdsal-dom-broker/src/test/resources/odl-datastore-test.yang b/dom/mdsal-dom-broker/src/test/resources/odl-datastore-test.yang index eb405f60b9..a01b4314c5 100644 --- a/dom/mdsal-dom-broker/src/test/resources/odl-datastore-test.yang +++ b/dom/mdsal-dom-broker/src/test/resources/odl-datastore-test.yang @@ -39,6 +39,33 @@ module odl-datastore-test { } } } + + container inner-container { + + leaf sharded-value-1 { + type string; + } + + leaf sharded-value-2 { + type string; + } + + container another-shard { + list new-shard-list { + key name; + leaf name { + type string; + } + leaf value { + type string; + } + } + + leaf another-shard-value { + type string; + } + } + } } container test2 { diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/DOMDataTreeShardWriteTransaction.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/DOMDataTreeShardWriteTransaction.java index 92146bca9f..9689057b6b 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/DOMDataTreeShardWriteTransaction.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/DOMDataTreeShardWriteTransaction.java @@ -9,6 +9,7 @@ package org.opendaylight.mdsal.dom.store.inmemory; import com.google.common.annotations.Beta; +import com.google.common.util.concurrent.ListenableFuture; import javax.annotation.Nonnull; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; @@ -34,4 +35,14 @@ public interface DOMDataTreeShardWriteTransaction { * @throws IllegalStateException if this transaction has an unclosed cursor. */ void ready(); + + ListenableFuture submit(); + + //FIXME: remove these from the public api? + ListenableFuture validate(); + + ListenableFuture prepare(); + + ListenableFuture commit(); + } diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ForeignShardModificationContext.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ForeignShardModificationContext.java index bb32047f8f..92cc97ac5e 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ForeignShardModificationContext.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ForeignShardModificationContext.java @@ -9,6 +9,7 @@ package org.opendaylight.mdsal.dom.store.inmemory; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; @@ -67,4 +68,18 @@ final class ForeignShardModificationContext { DOMDataTreeIdentifier getIdentifier() { return identifier; } + + ListenableFuture validate() { + return tx.validate(); + } + + ListenableFuture prepare() { + return tx.prepare(); + } + + ListenableFuture submit() { + final ListenableFuture commit = tx.commit(); + tx = null; + return commit; + } } diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ForeignShardThreePhaseCommitCohort.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ForeignShardThreePhaseCommitCohort.java new file mode 100644 index 0000000000..16d4b34e79 --- /dev/null +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ForeignShardThreePhaseCommitCohort.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.mdsal.dom.store.inmemory; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ForeignShardThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort { + + private static final Logger LOG = LoggerFactory.getLogger(ForeignShardThreePhaseCommitCohort.class); + + private final DOMDataTreeIdentifier prefix; + private final ForeignShardModificationContext shard; + + public ForeignShardThreePhaseCommitCohort(final DOMDataTreeIdentifier prefix, final ForeignShardModificationContext shard) { + this.prefix = prefix; + this.shard = shard; + } + + @Override + public ListenableFuture canCommit() { + LOG.debug("Validating transaction on foreign shard {}", prefix); + return shard.validate(); + } + + @Override + public ListenableFuture preCommit() { + LOG.debug("Preparing transaction on foreign shard {}", prefix); + return shard.prepare(); + } + + @Override + public ListenableFuture abort() { + // FIXME abort on the shard + return Futures.immediateFuture(null); + } + + @Override + public ListenableFuture commit() { + LOG.debug("Submitting transaction on foreign shard {}", prefix); + return shard.submit(); + } +} \ No newline at end of file diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java index b868d2ee74..5ccc8e4f3b 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java @@ -139,7 +139,11 @@ public class InMemoryDOMDataTreeShard implements WriteableDOMDataTreeShard, Sche } private void updateProducersAndListeners(final Map reparented) { - // FIXME: implement this + // FIXME: remove reparenting of producers, shards have to be registered from top to bottom + if (reparented.isEmpty()) { + //nothing was reparented no need to update anything + return; + } throw new UnsupportedOperationException(); } @@ -205,8 +209,9 @@ public class InMemoryDOMDataTreeShard implements WriteableDOMDataTreeShard, Sche ForeignShardModificationContext foreignContext = new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer()); builder.addSubshard(foreignContext); + builder.addSubshard(spec.getPrefix(), foreignContext); } - return new InmemoryDOMDataTreeShardWriteTransaction(builder.build()); + return new InmemoryDOMDataTreeShardWriteTransaction(builder.build(), dataTree); } } diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardThreePhaseCommitCohort.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardThreePhaseCommitCohort.java new file mode 100644 index 0000000000..8c69c59a65 --- /dev/null +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardThreePhaseCommitCohort.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.mdsal.dom.store.inmemory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class InMemoryDOMDataTreeShardThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort { + + private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataTreeShardThreePhaseCommitCohort.class); + private static final ListenableFuture SUCCESSFUL_FUTURE = Futures.immediateFuture(null); + private static final ListenableFuture CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE); + + private final DataTree dataTree; + private final DataTreeModification modification; + private DataTreeCandidate candidate; + + InMemoryDOMDataTreeShardThreePhaseCommitCohort(final DataTree dataTree, + final DataTreeModification modification) { + Preconditions.checkNotNull(dataTree); + this.dataTree = dataTree; + this.modification = modification; + } + + @Override + public ListenableFuture canCommit() { + try { + dataTree.validate(modification); + LOG.debug("DataTreeModification {} validated"); + + return CAN_COMMIT_FUTURE; + } catch (DataValidationFailedException e) { + LOG.warn("Data validation failed for {}", modification); + LOG.trace("dataTree : {}", dataTree); + + return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e)); + } catch (Exception e) { + LOG.warn("Unexpected failure in validation phase", e); + return Futures.immediateFailedFuture(e); + } + } + + @Override + public ListenableFuture preCommit() { + try { + candidate = dataTree.prepare(modification); + return SUCCESSFUL_FUTURE; + } catch (Exception e) { + LOG.warn("Unexpected failure in preparation phase", e); + return Futures.immediateFailedFuture(e); + } + } + + @Override + public ListenableFuture abort() { + candidate = null; + return SUCCESSFUL_FUTURE; + } + + @Override + public ListenableFuture commit() { + Preconditions.checkState(candidate != null, "Attempted to commit an aborted transaction"); + dataTree.commit(candidate); + return SUCCESSFUL_FUTURE; + } +} diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java index 0a754a5615..4634d4f704 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java @@ -11,15 +11,30 @@ package org.opendaylight.mdsal.dom.store.inmemory; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.ArrayList; import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction { + + private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class); + private enum SimpleCursorOperation { MERGE { @Override @@ -63,11 +78,20 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT } } + private InMemoryDOMDataTreeShardThreePhaseCommitCohort commitCohort; private final ShardDataModification modification; private DOMDataTreeWriteCursor cursor; + private DataTree rootShardDataTree; + private DataTreeModification rootModification = null; + + private ArrayList cohorts = new ArrayList<>(); - InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root) { + // FIXME inject into shard? + private ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + + InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root, final DataTree rootShardDataTree) { this.modification = Preconditions.checkNotNull(root); + this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree); } private DOMDataTreeWriteCursor getCursor() { @@ -122,10 +146,49 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT @Override public void ready() { - modification.seal(); + LOG.debug("Readying open transaction on shard {}", modification.getPrefix()); + rootModification = modification.seal(); + + cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(rootShardDataTree, rootModification)); + for (Entry entry : modification.getChildShards().entrySet()) { + cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue())); + } + } + + @Override + public ListenableFuture submit() { + LOG.debug("Submitting open transaction on shard {}", modification.getPrefix()); + + Preconditions.checkNotNull(cohorts); + Preconditions.checkState(!cohorts.isEmpty(), "Submitting an empty transaction"); + + final ListenableFuture submit = executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts)); + return submit; + } + + @Override + public ListenableFuture validate() { + LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix()); + + final ListenableFuture submit = executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts)); + return submit; + } + + @Override + public ListenableFuture prepare() { + LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix()); + + final ListenableFuture submit = executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts)); + return submit; + } + + @Override + public ListenableFuture commit() { + LOG.debug("Commit open transaction on shard {}", modification.getPrefix()); - return; + final ListenableFuture submit = executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts)); + return submit; } public void followUp() { diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCanCommitCoordinationTask.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCanCommitCoordinationTask.java new file mode 100644 index 0000000000..9c2f5a02f7 --- /dev/null +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCanCommitCoordinationTask.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.mdsal.dom.store.inmemory; + +import com.google.common.util.concurrent.ListenableFuture; +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ShardCanCommitCoordinationTask implements Callable { + + private static final Logger LOG = LoggerFactory.getLogger(ShardCanCommitCoordinationTask.class); + + private final DOMDataTreeIdentifier rootShardPrefix; + private final Collection cohorts; + + ShardCanCommitCoordinationTask(final DOMDataTreeIdentifier rootShardPrefix, + final Collection cohorts) { + this.rootShardPrefix = rootShardPrefix; + this.cohorts = cohorts; + } + + @Override + public Boolean call() throws TransactionCommitFailedException { + + try { + LOG.debug("Shard {}, canCommit started", rootShardPrefix); + canCommitBlocking(); + + return true; + } catch (TransactionCommitFailedException e) { + LOG.warn("Shard: {} Submit Error during phase CanCommit, starting Abort", rootShardPrefix, e); + //FIXME abort here + throw e; + } + } + + void canCommitBlocking() throws TransactionCommitFailedException { + for (final ListenableFuture canCommit : canCommitAll()) { + try { + final Boolean result = (Boolean)canCommit.get(); + if (result == null || !result) { + throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available."); + } + } catch (InterruptedException | ExecutionException e) { + throw new TransactionCommitFailedException("CanCommit failed", e); + } + } + } + + private ListenableFuture[] canCommitAll() { + final ListenableFuture[] ops = new ListenableFuture[cohorts.size()]; + int i = 0; + for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops[i++] = cohort.canCommit(); + } + return ops; + } +} diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTask.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTask.java new file mode 100644 index 0000000000..5dc6cad03d --- /dev/null +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTask.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.mdsal.dom.store.inmemory; + +import com.google.common.util.concurrent.ListenableFuture; +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ShardCommitCoordinationTask implements Callable { + + private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinationTask.class); + + private final DOMDataTreeIdentifier rootShardPrefix; + private final Collection cohorts; + + ShardCommitCoordinationTask(final DOMDataTreeIdentifier rootShardPrefix, + final Collection cohorts) { + this.rootShardPrefix = rootShardPrefix; + this.cohorts = cohorts; + } + + @Override + public Void call() throws TransactionCommitFailedException { + + try { + LOG.debug("Shard {}, commit started", rootShardPrefix); + commitBlocking(); + + return null; + } catch (TransactionCommitFailedException e) { + LOG.warn("Shard: {} Submit Error during phase {}, starting Abort", rootShardPrefix, e); + //FIXME abort here + throw e; + } + } + + void commitBlocking() throws TransactionCommitFailedException { + for (final ListenableFuture commit : commitAll()) { + try { + commit.get(); + } catch (InterruptedException | ExecutionException e) { + throw new TransactionCommitFailedException("Commit failed", e); + } + } + } + + private ListenableFuture[] commitAll() { + final ListenableFuture[] ops = new ListenableFuture[cohorts.size()]; + int i = 0; + for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops[i++] = cohort.commit(); + } + return ops; + } +} diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardDataModification.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardDataModification.java index ac3d358d6a..5840e9b6df 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardDataModification.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardDataModification.java @@ -16,6 +16,7 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; final class ShardDataModification extends WriteableNodeWithSubshard { @@ -63,11 +64,13 @@ final class ShardDataModification extends WriteableNodeWithSubshard { return childShards; } - public void seal() { - rootContext.ready(); + public DataTreeModification seal() { + final DataTreeModification rootModification = rootContext.ready(); for (ForeignShardModificationContext childShard : childShards.values()) { childShard.ready(); } + + return rootModification; } } \ No newline at end of file diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardDataModificationBuilder.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardDataModificationBuilder.java index 874400b3d9..f6e78ad7f7 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardDataModificationBuilder.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardDataModificationBuilder.java @@ -30,6 +30,10 @@ class ShardDataModificationBuilder extends ModificationContextNodeBuilder current = this; Iterator toBoundary = toRelative(key).getPathArguments().iterator(); diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardPreCommitCoordinationTask.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardPreCommitCoordinationTask.java new file mode 100644 index 0000000000..0024cb83f1 --- /dev/null +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardPreCommitCoordinationTask.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + + +package org.opendaylight.mdsal.dom.store.inmemory; + +import com.google.common.util.concurrent.ListenableFuture; +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ShardPreCommitCoordinationTask implements Callable{ + + private static final Logger LOG = LoggerFactory.getLogger(ShardPreCommitCoordinationTask.class); + + private final DOMDataTreeIdentifier rootShardPrefix; + private final Collection cohorts; + + ShardPreCommitCoordinationTask(final DOMDataTreeIdentifier rootShardPrefix, + final Collection cohorts) { + this.rootShardPrefix = rootShardPrefix; + this.cohorts = cohorts; + } + + @Override + public Void call() throws TransactionCommitFailedException { + + try { + LOG.debug("Shard {}, preCommit started", rootShardPrefix); + preCommitBlocking(); + + return null; + } catch (TransactionCommitFailedException e) { + LOG.warn("Shard: {} Submit Error during phase {}, starting Abort", rootShardPrefix, e); + //FIXME abort here + throw e; + } + } + + void preCommitBlocking() throws TransactionCommitFailedException { + for (final ListenableFuture preCommit : preCommitAll()) { + try { + preCommit.get(); + } catch (InterruptedException | ExecutionException e) { + throw new TransactionCommitFailedException("PreCommit failed", e); + } + } + } + + private ListenableFuture[] preCommitAll() { + final ListenableFuture[] ops = new ListenableFuture[cohorts.size()]; + int i = 0; + for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops[i++] = cohort.preCommit(); + } + return ops; + } + +} diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardRootModificationContext.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardRootModificationContext.java index 9ad1ac93ef..40ab36caa7 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardRootModificationContext.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardRootModificationContext.java @@ -13,6 +13,7 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor; class ShardRootModificationContext { @@ -50,12 +51,14 @@ class ShardRootModificationContext { return modification != null; } - void ready() { + DataTreeModification ready() { if (cursor != null) { cursor.close(); } if (modification != null) { modification.ready(); } + + return modification; } } diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardSubmitCoordinationTask.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardSubmitCoordinationTask.java new file mode 100644 index 0000000000..b46c9fcf0b --- /dev/null +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardSubmitCoordinationTask.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.mdsal.dom.store.inmemory; + +import java.util.Collection; +import java.util.concurrent.Callable; +import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ShardSubmitCoordinationTask implements Callable { + + private static final Logger LOG = LoggerFactory.getLogger(ShardSubmitCoordinationTask.class); + + private final DOMDataTreeIdentifier rootShardPrefix; + private final ShardCanCommitCoordinationTask canCommitCoordinationTask; + private final ShardPreCommitCoordinationTask preCommitCoordinationTask; + private final ShardCommitCoordinationTask commitCoordinationTask; + + + ShardSubmitCoordinationTask(final DOMDataTreeIdentifier rootShardPrefix, + final Collection cohorts) { + this.rootShardPrefix = rootShardPrefix; + + canCommitCoordinationTask = new ShardCanCommitCoordinationTask(rootShardPrefix, cohorts); + preCommitCoordinationTask = new ShardPreCommitCoordinationTask(rootShardPrefix, cohorts); + commitCoordinationTask = new ShardCommitCoordinationTask(rootShardPrefix, cohorts); + } + + @Override + public Void call() throws TransactionCommitFailedException { + + LOG.debug("Shard {}, CanCommit started", rootShardPrefix); + canCommitCoordinationTask.canCommitBlocking(); + + LOG.debug("Shard {}, PreCommit started", rootShardPrefix); + preCommitCoordinationTask.preCommitBlocking(); + + LOG.debug("Shard {}, commit started", rootShardPrefix); + commitCoordinationTask.commitBlocking(); + + return null; + } +}