Bug 4202: submit shard transactions 32/37332/10
authorTomas Cere <tcere@cisco.com>
Fri, 18 Mar 2016 09:38:59 +0000 (10:38 +0100)
committerRobert Varga <nite@hq.sk>
Fri, 29 Apr 2016 09:18:25 +0000 (09:18 +0000)
Change-Id: I27dd78a1c70edf99de646ccd1f3bf69775e829bc
Signed-off-by: Tomas Cere <tcere@cisco.com>
16 files changed:
dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/test/ShardedDOMDataTreeProducerMultiShardTest.java [new file with mode: 0644]
dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/test/util/TestModel.java
dom/mdsal-dom-broker/src/test/resources/odl-datastore-test.yang
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/DOMDataTreeShardWriteTransaction.java
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ForeignShardModificationContext.java
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ForeignShardThreePhaseCommitCohort.java [new file with mode: 0644]
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardThreePhaseCommitCohort.java [new file with mode: 0644]
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InmemoryDOMDataTreeShardWriteTransaction.java
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCanCommitCoordinationTask.java [new file with mode: 0644]
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTask.java [new file with mode: 0644]
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardDataModification.java
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardDataModificationBuilder.java
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardPreCommitCoordinationTask.java [new file with mode: 0644]
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardRootModificationContext.java
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardSubmitCoordinationTask.java [new file with mode: 0644]

