Fix initial datastore configuration
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 02ccb81c898a2201c0b5042a8f55fe15592bc8ea..9e5985a741738d9010ba5a53567d462c41911f7c 100644 (file)
@@ -7,7 +7,9 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verifyNotNull;
 import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorSelection;
@@ -20,6 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -29,20 +32,24 @@ import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.DeleteOperation;
+import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.MergeOperation;
+import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.WriteOperation;
 import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
 import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -60,6 +67,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
+    private static final DeleteOperation ROOT_DELETE_OPERATION = new DeleteOperation(YangInstanceIdentifier.empty());
 
     private final Map<String, TransactionContextWrapper> txContextWrappers = new TreeMap<>();
     private final AbstractTransactionContextFactory<?> txContextFactory;
@@ -140,31 +148,86 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     public void delete(final YangInstanceIdentifier path) {
         checkModificationState("delete", path);
 
-        executeModification(new DeleteModification(path));
+        if (path.isEmpty()) {
+            deleteAllData();
+        } else {
+            executeModification(new DeleteOperation(path));
+        }
+    }
+
+    private void deleteAllData() {
+        for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) {
+            getContextWrapper(shardName).maybeExecuteTransactionOperation(ROOT_DELETE_OPERATION);
+        }
     }
 
     @Override
     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
         checkModificationState("merge", path);
 
-        executeModification(new MergeModification(path, data));
+        if (path.isEmpty()) {
+            mergeAllData(checkRootData(data));
+        } else {
+            executeModification(new MergeOperation(path, data));
+        }
+    }
+
+    private void mergeAllData(final ContainerNode rootData) {
+        // Populate requests for individual shards that are being touched
+        final Map<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> rootBuilders = new HashMap<>();
+        for (DataContainerChild<?, ?> child : rootData.getValue()) {
+            final String shardName = shardNameFromRootChild(child);
+            rootBuilders.computeIfAbsent(shardName,
+                unused -> Builders.containerBuilder().withNodeIdentifier(rootData.getIdentifier()))
+                .addChild(child);
+        }
+
+        // Now dispatch all merges
+        for (Entry<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> entry : rootBuilders.entrySet()) {
+            getContextWrapper(entry.getKey()).maybeExecuteTransactionOperation(new MergeOperation(
+                YangInstanceIdentifier.empty(), entry.getValue().build()));
+        }
     }
 
     @Override
     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
         checkModificationState("write", path);
 
-        executeModification(new WriteModification(path, data));
+        if (path.isEmpty()) {
+            writeAllData(checkRootData(data));
+        } else {
+            executeModification(new WriteOperation(path, data));
+        }
     }
 
-    private void executeModification(final AbstractModification modification) {
-        final TransactionContextWrapper contextWrapper = getContextWrapper(modification.getPath());
-        contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
-            @Override
-            protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
-                transactionContext.executeModification(modification, havePermit);
-            }
-        });
+    private void writeAllData(final ContainerNode rootData) {
+        // Open builders for all shards
+        final Map<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> rootBuilders = new HashMap<>();
+        for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) {
+            rootBuilders.put(shardName, Builders.containerBuilder().withNodeIdentifier(rootData.getIdentifier()));
+        }
+
+        // Now distribute children as needed
+        for (DataContainerChild<?, ?> child : rootData.getValue()) {
+            final String shardName = shardNameFromRootChild(child);
+            verifyNotNull(rootBuilders.get(shardName), "Failed to find builder for %s", shardName).addChild(child);
+        }
+
+        // Now dispatch all writes
+        for (Entry<String, DataContainerNodeBuilder<NodeIdentifier, ContainerNode>> entry : rootBuilders.entrySet()) {
+            getContextWrapper(entry.getKey()).maybeExecuteTransactionOperation(new WriteOperation(
+                YangInstanceIdentifier.empty(), entry.getValue().build()));
+        }
+    }
+
+    private void executeModification(final TransactionModificationOperation operation) {
+        getContextWrapper(operation.path()).maybeExecuteTransactionOperation(operation);
+    }
+
+    private static ContainerNode checkRootData(final NormalizedNode<?, ?> data) {
+        // Root has to be a container
+        checkArgument(data instanceof ContainerNode, "Invalid root data %s", data);
+        return (ContainerNode) data;
     }
 
     private void checkModificationState(final String opName, final YangInstanceIdentifier path) {
@@ -290,8 +353,12 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return new ThreePhaseCommitCohortProxy(txContextFactory.getActorUtils(), cohorts, getIdentifier());
     }
 
+    private String shardNameFromRootChild(final DataContainerChild<?, ?> child) {
+        return shardNameFromIdentifier(YangInstanceIdentifier.create(child.getIdentifier()));
+    }
+
     private String shardNameFromIdentifier(final YangInstanceIdentifier path) {
-        return txContextFactory.getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
+        return getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
     }
 
     private TransactionContextWrapper getContextWrapper(final YangInstanceIdentifier path) {
@@ -317,7 +384,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return state != TransactionState.OPEN;
     }
 
-    ActorUtils getActorUtils() {
+    final ActorUtils getActorUtils() {
         return txContextFactory.getActorUtils();
     }
 }