CDS: use internal DataTree instance
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 91e072b076ef7c68116009c94127df859ef7540b..b53d12c0c8cc420129b281b685cbdea03b2a0568 100644 (file)
@@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
@@ -64,9 +63,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
@@ -74,7 +70,7 @@ import scala.concurrent.duration.FiniteDuration;
 /**
  * A Shard represents a portion of the logical data tree <br/>
  * <p>
- * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
+ * Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it
  * </p>
  */
 public class Shard extends RaftActor {
@@ -85,7 +81,7 @@ public class Shard extends RaftActor {
     static final String DEFAULT_NAME = "default";
 
     // The state of this Shard
-    private final InMemoryDOMDataStore store;
+    private final ShardDataTree store;
 
     /// The name of this shard
     private final String name;
@@ -104,8 +100,6 @@ public class Shard extends RaftActor {
 
     private final MessageTracker appendEntriesReplyTracker;
 
-    private final DOMTransactionFactory domTransactionFactory;
-
     private final ShardTransactionActorFactory transactionActorFactory;
 
     private final ShardSnapshotCohort snapshotCohort;
@@ -124,25 +118,17 @@ public class Shard extends RaftActor {
 
         LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
 
-        store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
-                datastoreContext.getDataStoreProperties());
-
-        if (schemaContext != null) {
-            store.onGlobalContextUpdated(schemaContext);
-        }
+        store = new ShardDataTree(schemaContext);
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
                 datastoreContext.getDataStoreMXBeanType());
-        shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
         shardMBean.setShardActor(getSelf());
 
         if (isMetricsCaptureEnabled()) {
             getContext().become(new MeteringBehavior(this));
         }
 
-        domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
-
-        commitCoordinator = new ShardCommitCoordinator(domTransactionFactory,
+        commitCoordinator = new ShardCommitCoordinator(store,
                 TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
                 datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
 
@@ -154,7 +140,7 @@ public class Shard extends RaftActor {
         appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
 
-        transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext,
+        transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext,
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(
                         Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
 
@@ -474,7 +460,7 @@ public class Shard extends RaftActor {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+        store.closeTransactionChain(closeTransactionChain.getTransactionChainId());
     }
 
     private ActorRef createTypedTransactionActor(int transactionType,
@@ -515,13 +501,13 @@ public class Shard extends RaftActor {
     }
 
     private void commitWithNewTransaction(final Modification modification) {
-        DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
-        modification.apply(tx);
+        ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.toString(), null);
+        modification.apply(tx.getSnapshot());
         try {
             snapshotCohort.syncCommitTransaction(tx);
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
-        } catch (InterruptedException | ExecutionException e) {
+        } catch (Exception e) {
             shardMBean.incrementFailedTransactionsCount();
             LOG.error("{}: Failed to commit", persistenceId(), e);
         }
@@ -533,7 +519,7 @@ public class Shard extends RaftActor {
 
     @VisibleForTesting
     void updateSchemaContext(final SchemaContext schemaContext) {
-        store.onGlobalContextUpdated(schemaContext);
+        store.updateSchemaContext(schemaContext);
     }
 
     private boolean isMetricsCaptureEnabled() {
@@ -622,7 +608,7 @@ public class Shard extends RaftActor {
                     persistenceId(), getId());
             }
 
-            domTransactionFactory.closeAllTransactionChains();
+            store.closeAllTransactionChains();
         }
     }
 
@@ -666,7 +652,7 @@ public class Shard extends RaftActor {
     }
 
     @VisibleForTesting
-    public InMemoryDOMDataStore getDataStore() {
+    public ShardDataTree getDataStore() {
         return store;
     }