diff --git a/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/test/ShardedDOMDataTreeProducerMultiShardTest.java b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/test/ShardedDOMDataTreeProducerMultiShardTest.java
new file mode 100644 (file)
index 0000000..bd18ac0
--- /dev/null
@@ -0,0 +1,171 @@
+package org.opendaylight.mdsal.dom.broker.test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.util.concurrent.Futures;
+import java.util.Collection;
+import java.util.Collections;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
+import org.opendaylight.mdsal.dom.broker.test.util.TestModel;
+import org.opendaylight.mdsal.dom.store.inmemory.DOMDataTreeShardProducer;
+import org.opendaylight.mdsal.dom.store.inmemory.DOMDataTreeShardWriteTransaction;
+import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataTreeShard;
+import org.opendaylight.mdsal.dom.store.inmemory.WriteableDOMDataTreeShard;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class ShardedDOMDataTreeProducerMultiShardTest {
+
+    private final SchemaContext schemaContext = TestModel.createTestContext();
+
+    private static final DOMDataTreeIdentifier ROOT_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
+            YangInstanceIdentifier.EMPTY);
+    private static final DOMDataTreeIdentifier TEST_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
+            TestModel.TEST_PATH);
+
+    private static final DOMDataTreeIdentifier TEST2_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
+            TestModel.TEST2_PATH);
+
+    private static final DOMDataTreeIdentifier INNER_CONTAINER_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, TestModel.INNER_CONTAINER_PATH);
+    private static final DOMDataTreeIdentifier ANOTHER_SHARD_ID = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, TestModel.ANOTHER_SHARD_PATH);
+
+    private InMemoryDOMDataTreeShard rootShard;
+
+    private InMemoryDOMDataTreeShard anotherInnerShard;
+
+
+    private ShardedDOMDataTree dataTreeService;
+    private ListenerRegistration<InMemoryDOMDataTreeShard> rootShardReg;
+    private ListenerRegistration<InMemoryDOMDataTreeShard> innerShardReg;
+
+    @Before
+    public void setUp() throws Exception {
+
+        rootShard = InMemoryDOMDataTreeShard.create(ROOT_ID);
+        rootShard.onGlobalContextUpdated(schemaContext);
+
+        ShardedDOMDataTree dataTree = new ShardedDOMDataTree();
+        rootShardReg = dataTree.registerDataTreeShard(ROOT_ID, rootShard);
+
+        dataTreeService = dataTree;
+    }
+
+    @Test
+    public void testMultipleShards() throws Exception {
+        //FIXME after listeners are implemented add them here and test those
+
+        final InMemoryDOMDataTreeShard innerShard = InMemoryDOMDataTreeShard.create(INNER_CONTAINER_ID);
+        innerShard.onGlobalContextUpdated(schemaContext);
+        innerShardReg = dataTreeService.registerDataTreeShard(INNER_CONTAINER_ID, innerShard);
+
+        final DOMDataTreeShardProducer producer = rootShard.createProducer(Collections.singletonList(TEST_ID));
+        final DOMDataTreeShardWriteTransaction transaction = producer.createTransaction();
+        final DOMDataTreeWriteCursor cursor = transaction.createCursor(ROOT_ID);
+        cursor.enter(TestModel.TEST_PATH.getLastPathArgument());
+
+        final LeafNode<String> shardedValue1 =
+                ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(new NodeIdentifier(TestModel.SHARDED_VALUE_1)).withValue("sharded value 1").build();
+        final LeafNode<String> shardedValue2 =
+                ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(new NodeIdentifier(TestModel.SHARDED_VALUE_2)).withValue("sharded value 2").build();
+
+        final DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> containerNodeBuilder = ImmutableContainerNodeBuilder.create();
+        final ContainerNode containerNode =
+                containerNodeBuilder
+                        .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_CONTAINER))
+                        .withChild(shardedValue1)
+                        .withChild(shardedValue2)
+                        .build();
+
+        cursor.write(TestModel.INNER_CONTAINER_PATH.getLastPathArgument(), containerNode);
+        cursor.enter(TestModel.INNER_CONTAINER_PATH.getLastPathArgument());
+
+        final ContainerNode lowerShardContainer = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new NodeIdentifier(TestModel.ANOTHER_SHARD_CONTAINER))
+                .withChild(ImmutableLeafNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.ANOTHER_SHARD_VALUE)).build())
+                .build();
+
+        cursor.write(TestModel.ANOTHER_SHARD_PATH.getLastPathArgument(), lowerShardContainer);
+        cursor.close();
+        transaction.ready();
+        transaction.submit().get();
+
+        //verify listeners have been notified
+    }
+
+    @Test
+    public void testMockedSubshards() throws Exception {
+        final WriteableDOMDataTreeShard mockedInnerShard = Mockito.mock(WriteableDOMDataTreeShard.class);
+        dataTreeService.registerDataTreeShard(INNER_CONTAINER_ID, mockedInnerShard);
+        final DOMDataTreeShardProducer mockedProducer = Mockito.mock(DOMDataTreeShardProducer.class);
+        doReturn(mockedProducer).when(mockedInnerShard).createProducer(any(Collection.class));
+
+        final DOMDataTreeShardProducer producer = rootShard.createProducer(Collections.singletonList(TEST_ID));
+
+        final DOMDataTreeShardWriteTransaction transaction = producer.createTransaction();
+        final DOMDataTreeWriteCursor cursor = transaction.createCursor(ROOT_ID);
+        cursor.enter(TestModel.TEST_PATH.getLastPathArgument());
+
+        final LeafNode<String> shardedValue1 =
+                ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(new NodeIdentifier(TestModel.SHARDED_VALUE_1)).withValue("sharded value 1").build();
+        final LeafNode<String> shardedValue2 =
+                ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(new NodeIdentifier(TestModel.SHARDED_VALUE_2)).withValue("sharded value 2").build();
+
+        final DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> containerNodeBuilder = ImmutableContainerNodeBuilder.create();
+        final ContainerNode containerNode =
+                containerNodeBuilder
+                        .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_CONTAINER))
+                        .withChild(shardedValue1)
+                        .withChild(shardedValue2)
+                        .build();
+
+        final DOMDataTreeShardWriteTransaction mockedTx = Mockito.mock(DOMDataTreeShardWriteTransaction.class);
+        doReturn(mockedTx).when(mockedProducer).createTransaction();
+
+        doNothing().when(mockedTx).ready();
+        doReturn(Futures.immediateFuture(true)).when(mockedTx).validate();
+        doReturn(Futures.immediateFuture(null)).when(mockedTx).prepare();
+        doReturn(Futures.immediateFuture(null)).when(mockedTx).commit();
+
+        final DOMDataTreeWriteCursor mockedCursor = Mockito.mock(DOMDataTreeWriteCursor.class);
+        doNothing().when(mockedCursor).write(any(PathArgument.class), any(NormalizedNode.class));
+        doNothing().when(mockedCursor).close();
+        doReturn(mockedCursor).when(mockedTx).createCursor(any(DOMDataTreeIdentifier.class));
+
+        cursor.write(TestModel.INNER_CONTAINER_PATH.getLastPathArgument(), containerNode);
+        cursor.enter(TestModel.INNER_CONTAINER_PATH.getLastPathArgument());
+
+        final ContainerNode lowerShardContainer = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new NodeIdentifier(TestModel.ANOTHER_SHARD_CONTAINER))
+                .withChild(ImmutableLeafNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.ANOTHER_SHARD_VALUE)).build())
+                .build();
+
+        cursor.write(TestModel.ANOTHER_SHARD_PATH.getLastPathArgument(), lowerShardContainer);
+        cursor.close();
+        transaction.ready();
+        transaction.submit().get();
+
+        verify(mockedTx).ready();
+        verify(mockedTx).validate();
+        verify(mockedTx).prepare();
+        verify(mockedTx).commit();
+
+    }
+}
index 3761c787057f0e0be0580d7e2238207f81e929ad..182c0fd06c5a28130872a4447b92e16ed8f7ee9a 100644 (file)
@@ -28,11 +28,21 @@ public class TestModel {
     public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
     public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
     public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
+    public static final QName INNER_CONTAINER = QName.create(TEST_QNAME, "inner-container");
+    public static final QName ANOTHER_SHARD_CONTAINER = QName.create(TEST_QNAME, "another-shard");
+    public static final QName NEW_SHARD_LIST = QName.create(TEST_QNAME, "new-shard-list");
+    public static final QName SHARDED_VALUE_1 = QName.create(TEST_QNAME, "sharded-value-1");
+    public static final QName SHARDED_VALUE_2 = QName.create(TEST_QNAME, "sharded-value-2");
+    public static final QName ANOTHER_SHARD_VALUE = QName.create(TEST_QNAME, "another-shard-value");
     private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
 
     public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
     public static final YangInstanceIdentifier TEST2_PATH = YangInstanceIdentifier.of(TEST2_QNAME);
     public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).node(OUTER_LIST_QNAME).build();
