From 56c1339ee7dbd85bc567fc44f21ecfd322c9e803 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 16 Apr 2015 16:30:47 +0200 Subject: [PATCH] CDS: use internal DataTree instance Instead of using a captive InMemoryDataStore instance, which is geared toward interaction with a DOMDataBroker, use the baseline DataTree instance and utility classes for implementing the data store behavior. Since most of CDS is geared for interactions with the usual DOMStore interfaces, this patch provides a set of bare-bone workalike classes, but taking advantage of the safeties provided by the AKKA actor system. Immediate benefit should be a speed up in backend processing, as we eliminate intermediate Future instances when accessing and manipulating data. We also gain explicit control over DataTree lifecycle, which enables us switching to DataTreeCandidate as the replication and persistence unit, as opposed to individual modifications. Change-Id: I031d0ed4d82dcc923fd377c13fefba2819923f9b Signed-off-by: Robert Varga --- .../AbstractShardDataTreeTransaction.java | 59 +++++++ .../datastore/DOMTransactionFactory.java | 96 ---------- .../datastore/DataChangeListenerSupport.java | 30 +++- .../DataTreeChangeListenerSupport.java | 16 +- .../DelayedDataTreeListenerRegistration.java | 11 +- .../cluster/datastore/DelegateFactory.java | 7 +- .../datastore/LeaderLocalDelegateFactory.java | 3 +- .../ReadOnlyShardDataTreeTransaction.java | 21 +++ .../ReadWriteShardDataTreeTransaction.java | 34 ++++ .../controller/cluster/datastore/Shard.java | 38 ++-- .../datastore/ShardCommitCoordinator.java | 18 +- .../cluster/datastore/ShardDataTree.java | 166 ++++++++++++++++++ .../ShardDataTreeChangePublisher.java | 44 +++++ .../datastore/ShardDataTreeCohort.java | 78 ++++++++ .../ShardDataTreeNotificationManager.java | 38 ++++ .../ShardDataTreeTransactionChain.java | 134 ++++++++++++++ .../ShardDataTreeTransactionParent.java | 15 ++ .../datastore/ShardReadTransaction.java | 31 +--- .../datastore/ShardReadWriteTransaction.java | 15 +- .../datastore/ShardRecoveryCoordinator.java | 41 +++-- .../datastore/ShardSnapshotCohort.java | 19 +- .../cluster/datastore/ShardTransaction.java | 97 +++++----- .../datastore/ShardTransactionChain.java | 56 +++--- .../datastore/ShardTransactionFactory.java | 25 ++- .../datastore/ShardWriteTransaction.java | 77 +++++--- .../modification/DeleteModification.java | 6 + .../modification/MergeModification.java | 6 + .../datastore/modification/Modification.java | 8 + .../MutableCompositeModification.java | 8 + .../modification/WriteModification.java | 6 + .../cluster/datastore/AbstractShardTest.java | 57 +++--- .../cluster/datastore/ShardTest.java | 23 +-- .../ShardTransactionFailureTest.java | 39 ++-- .../datastore/ShardTransactionTest.java | 126 ++++++------- .../datastore/compat/PreLithiumShardTest.java | 18 +- 35 files changed, 1027 insertions(+), 439 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadOnlyShardDataTreeTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisher.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java new file mode 100644 index 0000000000..dd8ec0b12a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; + +/** + * Abstract base for transactions running on SharrdDataTree. + * + * @param Backing transaction type. + */ +@NotThreadSafe +abstract class AbstractShardDataTreeTransaction { + private final T snapshot; + private final String id; + private boolean closed; + + protected AbstractShardDataTreeTransaction(final String id, final T snapshot) { + this.snapshot = Preconditions.checkNotNull(snapshot); + this.id = Preconditions.checkNotNull(id); + } + + final T getSnapshot() { + return snapshot; + } + + final boolean isClosed() { + return closed; + } + + /** + * Close this transaction and mark it as closed, allowing idempotent invocations. + * + * @return True if the transaction got closed by this method invocation. + */ + protected final boolean close() { + if (closed) { + return false; + } + + closed = true; + return true; + } + + @Override + public final String toString() { + return MoreObjects.toStringHelper(this).add("id", id).add("closed", closed).add("snapshot", snapshot).toString(); + } + + abstract void abort(); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java deleted file mode 100644 index f2436201d8..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore; - -import java.util.HashMap; -import java.util.Map; -import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; -import org.slf4j.Logger; - -/** - * A factory for creating DOM transactions, either normal or chained. - * - * @author Thomas Pantelis - */ -public class DOMTransactionFactory { - - private final Map transactionChains = new HashMap<>(); - private final InMemoryDOMDataStore store; - private final ShardStats shardMBean; - private final Logger log; - private final String name; - - public DOMTransactionFactory(InMemoryDOMDataStore store, ShardStats shardMBean, Logger log, String name) { - this.store = store; - this.shardMBean = shardMBean; - this.log = log; - this.name = name; - } - - @SuppressWarnings("unchecked") - public T newTransaction(TransactionProxy.TransactionType type, - String transactionID, String transactionChainID) { - - DOMStoreTransactionFactory factory = store; - - if(!transactionChainID.isEmpty()) { - factory = transactionChains.get(transactionChainID); - if(factory == null) { - if(log.isDebugEnabled()) { - log.debug("{}: Creating transaction with ID {} from chain {}", name, transactionID, - transactionChainID); - } - - DOMStoreTransactionChain transactionChain = store.createTransactionChain(); - transactionChains.put(transactionChainID, transactionChain); - factory = transactionChain; - } - } else { - log.debug("{}: Creating transaction with ID {}", name, transactionID); - } - - T transaction = null; - switch(type) { - case READ_ONLY: - transaction = (T) factory.newReadOnlyTransaction(); - shardMBean.incrementReadOnlyTransactionCount(); - break; - case READ_WRITE: - transaction = (T) factory.newReadWriteTransaction(); - shardMBean.incrementReadWriteTransactionCount(); - break; - case WRITE_ONLY: - transaction = (T) factory.newWriteOnlyTransaction(); - shardMBean.incrementWriteOnlyTransactionCount(); - break; - } - - return transaction; - } - - public void closeTransactionChain(String transactionChainID) { - DOMStoreTransactionChain chain = - transactionChains.remove(transactionChainID); - - if(chain != null) { - chain.close(); - } - } - - public void closeAllTransactionChains() { - for(Map.Entry entry : transactionChains.entrySet()){ - entry.getValue().close(); - } - - transactionChains.clear(); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java index 939ddf8fad..e6f63d7154 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java @@ -7,21 +7,24 @@ */ package org.opendaylight.controller.cluster.datastore; -import java.util.ArrayList; -import java.util.List; import akka.actor.ActorRef; import akka.actor.ActorSelection; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class DataChangeListenerSupport extends LeaderLocalDelegateFactory>>> { +final class DataChangeListenerSupport extends LeaderLocalDelegateFactory>>, DOMImmutableDataChangeEvent> { private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class); private final List delayedListenerRegistrations = new ArrayList<>(); private final List dataChangeListeners = new ArrayList<>(); @@ -39,7 +42,12 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory>>, DOMImmutableDataChangeEvent> res = + createDelegate(reg.getRegisterChangeListener()); + reg.setDelegate(res.getKey()); + if (res.getValue() != null) { + reg.getInstance().onDataChanged(res.getValue()); + } } } @@ -52,16 +60,21 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory>> registration; + final AsyncDataChangeEvent> event; if (isLeader) { - registration = createDelegate(message); + final Entry>>, DOMImmutableDataChangeEvent> res = + createDelegate(message); + registration = res.getKey(); + event = res.getValue(); } else { LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId()); DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message); delayedListenerRegistrations.add(delayedReg); registration = delayedReg; + event = null; } ActorRef listenerRegistration = createActor(DataChangeListenerRegistration.props(registration)); @@ -70,10 +83,13 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory>> createDelegate( + Entry>>, DOMImmutableDataChangeEvent> createDelegate( final RegisterChangeListener message) { ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java index 3987c9af35..db5eeb83e7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java @@ -11,15 +11,18 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Map.Entry; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory> { +final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory, DataTreeCandidate> { private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerSupport.class); private final ArrayList delayedRegistrations = new ArrayList<>(); private final Collection actors = new ArrayList<>(); @@ -49,6 +52,7 @@ final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory registration; + final DataTreeCandidate event; if (!isLeader) { LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId()); @@ -56,8 +60,11 @@ final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory, DataTreeCandidate> res = createDelegate(registerTreeChangeListener); + registration = res.getKey(); + event = res.getValue(); } ActorRef listenerRegistration = createActor(DataTreeChangeListenerRegistrationActor.props(registration)); @@ -66,10 +73,13 @@ final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory createDelegate(final RegisterDataTreeChangeListener message) { + Entry, DataTreeCandidate> createDelegate(final RegisterDataTreeChangeListener message) { ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath()); // Notify the listener if notifications should be enabled or not diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java index b3ae8a3ca2..e8cd31097b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java @@ -8,10 +8,13 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; +import java.util.Collections; +import java.util.Map.Entry; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; /** * Intermediate proxy registration returned to the user when we cannot @@ -28,9 +31,13 @@ final class DelayedDataTreeListenerRegistration implements ListenerRegistration< this.registerTreeChangeListener = Preconditions.checkNotNull(registerTreeChangeListener); } - synchronized void createDelegate(final DelegateFactory> factory) { + synchronized void createDelegate(final DelegateFactory, DataTreeCandidate> factory) { if (!closed) { - this.delegate = factory.createDelegate(registerTreeChangeListener); + final Entry, DataTreeCandidate> res = factory.createDelegate(registerTreeChangeListener); + this.delegate = res.getKey(); + if (res.getValue() != null) { + delegate.getInstance().onDataTreeChanged(Collections.singletonList(res.getValue())); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java index e6702d90f1..3b9b7adc6b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java @@ -7,12 +7,15 @@ */ package org.opendaylight.controller.cluster.datastore; +import java.util.Map.Entry; + /** * Base class for factories instantiating delegates. * * delegate type * message type + * initial state type */ -abstract class DelegateFactory { - abstract D createDelegate(M message); +abstract class DelegateFactory { + abstract Entry createDelegate(M message); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java index d33cebbebc..3f927736b5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java @@ -19,8 +19,9 @@ import com.google.common.base.Preconditions; * * delegate type * message type + * initial state type */ -abstract class LeaderLocalDelegateFactory extends DelegateFactory { +abstract class LeaderLocalDelegateFactory extends DelegateFactory { private final Shard shard; protected LeaderLocalDelegateFactory(final Shard shard) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadOnlyShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadOnlyShardDataTreeTransaction.java new file mode 100644 index 0000000000..59265682ad --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadOnlyShardDataTreeTransaction.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; + +final class ReadOnlyShardDataTreeTransaction extends AbstractShardDataTreeTransaction { + ReadOnlyShardDataTreeTransaction(final String id, final DataTreeSnapshot snapshot) { + super(id, snapshot); + } + + @Override + void abort() { + close(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java new file mode 100644 index 0000000000..0f3ab61041 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; + +final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction { + private final ShardDataTreeTransactionParent parent; + + protected ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final String id, final DataTreeModification modification) { + super(id, modification); + this.parent = Preconditions.checkNotNull(parent); + } + + @Override + void abort() { + Preconditions.checkState(close(), "Transaction is already closed"); + + parent.abortTransaction(this); + } + + DOMStoreThreePhaseCommitCohort ready() { + Preconditions.checkState(close(), "Transaction is already closed"); + + return parent.finishTransaction(this); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 91e072b076..b53d12c0c8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ListenableFuture; import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.common.actor.CommonConfig; @@ -64,9 +63,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -74,7 +70,7 @@ import scala.concurrent.duration.FiniteDuration; /** * A Shard represents a portion of the logical data tree
*

- * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it + * Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it *

*/ public class Shard extends RaftActor { @@ -85,7 +81,7 @@ public class Shard extends RaftActor { static final String DEFAULT_NAME = "default"; // The state of this Shard - private final InMemoryDOMDataStore store; + private final ShardDataTree store; /// The name of this shard private final String name; @@ -104,8 +100,6 @@ public class Shard extends RaftActor { private final MessageTracker appendEntriesReplyTracker; - private final DOMTransactionFactory domTransactionFactory; - private final ShardTransactionActorFactory transactionActorFactory; private final ShardSnapshotCohort snapshotCohort; @@ -124,25 +118,17 @@ public class Shard extends RaftActor { LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent()); - store = InMemoryDOMDataStoreFactory.create(name.toString(), null, - datastoreContext.getDataStoreProperties()); - - if (schemaContext != null) { - store.onGlobalContextUpdated(schemaContext); - } + store = new ShardDataTree(schemaContext); shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(), datastoreContext.getDataStoreMXBeanType()); - shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager()); shardMBean.setShardActor(getSelf()); if (isMetricsCaptureEnabled()) { getContext().become(new MeteringBehavior(this)); } - domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name); - - commitCoordinator = new ShardCommitCoordinator(domTransactionFactory, + commitCoordinator = new ShardCommitCoordinator(store, TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES), datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name); @@ -154,7 +140,7 @@ public class Shard extends RaftActor { appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class, getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); - transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext, + transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext, new Dispatchers(context().system().dispatchers()).getDispatcherPath( Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean); @@ -474,7 +460,7 @@ public class Shard extends RaftActor { } private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { - domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId()); + store.closeTransactionChain(closeTransactionChain.getTransactionChainId()); } private ActorRef createTypedTransactionActor(int transactionType, @@ -515,13 +501,13 @@ public class Shard extends RaftActor { } private void commitWithNewTransaction(final Modification modification) { - DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction(); - modification.apply(tx); + ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.toString(), null); + modification.apply(tx.getSnapshot()); try { snapshotCohort.syncCommitTransaction(tx); shardMBean.incrementCommittedTransactionCount(); shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); - } catch (InterruptedException | ExecutionException e) { + } catch (Exception e) { shardMBean.incrementFailedTransactionsCount(); LOG.error("{}: Failed to commit", persistenceId(), e); } @@ -533,7 +519,7 @@ public class Shard extends RaftActor { @VisibleForTesting void updateSchemaContext(final SchemaContext schemaContext) { - store.onGlobalContextUpdated(schemaContext); + store.updateSchemaContext(schemaContext); } private boolean isMetricsCaptureEnabled() { @@ -622,7 +608,7 @@ public class Shard extends RaftActor { persistenceId(), getId()); } - domTransactionFactory.closeAllTransactionChains(); + store.closeAllTransactionChains(); } } @@ -666,7 +652,7 @@ public class Shard extends RaftActor { } @VisibleForTesting - public InMemoryDOMDataStore getDataStore() { + public ShardDataTree getDataStore() { return store; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 4ff9b5fd43..0eb48fd180 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -31,7 +31,6 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.slf4j.Logger; /** @@ -50,7 +49,7 @@ public class ShardCommitCoordinator { private CohortEntry currentCohortEntry; - private final DOMTransactionFactory transactionFactory; + private final ShardDataTree dataTree; private final Queue queuedCohortEntries; @@ -75,13 +74,13 @@ public class ShardCommitCoordinator { private ReadyTransactionReply readyTransactionReply; - public ShardCommitCoordinator(DOMTransactionFactory transactionFactory, + public ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) { this.queueCapacity = queueCapacity; this.log = log; this.name = name; - this.transactionFactory = transactionFactory; + this.dataTree = Preconditions.checkNotNull(dataTree); cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS). removalListener(cacheRemovalListener).build(); @@ -162,8 +161,7 @@ public class ShardCommitCoordinator { CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID()); if(cohortEntry == null) { cohortEntry = new CohortEntry(batched.getTransactionID(), - transactionFactory.newTransaction( - TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(), + dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID())); cohortCache.put(batched.getTransactionID(), cohortEntry); } @@ -417,15 +415,15 @@ public class ShardCommitCoordinator { private final String transactionID; private DOMStoreThreePhaseCommitCohort cohort; private final MutableCompositeModification compositeModification; - private final DOMStoreWriteTransaction transaction; + private final ReadWriteShardDataTreeTransaction transaction; private ActorRef replySender; private Shard shard; private long lastAccessTime; private boolean doImmediateCommit; - CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) { + CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) { this.compositeModification = new MutableCompositeModification(); - this.transaction = transaction; + this.transaction = Preconditions.checkNotNull(transaction); this.transactionID = transactionID; } @@ -460,7 +458,7 @@ public class ShardCommitCoordinator { void applyModifications(Iterable modifications) { for(Modification modification: modifications) { compositeModification.addModification(modification); - modification.apply(transaction); + modification.apply(transaction.getSnapshot()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java new file mode 100644 index 0000000000..373bf499e0 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Strings; +import java.util.AbstractMap.SimpleEntry; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent; +import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; +import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Internal shard state, similar to a DOMStore, but optimized for use in the actor system, + * e.g. it does not expose public interfaces and assumes it is only ever called from a + * single thread. + * + * This class is not part of the API contract and is subject to change at any time. + */ +@NotThreadSafe +@VisibleForTesting +public final class ShardDataTree extends ShardDataTreeTransactionParent { + private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class); + private static final ShardDataTreeNotificationManager MANAGER = new ShardDataTreeNotificationManager(); + private final Map transactionChains = new HashMap<>(); + private final ShardDataTreeChangePublisher treeChangePublisher = new ShardDataTreeChangePublisher(); + private final ListenerTree listenerTree = ListenerTree.create(); + private final TipProducingDataTree dataTree; + + ShardDataTree(final SchemaContext schemaContext) { + dataTree = InMemoryDataTreeFactory.getInstance().create(); + if (schemaContext != null) { + dataTree.setSchemaContext(schemaContext); + } + } + + TipProducingDataTree getDataTree() { + return dataTree; + } + + void updateSchemaContext(final SchemaContext schemaContext) { + dataTree.setSchemaContext(schemaContext); + } + + private ShardDataTreeTransactionChain ensureTransactionChain(final String chainId) { + ShardDataTreeTransactionChain chain = transactionChains.get(chainId); + if (chain == null) { + chain = new ShardDataTreeTransactionChain(chainId, this); + transactionChains.put(chainId, chain); + } + + return chain; + } + + ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final String txId, final String chainId) { + if (Strings.isNullOrEmpty(chainId)) { + return new ReadOnlyShardDataTreeTransaction(txId, dataTree.takeSnapshot()); + } + + return ensureTransactionChain(chainId).newReadOnlyTransaction(txId); + } + + ReadWriteShardDataTreeTransaction newReadWriteTransaction(final String txId, final String chainId) { + if (Strings.isNullOrEmpty(chainId)) { + return new ReadWriteShardDataTreeTransaction(this, txId, dataTree.takeSnapshot().newModification()); + } + + return ensureTransactionChain(chainId).newReadWriteTransaction(txId); + } + + void notifyListeners(final DataTreeCandidateTip candidate) { + LOG.debug("Notifying listeners on candidate {}", candidate); + + // DataTreeChanges first, as they are more light-weight + treeChangePublisher.publishChanges(candidate); + + // DataChanges second, as they are heavier + ResolveDataChangeEventsTask.create(candidate, listenerTree).resolve(MANAGER); + } + + void closeAllTransactionChains() { + for (ShardDataTreeTransactionChain chain : transactionChains.values()) { + chain.close(); + } + + transactionChains.clear(); + } + + void closeTransactionChain(final String transactionChainId) { + final ShardDataTreeTransactionChain chain = transactionChains.remove(transactionChainId); + if (chain != null) { + chain.close(); + } else { + LOG.warn("Closing non-existent transaction chain {}", transactionChainId); + } + } + + Entry>>, DOMImmutableDataChangeEvent> registerChangeListener( + final YangInstanceIdentifier path, + final AsyncDataChangeListener> listener, final DataChangeScope scope) { + final ListenerRegistration>> reg = + listenerTree.registerDataChangeListener(path, listener, scope); + + final Optional> currentState = dataTree.takeSnapshot().readNode(path); + final DOMImmutableDataChangeEvent event; + if (currentState.isPresent()) { + final NormalizedNode data = currentState.get(); + event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE).setAfter(data).addCreated(path, data).build(); + } else { + event = null; + } + + return new SimpleEntry<>(reg, event); + } + + Entry, DataTreeCandidate> registerTreeChangeListener(final YangInstanceIdentifier path, + final DOMDataTreeChangeListener listener) { + final ListenerRegistration reg = treeChangePublisher.registerTreeChangeListener(path, listener); + + final Optional> currentState = dataTree.takeSnapshot().readNode(path); + final DataTreeCandidate event; + if (currentState.isPresent()) { + event = DataTreeCandidates.fromNormalizedNode(path, currentState.get()); + } else { + event = null; + } + return new SimpleEntry<>(reg, event); + } + + @Override + void abortTransaction(final AbstractShardDataTreeTransaction transaction) { + // Intentional no-op + } + + @Override + DOMStoreThreePhaseCommitCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { + final DataTreeModification snapshot = transaction.getSnapshot(); + snapshot.ready(); + return new ShardDataTreeCohort(this, snapshot); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisher.java new file mode 100644 index 0000000000..5e24dcd182 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisher.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import java.util.Collection; +import java.util.Collections; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration; +import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTreeChangePublisher; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.spi.DefaultDataTreeCandidate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@NotThreadSafe +final class ShardDataTreeChangePublisher extends AbstractDOMStoreTreeChangePublisher { + private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeChangePublisher.class); + + void publishChanges(final DataTreeCandidate candidate) { + processCandidateTree(candidate); + } + + @Override + protected void notifyListeners(final Collection> registrations, + final YangInstanceIdentifier path, final DataTreeCandidateNode node) { + final Collection changes = Collections.singleton(new DefaultDataTreeCandidate(path, node)); + + for (AbstractDOMDataTreeChangeListenerRegistration reg : registrations) { + reg.getInstance().onDataTreeChanged(changes); + } + } + + @Override + protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration registration) { + LOG.debug("Registration {} removed", registration); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java new file mode 100644 index 0000000000..11b3ca8ed7 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ShardDataTreeCohort implements DOMStoreThreePhaseCommitCohort { + private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeCohort.class); + private static final ListenableFuture TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE); + private static final ListenableFuture VOID_FUTURE = Futures.immediateFuture(null); + private final DataTreeModification transaction; + private final ShardDataTree dataTree; + private DataTreeCandidateTip candidate; + + ShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction) { + this.dataTree = Preconditions.checkNotNull(dataTree); + this.transaction = Preconditions.checkNotNull(transaction); + } + + @Override + public ListenableFuture canCommit() { + try { + dataTree.getDataTree().validate(transaction); + LOG.debug("Transaction {} validated", transaction); + return TRUE_FUTURE; + } catch (Exception e) { + return Futures.immediateFailedFuture(e); + } + } + + @Override + public ListenableFuture preCommit() { + try { + candidate = dataTree.getDataTree().prepare(transaction); + /* + * FIXME: this is the place where we should be interacting with persistence, specifically by invoking + * persist on the candidate (which gives us a Future). + */ + LOG.debug("Transaction {} prepared candidate {}", transaction, candidate); + return VOID_FUTURE; + } catch (Exception e) { + LOG.debug("Transaction {} failed to prepare", transaction, e); + return Futures.immediateFailedFuture(e); + } + } + + @Override + public ListenableFuture abort() { + // No-op, really + return VOID_FUTURE; + } + + @Override + public ListenableFuture commit() { + try { + dataTree.getDataTree().commit(candidate); + } catch (Exception e) { + LOG.error("Transaction {} failed to commit", transaction, e); + return Futures.immediateFailedFuture(e); + } + + LOG.debug("Transaction {} committed, proceeding to notify", transaction); + dataTree.notifyListeners(candidate); + return VOID_FUTURE; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java new file mode 100644 index 0000000000..8a54fc6231 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent; +import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; +import org.opendaylight.yangtools.util.concurrent.NotificationManager; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ShardDataTreeNotificationManager implements NotificationManager, DOMImmutableDataChangeEvent> { + private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeNotificationManager.class); + + @Override + public void submitNotification(final DataChangeListenerRegistration listener, final DOMImmutableDataChangeEvent notification) { + LOG.debug("Notifying listener {} about {}", listener.getInstance(), notification); + + listener.getInstance().onDataChanged(notification); + } + + @Override + public void submitNotifications(final DataChangeListenerRegistration listener, final Iterable notifications) { + final AsyncDataChangeListener> instance = listener.getInstance(); + LOG.debug("Notifying listener {} about {}", instance, notifications); + + for (DOMImmutableDataChangeEvent n : notifications) { + instance.onDataChanged(n); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java new file mode 100644 index 0000000000..780d940128 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.ForwardingDOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A transaction chain attached to a Shard. + */ +@NotThreadSafe +final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent { + private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeTransactionChain.class); + private final ShardDataTree dataTree; + private final String chainId; + + private ReadWriteShardDataTreeTransaction previousTx; + private ReadWriteShardDataTreeTransaction openTransaction; + private boolean closed; + + ShardDataTreeTransactionChain(final String chainId, final ShardDataTree dataTree) { + this.dataTree = Preconditions.checkNotNull(dataTree); + this.chainId = Preconditions.checkNotNull(chainId); + } + + private DataTreeSnapshot getSnapshot() { + Preconditions.checkState(!closed, "TransactionChain %s has been closed", this); + Preconditions.checkState(openTransaction == null, "Transaction %s is open", openTransaction); + + if (previousTx == null) { + return dataTree.getDataTree().takeSnapshot(); + } else { + return previousTx.getSnapshot(); + } + } + + ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final String txId) { + final DataTreeSnapshot snapshot = getSnapshot(); + LOG.debug("Allocated read-only transaction {} snapshot {}", txId, snapshot); + + return new ReadOnlyShardDataTreeTransaction(txId, snapshot); + } + + ReadWriteShardDataTreeTransaction newReadWriteTransaction(final String txId) { + final DataTreeSnapshot snapshot = getSnapshot(); + LOG.debug("Allocated read-write transaction {} snapshot {}", txId, snapshot); + + openTransaction = new ReadWriteShardDataTreeTransaction(this, txId, snapshot.newModification()); + return openTransaction; + } + + void close() { + closed = true; + } + + @Override + protected void abortTransaction(final AbstractShardDataTreeTransaction transaction) { + if (transaction instanceof ReadWriteShardDataTreeTransaction) { + Preconditions.checkState(openTransaction != null, "Attempted to abort transaction %s while none is outstanding", transaction); + LOG.debug("Aborted transaction {}", transaction); + openTransaction = null; + } + } + + @Override + protected DOMStoreThreePhaseCommitCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { + Preconditions.checkState(openTransaction != null, "Attempted to finish transaction %s while none is outstanding", transaction); + + // dataTree is finalizing ready the transaction, we just record it for the next + // transaction in chain + final DOMStoreThreePhaseCommitCohort delegate = dataTree.finishTransaction(transaction); + openTransaction = null; + previousTx = transaction; + LOG.debug("Committing transaction {}", transaction); + + return new CommitCohort(transaction, delegate); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("id", chainId).toString(); + } + + private final class CommitCohort extends ForwardingDOMStoreThreePhaseCommitCohort { + private final ReadWriteShardDataTreeTransaction transaction; + private final DOMStoreThreePhaseCommitCohort delegate; + + CommitCohort(final ReadWriteShardDataTreeTransaction transaction, final DOMStoreThreePhaseCommitCohort delegate) { + this.transaction = Preconditions.checkNotNull(transaction); + this.delegate = Preconditions.checkNotNull(delegate); + } + + @Override + protected DOMStoreThreePhaseCommitCohort delegate() { + return delegate; + } + + @Override + public ListenableFuture commit() { + final ListenableFuture ret = super.commit(); + + Futures.addCallback(ret, new FutureCallback() { + @Override + public void onSuccess(Void result) { + if (transaction.equals(previousTx)) { + previousTx = null; + } + LOG.debug("Committed transaction {}", transaction); + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Transaction {} commit failed, cannot recover", transaction, t); + } + }); + + return ret; + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java new file mode 100644 index 0000000000..6cc1408eae --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; + +abstract class ShardDataTreeTransactionParent { + abstract void abortTransaction(AbstractShardDataTreeTransaction transaction); + abstract DOMStoreThreePhaseCommitCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java index 41ca486eb6..f2c77e32d8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java @@ -13,17 +13,12 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.PoisonPill; import com.google.common.base.Optional; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -34,9 +29,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; public class ShardReadTransaction extends ShardTransaction { private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build(); - private final DOMStoreReadTransaction transaction; + private final AbstractShardDataTreeTransaction transaction; - public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor, + public ShardReadTransaction(AbstractShardDataTreeTransaction transaction, ActorRef shardActor, ShardStats shardStats, String transactionID, short clientTxVersion) { super(shardActor, shardStats, transactionID, clientTxVersion); this.transaction = transaction; @@ -70,28 +65,16 @@ public class ShardReadTransaction extends ShardTransaction { final ActorRef sender = getSender(); final ActorRef self = getSelf(); - final ListenableFuture>> future = transaction.read(DATASTORE_ROOT); + final Optional> result = transaction.getSnapshot().readNode(DATASTORE_ROOT); - Futures.addCallback(future, new FutureCallback>>() { - @Override - public void onSuccess(Optional> result) { - byte[] serialized = SerializationUtils.serializeNormalizedNode(result.get()); - sender.tell(new CaptureSnapshotReply(serialized), self); + byte[] serialized = SerializationUtils.serializeNormalizedNode(result.get()); + sender.tell(new CaptureSnapshotReply(serialized), self); - self.tell(PoisonPill.getInstance(), self); - } - - @Override - public void onFailure(Throwable t) { - sender.tell(new akka.actor.Status.Failure(t), self); - - self.tell(PoisonPill.getInstance(), self); - } - }); + self.tell(PoisonPill.getInstance(), self); } @Override - protected DOMStoreTransaction getDOMStoreTransaction() { + protected AbstractShardDataTreeTransaction getDOMStoreTransaction() { return transaction; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java index 2042e95577..c90b2ae028 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java @@ -14,35 +14,30 @@ import akka.actor.ActorRef; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.ReadData; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; /** * @author: syedbahm * Date: 8/6/14 */ public class ShardReadWriteTransaction extends ShardWriteTransaction { - private final DOMStoreReadWriteTransaction transaction; - - public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor, + public ShardReadWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor, ShardStats shardStats, String transactionID, short clientTxVersion) { super(transaction, shardActor, shardStats, transactionID, clientTxVersion); - this.transaction = transaction; } @Override public void handleReceive(Object message) throws Exception { if (message instanceof ReadData) { - readData(transaction, (ReadData) message, !SERIALIZED_REPLY); + readData((ReadData) message, !SERIALIZED_REPLY); } else if (message instanceof DataExists) { - dataExists(transaction, (DataExists) message, !SERIALIZED_REPLY); + dataExists((DataExists) message, !SERIALIZED_REPLY); } else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) { - readData(transaction, ReadData.fromSerializable(message), SERIALIZED_REPLY); + readData(ReadData.fromSerializable(message), SERIALIZED_REPLY); } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) { - dataExists(transaction, DataExists.fromSerializable(message), SERIALIZED_REPLY); - + dataExists(DataExists.fromSerializable(message), SERIALIZED_REPLY); } else { super.handleReceive(message); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index 01a124b697..f9d3050015 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -17,11 +17,12 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.slf4j.Logger; /** @@ -31,16 +32,16 @@ import org.slf4j.Logger; * committed to the data store in the order the corresponding snapshot or log batch are received * to preserve data store integrity. * - * @author Thomas Panetelis + * @author Thomas Pantelis */ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { - - private final InMemoryDOMDataStore store; + private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build(); + private final ShardDataTree store; private List currentLogRecoveryBatch; private final String shardName; private final Logger log; - ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) { + ShardRecoveryCoordinator(ShardDataTree store, String shardName, Logger log) { this.store = store; this.shardName = shardName; this.log = log; @@ -73,8 +74,8 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { } - private void commitTransaction(DOMStoreWriteTransaction transaction) { - DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); + private void commitTransaction(ReadWriteShardDataTreeTransaction transaction) { + DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction); try { commitCohort.preCommit().get(); commitCohort.commit().get(); @@ -90,10 +91,11 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { public void applyCurrentLogRecoveryBatch() { log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size()); - DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction(); - for(ModificationPayload payload: currentLogRecoveryBatch) { + ReadWriteShardDataTreeTransaction writeTx = store.newReadWriteTransaction(shardName + "-recovery", null); + DataTreeModification snapshot = writeTx.getSnapshot(); + for (ModificationPayload payload : currentLogRecoveryBatch) { try { - MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx); + MutableCompositeModification.fromSerializable(payload.getModification()).apply(snapshot); } catch (Exception e) { log.error("{}: Error extracting ModificationPayload", shardName, e); } @@ -111,14 +113,21 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { */ @Override public void applyRecoverySnapshot(final byte[] snapshotBytes) { - log.debug("{}: Applyng recovered sbapshot", shardName); + log.debug("{}: Applying recovered snapshot", shardName); - DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction(); + // Intentionally bypass normal transaction to side-step persistence/replication + final DataTree tree = store.getDataTree(); + DataTreeModification writeTx = tree.takeSnapshot().newModification(); NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); - writeTx.write(YangInstanceIdentifier.builder().build(), node); - - commitTransaction(writeTx); + writeTx.write(ROOT, node); + writeTx.ready(); + try { + tree.validate(writeTx); + tree.commit(tree.prepare(writeTx)); + } catch (DataValidationFailedException e) { + log.error("{}: Failed to validate recovery snapshot", shardName, e); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java index c59085d61c..35d8e922f2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java @@ -7,15 +7,14 @@ */ package org.opendaylight.controller.cluster.datastore; +import com.google.common.base.Preconditions; import akka.actor.ActorRef; import java.util.concurrent.ExecutionException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; @@ -31,14 +30,14 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort { private int createSnapshotTransactionCounter; private final ShardTransactionActorFactory transactionActorFactory; - private final InMemoryDOMDataStore store; + private final ShardDataTree store; private final Logger log; private final String logId; - ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, InMemoryDOMDataStore store, + ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, ShardDataTree store, Logger log, String logId) { this.transactionActorFactory = transactionActorFactory; - this.store = store; + this.store = Preconditions.checkNotNull(store); this.log = log; this.logId = logId; } @@ -67,15 +66,15 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort { log.info("{}: Applying snapshot", logId); try { - DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); + ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("snapshot-" + logId, null); NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); // delete everything first - transaction.delete(DATASTORE_ROOT); + transaction.getSnapshot().delete(DATASTORE_ROOT); // Add everything from the remote node back - transaction.write(DATASTORE_ROOT, node); + transaction.getSnapshot().write(DATASTORE_ROOT, node); syncCommitTransaction(transaction); } catch (InterruptedException | ExecutionException e) { log.error("{}: An exception occurred when applying snapshot", logId, e); @@ -85,9 +84,9 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort { } - void syncCommitTransaction(final DOMStoreWriteTransaction transaction) + void syncCommitTransaction(final ReadWriteShardDataTreeTransaction transaction) throws ExecutionException, InterruptedException { - DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); + DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction); commitCohort.preCommit().get(); commitCohort.commit().get(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index 81125a7152..600ec39397 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -14,8 +14,9 @@ import akka.actor.Props; import akka.actor.ReceiveTimeout; import akka.japi.Creator; import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; +import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; @@ -25,10 +26,6 @@ import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -66,13 +63,13 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering this.clientTxVersion = clientTxVersion; } - public static Props props(DOMStoreTransaction transaction, ActorRef shardActor, + public static Props props(TransactionType type, AbstractShardDataTreeTransaction transaction, ActorRef shardActor, DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) { - return Props.create(new ShardTransactionCreator(transaction, shardActor, + return Props.create(new ShardTransactionCreator(type, transaction, shardActor, datastoreContext, shardStats, transactionID, txnClientVersion)); } - protected abstract DOMStoreTransaction getDOMStoreTransaction(); + protected abstract AbstractShardDataTreeTransaction getDOMStoreTransaction(); protected ActorRef getShardActor() { return shardActor; @@ -105,7 +102,7 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering } private void closeTransaction(boolean sendReply) { - getDOMStoreTransaction().close(); + getDOMStoreTransaction().abort(); if(sendReply && returnCloseTransactionReply()) { getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf()); @@ -114,71 +111,83 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering getSelf().tell(PoisonPill.getInstance(), getSelf()); } - protected void readData(DOMStoreReadTransaction transaction, ReadData message, - final boolean returnSerialized) { - - final YangInstanceIdentifier path = message.getPath(); - try { - final CheckedFuture>, ReadFailedException> future = transaction.read(path); - Optional> optional = future.checkedGet(); - ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion); + private boolean checkClosed(AbstractShardDataTreeTransaction transaction) { + final boolean ret = transaction.isClosed(); + if (ret) { + shardStats.incrementFailedReadTransactionsCount(); + getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")), getSelf()); + } + return ret; + } - sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self()); + protected void readData(AbstractShardDataTreeTransaction transaction, ReadData message, + final boolean returnSerialized) { - } catch (Exception e) { - LOG.debug(String.format("Unexpected error reading path %s", path), e); - shardStats.incrementFailedReadTransactionsCount(); - sender().tell(new akka.actor.Status.Failure(e), self()); + if (checkClosed(transaction)) { + return; } + + final YangInstanceIdentifier path = message.getPath(); + Optional> optional = transaction.getSnapshot().readNode(path); + ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion); + sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self()); } - protected void dataExists(DOMStoreReadTransaction transaction, DataExists message, + protected void dataExists(AbstractShardDataTreeTransaction transaction, DataExists message, final boolean returnSerialized) { - final YangInstanceIdentifier path = message.getPath(); - try { - boolean exists = transaction.exists(path).checkedGet(); - DataExistsReply dataExistsReply = DataExistsReply.create(exists); - getSender().tell(returnSerialized ? dataExistsReply.toSerializable() : - dataExistsReply, getSelf()); - } catch (ReadFailedException e) { - getSender().tell(new akka.actor.Status.Failure(e),getSelf()); + if (checkClosed(transaction)) { + return; } + + final YangInstanceIdentifier path = message.getPath(); + boolean exists = transaction.getSnapshot().readNode(path).isPresent(); + DataExistsReply dataExistsReply = DataExistsReply.create(exists); + getSender().tell(returnSerialized ? dataExistsReply.toSerializable() : + dataExistsReply, getSelf()); } private static class ShardTransactionCreator implements Creator { private static final long serialVersionUID = 1L; - final DOMStoreTransaction transaction; + final AbstractShardDataTreeTransaction transaction; final ActorRef shardActor; final DatastoreContext datastoreContext; final ShardStats shardStats; final String transactionID; final short txnClientVersion; + final TransactionType type; - ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor, + ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction transaction, ActorRef shardActor, DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) { - this.transaction = transaction; + this.transaction = Preconditions.checkNotNull(transaction); this.shardActor = shardActor; this.shardStats = shardStats; this.datastoreContext = datastoreContext; this.transactionID = transactionID; this.txnClientVersion = txnClientVersion; + this.type = type; } @Override public ShardTransaction create() throws Exception { - ShardTransaction tx; - if(transaction instanceof DOMStoreReadWriteTransaction) { - tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction, - shardActor, shardStats, transactionID, txnClientVersion); - } else if(transaction instanceof DOMStoreReadTransaction) { - tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor, - shardStats, transactionID, txnClientVersion); - } else { - tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction, - shardActor, shardStats, transactionID, txnClientVersion); + final ShardTransaction tx; + switch (type) { + case READ_ONLY: + tx = new ShardReadTransaction(transaction, shardActor, + shardStats, transactionID, txnClientVersion); + break; + case READ_WRITE: + tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, + shardActor, shardStats, transactionID, txnClientVersion); + break; + case WRITE_ONLY: + tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, + shardActor, shardStats, transactionID, txnClientVersion); + break; + default: + throw new IllegalArgumentException("Unhandled transaction type " + type); } tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java index a4c97e8ab9..7a163088d4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java @@ -8,16 +8,17 @@ package org.opendaylight.controller.cluster.datastore; +import com.google.common.base.Preconditions; import akka.actor.ActorRef; import akka.actor.Props; import akka.japi.Creator; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** @@ -25,13 +26,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; */ public class ShardTransactionChain extends AbstractUntypedActor { - private final DOMStoreTransactionChain chain; + private final ShardDataTreeTransactionChain chain; private final DatastoreContext datastoreContext; private final ShardStats shardStats; - public ShardTransactionChain(DOMStoreTransactionChain chain, DatastoreContext datastoreContext, + public ShardTransactionChain(ShardDataTreeTransactionChain chain, DatastoreContext datastoreContext, ShardStats shardStats) { - this.chain = chain; + this.chain = Preconditions.checkNotNull(chain); this.datastoreContext = datastoreContext; this.shardStats = shardStats; } @@ -55,29 +56,25 @@ public class ShardTransactionChain extends AbstractUntypedActor { private ActorRef createTypedTransactionActor(CreateTransaction createTransaction) { String transactionName = "shard-" + createTransaction.getTransactionId(); - if(createTransaction.getTransactionType() == - TransactionProxy.TransactionType.READ_ONLY.ordinal()) { - return getContext().actorOf( - ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(), - datastoreContext, shardStats, createTransaction.getTransactionId(), - createTransaction.getVersion()), transactionName); - } else if (createTransaction.getTransactionType() == - TransactionProxy.TransactionType.READ_WRITE.ordinal()) { - return getContext().actorOf( - ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(), - datastoreContext, shardStats, createTransaction.getTransactionId(), - createTransaction.getVersion()), transactionName); - } else if (createTransaction.getTransactionType() == - TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { - return getContext().actorOf( - ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(), - datastoreContext, shardStats, createTransaction.getTransactionId(), - createTransaction.getVersion()), transactionName); - } else { - throw new IllegalArgumentException ( - "CreateTransaction message has unidentified transaction type=" + - createTransaction.getTransactionType()); + + final TransactionType type = TransactionType.fromInt(createTransaction.getTransactionType()); + final AbstractShardDataTreeTransaction transaction; + switch (type) { + case READ_ONLY: + transaction = chain.newReadOnlyTransaction(transactionName); + break; + case READ_WRITE: + case WRITE_ONLY: + transaction = chain.newReadWriteTransaction(transactionName); + break; + default: + throw new IllegalArgumentException("Unhandled transaction type " + type); } + + return getContext().actorOf( + ShardTransaction.props(type, transaction, getShardActor(), + datastoreContext, shardStats, createTransaction.getTransactionId(), + createTransaction.getVersion()), transactionName); } private void createTransaction(CreateTransaction createTransaction) { @@ -87,7 +84,7 @@ public class ShardTransactionChain extends AbstractUntypedActor { createTransaction.getTransactionId()).toSerializable(), getSelf()); } - public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext, + public static Props props(ShardDataTreeTransactionChain chain, SchemaContext schemaContext, DatastoreContext datastoreContext, ShardStats shardStats) { return Props.create(new ShardTransactionChainCreator(chain, datastoreContext, shardStats)); } @@ -95,12 +92,11 @@ public class ShardTransactionChain extends AbstractUntypedActor { private static class ShardTransactionChainCreator implements Creator { private static final long serialVersionUID = 1L; - final DOMStoreTransactionChain chain; + final ShardDataTreeTransactionChain chain; final DatastoreContext datastoreContext; final ShardStats shardStats; - - ShardTransactionChainCreator(DOMStoreTransactionChain chain, DatastoreContext datastoreContext, + ShardTransactionChainCreator(ShardDataTreeTransactionChain chain, DatastoreContext datastoreContext, ShardStats shardStats) { this.chain = chain; this.datastoreContext = datastoreContext; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java index 9637646fc5..3a92062e7f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java @@ -7,11 +7,11 @@ */ package org.opendaylight.controller.cluster.datastore; +import com.google.common.base.Preconditions; import akka.actor.ActorRef; import akka.actor.UntypedActorContext; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; /** * A factory for creating ShardTransaction actors. @@ -20,16 +20,16 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; */ class ShardTransactionActorFactory { - private final DOMTransactionFactory domTransactionFactory; + private final ShardDataTree dataTree; private final DatastoreContext datastoreContext; private final String txnDispatcherPath; private final ShardStats shardMBean; private final UntypedActorContext actorContext; private final ActorRef shardActor; - ShardTransactionActorFactory(DOMTransactionFactory domTransactionFactory, DatastoreContext datastoreContext, + ShardTransactionActorFactory(ShardDataTree dataTree, DatastoreContext datastoreContext, String txnDispatcherPath, ActorRef shardActor, UntypedActorContext actorContext, ShardStats shardMBean) { - this.domTransactionFactory = domTransactionFactory; + this.dataTree = Preconditions.checkNotNull(dataTree); this.datastoreContext = datastoreContext; this.txnDispatcherPath = txnDispatcherPath; this.shardMBean = shardMBean; @@ -39,11 +39,20 @@ class ShardTransactionActorFactory { ActorRef newShardTransaction(TransactionProxy.TransactionType type, ShardTransactionIdentifier transactionID, String transactionChainID, short clientVersion) { + final AbstractShardDataTreeTransaction transaction; + switch (type) { + case READ_ONLY: + transaction = dataTree.newReadOnlyTransaction(transactionID.toString(), transactionChainID); + break; + case READ_WRITE: + case WRITE_ONLY: + transaction = dataTree.newReadWriteTransaction(transactionID.toString(), transactionChainID); + break; + default: + throw new IllegalArgumentException("Unsupported transaction type " + type); + } - DOMStoreTransaction transaction = domTransactionFactory.newTransaction(type, transactionID.toString(), - transactionChainID); - - return actorContext.actorOf(ShardTransaction.props(transaction, shardActor, datastoreContext, shardMBean, + return actorContext.actorOf(ShardTransaction.props(type, transaction, shardActor, datastoreContext, shardMBean, transactionID.getRemoteTransactionId(), clientVersion).withDispatcher(txnDispatcherPath), transactionID.toString()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index 424ab2052c..69a696f294 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -15,11 +15,13 @@ import akka.actor.PoisonPill; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; +import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.MergeData; import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; @@ -30,8 +32,6 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; /** * @author: syedbahm @@ -42,16 +42,16 @@ public class ShardWriteTransaction extends ShardTransaction { private final MutableCompositeModification compositeModification = new MutableCompositeModification(); private int totalBatchedModificationsReceived; private Exception lastBatchedModificationsException; - private final DOMStoreWriteTransaction transaction; + private final ReadWriteShardDataTreeTransaction transaction; - public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor, + public ShardWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor, ShardStats shardStats, String transactionID, short clientTxVersion) { super(shardActor, shardStats, transactionID, clientTxVersion); this.transaction = transaction; } @Override - protected DOMStoreTransaction getDOMStoreTransaction() { + protected ReadWriteShardDataTreeTransaction getDOMStoreTransaction() { return transaction; } @@ -61,17 +61,17 @@ public class ShardWriteTransaction extends ShardTransaction { if (message instanceof BatchedModifications) { batchedModifications((BatchedModifications)message); } else if (message instanceof ReadyTransaction) { - readyTransaction(transaction, !SERIALIZED_REPLY, false); + readyTransaction(!SERIALIZED_REPLY, false); } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) { - readyTransaction(transaction, SERIALIZED_REPLY, false); + readyTransaction(SERIALIZED_REPLY, false); } else if(WriteData.isSerializedType(message)) { - writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY); + writeData(WriteData.fromSerializable(message), SERIALIZED_REPLY); } else if(MergeData.isSerializedType(message)) { - mergeData(transaction, MergeData.fromSerializable(message), SERIALIZED_REPLY); + mergeData(MergeData.fromSerializable(message), SERIALIZED_REPLY); } else if(DeleteData.isSerializedType(message)) { - deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY); + deleteData(DeleteData.fromSerializable(message), SERIALIZED_REPLY); } else if (message instanceof GetCompositedModification) { // This is here for testing only @@ -82,10 +82,17 @@ public class ShardWriteTransaction extends ShardTransaction { } private void batchedModifications(BatchedModifications batched) { + if (checkClosed()) { + if (batched.isReady()) { + getSelf().tell(PoisonPill.getInstance(), getSelf()); + } + return; + } + try { for(Modification modification: batched.getModifications()) { compositeModification.addModification(modification); - modification.apply(transaction); + modification.apply(transaction.getSnapshot()); } totalBatchedModificationsReceived++; @@ -100,7 +107,7 @@ public class ShardWriteTransaction extends ShardTransaction { totalBatchedModificationsReceived, batched.getTotalMessagesSent())); } - readyTransaction(transaction, false, batched.isDoCommitOnReady()); + readyTransaction(false, batched.isDoCommitOnReady()); } else { getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf()); } @@ -114,14 +121,33 @@ public class ShardWriteTransaction extends ShardTransaction { } } - private void writeData(DOMStoreWriteTransaction transaction, WriteData message, - boolean returnSerialized) { + protected final void dataExists(DataExists message, final boolean returnSerialized) { + super.dataExists(transaction, message, returnSerialized); + } + + protected final void readData(ReadData message, final boolean returnSerialized) { + super.readData(transaction, message, returnSerialized); + } + + private boolean checkClosed() { + if (transaction.isClosed()) { + getSender().tell(new akka.actor.Status.Failure(new IllegalStateException("Transaction is closed, no modifications allowed")), getSelf()); + return true; + } else { + return false; + } + } + + private void writeData(WriteData message, boolean returnSerialized) { LOG.debug("writeData at path : {}", message.getPath()); + if (checkClosed()) { + return; + } compositeModification.addModification( new WriteModification(message.getPath(), message.getData())); try { - transaction.write(message.getPath(), message.getData()); + transaction.getSnapshot().write(message.getPath(), message.getData()); WriteDataReply writeDataReply = WriteDataReply.INSTANCE; getSender().tell(returnSerialized ? writeDataReply.toSerializable(message.getVersion()) : writeDataReply, getSelf()); @@ -130,15 +156,17 @@ public class ShardWriteTransaction extends ShardTransaction { } } - private void mergeData(DOMStoreWriteTransaction transaction, MergeData message, - boolean returnSerialized) { + private void mergeData(MergeData message, boolean returnSerialized) { LOG.debug("mergeData at path : {}", message.getPath()); + if (checkClosed()) { + return; + } compositeModification.addModification( new MergeModification(message.getPath(), message.getData())); try { - transaction.merge(message.getPath(), message.getData()); + transaction.getSnapshot().merge(message.getPath(), message.getData()); MergeDataReply mergeDataReply = MergeDataReply.INSTANCE; getSender().tell(returnSerialized ? mergeDataReply.toSerializable(message.getVersion()) : mergeDataReply, getSelf()); @@ -147,23 +175,24 @@ public class ShardWriteTransaction extends ShardTransaction { } } - private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message, - boolean returnSerialized) { + private void deleteData(DeleteData message, boolean returnSerialized) { LOG.debug("deleteData at path : {}", message.getPath()); + if (checkClosed()) { + return; + } compositeModification.addModification(new DeleteModification(message.getPath())); try { - transaction.delete(message.getPath()); + transaction.getSnapshot().delete(message.getPath()); DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE; getSender().tell(returnSerialized ? deleteDataReply.toSerializable(message.getVersion()) : deleteDataReply, getSelf()); - }catch(Exception e){ + } catch(Exception e) { getSender().tell(new akka.actor.Status.Failure(e), getSelf()); } } - private void readyTransaction(DOMStoreWriteTransaction transaction, boolean returnSerialized, - boolean doImmediateCommit) { + private void readyTransaction(boolean returnSerialized, boolean doImmediateCommit) { String transactionID = getTransactionID(); LOG.debug("readyTransaction : {}", transactionID); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java index 3a63f5b173..2c55357161 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java @@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; /** * DeleteModification store all the parameters required to delete a path from the data tree @@ -41,6 +42,11 @@ public class DeleteModification extends AbstractModification { transaction.delete(getPath()); } + @Override + public void apply(DataTreeModification transaction) { + transaction.delete(getPath()); + } + @Override public byte getType() { return DELETE; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java index 7ba74f4e7f..cc7956ebbc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java @@ -17,6 +17,7 @@ import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessa import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; /** * MergeModification stores all the parameters required to merge data into the specified path @@ -41,6 +42,11 @@ public class MergeModification extends WriteModification { transaction.merge(getPath(), getData()); } + @Override + public void apply(final DataTreeModification transaction) { + transaction.merge(getPath(), getData()); + } + @Override public byte getType() { return MERGE; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/Modification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/Modification.java index 2dfcdf0287..6fc8183bd8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/Modification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/Modification.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.modification; import java.io.Externalizable; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; /** * Represents a modification to the data store. @@ -39,6 +40,13 @@ public interface Modification extends Externalizable { */ void apply(DOMStoreWriteTransaction transaction); + /** + * Apply the modification to the specified transaction + * + * @param transaction + */ + void apply(DataTreeModification transaction); + byte getType(); @Deprecated diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java index b597742319..b594578eb2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java @@ -19,6 +19,7 @@ import org.opendaylight.controller.cluster.datastore.node.utils.stream.Normalize import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; /** * MutableCompositeModification is just a mutable version of a @@ -45,6 +46,13 @@ public class MutableCompositeModification implements CompositeModification { } } + @Override + public void apply(DataTreeModification transaction) { + for (Modification modification : modifications) { + modification.apply(transaction); + } + } + @Override public byte getType() { return COMPOSITE; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java index 2fdca5f379..f7f9a71735 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java @@ -21,6 +21,7 @@ import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessa import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; /** * WriteModification stores all the parameters required to write data to the specified path @@ -48,6 +49,11 @@ public class WriteModification extends AbstractModification { transaction.write(getPath(), data); } + @Override + public void apply(final DataTreeModification transaction) { + transaction.write(getPath(), data); + } + public NormalizedNode getData() { return data; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index f6a103ffb8..03f2bb7ad0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -46,12 +46,16 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** @@ -183,19 +187,19 @@ public abstract class AbstractShardTest extends AbstractActorTest{ } protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName, - final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode data, + final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode data, final MutableCompositeModification modification) { return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null); } protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName, - final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode data, + final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode data, final MutableCompositeModification modification, final Function> preCommit) { - DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction(); - tx.write(path, data); - DOMStoreThreePhaseCommitCohort cohort = createDelegatingMockCohort(cohortName, tx.ready(), preCommit); + ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction("setup-mock-" + cohortName, null); + tx.getSnapshot().write(path, data); + DOMStoreThreePhaseCommitCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit); modification.addModification(new WriteModification(path, data)); @@ -249,38 +253,43 @@ public abstract class AbstractShardTest extends AbstractActorTest{ public static NormalizedNode readStore(final TestActorRef shard, final YangInstanceIdentifier id) throws ExecutionException, InterruptedException { - return readStore(shard.underlyingActor().getDataStore(), id); + return readStore(shard.underlyingActor().getDataStore().getDataTree(), id); } - public static NormalizedNode readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id) - throws ExecutionException, InterruptedException { - DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); - - CheckedFuture>, ReadFailedException> future = - transaction.read(id); + public static NormalizedNode readStore(final DataTree store, final YangInstanceIdentifier id) { + DataTreeSnapshot transaction = store.takeSnapshot(); - Optional> optional = future.get(); + Optional> optional = transaction.readNode(id); NormalizedNode node = optional.isPresent()? optional.get() : null; - transaction.close(); - return node; } public static void writeToStore(final TestActorRef shard, final YangInstanceIdentifier id, - final NormalizedNode node) throws ExecutionException, InterruptedException { + final NormalizedNode node) throws InterruptedException, ExecutionException { writeToStore(shard.underlyingActor().getDataStore(), id, node); } - public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id, - final NormalizedNode node) throws ExecutionException, InterruptedException { - DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); + public static void writeToStore(final ShardDataTree store, final YangInstanceIdentifier id, + final NormalizedNode node) throws InterruptedException, ExecutionException { + ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null); - transaction.write(id, node); + transaction.getSnapshot().write(id, node); + DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); + cohort.canCommit().get(); + cohort.preCommit().get(); + cohort.commit(); + } - DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); - commitCohort.preCommit().get(); - commitCohort.commit().get(); + public static void writeToStore(final DataTree store, final YangInstanceIdentifier id, + final NormalizedNode node) throws DataValidationFailedException { + DataTreeModification transaction = store.takeSnapshot().newModification(); + + transaction.write(id, node); + transaction.ready(); + store.validate(transaction); + final DataTreeCandidate candidate = store.prepare(transaction); + store.commit(candidate); } @SuppressWarnings("serial") diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 72f672794a..22ce50b90d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -89,7 +89,6 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -100,7 +99,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; @@ -338,8 +339,8 @@ public class ShardTest extends AbstractShardTest { TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplySnapshot"); - InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); - store.onGlobalContextUpdated(SCHEMA_CONTEXT); + DataTree store = InMemoryDataTreeFactory.getInstance().create(); + store.setSchemaContext(SCHEMA_CONTEXT); writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -382,8 +383,8 @@ public class ShardTest extends AbstractShardTest { // Set up the InMemorySnapshotStore. - InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null); - testStore.onGlobalContextUpdated(SCHEMA_CONTEXT); + DataTree testStore = InMemoryDataTreeFactory.getInstance().create(); + testStore.setSchemaContext(SCHEMA_CONTEXT); writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -440,7 +441,7 @@ public class ShardTest extends AbstractShardTest { // Setup 3 simulated transactions with mock cohorts backed by real cohorts. - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); @@ -861,7 +862,7 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); @@ -901,7 +902,7 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); // Setup a simulated transactions with a mock cohort. @@ -1400,7 +1401,7 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); final FiniteDuration duration = duration("5 seconds"); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); final String transactionID = "tx1"; Function> preCommit = @@ -1464,7 +1465,7 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); writeToStore(shard, TestModel.OUTER_LIST_PATH, @@ -1541,7 +1542,7 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index 21fb55fcf1..e45389f5f3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -14,17 +14,15 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.pattern.AskTimeoutException; import akka.testkit.TestActorRef; -import com.google.common.util.concurrent.MoreExecutors; import java.util.Collections; import java.util.concurrent.TimeUnit; -import org.junit.BeforeClass; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -40,11 +38,13 @@ import scala.concurrent.duration.Duration; * @author Basheeruddin Ahmed */ public class ShardTransactionFailureTest extends AbstractActorTest { - private static final InMemoryDOMDataStore store = - new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); - private static final SchemaContext testSchemaContext = - TestModel.createTestContext(); + TestModel.createTestContext(); + private static final TransactionType RO = TransactionType.READ_ONLY; + private static final TransactionType RW = TransactionType.READ_WRITE; + private static final TransactionType WO = TransactionType.WRITE_ONLY; + + private static final ShardDataTree store = new ShardDataTree(testSchemaContext); private static final ShardIdentifier SHARD_IDENTIFIER = ShardIdentifier.builder().memberName("member-1") @@ -54,11 +54,6 @@ public class ShardTransactionFailureTest extends AbstractActorTest { private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore"); - @BeforeClass - public static void staticSetup() { - store.onGlobalContextUpdated(testSchemaContext); - } - private ActorRef createShard(){ return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.emptyMap(), datastoreContext, TestModel.createTestContext())); @@ -69,7 +64,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { throws Throwable { final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, + final Props props = ShardTransaction.props(RO, store.newReadOnlyTransaction("test-txn", null), shard, datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef @@ -86,7 +81,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { akka.pattern.Patterns.ask(subject, readData, 3000); Await.result(future, Duration.create(3, TimeUnit.SECONDS)); - subject.underlyingActor().getDOMStoreTransaction().close(); + subject.underlyingActor().getDOMStoreTransaction().abort(); future = akka.pattern.Patterns.ask(subject, readData, 3000); Await.result(future, Duration.create(3, TimeUnit.SECONDS)); @@ -98,7 +93,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { throws Throwable { final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, + final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard, datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef @@ -116,7 +111,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { akka.pattern.Patterns.ask(subject, readData, 3000); Await.result(future, Duration.create(3, TimeUnit.SECONDS)); - subject.underlyingActor().getDOMStoreTransaction().close(); + subject.underlyingActor().getDOMStoreTransaction().abort(); future = akka.pattern.Patterns.ask(subject, readData, 3000); Await.result(future, Duration.create(3, TimeUnit.SECONDS)); @@ -127,7 +122,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { throws Throwable { final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, + final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard, datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef @@ -145,7 +140,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { akka.pattern.Patterns.ask(subject, dataExists, 3000); Await.result(future, Duration.create(3, TimeUnit.SECONDS)); - subject.underlyingActor().getDOMStoreTransaction().close(); + subject.underlyingActor().getDOMStoreTransaction().abort(); future = akka.pattern.Patterns.ask(subject, dataExists, 3000); Await.result(future, Duration.create(3, TimeUnit.SECONDS)); @@ -156,7 +151,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, + final Props props = ShardTransaction.props(WO, store.newReadWriteTransaction("test-txn", null), shard, datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef @@ -188,7 +183,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, + final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard, datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef @@ -225,7 +220,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, + final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard, datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef @@ -257,7 +252,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, + final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard, datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index b22001a4da..23984ad973 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -11,14 +11,13 @@ import akka.actor.Status.Failure; import akka.actor.Terminated; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; -import com.google.common.util.concurrent.MoreExecutors; import java.util.Collections; import java.util.concurrent.TimeUnit; -import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply; +import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; @@ -50,13 +49,11 @@ import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCo import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -64,6 +61,9 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; public class ShardTransactionTest extends AbstractActorTest { private static final SchemaContext testSchemaContext = TestModel.createTestContext(); + private static final TransactionType RO = TransactionType.READ_ONLY; + private static final TransactionType RW = TransactionType.READ_WRITE; + private static final TransactionType WO = TransactionType.WRITE_ONLY; private static final ShardIdentifier SHARD_IDENTIFIER = ShardIdentifier.builder().memberName("member-1") @@ -73,46 +73,50 @@ public class ShardTransactionTest extends AbstractActorTest { private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore"); - private final InMemoryDOMDataStore store = - new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); + private final ShardDataTree store = new ShardDataTree(testSchemaContext); - @Before - public void setup() { - store.onGlobalContextUpdated(testSchemaContext); - } + private int txCounter = 0; - private ActorRef createShard(){ + private ActorRef createShard() { return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.emptyMap(), datastoreContext, TestModel.createTestContext())); } - private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) { - return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION); + private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction transaction, String name) { + return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION); } - private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) { - return newTransactionActor(transaction, null, name, version); + private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction transaction, String name, short version) { + return newTransactionActor(type, transaction, null, name, version); } - private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) { - return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION); + private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction transaction, ActorRef shard, String name) { + return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION); } - private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name, + private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction transaction, ActorRef shard, String name, short version) { - Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(), + Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(), datastoreContext, shardStats, "txn", version); return getSystem().actorOf(props, name); } + private ReadOnlyShardDataTreeTransaction readOnlyTransaction() { + return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null); + } + + private ReadWriteShardDataTreeTransaction readWriteTransaction() { + return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null); + } + @Test public void testOnReceiveReadData() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); - testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO")); + testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO")); - testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW")); + testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW")); } private void testOnReceiveReadData(final ActorRef transaction) { @@ -140,10 +144,10 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef shard = createShard(); testOnReceiveReadDataWhenDataNotFound(newTransactionActor( - store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO")); + RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO")); testOnReceiveReadDataWhenDataNotFound(newTransactionActor( - store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW")); + RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW")); } private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) { @@ -167,7 +171,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveReadDataHeliumR1() throws Exception { new JavaTestKit(getSystem()) {{ - ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(), + ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION); transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), @@ -185,10 +189,10 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); - testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard, + testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard, "testDataExistsPositiveRO")); - testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard, + testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard, "testDataExistsPositiveRW")); } @@ -215,10 +219,10 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); - testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard, + testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard, "testDataExistsNegativeRO")); - testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard, + testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard, "testDataExistsNegativeRW")); } @@ -253,9 +257,9 @@ public class ShardTransactionTest extends AbstractActorTest { } @Test - public void testOnReceiveWriteData() throws Exception { + public void testOnReceiveWriteData() { new JavaTestKit(getSystem()) {{ - final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), "testOnReceiveWriteData"); transaction.tell(new WriteData(TestModel.TEST_PATH, @@ -276,9 +280,9 @@ public class ShardTransactionTest extends AbstractActorTest { } @Test - public void testOnReceiveHeliumR1WriteData() throws Exception { + public void testOnReceiveHeliumR1WriteData() { new JavaTestKit(getSystem()) {{ - final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION); Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH, @@ -296,9 +300,9 @@ public class ShardTransactionTest extends AbstractActorTest { } @Test - public void testOnReceiveMergeData() throws Exception { + public void testOnReceiveMergeData() { new JavaTestKit(getSystem()) {{ - final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), + final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(), "testMergeData"); transaction.tell(new MergeData(TestModel.TEST_PATH, @@ -321,7 +325,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveHeliumR1MergeData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION); Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH, @@ -341,7 +345,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveDeleteData() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), "testDeleteData"); transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION). @@ -362,8 +366,10 @@ public class ShardTransactionTest extends AbstractActorTest { public void testOnReceiveBatchedModifications() throws Exception { new JavaTestKit(getSystem()) {{ - DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class); - final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications"); + ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class); + DataTreeModification mockModification = Mockito.mock(DataTreeModification.class); + ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification); + final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications"); YangInstanceIdentifier writePath = TestModel.TEST_PATH; NormalizedNode writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( @@ -405,10 +411,10 @@ public class ShardTransactionTest extends AbstractActorTest { DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2); assertEquals("getPath", deletePath, delete.getPath()); - InOrder inOrder = Mockito.inOrder(mockWriteTx); - inOrder.verify(mockWriteTx).write(writePath, writeData); - inOrder.verify(mockWriteTx).merge(mergePath, mergeData); - inOrder.verify(mockWriteTx).delete(deletePath); + InOrder inOrder = Mockito.inOrder(mockModification); + inOrder.verify(mockModification).write(writePath, writeData); + inOrder.verify(mockModification).merge(mergePath, mergeData); + inOrder.verify(mockModification).delete(deletePath); }}; } @@ -416,7 +422,7 @@ public class ShardTransactionTest extends AbstractActorTest { public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit"); JavaTestKit watcher = new JavaTestKit(getSystem()); @@ -448,7 +454,7 @@ public class ShardTransactionTest extends AbstractActorTest { public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), "testOnReceiveBatchedModificationsReadyWithImmediateCommit"); JavaTestKit watcher = new JavaTestKit(getSystem()); @@ -475,8 +481,10 @@ public class ShardTransactionTest extends AbstractActorTest { public void testOnReceiveBatchedModificationsFailure() throws Throwable { new JavaTestKit(getSystem()) {{ - DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class); - final ActorRef transaction = newTransactionActor(mockWriteTx, + ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class); + DataTreeModification mockModification = Mockito.mock(DataTreeModification.class); + ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification); + final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModificationsFailure"); JavaTestKit watcher = new JavaTestKit(getSystem()); @@ -485,7 +493,7 @@ public class ShardTransactionTest extends AbstractActorTest { YangInstanceIdentifier path = TestModel.TEST_PATH; ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doThrow(new TestException()).when(mockWriteTx).write(path, node); + doThrow(new TestException()).when(mockModification).write(path, node); BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); batched.addModification(new WriteModification(path, node)); @@ -511,7 +519,7 @@ public class ShardTransactionTest extends AbstractActorTest { public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable { new JavaTestKit(getSystem()) {{ - final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount"); JavaTestKit watcher = new JavaTestKit(getSystem()); @@ -535,7 +543,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceivePreLithiumReadyTransaction() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), + final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(), "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION); JavaTestKit watcher = new JavaTestKit(getSystem()); @@ -549,7 +557,7 @@ public class ShardTransactionTest extends AbstractActorTest { // test new JavaTestKit(getSystem()) {{ - final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), + final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(), "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION); JavaTestKit watcher = new JavaTestKit(getSystem()); @@ -565,13 +573,13 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveCreateSnapshot() throws Exception { new JavaTestKit(getSystem()) {{ - ShardTest.writeToStore(store, TestModel.TEST_PATH, + ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - NormalizedNode expectedRoot = ShardTest.readStore(store, + NormalizedNode expectedRoot = ShardTest.readStore(store.getDataTree(), YangInstanceIdentifier.builder().build()); - final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(), + final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(), "testOnReceiveCreateSnapshot"); watch(transaction); @@ -594,7 +602,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testReadWriteTxOnReceiveCloseTransaction() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), + final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(), "testReadWriteTxOnReceiveCloseTransaction"); watch(transaction); @@ -609,7 +617,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), "testWriteTxOnReceiveCloseTransaction"); watch(transaction); @@ -624,7 +632,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(), + final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(), "testReadOnlyTxOnReceiveCloseTransaction"); watch(transaction); @@ -638,7 +646,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Test(expected=UnknownMessageException.class) public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception { final ActorRef shard = createShard(); - final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, + final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard, datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef transaction = TestActorRef.apply(props,getSystem()); @@ -653,7 +661,7 @@ public class ShardTransactionTest extends AbstractActorTest { 500, TimeUnit.MILLISECONDS).build(); new JavaTestKit(getSystem()) {{ - final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), + final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(), "testShardTransactionInactivity"); watch(transaction); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java index caabb32d71..96cd3e45eb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -20,7 +20,6 @@ import akka.pattern.Patterns; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Optional; -import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; import java.util.Collections; import java.util.HashSet; @@ -32,6 +31,7 @@ import org.junit.Test; import org.mockito.InOrder; import org.opendaylight.controller.cluster.datastore.AbstractShardTest; import org.opendaylight.controller.cluster.datastore.Shard; +import org.opendaylight.controller.cluster.datastore.ShardDataTree; import org.opendaylight.controller.cluster.datastore.ShardTestKit; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; @@ -56,8 +56,6 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Compos import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -65,7 +63,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgum import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -110,8 +110,8 @@ public class PreLithiumShardTest extends AbstractShardTest { NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT); - InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); - store.onGlobalContextUpdated(SCHEMA_CONTEXT); + DataTree store = InMemoryDataTreeFactory.getInstance().create(); + store.setSchemaContext(SCHEMA_CONTEXT); writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -154,10 +154,8 @@ public class PreLithiumShardTest extends AbstractShardTest { @Test public void testHelium2VersionRecovery() throws Exception { - // Set up the InMemorySnapshotStore. - - InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null); - testStore.onGlobalContextUpdated(SCHEMA_CONTEXT); + DataTree testStore = InMemoryDataTreeFactory.getInstance().create(); + testStore.setSchemaContext(SCHEMA_CONTEXT); writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -217,7 +215,7 @@ public class PreLithiumShardTest extends AbstractShardTest { // Setup 3 simulated transactions with mock cohorts backed by real cohorts. - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); -- 2.36.6