--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+/**
+ * Abstract base for transactions running on SharrdDataTree.
+ *
+ * @param <T> Backing transaction type.
+ */
+@NotThreadSafe
+abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot> {
+ private final T snapshot;
+ private final String id;
+ private boolean closed;
+
+ protected AbstractShardDataTreeTransaction(final String id, final T snapshot) {
+ this.snapshot = Preconditions.checkNotNull(snapshot);
+ this.id = Preconditions.checkNotNull(id);
+ }
+
+ final T getSnapshot() {
+ return snapshot;
+ }
+
+ final boolean isClosed() {
+ return closed;
+ }
+
+ /**
+ * Close this transaction and mark it as closed, allowing idempotent invocations.
+ *
+ * @return True if the transaction got closed by this method invocation.
+ */
+ protected final boolean close() {
+ if (closed) {
+ return false;
+ }
+
+ closed = true;
+ return true;
+ }
+
+ @Override
+ public final String toString() {
+ return MoreObjects.toStringHelper(this).add("id", id).add("closed", closed).add("snapshot", snapshot).toString();
+ }
+
+ abstract void abort();
+}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
-import org.slf4j.Logger;
-
-/**
- * A factory for creating DOM transactions, either normal or chained.
- *
- * @author Thomas Pantelis
- */
-public class DOMTransactionFactory {
-
- private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
- private final InMemoryDOMDataStore store;
- private final ShardStats shardMBean;
- private final Logger log;
- private final String name;
-
- public DOMTransactionFactory(InMemoryDOMDataStore store, ShardStats shardMBean, Logger log, String name) {
- this.store = store;
- this.shardMBean = shardMBean;
- this.log = log;
- this.name = name;
- }
-
- @SuppressWarnings("unchecked")
- public <T extends DOMStoreTransaction> T newTransaction(TransactionProxy.TransactionType type,
- String transactionID, String transactionChainID) {
-
- DOMStoreTransactionFactory factory = store;
-
- if(!transactionChainID.isEmpty()) {
- factory = transactionChains.get(transactionChainID);
- if(factory == null) {
- if(log.isDebugEnabled()) {
- log.debug("{}: Creating transaction with ID {} from chain {}", name, transactionID,
- transactionChainID);
- }
-
- DOMStoreTransactionChain transactionChain = store.createTransactionChain();
- transactionChains.put(transactionChainID, transactionChain);
- factory = transactionChain;
- }
- } else {
- log.debug("{}: Creating transaction with ID {}", name, transactionID);
- }
-
- T transaction = null;
- switch(type) {
- case READ_ONLY:
- transaction = (T) factory.newReadOnlyTransaction();
- shardMBean.incrementReadOnlyTransactionCount();
- break;
- case READ_WRITE:
- transaction = (T) factory.newReadWriteTransaction();
- shardMBean.incrementReadWriteTransactionCount();
- break;
- case WRITE_ONLY:
- transaction = (T) factory.newWriteOnlyTransaction();
- shardMBean.incrementWriteOnlyTransactionCount();
- break;
- }
-
- return transaction;
- }
-
- public void closeTransactionChain(String transactionChainID) {
- DOMStoreTransactionChain chain =
- transactionChains.remove(transactionChainID);
-
- if(chain != null) {
- chain.close();
- }
- }
-
- public void closeAllTransactionChains() {
- for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
- entry.getValue().close();
- }
-
- transactionChains.clear();
- }
-}
*/
package org.opendaylight.controller.cluster.datastore;
-import java.util.ArrayList;
-import java.util.List;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterChangeListener, ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> {
+final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterChangeListener, ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> {
private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class);
private final List<DelayedListenerRegistration> delayedListenerRegistrations = new ArrayList<>();
private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
if (isLeader) {
for (DelayedListenerRegistration reg: delayedListenerRegistrations) {
if(!reg.isClosed()) {
- reg.setDelegate(createDelegate(reg.getRegisterChangeListener()));
+ final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
+ createDelegate(reg.getRegisterChangeListener());
+ reg.setDelegate(res.getKey());
+ if (res.getValue() != null) {
+ reg.getInstance().onDataChanged(res.getValue());
+ }
}
}
LOG.debug("{}: registerDataChangeListener for {}, leader: {}", persistenceId(), message.getPath(), isLeader);
- ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> registration;
+ final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
if (isLeader) {
- registration = createDelegate(message);
+ final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
+ createDelegate(message);
+ registration = res.getKey();
+ event = res.getValue();
} else {
LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message);
delayedListenerRegistrations.add(delayedReg);
registration = delayedReg;
+ event = null;
}
ActorRef listenerRegistration = createActor(DataChangeListenerRegistration.props(registration));
persistenceId(), listenerRegistration.path());
tellSender(new RegisterChangeListenerReply(listenerRegistration));
+ if (event != null) {
+ registration.getInstance().onDataChanged(event);
+ }
}
@Override
- ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> createDelegate(
+ Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> createDelegate(
final RegisterChangeListener message) {
ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
import akka.actor.ActorSelection;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Map.Entry;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>> {
+final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> {
private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerSupport.class);
private final ArrayList<DelayedDataTreeListenerRegistration> delayedRegistrations = new ArrayList<>();
private final Collection<ActorSelection> actors = new ArrayList<>();
LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", persistenceId(), registerTreeChangeListener.getPath(), isLeader);
final ListenerRegistration<DOMDataTreeChangeListener> registration;
+ final DataTreeCandidate event;
if (!isLeader) {
LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
new DelayedDataTreeListenerRegistration(registerTreeChangeListener);
delayedRegistrations.add(delayedReg);
registration = delayedReg;
+ event = null;
} else {
- registration = createDelegate(registerTreeChangeListener);
+ final Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> res = createDelegate(registerTreeChangeListener);
+ registration = res.getKey();
+ event = res.getValue();
}
ActorRef listenerRegistration = createActor(DataTreeChangeListenerRegistrationActor.props(registration));
persistenceId(), listenerRegistration.path());
tellSender(new RegisterDataTreeChangeListenerReply(listenerRegistration));
+ if (event != null) {
+ registration.getInstance().onDataTreeChanged(Collections.singletonList(event));
+ }
}
@Override
- ListenerRegistration<DOMDataTreeChangeListener> createDelegate(final RegisterDataTreeChangeListener message) {
+ Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> createDelegate(final RegisterDataTreeChangeListener message) {
ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
// Notify the listener if notifications should be enabled or not
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.Map.Entry;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
/**
* Intermediate proxy registration returned to the user when we cannot
this.registerTreeChangeListener = Preconditions.checkNotNull(registerTreeChangeListener);
}
- synchronized void createDelegate(final DelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>> factory) {
+ synchronized void createDelegate(final DelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> factory) {
if (!closed) {
- this.delegate = factory.createDelegate(registerTreeChangeListener);
+ final Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> res = factory.createDelegate(registerTreeChangeListener);
+ this.delegate = res.getKey();
+ if (res.getValue() != null) {
+ delegate.getInstance().onDataTreeChanged(Collections.singletonList(res.getValue()));
+ }
}
}
*/
package org.opendaylight.controller.cluster.datastore;
+import java.util.Map.Entry;
+
/**
* Base class for factories instantiating delegates.
*
* <D> delegate type
* <M> message type
+ * <I> initial state type
*/
-abstract class DelegateFactory<M, D> {
- abstract D createDelegate(M message);
+abstract class DelegateFactory<M, D, I> {
+ abstract Entry<D, I> createDelegate(M message);
}
*
* <D> delegate type
* <M> message type
+ * <I> initial state type
*/
-abstract class LeaderLocalDelegateFactory<M, D> extends DelegateFactory<M, D> {
+abstract class LeaderLocalDelegateFactory<M, D, I> extends DelegateFactory<M, D, I> {
private final Shard shard;
protected LeaderLocalDelegateFactory(final Shard shard) {
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+final class ReadOnlyShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeSnapshot> {
+ ReadOnlyShardDataTreeTransaction(final String id, final DataTreeSnapshot snapshot) {
+ super(id, snapshot);
+ }
+
+ @Override
+ void abort() {
+ close();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+
+final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeModification> {
+ private final ShardDataTreeTransactionParent parent;
+
+ protected ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final String id, final DataTreeModification modification) {
+ super(id, modification);
+ this.parent = Preconditions.checkNotNull(parent);
+ }
+
+ @Override
+ void abort() {
+ Preconditions.checkState(close(), "Transaction is already closed");
+
+ parent.abortTransaction(this);
+ }
+
+ DOMStoreThreePhaseCommitCohort ready() {
+ Preconditions.checkState(close(), "Transaction is already closed");
+
+ return parent.finishTransaction(this);
+ }
+}
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
/**
* A Shard represents a portion of the logical data tree <br/>
* <p>
- * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
+ * Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it
* </p>
*/
public class Shard extends RaftActor {
static final String DEFAULT_NAME = "default";
// The state of this Shard
- private final InMemoryDOMDataStore store;
+ private final ShardDataTree store;
/// The name of this shard
private final String name;
private final MessageTracker appendEntriesReplyTracker;
- private final DOMTransactionFactory domTransactionFactory;
-
private final ShardTransactionActorFactory transactionActorFactory;
private final ShardSnapshotCohort snapshotCohort;
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
- store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
- datastoreContext.getDataStoreProperties());
-
- if (schemaContext != null) {
- store.onGlobalContextUpdated(schemaContext);
- }
+ store = new ShardDataTree(schemaContext);
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
datastoreContext.getDataStoreMXBeanType());
- shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
shardMBean.setShardActor(getSelf());
if (isMetricsCaptureEnabled()) {
getContext().become(new MeteringBehavior(this));
}
- domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
-
- commitCoordinator = new ShardCommitCoordinator(domTransactionFactory,
+ commitCoordinator = new ShardCommitCoordinator(store,
TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
- transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext,
+ transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext,
new Dispatchers(context().system().dispatchers()).getDispatcherPath(
Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+ store.closeTransactionChain(closeTransactionChain.getTransactionChainId());
}
private ActorRef createTypedTransactionActor(int transactionType,
}
private void commitWithNewTransaction(final Modification modification) {
- DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
- modification.apply(tx);
+ ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.toString(), null);
+ modification.apply(tx.getSnapshot());
try {
snapshotCohort.syncCommitTransaction(tx);
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
- } catch (InterruptedException | ExecutionException e) {
+ } catch (Exception e) {
shardMBean.incrementFailedTransactionsCount();
LOG.error("{}: Failed to commit", persistenceId(), e);
}
@VisibleForTesting
void updateSchemaContext(final SchemaContext schemaContext) {
- store.onGlobalContextUpdated(schemaContext);
+ store.updateSchemaContext(schemaContext);
}
private boolean isMetricsCaptureEnabled() {
persistenceId(), getId());
}
- domTransactionFactory.closeAllTransactionChains();
+ store.closeAllTransactionChains();
}
}
}
@VisibleForTesting
- public InMemoryDOMDataStore getDataStore() {
+ public ShardDataTree getDataStore() {
return store;
}
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.slf4j.Logger;
/**
private CohortEntry currentCohortEntry;
- private final DOMTransactionFactory transactionFactory;
+ private final ShardDataTree dataTree;
private final Queue<CohortEntry> queuedCohortEntries;
private ReadyTransactionReply readyTransactionReply;
- public ShardCommitCoordinator(DOMTransactionFactory transactionFactory,
+ public ShardCommitCoordinator(ShardDataTree dataTree,
long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
this.queueCapacity = queueCapacity;
this.log = log;
this.name = name;
- this.transactionFactory = transactionFactory;
+ this.dataTree = Preconditions.checkNotNull(dataTree);
cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
removalListener(cacheRemovalListener).build();
CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
if(cohortEntry == null) {
cohortEntry = new CohortEntry(batched.getTransactionID(),
- transactionFactory.<DOMStoreWriteTransaction>newTransaction(
- TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(),
+ dataTree.newReadWriteTransaction(batched.getTransactionID(),
batched.getTransactionChainID()));
cohortCache.put(batched.getTransactionID(), cohortEntry);
}
private final String transactionID;
private DOMStoreThreePhaseCommitCohort cohort;
private final MutableCompositeModification compositeModification;
- private final DOMStoreWriteTransaction transaction;
+ private final ReadWriteShardDataTreeTransaction transaction;
private ActorRef replySender;
private Shard shard;
private long lastAccessTime;
private boolean doImmediateCommit;
- CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) {
+ CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
this.compositeModification = new MutableCompositeModification();
- this.transaction = transaction;
+ this.transaction = Preconditions.checkNotNull(transaction);
this.transactionID = transactionID;
}
void applyModifications(Iterable<Modification> modifications) {
for(Modification modification: modifications) {
compositeModification.addModification(modification);
- modification.apply(transaction);
+ modification.apply(transaction.getSnapshot());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Internal shard state, similar to a DOMStore, but optimized for use in the actor system,
+ * e.g. it does not expose public interfaces and assumes it is only ever called from a
+ * single thread.
+ *
+ * This class is not part of the API contract and is subject to change at any time.
+ */
+@NotThreadSafe
+@VisibleForTesting
+public final class ShardDataTree extends ShardDataTreeTransactionParent {
+ private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
+ private static final ShardDataTreeNotificationManager MANAGER = new ShardDataTreeNotificationManager();
+ private final Map<String, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+ private final ShardDataTreeChangePublisher treeChangePublisher = new ShardDataTreeChangePublisher();
+ private final ListenerTree listenerTree = ListenerTree.create();
+ private final TipProducingDataTree dataTree;
+
+ ShardDataTree(final SchemaContext schemaContext) {
+ dataTree = InMemoryDataTreeFactory.getInstance().create();
+ if (schemaContext != null) {
+ dataTree.setSchemaContext(schemaContext);
+ }
+ }
+
+ TipProducingDataTree getDataTree() {
+ return dataTree;
+ }
+
+ void updateSchemaContext(final SchemaContext schemaContext) {
+ dataTree.setSchemaContext(schemaContext);
+ }
+
+ private ShardDataTreeTransactionChain ensureTransactionChain(final String chainId) {
+ ShardDataTreeTransactionChain chain = transactionChains.get(chainId);
+ if (chain == null) {
+ chain = new ShardDataTreeTransactionChain(chainId, this);
+ transactionChains.put(chainId, chain);
+ }
+
+ return chain;
+ }
+
+ ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final String txId, final String chainId) {
+ if (Strings.isNullOrEmpty(chainId)) {
+ return new ReadOnlyShardDataTreeTransaction(txId, dataTree.takeSnapshot());
+ }
+
+ return ensureTransactionChain(chainId).newReadOnlyTransaction(txId);
+ }
+
+ ReadWriteShardDataTreeTransaction newReadWriteTransaction(final String txId, final String chainId) {
+ if (Strings.isNullOrEmpty(chainId)) {
+ return new ReadWriteShardDataTreeTransaction(this, txId, dataTree.takeSnapshot().newModification());
+ }
+
+ return ensureTransactionChain(chainId).newReadWriteTransaction(txId);
+ }
+
+ void notifyListeners(final DataTreeCandidateTip candidate) {
+ LOG.debug("Notifying listeners on candidate {}", candidate);
+
+ // DataTreeChanges first, as they are more light-weight
+ treeChangePublisher.publishChanges(candidate);
+
+ // DataChanges second, as they are heavier
+ ResolveDataChangeEventsTask.create(candidate, listenerTree).resolve(MANAGER);
+ }
+
+ void closeAllTransactionChains() {
+ for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
+ chain.close();
+ }
+
+ transactionChains.clear();
+ }
+
+ void closeTransactionChain(final String transactionChainId) {
+ final ShardDataTreeTransactionChain chain = transactionChains.remove(transactionChainId);
+ if (chain != null) {
+ chain.close();
+ } else {
+ LOG.warn("Closing non-existent transaction chain {}", transactionChainId);
+ }
+ }
+
+ Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> registerChangeListener(
+ final YangInstanceIdentifier path,
+ final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, final DataChangeScope scope) {
+ final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
+ listenerTree.registerDataChangeListener(path, listener, scope);
+
+ final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
+ final DOMImmutableDataChangeEvent event;
+ if (currentState.isPresent()) {
+ final NormalizedNode<?, ?> data = currentState.get();
+ event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE).setAfter(data).addCreated(path, data).build();
+ } else {
+ event = null;
+ }
+
+ return new SimpleEntry<>(reg, event);
+ }
+
+ Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> registerTreeChangeListener(final YangInstanceIdentifier path,
+ final DOMDataTreeChangeListener listener) {
+ final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangePublisher.registerTreeChangeListener(path, listener);
+
+ final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
+ final DataTreeCandidate event;
+ if (currentState.isPresent()) {
+ event = DataTreeCandidates.fromNormalizedNode(path, currentState.get());
+ } else {
+ event = null;
+ }
+ return new SimpleEntry<>(reg, event);
+ }
+
+ @Override
+ void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
+ // Intentional no-op
+ }
+
+ @Override
+ DOMStoreThreePhaseCommitCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+ final DataTreeModification snapshot = transaction.getSnapshot();
+ snapshot.ready();
+ return new ShardDataTreeCohort(this, snapshot);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.Collection;
+import java.util.Collections;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.spi.DefaultDataTreeCandidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@NotThreadSafe
+final class ShardDataTreeChangePublisher extends AbstractDOMStoreTreeChangePublisher {
+ private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeChangePublisher.class);
+
+ void publishChanges(final DataTreeCandidate candidate) {
+ processCandidateTree(candidate);
+ }
+
+ @Override
+ protected void notifyListeners(final Collection<AbstractDOMDataTreeChangeListenerRegistration<?>> registrations,
+ final YangInstanceIdentifier path, final DataTreeCandidateNode node) {
+ final Collection<DataTreeCandidate> changes = Collections.<DataTreeCandidate>singleton(new DefaultDataTreeCandidate(path, node));
+
+ for (AbstractDOMDataTreeChangeListenerRegistration<?> reg : registrations) {
+ reg.getInstance().onDataTreeChanged(changes);
+ }
+ }
+
+ @Override
+ protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
+ LOG.debug("Registration {} removed", registration);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ShardDataTreeCohort implements DOMStoreThreePhaseCommitCohort {
+ private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeCohort.class);
+ private static final ListenableFuture<Boolean> TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE);
+ private static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
+ private final DataTreeModification transaction;
+ private final ShardDataTree dataTree;
+ private DataTreeCandidateTip candidate;
+
+ ShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction) {
+ this.dataTree = Preconditions.checkNotNull(dataTree);
+ this.transaction = Preconditions.checkNotNull(transaction);
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ try {
+ dataTree.getDataTree().validate(transaction);
+ LOG.debug("Transaction {} validated", transaction);
+ return TRUE_FUTURE;
+ } catch (Exception e) {
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ try {
+ candidate = dataTree.getDataTree().prepare(transaction);
+ /*
+ * FIXME: this is the place where we should be interacting with persistence, specifically by invoking
+ * persist on the candidate (which gives us a Future).
+ */
+ LOG.debug("Transaction {} prepared candidate {}", transaction, candidate);
+ return VOID_FUTURE;
+ } catch (Exception e) {
+ LOG.debug("Transaction {} failed to prepare", transaction, e);
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ // No-op, really
+ return VOID_FUTURE;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ try {
+ dataTree.getDataTree().commit(candidate);
+ } catch (Exception e) {
+ LOG.error("Transaction {} failed to commit", transaction, e);
+ return Futures.immediateFailedFuture(e);
+ }
+
+ LOG.debug("Transaction {} committed, proceeding to notify", transaction);
+ dataTree.notifyListeners(candidate);
+ return VOID_FUTURE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ShardDataTreeNotificationManager implements NotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> {
+ private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeNotificationManager.class);
+
+ @Override
+ public void submitNotification(final DataChangeListenerRegistration<?> listener, final DOMImmutableDataChangeEvent notification) {
+ LOG.debug("Notifying listener {} about {}", listener.getInstance(), notification);
+
+ listener.getInstance().onDataChanged(notification);
+ }
+
+ @Override
+ public void submitNotifications(final DataChangeListenerRegistration<?> listener, final Iterable<DOMImmutableDataChangeEvent> notifications) {
+ final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> instance = listener.getInstance();
+ LOG.debug("Notifying listener {} about {}", instance, notifications);
+
+ for (DOMImmutableDataChangeEvent n : notifications) {
+ instance.onDataChanged(n);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.ForwardingDOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A transaction chain attached to a Shard.
+ */
+@NotThreadSafe
+final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent {
+ private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeTransactionChain.class);
+ private final ShardDataTree dataTree;
+ private final String chainId;
+
+ private ReadWriteShardDataTreeTransaction previousTx;
+ private ReadWriteShardDataTreeTransaction openTransaction;
+ private boolean closed;
+
+ ShardDataTreeTransactionChain(final String chainId, final ShardDataTree dataTree) {
+ this.dataTree = Preconditions.checkNotNull(dataTree);
+ this.chainId = Preconditions.checkNotNull(chainId);
+ }
+
+ private DataTreeSnapshot getSnapshot() {
+ Preconditions.checkState(!closed, "TransactionChain %s has been closed", this);
+ Preconditions.checkState(openTransaction == null, "Transaction %s is open", openTransaction);
+
+ if (previousTx == null) {
+ return dataTree.getDataTree().takeSnapshot();
+ } else {
+ return previousTx.getSnapshot();
+ }
+ }
+
+ ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final String txId) {
+ final DataTreeSnapshot snapshot = getSnapshot();
+ LOG.debug("Allocated read-only transaction {} snapshot {}", txId, snapshot);
+
+ return new ReadOnlyShardDataTreeTransaction(txId, snapshot);
+ }
+
+ ReadWriteShardDataTreeTransaction newReadWriteTransaction(final String txId) {
+ final DataTreeSnapshot snapshot = getSnapshot();
+ LOG.debug("Allocated read-write transaction {} snapshot {}", txId, snapshot);
+
+ openTransaction = new ReadWriteShardDataTreeTransaction(this, txId, snapshot.newModification());
+ return openTransaction;
+ }
+
+ void close() {
+ closed = true;
+ }
+
+ @Override
+ protected void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
+ if (transaction instanceof ReadWriteShardDataTreeTransaction) {
+ Preconditions.checkState(openTransaction != null, "Attempted to abort transaction %s while none is outstanding", transaction);
+ LOG.debug("Aborted transaction {}", transaction);
+ openTransaction = null;
+ }
+ }
+
+ @Override
+ protected DOMStoreThreePhaseCommitCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+ Preconditions.checkState(openTransaction != null, "Attempted to finish transaction %s while none is outstanding", transaction);
+
+ // dataTree is finalizing ready the transaction, we just record it for the next
+ // transaction in chain
+ final DOMStoreThreePhaseCommitCohort delegate = dataTree.finishTransaction(transaction);
+ openTransaction = null;
+ previousTx = transaction;
+ LOG.debug("Committing transaction {}", transaction);
+
+ return new CommitCohort(transaction, delegate);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("id", chainId).toString();
+ }
+
+ private final class CommitCohort extends ForwardingDOMStoreThreePhaseCommitCohort {
+ private final ReadWriteShardDataTreeTransaction transaction;
+ private final DOMStoreThreePhaseCommitCohort delegate;
+
+ CommitCohort(final ReadWriteShardDataTreeTransaction transaction, final DOMStoreThreePhaseCommitCohort delegate) {
+ this.transaction = Preconditions.checkNotNull(transaction);
+ this.delegate = Preconditions.checkNotNull(delegate);
+ }
+
+ @Override
+ protected DOMStoreThreePhaseCommitCohort delegate() {
+ return delegate;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ final ListenableFuture<Void> ret = super.commit();
+
+ Futures.addCallback(ret, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ if (transaction.equals(previousTx)) {
+ previousTx = null;
+ }
+ LOG.debug("Committed transaction {}", transaction);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Transaction {} commit failed, cannot recover", transaction, t);
+ }
+ });
+
+ return ret;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+abstract class ShardDataTreeTransactionParent {
+ abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction);
+ abstract DOMStoreThreePhaseCommitCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
+}
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public class ShardReadTransaction extends ShardTransaction {
private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
- private final DOMStoreReadTransaction transaction;
+ private final AbstractShardDataTreeTransaction<?> transaction;
- public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
+ public ShardReadTransaction(AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
ShardStats shardStats, String transactionID, short clientTxVersion) {
super(shardActor, shardStats, transactionID, clientTxVersion);
this.transaction = transaction;
final ActorRef sender = getSender();
final ActorRef self = getSelf();
- final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = transaction.read(DATASTORE_ROOT);
+ final Optional<NormalizedNode<?, ?>> result = transaction.getSnapshot().readNode(DATASTORE_ROOT);
- Futures.addCallback(future, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
- @Override
- public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
- byte[] serialized = SerializationUtils.serializeNormalizedNode(result.get());
- sender.tell(new CaptureSnapshotReply(serialized), self);
+ byte[] serialized = SerializationUtils.serializeNormalizedNode(result.get());
+ sender.tell(new CaptureSnapshotReply(serialized), self);
- self.tell(PoisonPill.getInstance(), self);
- }
-
- @Override
- public void onFailure(Throwable t) {
- sender.tell(new akka.actor.Status.Failure(t), self);
-
- self.tell(PoisonPill.getInstance(), self);
- }
- });
+ self.tell(PoisonPill.getInstance(), self);
}
@Override
- protected DOMStoreTransaction getDOMStoreTransaction() {
+ protected AbstractShardDataTreeTransaction<?> getDOMStoreTransaction() {
return transaction;
}
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
/**
* @author: syedbahm
* Date: 8/6/14
*/
public class ShardReadWriteTransaction extends ShardWriteTransaction {
- private final DOMStoreReadWriteTransaction transaction;
-
- public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
+ public ShardReadWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor,
ShardStats shardStats, String transactionID, short clientTxVersion) {
super(transaction, shardActor, shardStats, transactionID, clientTxVersion);
- this.transaction = transaction;
}
@Override
public void handleReceive(Object message) throws Exception {
if (message instanceof ReadData) {
- readData(transaction, (ReadData) message, !SERIALIZED_REPLY);
+ readData((ReadData) message, !SERIALIZED_REPLY);
} else if (message instanceof DataExists) {
- dataExists(transaction, (DataExists) message, !SERIALIZED_REPLY);
+ dataExists((DataExists) message, !SERIALIZED_REPLY);
} else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
- readData(transaction, ReadData.fromSerializable(message), SERIALIZED_REPLY);
+ readData(ReadData.fromSerializable(message), SERIALIZED_REPLY);
} else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
- dataExists(transaction, DataExists.fromSerializable(message), SERIALIZED_REPLY);
-
+ dataExists(DataExists.fromSerializable(message), SERIALIZED_REPLY);
} else {
super.handleReceive(message);
}
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.slf4j.Logger;
/**
* committed to the data store in the order the corresponding snapshot or log batch are received
* to preserve data store integrity.
*
- * @author Thomas Panetelis
+ * @author Thomas Pantelis
*/
class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
-
- private final InMemoryDOMDataStore store;
+ private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build();
+ private final ShardDataTree store;
private List<ModificationPayload> currentLogRecoveryBatch;
private final String shardName;
private final Logger log;
- ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) {
+ ShardRecoveryCoordinator(ShardDataTree store, String shardName, Logger log) {
this.store = store;
this.shardName = shardName;
this.log = log;
}
- private void commitTransaction(DOMStoreWriteTransaction transaction) {
- DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+ private void commitTransaction(ReadWriteShardDataTreeTransaction transaction) {
+ DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction);
try {
commitCohort.preCommit().get();
commitCohort.commit().get();
public void applyCurrentLogRecoveryBatch() {
log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
- DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
- for(ModificationPayload payload: currentLogRecoveryBatch) {
+ ReadWriteShardDataTreeTransaction writeTx = store.newReadWriteTransaction(shardName + "-recovery", null);
+ DataTreeModification snapshot = writeTx.getSnapshot();
+ for (ModificationPayload payload : currentLogRecoveryBatch) {
try {
- MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx);
+ MutableCompositeModification.fromSerializable(payload.getModification()).apply(snapshot);
} catch (Exception e) {
log.error("{}: Error extracting ModificationPayload", shardName, e);
}
*/
@Override
public void applyRecoverySnapshot(final byte[] snapshotBytes) {
- log.debug("{}: Applyng recovered sbapshot", shardName);
+ log.debug("{}: Applying recovered snapshot", shardName);
- DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
+ // Intentionally bypass normal transaction to side-step persistence/replication
+ final DataTree tree = store.getDataTree();
+ DataTreeModification writeTx = tree.takeSnapshot().newModification();
NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
- writeTx.write(YangInstanceIdentifier.builder().build(), node);
-
- commitTransaction(writeTx);
+ writeTx.write(ROOT, node);
+ writeTx.ready();
+ try {
+ tree.validate(writeTx);
+ tree.commit(tree.prepare(writeTx));
+ } catch (DataValidationFailedException e) {
+ log.error("{}: Failed to validate recovery snapshot", shardName, e);
+ }
}
}
*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.Preconditions;
import akka.actor.ActorRef;
import java.util.concurrent.ExecutionException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
private int createSnapshotTransactionCounter;
private final ShardTransactionActorFactory transactionActorFactory;
- private final InMemoryDOMDataStore store;
+ private final ShardDataTree store;
private final Logger log;
private final String logId;
- ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, InMemoryDOMDataStore store,
+ ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, ShardDataTree store,
Logger log, String logId) {
this.transactionActorFactory = transactionActorFactory;
- this.store = store;
+ this.store = Preconditions.checkNotNull(store);
this.log = log;
this.logId = logId;
}
log.info("{}: Applying snapshot", logId);
try {
- DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+ ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("snapshot-" + logId, null);
NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
// delete everything first
- transaction.delete(DATASTORE_ROOT);
+ transaction.getSnapshot().delete(DATASTORE_ROOT);
// Add everything from the remote node back
- transaction.write(DATASTORE_ROOT, node);
+ transaction.getSnapshot().write(DATASTORE_ROOT, node);
syncCommitTransaction(transaction);
} catch (InterruptedException | ExecutionException e) {
log.error("{}: An exception occurred when applying snapshot", logId, e);
}
- void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
+ void syncCommitTransaction(final ReadWriteShardDataTreeTransaction transaction)
throws ExecutionException, InterruptedException {
- DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+ DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction);
commitCohort.preCommit().get();
commitCohort.commit().get();
}
import akka.actor.ReceiveTimeout;
import akka.japi.Creator;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
this.clientTxVersion = clientTxVersion;
}
- public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
+ public static Props props(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
- return Props.create(new ShardTransactionCreator(transaction, shardActor,
+ return Props.create(new ShardTransactionCreator(type, transaction, shardActor,
datastoreContext, shardStats, transactionID, txnClientVersion));
}
- protected abstract DOMStoreTransaction getDOMStoreTransaction();
+ protected abstract AbstractShardDataTreeTransaction<?> getDOMStoreTransaction();
protected ActorRef getShardActor() {
return shardActor;
}
private void closeTransaction(boolean sendReply) {
- getDOMStoreTransaction().close();
+ getDOMStoreTransaction().abort();
if(sendReply && returnCloseTransactionReply()) {
getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
getSelf().tell(PoisonPill.getInstance(), getSelf());
}
- protected void readData(DOMStoreReadTransaction transaction, ReadData message,
- final boolean returnSerialized) {
-
- final YangInstanceIdentifier path = message.getPath();
- try {
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future = transaction.read(path);
- Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
- ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
+ private boolean checkClosed(AbstractShardDataTreeTransaction<?> transaction) {
+ final boolean ret = transaction.isClosed();
+ if (ret) {
+ shardStats.incrementFailedReadTransactionsCount();
+ getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")), getSelf());
+ }
+ return ret;
+ }
- sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
+ protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message,
+ final boolean returnSerialized) {
- } catch (Exception e) {
- LOG.debug(String.format("Unexpected error reading path %s", path), e);
- shardStats.incrementFailedReadTransactionsCount();
- sender().tell(new akka.actor.Status.Failure(e), self());
+ if (checkClosed(transaction)) {
+ return;
}
+
+ final YangInstanceIdentifier path = message.getPath();
+ Optional<NormalizedNode<?, ?>> optional = transaction.getSnapshot().readNode(path);
+ ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
+ sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
}
- protected void dataExists(DOMStoreReadTransaction transaction, DataExists message,
+ protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message,
final boolean returnSerialized) {
- final YangInstanceIdentifier path = message.getPath();
- try {
- boolean exists = transaction.exists(path).checkedGet();
- DataExistsReply dataExistsReply = DataExistsReply.create(exists);
- getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
- dataExistsReply, getSelf());
- } catch (ReadFailedException e) {
- getSender().tell(new akka.actor.Status.Failure(e),getSelf());
+ if (checkClosed(transaction)) {
+ return;
}
+
+ final YangInstanceIdentifier path = message.getPath();
+ boolean exists = transaction.getSnapshot().readNode(path).isPresent();
+ DataExistsReply dataExistsReply = DataExistsReply.create(exists);
+ getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
+ dataExistsReply, getSelf());
}
private static class ShardTransactionCreator implements Creator<ShardTransaction> {
private static final long serialVersionUID = 1L;
- final DOMStoreTransaction transaction;
+ final AbstractShardDataTreeTransaction<?> transaction;
final ActorRef shardActor;
final DatastoreContext datastoreContext;
final ShardStats shardStats;
final String transactionID;
final short txnClientVersion;
+ final TransactionType type;
- ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
+ ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
- this.transaction = transaction;
+ this.transaction = Preconditions.checkNotNull(transaction);
this.shardActor = shardActor;
this.shardStats = shardStats;
this.datastoreContext = datastoreContext;
this.transactionID = transactionID;
this.txnClientVersion = txnClientVersion;
+ this.type = type;
}
@Override
public ShardTransaction create() throws Exception {
- ShardTransaction tx;
- if(transaction instanceof DOMStoreReadWriteTransaction) {
- tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
- shardActor, shardStats, transactionID, txnClientVersion);
- } else if(transaction instanceof DOMStoreReadTransaction) {
- tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
- shardStats, transactionID, txnClientVersion);
- } else {
- tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
- shardActor, shardStats, transactionID, txnClientVersion);
+ final ShardTransaction tx;
+ switch (type) {
+ case READ_ONLY:
+ tx = new ShardReadTransaction(transaction, shardActor,
+ shardStats, transactionID, txnClientVersion);
+ break;
+ case READ_WRITE:
+ tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction,
+ shardActor, shardStats, transactionID, txnClientVersion);
+ break;
+ case WRITE_ONLY:
+ tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction,
+ shardActor, shardStats, transactionID, txnClientVersion);
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled transaction type " + type);
}
tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.Preconditions;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.Creator;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
*/
public class ShardTransactionChain extends AbstractUntypedActor {
- private final DOMStoreTransactionChain chain;
+ private final ShardDataTreeTransactionChain chain;
private final DatastoreContext datastoreContext;
private final ShardStats shardStats;
- public ShardTransactionChain(DOMStoreTransactionChain chain, DatastoreContext datastoreContext,
+ public ShardTransactionChain(ShardDataTreeTransactionChain chain, DatastoreContext datastoreContext,
ShardStats shardStats) {
- this.chain = chain;
+ this.chain = Preconditions.checkNotNull(chain);
this.datastoreContext = datastoreContext;
this.shardStats = shardStats;
}
private ActorRef createTypedTransactionActor(CreateTransaction createTransaction) {
String transactionName = "shard-" + createTransaction.getTransactionId();
- if(createTransaction.getTransactionType() ==
- TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
- return getContext().actorOf(
- ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
- datastoreContext, shardStats, createTransaction.getTransactionId(),
- createTransaction.getVersion()), transactionName);
- } else if (createTransaction.getTransactionType() ==
- TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
- return getContext().actorOf(
- ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
- datastoreContext, shardStats, createTransaction.getTransactionId(),
- createTransaction.getVersion()), transactionName);
- } else if (createTransaction.getTransactionType() ==
- TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
- return getContext().actorOf(
- ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
- datastoreContext, shardStats, createTransaction.getTransactionId(),
- createTransaction.getVersion()), transactionName);
- } else {
- throw new IllegalArgumentException (
- "CreateTransaction message has unidentified transaction type=" +
- createTransaction.getTransactionType());
+
+ final TransactionType type = TransactionType.fromInt(createTransaction.getTransactionType());
+ final AbstractShardDataTreeTransaction<?> transaction;
+ switch (type) {
+ case READ_ONLY:
+ transaction = chain.newReadOnlyTransaction(transactionName);
+ break;
+ case READ_WRITE:
+ case WRITE_ONLY:
+ transaction = chain.newReadWriteTransaction(transactionName);
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled transaction type " + type);
}
+
+ return getContext().actorOf(
+ ShardTransaction.props(type, transaction, getShardActor(),
+ datastoreContext, shardStats, createTransaction.getTransactionId(),
+ createTransaction.getVersion()), transactionName);
}
private void createTransaction(CreateTransaction createTransaction) {
createTransaction.getTransactionId()).toSerializable(), getSelf());
}
- public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
+ public static Props props(ShardDataTreeTransactionChain chain, SchemaContext schemaContext,
DatastoreContext datastoreContext, ShardStats shardStats) {
return Props.create(new ShardTransactionChainCreator(chain, datastoreContext, shardStats));
}
private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
private static final long serialVersionUID = 1L;
- final DOMStoreTransactionChain chain;
+ final ShardDataTreeTransactionChain chain;
final DatastoreContext datastoreContext;
final ShardStats shardStats;
-
- ShardTransactionChainCreator(DOMStoreTransactionChain chain, DatastoreContext datastoreContext,
+ ShardTransactionChainCreator(ShardDataTreeTransactionChain chain, DatastoreContext datastoreContext,
ShardStats shardStats) {
this.chain = chain;
this.datastoreContext = datastoreContext;
*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.Preconditions;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
/**
* A factory for creating ShardTransaction actors.
*/
class ShardTransactionActorFactory {
- private final DOMTransactionFactory domTransactionFactory;
+ private final ShardDataTree dataTree;
private final DatastoreContext datastoreContext;
private final String txnDispatcherPath;
private final ShardStats shardMBean;
private final UntypedActorContext actorContext;
private final ActorRef shardActor;
- ShardTransactionActorFactory(DOMTransactionFactory domTransactionFactory, DatastoreContext datastoreContext,
+ ShardTransactionActorFactory(ShardDataTree dataTree, DatastoreContext datastoreContext,
String txnDispatcherPath, ActorRef shardActor, UntypedActorContext actorContext, ShardStats shardMBean) {
- this.domTransactionFactory = domTransactionFactory;
+ this.dataTree = Preconditions.checkNotNull(dataTree);
this.datastoreContext = datastoreContext;
this.txnDispatcherPath = txnDispatcherPath;
this.shardMBean = shardMBean;
ActorRef newShardTransaction(TransactionProxy.TransactionType type, ShardTransactionIdentifier transactionID,
String transactionChainID, short clientVersion) {
+ final AbstractShardDataTreeTransaction<?> transaction;
+ switch (type) {
+ case READ_ONLY:
+ transaction = dataTree.newReadOnlyTransaction(transactionID.toString(), transactionChainID);
+ break;
+ case READ_WRITE:
+ case WRITE_ONLY:
+ transaction = dataTree.newReadWriteTransaction(transactionID.toString(), transactionChainID);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported transaction type " + type);
+ }
- DOMStoreTransaction transaction = domTransactionFactory.newTransaction(type, transactionID.toString(),
- transactionChainID);
-
- return actorContext.actorOf(ShardTransaction.props(transaction, shardActor, datastoreContext, shardMBean,
+ return actorContext.actorOf(ShardTransaction.props(type, transaction, shardActor, datastoreContext, shardMBean,
transactionID.getRemoteTransactionId(), clientVersion).withDispatcher(txnDispatcherPath),
transactionID.toString());
}
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
/**
* @author: syedbahm
private final MutableCompositeModification compositeModification = new MutableCompositeModification();
private int totalBatchedModificationsReceived;
private Exception lastBatchedModificationsException;
- private final DOMStoreWriteTransaction transaction;
+ private final ReadWriteShardDataTreeTransaction transaction;
- public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
+ public ShardWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor,
ShardStats shardStats, String transactionID, short clientTxVersion) {
super(shardActor, shardStats, transactionID, clientTxVersion);
this.transaction = transaction;
}
@Override
- protected DOMStoreTransaction getDOMStoreTransaction() {
+ protected ReadWriteShardDataTreeTransaction getDOMStoreTransaction() {
return transaction;
}
if (message instanceof BatchedModifications) {
batchedModifications((BatchedModifications)message);
} else if (message instanceof ReadyTransaction) {
- readyTransaction(transaction, !SERIALIZED_REPLY, false);
+ readyTransaction(!SERIALIZED_REPLY, false);
} else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
- readyTransaction(transaction, SERIALIZED_REPLY, false);
+ readyTransaction(SERIALIZED_REPLY, false);
} else if(WriteData.isSerializedType(message)) {
- writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY);
+ writeData(WriteData.fromSerializable(message), SERIALIZED_REPLY);
} else if(MergeData.isSerializedType(message)) {
- mergeData(transaction, MergeData.fromSerializable(message), SERIALIZED_REPLY);
+ mergeData(MergeData.fromSerializable(message), SERIALIZED_REPLY);
} else if(DeleteData.isSerializedType(message)) {
- deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY);
+ deleteData(DeleteData.fromSerializable(message), SERIALIZED_REPLY);
} else if (message instanceof GetCompositedModification) {
// This is here for testing only
}
private void batchedModifications(BatchedModifications batched) {
+ if (checkClosed()) {
+ if (batched.isReady()) {
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
+ }
+ return;
+ }
+
try {
for(Modification modification: batched.getModifications()) {
compositeModification.addModification(modification);
- modification.apply(transaction);
+ modification.apply(transaction.getSnapshot());
}
totalBatchedModificationsReceived++;
totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
}
- readyTransaction(transaction, false, batched.isDoCommitOnReady());
+ readyTransaction(false, batched.isDoCommitOnReady());
} else {
getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
}
}
}
- private void writeData(DOMStoreWriteTransaction transaction, WriteData message,
- boolean returnSerialized) {
+ protected final void dataExists(DataExists message, final boolean returnSerialized) {
+ super.dataExists(transaction, message, returnSerialized);
+ }
+
+ protected final void readData(ReadData message, final boolean returnSerialized) {
+ super.readData(transaction, message, returnSerialized);
+ }
+
+ private boolean checkClosed() {
+ if (transaction.isClosed()) {
+ getSender().tell(new akka.actor.Status.Failure(new IllegalStateException("Transaction is closed, no modifications allowed")), getSelf());
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void writeData(WriteData message, boolean returnSerialized) {
LOG.debug("writeData at path : {}", message.getPath());
+ if (checkClosed()) {
+ return;
+ }
compositeModification.addModification(
new WriteModification(message.getPath(), message.getData()));
try {
- transaction.write(message.getPath(), message.getData());
+ transaction.getSnapshot().write(message.getPath(), message.getData());
WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
getSender().tell(returnSerialized ? writeDataReply.toSerializable(message.getVersion()) :
writeDataReply, getSelf());
}
}
- private void mergeData(DOMStoreWriteTransaction transaction, MergeData message,
- boolean returnSerialized) {
+ private void mergeData(MergeData message, boolean returnSerialized) {
LOG.debug("mergeData at path : {}", message.getPath());
+ if (checkClosed()) {
+ return;
+ }
compositeModification.addModification(
new MergeModification(message.getPath(), message.getData()));
try {
- transaction.merge(message.getPath(), message.getData());
+ transaction.getSnapshot().merge(message.getPath(), message.getData());
MergeDataReply mergeDataReply = MergeDataReply.INSTANCE;
getSender().tell(returnSerialized ? mergeDataReply.toSerializable(message.getVersion()) :
mergeDataReply, getSelf());
}
}
- private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message,
- boolean returnSerialized) {
+ private void deleteData(DeleteData message, boolean returnSerialized) {
LOG.debug("deleteData at path : {}", message.getPath());
+ if (checkClosed()) {
+ return;
+ }
compositeModification.addModification(new DeleteModification(message.getPath()));
try {
- transaction.delete(message.getPath());
+ transaction.getSnapshot().delete(message.getPath());
DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE;
getSender().tell(returnSerialized ? deleteDataReply.toSerializable(message.getVersion()) :
deleteDataReply, getSelf());
- }catch(Exception e){
+ } catch(Exception e) {
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
}
- private void readyTransaction(DOMStoreWriteTransaction transaction, boolean returnSerialized,
- boolean doImmediateCommit) {
+ private void readyTransaction(boolean returnSerialized, boolean doImmediateCommit) {
String transactionID = getTransactionID();
LOG.debug("readyTransaction : {}", transactionID);
import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
/**
* DeleteModification store all the parameters required to delete a path from the data tree
transaction.delete(getPath());
}
+ @Override
+ public void apply(DataTreeModification transaction) {
+ transaction.delete(getPath());
+ }
+
@Override
public byte getType() {
return DELETE;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
/**
* MergeModification stores all the parameters required to merge data into the specified path
transaction.merge(getPath(), getData());
}
+ @Override
+ public void apply(final DataTreeModification transaction) {
+ transaction.merge(getPath(), getData());
+ }
+
@Override
public byte getType() {
return MERGE;
import java.io.Externalizable;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
/**
* Represents a modification to the data store.
*/
void apply(DOMStoreWriteTransaction transaction);
+ /**
+ * Apply the modification to the specified transaction
+ *
+ * @param transaction
+ */
+ void apply(DataTreeModification transaction);
+
byte getType();
@Deprecated
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
/**
* MutableCompositeModification is just a mutable version of a
}
}
+ @Override
+ public void apply(DataTreeModification transaction) {
+ for (Modification modification : modifications) {
+ modification.apply(transaction);
+ }
+ }
+
@Override
public byte getType() {
return COMPOSITE;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
/**
* WriteModification stores all the parameters required to write data to the specified path
transaction.write(getPath(), data);
}
+ @Override
+ public void apply(final DataTreeModification transaction) {
+ transaction.write(getPath(), data);
+ }
+
public NormalizedNode<?, ?> getData() {
return data;
}
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
}
protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
- final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+ final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
final MutableCompositeModification modification) {
return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
}
protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
- final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+ final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
final MutableCompositeModification modification,
final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
- DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
- tx.write(path, data);
- DOMStoreThreePhaseCommitCohort cohort = createDelegatingMockCohort(cohortName, tx.ready(), preCommit);
+ ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction("setup-mock-" + cohortName, null);
+ tx.getSnapshot().write(path, data);
+ DOMStoreThreePhaseCommitCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit);
modification.addModification(new WriteModification(path, data));
public static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
throws ExecutionException, InterruptedException {
- return readStore(shard.underlyingActor().getDataStore(), id);
+ return readStore(shard.underlyingActor().getDataStore().getDataTree(), id);
}
- public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
- throws ExecutionException, InterruptedException {
- DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
-
- CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
- transaction.read(id);
+ public static NormalizedNode<?,?> readStore(final DataTree store, final YangInstanceIdentifier id) {
+ DataTreeSnapshot transaction = store.takeSnapshot();
- Optional<NormalizedNode<?, ?>> optional = future.get();
+ Optional<NormalizedNode<?, ?>> optional = transaction.readNode(id);
NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
- transaction.close();
-
return node;
}
public static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
- final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
+ final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
writeToStore(shard.underlyingActor().getDataStore(), id, node);
}
- public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
- final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
- DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+ public static void writeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
+ final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
+ ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null);
- transaction.write(id, node);
+ transaction.getSnapshot().write(id, node);
+ DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
+ cohort.canCommit().get();
+ cohort.preCommit().get();
+ cohort.commit();
+ }
- DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
- commitCohort.preCommit().get();
- commitCohort.commit().get();
+ public static void writeToStore(final DataTree store, final YangInstanceIdentifier id,
+ final NormalizedNode<?,?> node) throws DataValidationFailedException {
+ DataTreeModification transaction = store.takeSnapshot().newModification();
+
+ transaction.write(id, node);
+ transaction.ready();
+ store.validate(transaction);
+ final DataTreeCandidate candidate = store.prepare(transaction);
+ store.commit(candidate);
}
@SuppressWarnings("serial")
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
"testApplySnapshot");
- InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
- store.onGlobalContextUpdated(SCHEMA_CONTEXT);
+ DataTree store = InMemoryDataTreeFactory.getInstance().create();
+ store.setSchemaContext(SCHEMA_CONTEXT);
writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
// Set up the InMemorySnapshotStore.
- InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
- testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
+ DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
+ testStore.setSchemaContext(SCHEMA_CONTEXT);
writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
// Setup 3 simulated transactions with mock cohorts backed by real cohorts.
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+ ShardDataTree dataStore = shard.underlyingActor().getDataStore();
String transactionID1 = "tx1";
MutableCompositeModification modification1 = new MutableCompositeModification();
waitUntilLeader(shard);
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+ ShardDataTree dataStore = shard.underlyingActor().getDataStore();
String transactionID = "tx1";
MutableCompositeModification modification = new MutableCompositeModification();
waitUntilLeader(shard);
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+ ShardDataTree dataStore = shard.underlyingActor().getDataStore();
// Setup a simulated transactions with a mock cohort.
waitUntilLeader(shard);
final FiniteDuration duration = duration("5 seconds");
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+ ShardDataTree dataStore = shard.underlyingActor().getDataStore();
final String transactionID = "tx1";
Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
final FiniteDuration duration = duration("5 seconds");
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+ ShardDataTree dataStore = shard.underlyingActor().getDataStore();
writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
writeToStore(shard, TestModel.OUTER_LIST_PATH,
final FiniteDuration duration = duration("5 seconds");
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+ ShardDataTree dataStore = shard.underlyingActor().getDataStore();
String transactionID1 = "tx1";
MutableCompositeModification modification1 = new MutableCompositeModification();
import akka.actor.Props;
import akka.pattern.AskTimeoutException;
import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
-import org.junit.BeforeClass;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
* @author Basheeruddin Ahmed <syedbahm@cisco.com>
*/
public class ShardTransactionFailureTest extends AbstractActorTest {
- private static final InMemoryDOMDataStore store =
- new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
-
private static final SchemaContext testSchemaContext =
- TestModel.createTestContext();
+ TestModel.createTestContext();
+ private static final TransactionType RO = TransactionType.READ_ONLY;
+ private static final TransactionType RW = TransactionType.READ_WRITE;
+ private static final TransactionType WO = TransactionType.WRITE_ONLY;
+
+ private static final ShardDataTree store = new ShardDataTree(testSchemaContext);
private static final ShardIdentifier SHARD_IDENTIFIER =
ShardIdentifier.builder().memberName("member-1")
private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
- @BeforeClass
- public static void staticSetup() {
- store.onGlobalContextUpdated(testSchemaContext);
- }
-
private ActorRef createShard(){
return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<String, String>emptyMap(), datastoreContext,
TestModel.createTestContext()));
throws Throwable {
final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+ final Props props = ShardTransaction.props(RO, store.newReadOnlyTransaction("test-txn", null), shard,
datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
akka.pattern.Patterns.ask(subject, readData, 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
- subject.underlyingActor().getDOMStoreTransaction().close();
+ subject.underlyingActor().getDOMStoreTransaction().abort();
future = akka.pattern.Patterns.ask(subject, readData, 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
throws Throwable {
final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
akka.pattern.Patterns.ask(subject, readData, 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
- subject.underlyingActor().getDOMStoreTransaction().close();
+ subject.underlyingActor().getDOMStoreTransaction().abort();
future = akka.pattern.Patterns.ask(subject, readData, 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
throws Throwable {
final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
akka.pattern.Patterns.ask(subject, dataExists, 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
- subject.underlyingActor().getDOMStoreTransaction().close();
+ subject.underlyingActor().getDOMStoreTransaction().abort();
future = akka.pattern.Patterns.ask(subject, dataExists, 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+ final Props props = ShardTransaction.props(WO, store.newReadWriteTransaction("test-txn", null), shard,
datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
-import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public class ShardTransactionTest extends AbstractActorTest {
private static final SchemaContext testSchemaContext = TestModel.createTestContext();
+ private static final TransactionType RO = TransactionType.READ_ONLY;
+ private static final TransactionType RW = TransactionType.READ_WRITE;
+ private static final TransactionType WO = TransactionType.WRITE_ONLY;
private static final ShardIdentifier SHARD_IDENTIFIER =
ShardIdentifier.builder().memberName("member-1")
private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
- private final InMemoryDOMDataStore store =
- new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+ private final ShardDataTree store = new ShardDataTree(testSchemaContext);
- @Before
- public void setup() {
- store.onGlobalContextUpdated(testSchemaContext);
- }
+ private int txCounter = 0;
- private ActorRef createShard(){
+ private ActorRef createShard() {
return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
}
- private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
- return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
+ private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
+ return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
}
- private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
- return newTransactionActor(transaction, null, name, version);
+ private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
+ return newTransactionActor(type, transaction, null, name, version);
}
- private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
- return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
+ private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
+ return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
}
- private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
+ private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
short version) {
- Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
+ Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
datastoreContext, shardStats, "txn", version);
return getSystem().actorOf(props, name);
}
+ private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
+ return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
+ }
+
+ private ReadWriteShardDataTreeTransaction readWriteTransaction() {
+ return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
+ }
+
@Test
public void testOnReceiveReadData() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
- testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
+ testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
- testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
+ testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
}
private void testOnReceiveReadData(final ActorRef transaction) {
final ActorRef shard = createShard();
testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
- store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
+ RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
- store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
+ RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
}
private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
@Test
public void testOnReceiveReadDataHeliumR1() throws Exception {
new JavaTestKit(getSystem()) {{
- ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+ ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
"testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
- testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
+ testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
"testDataExistsPositiveRO"));
- testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
+ testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
"testDataExistsPositiveRW"));
}
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
- testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
+ testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
"testDataExistsNegativeRO"));
- testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
+ testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
"testDataExistsNegativeRW"));
}
}
@Test
- public void testOnReceiveWriteData() throws Exception {
+ public void testOnReceiveWriteData() {
new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
"testOnReceiveWriteData");
transaction.tell(new WriteData(TestModel.TEST_PATH,
}
@Test
- public void testOnReceiveHeliumR1WriteData() throws Exception {
+ public void testOnReceiveHeliumR1WriteData() {
new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
"testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
}
@Test
- public void testOnReceiveMergeData() throws Exception {
+ public void testOnReceiveMergeData() {
new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+ final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
"testMergeData");
transaction.tell(new MergeData(TestModel.TEST_PATH,
@Test
public void testOnReceiveHeliumR1MergeData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
"testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
@Test
public void testOnReceiveDeleteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
"testDeleteData");
transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
public void testOnReceiveBatchedModifications() throws Exception {
new JavaTestKit(getSystem()) {{
- DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
- final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
+ ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
+ DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
+ ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
+ final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
YangInstanceIdentifier writePath = TestModel.TEST_PATH;
NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
assertEquals("getPath", deletePath, delete.getPath());
- InOrder inOrder = Mockito.inOrder(mockWriteTx);
- inOrder.verify(mockWriteTx).write(writePath, writeData);
- inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
- inOrder.verify(mockWriteTx).delete(deletePath);
+ InOrder inOrder = Mockito.inOrder(mockModification);
+ inOrder.verify(mockModification).write(writePath, writeData);
+ inOrder.verify(mockModification).merge(mergePath, mergeData);
+ inOrder.verify(mockModification).delete(deletePath);
}};
}
public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
"testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
JavaTestKit watcher = new JavaTestKit(getSystem());
public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
"testOnReceiveBatchedModificationsReadyWithImmediateCommit");
JavaTestKit watcher = new JavaTestKit(getSystem());
public void testOnReceiveBatchedModificationsFailure() throws Throwable {
new JavaTestKit(getSystem()) {{
- DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
- final ActorRef transaction = newTransactionActor(mockWriteTx,
+ ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
+ DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
+ ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
+ final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
"testOnReceiveBatchedModificationsFailure");
JavaTestKit watcher = new JavaTestKit(getSystem());
YangInstanceIdentifier path = TestModel.TEST_PATH;
ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doThrow(new TestException()).when(mockWriteTx).write(path, node);
+ doThrow(new TestException()).when(mockModification).write(path, node);
BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
batched.addModification(new WriteModification(path, node));
public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
"testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
JavaTestKit watcher = new JavaTestKit(getSystem());
@Test
public void testOnReceivePreLithiumReadyTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+ final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
"testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
JavaTestKit watcher = new JavaTestKit(getSystem());
// test
new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+ final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
"testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
JavaTestKit watcher = new JavaTestKit(getSystem());
@Test
public void testOnReceiveCreateSnapshot() throws Exception {
new JavaTestKit(getSystem()) {{
- ShardTest.writeToStore(store, TestModel.TEST_PATH,
+ ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
+ NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
YangInstanceIdentifier.builder().build());
- final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+ final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
"testOnReceiveCreateSnapshot");
watch(transaction);
@Test
public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+ final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
"testReadWriteTxOnReceiveCloseTransaction");
watch(transaction);
@Test
public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
"testWriteTxOnReceiveCloseTransaction");
watch(transaction);
@Test
public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+ final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
"testReadOnlyTxOnReceiveCloseTransaction");
watch(transaction);
@Test(expected=UnknownMessageException.class)
public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
final ActorRef shard = createShard();
- final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+ final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
500, TimeUnit.MILLISECONDS).build();
new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+ final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
"testShardTransactionInactivity");
watch(transaction);
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import org.mockito.InOrder;
import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.ShardDataTree;
import org.opendaylight.controller.cluster.datastore.ShardTestKit;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
- InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
- store.onGlobalContextUpdated(SCHEMA_CONTEXT);
+ DataTree store = InMemoryDataTreeFactory.getInstance().create();
+ store.setSchemaContext(SCHEMA_CONTEXT);
writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@Test
public void testHelium2VersionRecovery() throws Exception {
- // Set up the InMemorySnapshotStore.
-
- InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
- testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
+ DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
+ testStore.setSchemaContext(SCHEMA_CONTEXT);
writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
// Setup 3 simulated transactions with mock cohorts backed by real cohorts.
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+ ShardDataTree dataStore = shard.underlyingActor().getDataStore();
String transactionID1 = "tx1";
MutableCompositeModification modification1 = new MutableCompositeModification();