+    public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(OUTER_LIST_PATH).node(INNER_LIST_QNAME).build();
+    public static final YangInstanceIdentifier INNER_CONTAINER_PATH = YangInstanceIdentifier.builder(TEST_PATH).node(INNER_CONTAINER).build();
+    public static final YangInstanceIdentifier ANOTHER_SHARD_PATH = YangInstanceIdentifier.builder(INNER_CONTAINER_PATH).node(ANOTHER_SHARD_CONTAINER).build();
+    public static final YangInstanceIdentifier NEW_SHARD_LIST_PATH = YangInstanceIdentifier.builder(ANOTHER_SHARD_PATH).node(NEW_SHARD_LIST).build();
     public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two");
     public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three");
 
index eb405f60b9212388a66de350a3735f769409a3b5..a01b4314c5ac246dfe6b28097e1eddffee12e497 100644 (file)
@@ -39,6 +39,33 @@ module odl-datastore-test {
                 }
             }
         }
+
+        container inner-container {
+
+            leaf sharded-value-1 {
+                type string;
+            }
+
+            leaf sharded-value-2 {
+                type string;
+            }
+
+            container another-shard {
+                list new-shard-list {
+                    key name;
+                    leaf name {
+                        type string;
+                    }
+                    leaf value {
+                        type string;
+                    }
+                }
+
+                leaf another-shard-value {
+                    type string;
+                }
+            }
+        }
     }
 
     container test2 {
index 92146bca9f8895a9aba04ba39a905318ea83f316..9689057b6bcbf23aea82cf4eea58be0b03d09b5a 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.mdsal.dom.store.inmemory;
 
 import com.google.common.annotations.Beta;
+import com.google.common.util.concurrent.ListenableFuture;
 import javax.annotation.Nonnull;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
@@ -34,4 +35,14 @@ public interface DOMDataTreeShardWriteTransaction {
      * @throws IllegalStateException if this transaction has an unclosed cursor.
      */
     void ready();
+
+    ListenableFuture<Void> submit();
+
+    //FIXME: remove these from the public api?
+    ListenableFuture<Boolean> validate();
+
+    ListenableFuture<Void> prepare();
+
+    ListenableFuture<Void> commit();
+
 }
index bb32047f8f1a04b04ee10f57a64a0d0f63200f58..92cc97ac5e4a7586dce33aadaa8241ac5f876264 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.mdsal.dom.store.inmemory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
@@ -67,4 +68,18 @@ final class ForeignShardModificationContext {
     DOMDataTreeIdentifier getIdentifier() {
         return identifier;
     }
+
+    ListenableFuture<Boolean> validate() {
+        return tx.validate();
+    }
+
+    ListenableFuture<Void> prepare() {
+        return tx.prepare();
+    }
+
+    ListenableFuture<Void> submit() {
+        final ListenableFuture<Void> commit = tx.commit();
+        tx = null;
+        return commit;
+    }
 }
diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ForeignShardThreePhaseCommitCohort.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ForeignShardThreePhaseCommitCohort.java
new file mode 100644 (file)
index 0000000..16d4b34
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.store.inmemory;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ForeignShardThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ForeignShardThreePhaseCommitCohort.class);
+
+    private final DOMDataTreeIdentifier prefix;
+    private final ForeignShardModificationContext shard;
+
+    public ForeignShardThreePhaseCommitCohort(final DOMDataTreeIdentifier prefix, final ForeignShardModificationContext shard) {
+        this.prefix = prefix;
+        this.shard = shard;
+    }
+
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
+        LOG.debug("Validating transaction on foreign shard {}", prefix);
+        return shard.validate();
+    }
+
+    @Override
+    public ListenableFuture<Void> preCommit() {
+        LOG.debug("Preparing transaction on foreign shard {}", prefix);
+        return shard.prepare();
+    }
+
+    @Override
+    public ListenableFuture<Void> abort() {
+        // FIXME abort on the shard
+        return Futures.immediateFuture(null);
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        LOG.debug("Submitting transaction on foreign shard {}", prefix);
+        return shard.submit();
+    }
+}
\ No newline at end of file
index b868d2ee7424319b1a68e35e1673e72d271dadff..5ccc8e4f3b9943d2c3c78716e7584db12ddf95d2 100644 (file)
@@ -139,7 +139,11 @@ public class InMemoryDOMDataTreeShard implements WriteableDOMDataTreeShard, Sche
     }
 
     private void updateProducersAndListeners(final Map<DOMDataTreeIdentifier, ChildShardContext> reparented) {
-        // FIXME: implement this
+        // FIXME: remove reparenting of producers, shards have to be registered from top to bottom
+        if (reparented.isEmpty()) {
+            //nothing was reparented no need to update anything
+            return;
+        }
         throw new UnsupportedOperationException();
     }
 
@@ -205,8 +209,9 @@ public class InMemoryDOMDataTreeShard implements WriteableDOMDataTreeShard, Sche
             ForeignShardModificationContext foreignContext =
                     new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer());
             builder.addSubshard(foreignContext);
+            builder.addSubshard(spec.getPrefix(), foreignContext);
         }
 
-        return new InmemoryDOMDataTreeShardWriteTransaction(builder.build());
+        return new InmemoryDOMDataTreeShardWriteTransaction(builder.build(), dataTree);
     }
 }
diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardThreePhaseCommitCohort.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardThreePhaseCommitCohort.java
new file mode 100644 (file)
index 0000000..8c69c59
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.store.inmemory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class InMemoryDOMDataTreeShardThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataTreeShardThreePhaseCommitCohort.class);
+    private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
+    private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
+
+    private final DataTree dataTree;
+    private final DataTreeModification modification;
+    private DataTreeCandidate candidate;
+
+    InMemoryDOMDataTreeShardThreePhaseCommitCohort(final DataTree dataTree,
+                                                   final DataTreeModification modification) {
+        Preconditions.checkNotNull(dataTree);
+        this.dataTree = dataTree;
+        this.modification = modification;
+    }
+
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
+        try {
+            dataTree.validate(modification);
+            LOG.debug("DataTreeModification {} validated");
+
+            return CAN_COMMIT_FUTURE;
+        } catch (DataValidationFailedException e) {
+            LOG.warn("Data validation failed for {}", modification);
+            LOG.trace("dataTree : {}", dataTree);
+
+            return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
+        } catch (Exception e) {
+            LOG.warn("Unexpected failure in validation phase", e);
+            return Futures.immediateFailedFuture(e);
+        }
+    }
+
+    @Override
+    public ListenableFuture<Void> preCommit() {
+        try {
+            candidate = dataTree.prepare(modification);
+            return SUCCESSFUL_FUTURE;
+        } catch (Exception e) {
+            LOG.warn("Unexpected failure in preparation phase", e);
+            return Futures.immediateFailedFuture(e);
+        }
+    }
+
+    @Override
+    public ListenableFuture<Void> abort() {
+        candidate = null;
+        return SUCCESSFUL_FUTURE;
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        Preconditions.checkState(candidate != null, "Attempted to commit an aborted transaction");
+        dataTree.commit(candidate);
+        return SUCCESSFUL_FUTURE;
+    }
+}
index 0a754a5615000d81968caeca851af2978ad6acc4..4634d4f70498fb018bf1bb3db4c764b7b78d3527 100644 (file)
@@ -11,15 +11,30 @@ package org.opendaylight.mdsal.dom.store.inmemory;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
 import org.opendaylight.mdsal.common.api.ReadFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
+
     private enum SimpleCursorOperation {
         MERGE {
             @Override
@@ -63,11 +78,20 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
         }
     }
 
+    private InMemoryDOMDataTreeShardThreePhaseCommitCohort commitCohort;
     private final ShardDataModification modification;
     private DOMDataTreeWriteCursor cursor;
+    private DataTree rootShardDataTree;
+    private DataTreeModification rootModification = null;
+
+    private ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
 
-    InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root) {
+    // FIXME inject into shard?
+    private ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+
+    InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root, final DataTree rootShardDataTree) {
         this.modification = Preconditions.checkNotNull(root);
+        this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree);
     }
 
     private DOMDataTreeWriteCursor getCursor() {
@@ -122,10 +146,49 @@ class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteT
     @Override
     public void ready() {
 
-        modification.seal();
+        LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
+        rootModification = modification.seal();
+
+        cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(rootShardDataTree, rootModification));
+        for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry : modification.getChildShards().entrySet()) {
+            cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
+        }
+    }
+
+    @Override
+    public ListenableFuture<Void> submit() {
+        LOG.debug("Submitting open transaction on shard {}", modification.getPrefix());
+
+        Preconditions.checkNotNull(cohorts);
+        Preconditions.checkState(!cohorts.isEmpty(), "Submitting an empty transaction");
+
+        final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts));
 
+        return submit;
+    }
+
+    @Override
+    public ListenableFuture<Boolean> validate() {
+        LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix());
+
+        final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts));
+        return submit;
+    }
+
+    @Override
+    public ListenableFuture<Void> prepare() {
+        LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix());
+
+        final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts));
+        return submit;
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
 
-        return;
+        final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts));
+        return submit;
     }
 
     public void followUp() {
diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCanCommitCoordinationTask.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCanCommitCoordinationTask.java
new file mode 100644 (file)
index 0000000..9c2f5a0
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.store.inmemory;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ShardCanCommitCoordinationTask implements Callable<Boolean> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ShardCanCommitCoordinationTask.class);
+
+    private final DOMDataTreeIdentifier rootShardPrefix;
+    private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
+
+    ShardCanCommitCoordinationTask(final DOMDataTreeIdentifier rootShardPrefix,
+                                       final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+        this.rootShardPrefix = rootShardPrefix;
+        this.cohorts = cohorts;
+    }
+
+    @Override
+    public Boolean call() throws TransactionCommitFailedException {
+
+        try {
+            LOG.debug("Shard {}, canCommit started", rootShardPrefix);
+            canCommitBlocking();
+
+            return true;
+        } catch (TransactionCommitFailedException e) {
+            LOG.warn("Shard: {} Submit Error during phase CanCommit, starting Abort", rootShardPrefix, e);
+            //FIXME abort here
+            throw e;
+        }
+    }
+
+    void canCommitBlocking() throws TransactionCommitFailedException {
+        for (final ListenableFuture<?> canCommit : canCommitAll()) {
+            try {
+                final Boolean result = (Boolean)canCommit.get();
+                if (result == null || !result) {
+                    throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                throw new TransactionCommitFailedException("CanCommit failed", e);
+            }
+        }
+    }
+
+    private ListenableFuture<?>[] canCommitAll() {
+        final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
+        int i = 0;
+        for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+            ops[i++] = cohort.canCommit();
+        }
+        return ops;
+    }
+}
diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTask.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardCommitCoordinationTask.java
new file mode 100644 (file)
index 0000000..5dc6cad
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.store.inmemory;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ShardCommitCoordinationTask implements Callable<Void> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinationTask.class);
+
+    private final DOMDataTreeIdentifier rootShardPrefix;
+    private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
+
+    ShardCommitCoordinationTask(final DOMDataTreeIdentifier rootShardPrefix,
+                                       final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+        this.rootShardPrefix = rootShardPrefix;
+        this.cohorts = cohorts;
+    }
+
+    @Override
+    public Void call() throws TransactionCommitFailedException {
+
+        try {
+            LOG.debug("Shard {}, commit started", rootShardPrefix);
+            commitBlocking();
+
+            return null;
+        } catch (TransactionCommitFailedException e) {
+            LOG.warn("Shard: {} Submit Error during phase {}, starting Abort", rootShardPrefix, e);
+            //FIXME abort here
+            throw e;
+        }
+    }
+
+    void commitBlocking() throws TransactionCommitFailedException {
+        for (final ListenableFuture<?> commit : commitAll()) {
+            try {
+                commit.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new TransactionCommitFailedException("Commit failed", e);
+            }
+        }
+    }
+
+    private ListenableFuture<?>[] commitAll() {
+        final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
+        int i = 0;
+        for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+            ops[i++] = cohort.commit();
+        }
+        return ops;
+    }
+}
index ac3d358d6a681536baeb2adccf2f5e7f6fb1672d..5840e9b6df5dc0ff13d25d6b00cb1705bba9d361 100644 (file)
@@ -16,6 +16,7 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 final class ShardDataModification extends WriteableNodeWithSubshard {
 
@@ -63,11 +64,13 @@ final class ShardDataModification extends WriteableNodeWithSubshard {
         return childShards;
     }
 
-    public void seal() {
-        rootContext.ready();
+    public DataTreeModification seal() {
+        final DataTreeModification rootModification = rootContext.ready();
         for (ForeignShardModificationContext childShard : childShards.values()) {
             childShard.ready();
         }
+
+        return rootModification;
     }
 
 }
\ No newline at end of file
index 874400b3d917e265b2c7963ecdf343e0915dd4ae..f6e78ad7f7831ba771f0357c10c3f651f51502fc 100644 (file)
@@ -30,6 +30,10 @@ class ShardDataModificationBuilder extends ModificationContextNodeBuilder<ShardD
         putNode(value.getIdentifier().getRootIdentifier(), leafNode);
     }
 
+    public void addSubshard(final DOMDataTreeIdentifier prefix, final ForeignShardModificationContext value) {
+        childShards.put(prefix, value);
+    }
+
     private void putNode(final YangInstanceIdentifier key, final WriteableSubshardBoundaryNode subshardNode) {
         ModificationContextNodeBuilder<?> current = this;
         Iterator<PathArgument> toBoundary = toRelative(key).getPathArguments().iterator();
diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardPreCommitCoordinationTask.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardPreCommitCoordinationTask.java
new file mode 100644 (file)
index 0000000..0024cb8
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+
+package org.opendaylight.mdsal.dom.store.inmemory;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ShardPreCommitCoordinationTask implements Callable<Void>{
+
+    private static final Logger LOG = LoggerFactory.getLogger(ShardPreCommitCoordinationTask.class);
+
+    private final DOMDataTreeIdentifier rootShardPrefix;
+    private final Collection<DOMStoreThreePhaseCommitCohort> cohorts;
+
+    ShardPreCommitCoordinationTask(final DOMDataTreeIdentifier rootShardPrefix,
+                                       final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+        this.rootShardPrefix = rootShardPrefix;
+        this.cohorts = cohorts;
+    }
+
+    @Override
+    public Void call() throws TransactionCommitFailedException {
+
+        try {
+            LOG.debug("Shard {}, preCommit started", rootShardPrefix);
+            preCommitBlocking();
+
+            return null;
+        } catch (TransactionCommitFailedException e) {
+            LOG.warn("Shard: {} Submit Error during phase {}, starting Abort", rootShardPrefix, e);
+            //FIXME abort here
+            throw e;
+        }
+    }
+
+    void preCommitBlocking() throws TransactionCommitFailedException {
+        for (final ListenableFuture<?> preCommit : preCommitAll()) {
+            try {
+                preCommit.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new TransactionCommitFailedException("PreCommit failed", e);
+            }
+        }
+    }
+
+    private ListenableFuture<?>[] preCommitAll() {
+        final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohorts.size()];
+        int i = 0;
+        for (final DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+            ops[i++] = cohort.preCommit();
+        }
+        return ops;
+    }
+
+}
index 9ad1ac93ef2ca18dd34f637dc86e9208519f1621..40ab36caa7046ee1bec3e60b9389689d9c6bec52 100644 (file)
@@ -13,6 +13,7 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
 
 class ShardRootModificationContext {
@@ -50,12 +51,14 @@ class ShardRootModificationContext {
         return modification != null;
     }
 
-    void ready() {
+    DataTreeModification ready() {
         if (cursor != null) {
             cursor.close();
         }
         if (modification != null) {
             modification.ready();
         }
+
+        return modification;
     }
 }
diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardSubmitCoordinationTask.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/ShardSubmitCoordinationTask.java
new file mode 100644 (file)
index 0000000..b46c9fc
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.mdsal.dom.store.inmemory;
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ShardSubmitCoordinationTask implements Callable<Void> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ShardSubmitCoordinationTask.class);
+
+    private final DOMDataTreeIdentifier rootShardPrefix;
+    private final ShardCanCommitCoordinationTask canCommitCoordinationTask;
+    private final ShardPreCommitCoordinationTask preCommitCoordinationTask;
+    private final ShardCommitCoordinationTask commitCoordinationTask;
+
+
+    ShardSubmitCoordinationTask(final DOMDataTreeIdentifier rootShardPrefix,
+                                       final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+        this.rootShardPrefix = rootShardPrefix;
+
+        canCommitCoordinationTask = new ShardCanCommitCoordinationTask(rootShardPrefix, cohorts);
+        preCommitCoordinationTask = new ShardPreCommitCoordinationTask(rootShardPrefix, cohorts);
+        commitCoordinationTask = new ShardCommitCoordinationTask(rootShardPrefix, cohorts);
+    }
+
+    @Override
+    public Void call() throws TransactionCommitFailedException {
+
+        LOG.debug("Shard {}, CanCommit started", rootShardPrefix);
+        canCommitCoordinationTask.canCommitBlocking();
+
+        LOG.debug("Shard {}, PreCommit started", rootShardPrefix);
+        preCommitCoordinationTask.preCommitBlocking();
+
+        LOG.debug("Shard {}, commit started", rootShardPrefix);
+        commitCoordinationTask.commitBlocking();
+
+        return null;
+    }
+}