From: Robert Varga Date: Fri, 24 Apr 2015 11:33:24 +0000 (+0200) Subject: CDS: Implement front-end support for local transactions X-Git-Tag: release/beryllium~587 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=daaef05cbf70e6cbec9af181258faead6d9620a6 CDS: Implement front-end support for local transactions Implemented support on the TransactionProxy front-end for optimizations where the Shard is local to the controller instance. In this mode, the Shard's DataTree obtained from the FindPrimaryShard message is used to prepare the modifications completely on the front-end. On ready, the DataTreeModification instance is passed to the Shard via the ReadyLocalTransaction message. The Shard then does a direct commit, eliding the 3PC from the front-end (it's a no-op as it is for remote write-only txns). TransactionContext instances are now obtained via an AbstractTransactionContextFactory passed to TransactionProxy of which there are 2 kinds: one for single, unchained txns and one for chained tnxs. Each creates a different DOM transaction instance to handle preperation of modifications. The DOM transacton is wrapped in a LocalTransactionContext which the TransactionProxy interfaces with. Change-Id: I0322b586f394e4b6c7793b8287ac804b41964378 Signed-off-by: Robert Varga Signed-off-by: Tom Pantelis (cherry picked from commit 2f7c93174d7834a4c4aedacc9b88aa53a5a0422c) --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java index d482e28401..0e2449e4b8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java @@ -36,13 +36,23 @@ import scala.concurrent.Future; */ public class InMemoryJournal extends AsyncWriteJournal { + private static class WriteMessagesComplete { + final CountDownLatch latch; + final Class ofType; + + public WriteMessagesComplete(int count, Class ofType) { + this.latch = new CountDownLatch(count); + this.ofType = ofType; + } + } + static final Logger LOG = LoggerFactory.getLogger(InMemoryJournal.class); private static final Map> journals = new ConcurrentHashMap<>(); private static final Map deleteMessagesCompleteLatches = new ConcurrentHashMap<>(); - private static final Map writeMessagesCompleteLatches = new ConcurrentHashMap<>(); + private static final Map writeMessagesComplete = new ConcurrentHashMap<>(); private static final Map blockReadMessagesLatches = new ConcurrentHashMap<>(); @@ -107,7 +117,7 @@ public class InMemoryJournal extends AsyncWriteJournal { } public static void waitForWriteMessagesComplete(String persistenceId) { - if(!Uninterruptibles.awaitUninterruptibly(writeMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS)) { + if(!Uninterruptibles.awaitUninterruptibly(writeMessagesComplete.get(persistenceId).latch, 5, TimeUnit.SECONDS)) { throw new AssertionError("Journal write messages did not complete"); } } @@ -117,7 +127,11 @@ public class InMemoryJournal extends AsyncWriteJournal { } public static void addWriteMessagesCompleteLatch(String persistenceId, int count) { - writeMessagesCompleteLatches.put(persistenceId, new CountDownLatch(count)); + writeMessagesComplete.put(persistenceId, new WriteMessagesComplete(count, null)); + } + + public static void addWriteMessagesCompleteLatch(String persistenceId, int count, Class ofType) { + writeMessagesComplete.put(persistenceId, new WriteMessagesComplete(count, ofType)); } public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) { @@ -193,9 +207,11 @@ public class InMemoryJournal extends AsyncWriteJournal { journal.put(repr.sequenceNr(), repr.payload()); } - CountDownLatch latch = writeMessagesCompleteLatches.get(repr.persistenceId()); - if(latch != null) { - latch.countDown(); + WriteMessagesComplete complete = writeMessagesComplete.get(repr.persistenceId()); + if(complete != null) { + if(complete.ofType == null || complete.ofType.equals(repr.payload().getClass())) { + complete.latch.countDown(); + } } } diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf index 8d65e59dfe..a4d554c2bf 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf @@ -17,10 +17,12 @@ odl-cluster-data { serializers { java = "akka.serialization.JavaSerializer" proto = "akka.remote.serialization.ProtobufSerializer" + readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer" } serialization-bindings { "com.google.protobuf.Message" = proto + "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal } default-dispatcher { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java new file mode 100644 index 0000000000..78e059c798 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSelection; +import akka.dispatch.OnComplete; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.util.Try; + +/** + * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local + * transaction factories. + */ +abstract class AbstractTransactionContextFactory + implements ShardInfoListener, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class); + + protected static final AtomicLong TX_COUNTER = new AtomicLong(); + + private final ConcurrentMap knownLocal = new ConcurrentHashMap<>(); + private final ActorContext actorContext; + + protected AbstractTransactionContextFactory(final ActorContext actorContext) { + this.actorContext = Preconditions.checkNotNull(actorContext); + } + + final ActorContext getActorContext() { + return actorContext; + } + + private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent, final String shardName) { + final LocalTransactionFactory local = knownLocal.get(shardName); + if (local != null) { + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} - Creating local component for shard {} using factory {}", + parent.getIdentifier(), shardName, local); + } + return createLocalTransactionContext(local, parent); + } + + return null; + } + + private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent, + String shardName, TransactionContextWrapper transactionContextAdapter) { + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(), + primaryShardInfo.getPrimaryShardActor(), shardName); + } + + updateShardInfo(shardName, primaryShardInfo); + + TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName); + if(localContext != null) { + transactionContextAdapter.executePriorTransactionOperations(localContext); + } else { + RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextAdapter, + parent, shardName); + remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor()); + } + } + + private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent, + String shardName, TransactionContextWrapper transactionContextAdapter) { + LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure); + + transactionContextAdapter.executePriorTransactionOperations(new NoOpTransactionContext(failure, + parent.getIdentifier(), parent.getLimiter())); + } + + final TransactionContextWrapper newTransactionAdapter(final TransactionProxy parent, final String shardName) { + final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getIdentifier()); + + Future findPrimaryFuture = findPrimaryShard(shardName); + if(findPrimaryFuture.isCompleted()) { + Try maybe = findPrimaryFuture.value().get(); + if(maybe.isSuccess()) { + onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextAdapter); + } else { + onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextAdapter); + } + } else { + findPrimaryFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) { + if (failure == null) { + onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextAdapter); + } else { + onFindPrimaryShardFailure(failure, parent, shardName, transactionContextAdapter); + } + } + }, actorContext.getClientDispatcher()); + } + + return transactionContextAdapter; + } + + private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) { + final Optional maybeDataTree = primaryShardInfo.getLocalShardDataTree(); + if (maybeDataTree.isPresent()) { + knownLocal.put(shardName, factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get())); + LOG.debug("Shard {} resolved to local data tree", shardName); + } + } + + @Override + public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) { + final F existing = knownLocal.get(shardName); + if (existing != null) { + if (primaryShardInfo != null) { + final Optional maybeDataTree = primaryShardInfo.getLocalShardDataTree(); + if (maybeDataTree.isPresent()) { + final DataTree newDataTree = maybeDataTree.get(); + final DataTree oldDataTree = dataTreeForFactory(existing); + if (!oldDataTree.equals(newDataTree)) { + final F newChain = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), newDataTree); + knownLocal.replace(shardName, existing, newChain); + LOG.debug("Replaced shard {} local data tree to {}", shardName, newDataTree); + } + + return; + } + } + if (knownLocal.remove(shardName, existing)) { + LOG.debug("Shard {} invalidated data tree {}", shardName, existing); + } else { + LOG.debug("Shard {} failed to invalidate data tree {} ... strange", shardName, existing); + } + } + } + + protected String getMemberName() { + String memberName = getActorContext().getCurrentMemberName(); + if (memberName == null) { + memberName = "UNKNOWN-MEMBER"; + } + + return memberName; + } + + /** + * Create an identifier for the next TransactionProxy attached to this component + * factory. + * @return Transaction identifier, may not be null. + */ + protected abstract TransactionIdentifier nextIdentifier(); + + /** + * Find the primary shard actor. + * + * @param shardName Shard name + * @return Future containing shard information. + */ + protected abstract Future findPrimaryShard(String shardName); + + /** + * Create local transaction factory for specified shard, backed by specified shard leader + * and data tree instance. + * + * @param shardName + * @param shardLeader + * @param dataTree Backing data tree instance. The data tree may only be accessed in + * read-only manner. + * @return Transaction factory for local use. + */ + protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, DataTree dataTree); + + /** + * Extract the backing data tree from a particular factory. + * + * @param factory Transaction factory + * @return Backing data tree + */ + protected abstract DataTree dataTreeForFactory(F factory); + + /** + * Callback invoked from child transactions to push any futures, which need to + * be waited for before the next transaction is allocated. + * @param cohortFutures Collection of futures + */ + protected abstract void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection> cohortFutures); + + private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) { + return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier())); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java deleted file mode 100644 index 9a800c1659..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore; - -import akka.dispatch.OnComplete; -import java.util.List; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.Future; -import scala.concurrent.Promise; - -final class ChainedTransactionProxy extends TransactionProxy { - private static final Logger LOG = LoggerFactory.getLogger(ChainedTransactionProxy.class); - - /** - * Stores the ready Futures from the previous Tx in the chain. - */ - private final List> previousReadyFutures; - - /** - * Stores the ready Futures from this transaction when it is readied. - */ - private volatile List> readyFutures; - - ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType, - String transactionChainId, List> previousReadyFutures) { - super(actorContext, transactionType, transactionChainId); - this.previousReadyFutures = previousReadyFutures; - } - - List> getReadyFutures() { - return readyFutures; - } - - boolean isReady() { - return readyFutures != null; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public AbstractThreePhaseCommitCohort ready() { - final AbstractThreePhaseCommitCohort ret = super.ready(); - readyFutures = (List)ret.getCohortFutures(); - LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), - readyFutures.size(), getTransactionChainId()); - return ret; - } - - /** - * This method is overridden to ensure the previous Tx's ready operations complete - * before we initiate the next Tx in the chain to avoid creation failures if the - * previous Tx's ready operations haven't completed yet. - */ - @Override - protected Future sendFindPrimaryShardAsync(final String shardName) { - // Check if there are any previous ready Futures, otherwise let the super class handle it. - if(previousReadyFutures.isEmpty()) { - return super.sendFindPrimaryShardAsync(shardName); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}", - previousReadyFutures.size(), getIdentifier(), getTransactionChainId()); - } - - // Combine the ready Futures into 1. - Future> combinedFutures = akka.dispatch.Futures.sequence( - previousReadyFutures, getActorContext().getClientDispatcher()); - - // Add a callback for completion of the combined Futures. - final Promise returnPromise = akka.dispatch.Futures.promise(); - OnComplete> onComplete = new OnComplete>() { - @Override - public void onComplete(Throwable failure, Iterable notUsed) { - if(failure != null) { - // A Ready Future failed so fail the returned Promise. - returnPromise.failure(failure); - } else { - LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}", - getIdentifier(), getTransactionChainId()); - - // Send the FindPrimaryShard message and use the resulting Future to complete the - // returned Promise. - returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName)); - } - } - }; - - combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher()); - - return returnPromise.future(); - } -} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 8051f7d49b..18266658d3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -63,6 +63,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, private final String type; + private final TransactionContextFactory txContextFactory; + public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext) { Preconditions.checkNotNull(actorSystem, "actorSystem should not be null"); @@ -85,6 +87,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR; + this.txContextFactory = TransactionContextFactory.create(actorContext); datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(datastoreContext.getDataStoreMXBeanType()); datastoreConfigMXBean.setContext(datastoreContext); @@ -96,10 +99,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, public DistributedDataStore(ActorContext actorContext) { this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); + this.txContextFactory = TransactionContextFactory.create(actorContext); this.type = UNKNOWN_TYPE; this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR; - } public void setCloseable(AutoCloseable closeable) { @@ -144,24 +147,24 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, @Override public DOMStoreTransactionChain createTransactionChain() { - return new TransactionChainProxy(actorContext); + return txContextFactory.createTransactionChain(); } @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - return new TransactionProxy(actorContext, TransactionType.READ_ONLY); + return new TransactionProxy(txContextFactory, TransactionType.READ_ONLY); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { actorContext.acquireTxCreationPermit(); - return new TransactionProxy(actorContext, TransactionType.WRITE_ONLY); + return new TransactionProxy(txContextFactory, TransactionType.WRITE_ONLY); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { actorContext.acquireTxCreationPermit(); - return new TransactionProxy(actorContext, TransactionType.READ_WRITE); + return new TransactionProxy(txContextFactory, TransactionType.READ_WRITE); } @Override @@ -182,7 +185,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, datastoreConfigMXBean.unregisterMBean(); datastoreInfoMXBean.unregisterMBean(); - if(closeable != null) { + if (closeable != null) { try { closeable.close(); } catch (Exception e) { @@ -190,6 +193,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, } } + txContextFactory.close(); actorContext.shutdown(); DistributedDataStoreFactory.destroyInstance(this); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java new file mode 100644 index 0000000000..0ea1029a9d --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSelection; +import akka.dispatch.OnComplete; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +/** + * Fake {@link DOMStoreThreePhaseCommitCohort} instantiated for local transactions to conform with the DOM + * transaction APIs. It is only used to hold the data from a local DOM transaction ready operation and to + * initiate direct or coordinated commits from the front-end by sending the ReadyLocalTransaction message. + * It is not actually called by the front-end to perform 3PC thus the canCommit/preCommit/commit methods + * are no-ops. + */ +abstract class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort { + private static final Logger LOG = LoggerFactory.getLogger(LocalThreePhaseCommitCohort.class); + + private final SnapshotBackedWriteTransaction transaction; + private final DataTreeModification modification; + private final ActorContext actorContext; + private final ActorSelection leader; + + protected LocalThreePhaseCommitCohort(final ActorContext actorContext, final ActorSelection leader, + final SnapshotBackedWriteTransaction transaction, final DataTreeModification modification) { + this.actorContext = Preconditions.checkNotNull(actorContext); + this.leader = Preconditions.checkNotNull(leader); + this.transaction = Preconditions.checkNotNull(transaction); + this.modification = Preconditions.checkNotNull(modification); + } + + private Future initiateCommit(final boolean immediate) { + final ReadyLocalTransaction message = new ReadyLocalTransaction(transaction.getIdentifier().toString(), + modification, immediate); + return actorContext.executeOperationAsync(leader, message); + } + + Future initiateCoordinatedCommit() { + final Future messageFuture = initiateCommit(false); + final Future ret = TransactionReadyReplyMapper.transform(messageFuture, actorContext, + transaction.getIdentifier()); + ret.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final ActorSelection success) throws Throwable { + if (failure != null) { + LOG.info("Failed to prepare transaction {} on backend", transaction.getIdentifier(), failure); + transactionAborted(transaction); + return; + } + + LOG.debug("Transaction {} resolved to actor {}", transaction.getIdentifier(), success); + } + }, actorContext.getClientDispatcher()); + + return ret; + } + + Future initiateDirectCommit() { + final Future messageFuture = initiateCommit(true); + messageFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object message) throws Throwable { + if (failure != null) { + LOG.error("Failed to prepare transaction {} on backend", transaction.getIdentifier(), failure); + transactionAborted(transaction); + } else if (CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(message)) { + LOG.debug("Transaction {} committed successfully", transaction.getIdentifier()); + transactionCommitted(transaction); + } else { + LOG.error("Transaction {} resulted in unhandled message type {}, aborting", message.getClass()); + transactionAborted(transaction); + } + } + }, actorContext.getClientDispatcher()); + + return messageFuture; + } + + @Override + public final ListenableFuture canCommit() { + // Intended no-op + throw new UnsupportedOperationException(); + } + + @Override + public final ListenableFuture preCommit() { + // Intended no-op + throw new UnsupportedOperationException(); + } + + @Override + public final ListenableFuture abort() { + // Intended no-op + throw new UnsupportedOperationException(); + } + + @Override + public final ListenableFuture commit() { + // Intended no-op + throw new UnsupportedOperationException(); + } + + protected abstract void transactionAborted(SnapshotBackedWriteTransaction transaction); + protected abstract void transactionCommitted(SnapshotBackedWriteTransaction transaction); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java new file mode 100644 index 0000000000..93e09db32a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSelection; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.sal.core.spi.data.AbstractSnapshotBackedTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; + +/** + * Transaction chain instantiated on top of a locally-available DataTree. It does not instantiate + * a transaction in the leader and rather chains transactions on top of themselves. + */ +final class LocalTransactionChain extends AbstractSnapshotBackedTransactionChain + implements LocalTransactionFactory { + private static final Throwable ABORTED = new Throwable("Transaction aborted"); + private final TransactionChainProxy parent; + private final ActorSelection leader; + private final DataTree tree; + + LocalTransactionChain(final TransactionChainProxy parent, final ActorSelection leader, final DataTree tree) { + this.parent = Preconditions.checkNotNull(parent); + this.leader = Preconditions.checkNotNull(leader); + this.tree = Preconditions.checkNotNull(tree); + } + + DataTree getDataTree() { + return tree; + } + + @Override + protected TransactionIdentifier nextTransactionIdentifier() { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean getDebugTransactions() { + return false; + } + + @Override + protected DataTreeSnapshot takeSnapshot() { + return tree.takeSnapshot(); + } + + @Override + protected DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction transaction, final DataTreeModification modification) { + return new LocalThreePhaseCommitCohort(parent.getActorContext(), leader, transaction, modification) { + @Override + protected void transactionAborted(final SnapshotBackedWriteTransaction transaction) { + onTransactionFailed(transaction, ABORTED); + } + + @Override + protected void transactionCommitted(final SnapshotBackedWriteTransaction transaction) { + onTransactionCommited(transaction); + } + }; + } + + @Override + public DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier) { + return super.newReadWriteTransaction(identifier); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java new file mode 100644 index 0000000000..01a778f8e4 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSelection; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import scala.concurrent.Future; + +/** + * Processes front-end transaction operations locally before being committed to the destination shard. + * Instances of this class are used when the destination shard is local to the caller. + * + * @author Thomas Pantelis + */ +final class LocalTransactionContext extends AbstractTransactionContext { + private final DOMStoreReadWriteTransaction delegate; + + LocalTransactionContext(TransactionIdentifier identifier, DOMStoreReadWriteTransaction delegate) { + super(identifier); + this.delegate = delegate; + } + + @Override + public void writeData(YangInstanceIdentifier path, NormalizedNode data) { + delegate.write(path, data); + } + + @Override + public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { + delegate.merge(path, data); + } + + @Override + public void deleteData(YangInstanceIdentifier path) { + delegate.delete(path); + } + + @Override + public void readData(YangInstanceIdentifier path, final SettableFuture>> proxyFuture) { + Futures.addCallback(delegate.read(path), new FutureCallback>>() { + @Override + public void onSuccess(Optional> result) { + proxyFuture.set(result); + } + + @Override + public void onFailure(Throwable t) { + proxyFuture.setException(t); + } + }); + } + + @Override + public void dataExists(YangInstanceIdentifier path, final SettableFuture proxyFuture) { + Futures.addCallback(delegate.exists(path), new FutureCallback() { + @Override + public void onSuccess(Boolean result) { + proxyFuture.set(result); + } + + @Override + public void onFailure(Throwable t) { + proxyFuture.setException(t); + } + }); + } + + private LocalThreePhaseCommitCohort ready() { + return (LocalThreePhaseCommitCohort) delegate.ready(); + } + + @Override + public Future readyTransaction() { + return ready().initiateCoordinatedCommit(); + } + + @Override + public Future directCommit() { + return ready().initiateDirectCommit(); + } + + @Override + public boolean supportsDirectCommit() { + return true; + } + + @Override + public void closeTransaction() { + delegate.close(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java new file mode 100644 index 0000000000..8ca442498e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; + +/** + * A factory for creating local transactions used by {@link AbstractTransactionContextFactory} to instantiate + * transactions on shards which are co-located with the shard leader. + * + * @author Thomas Pantelis + */ +interface LocalTransactionFactory { + DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier); +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java new file mode 100644 index 0000000000..dce9b5c55b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSelection; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions; +import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link LocalTransactionFactory} for instantiating backing transactions which are + * disconnected from each other, ie not chained. These are used by {@link AbstractTransactionContextFactory} + * to instantiate transactions on shards which are co-located with the shard leader. + */ +final class LocalTransactionFactoryImpl extends TransactionReadyPrototype + implements LocalTransactionFactory { + + private static final Logger LOG = LoggerFactory.getLogger(LocalTransactionFactoryImpl.class); + private final ActorSelection leader; + private final DataTree dataTree; + private final ActorContext actorContext; + + LocalTransactionFactoryImpl(final ActorContext actorContext, final ActorSelection leader, final DataTree dataTree) { + this.leader = Preconditions.checkNotNull(leader); + this.dataTree = Preconditions.checkNotNull(dataTree); + this.actorContext = actorContext; + } + + DataTree getDataTree() { + return dataTree; + } + + @Override + public DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier) { + return SnapshotBackedTransactions.newReadWriteTransaction(identifier, false, dataTree.takeSnapshot(), this); + } + + @Override + protected void transactionAborted(final SnapshotBackedWriteTransaction tx) { + // No-op + } + + @Override + protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, + final DataTreeModification tree) { + return new LocalThreePhaseCommitCohort(actorContext, leader, tx, tree) { + @Override + protected void transactionAborted(final SnapshotBackedWriteTransaction transaction) { + // No-op + LOG.debug("Transaction {} aborted", transaction); + } + + @Override + protected void transactionCommitted(final SnapshotBackedWriteTransaction transaction) { + // No-op + LOG.debug("Transaction {} committed", transaction); + } + }; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java similarity index 87% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index b6fe2c29bd..a25ddc8733 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; -import akka.dispatch.Mapper; import akka.dispatch.OnComplete; import com.google.common.base.Optional; import com.google.common.util.concurrent.SettableFuture; @@ -20,7 +19,6 @@ import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; @@ -34,8 +32,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; -public class TransactionContextImpl extends AbstractTransactionContext { - private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class); +/** + * Redirects front-end transaction operations to a shard for processing. Instances of this class are used + * when the destination shard is remote to the caller. + * + * @author Thomas Pantelis + */ +public class RemoteTransactionContext extends AbstractTransactionContext { + private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContext.class); private final ActorContext actorContext; private final ActorSelection actor; @@ -46,7 +50,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { private BatchedModifications batchedModifications; private int totalBatchedModificationsSent; - protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier, + protected RemoteTransactionContext(ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext, boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) { super(identifier); @@ -82,6 +86,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { @Override public void closeTransaction() { LOG.debug("Tx {} closeTransaction called", getIdentifier()); + TransactionContextCleanup.untrack(this); actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable()); } @@ -115,27 +120,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { // Transform the last reply Future into a Future that returns the cohort actor path from // the last reply message. That's the end result of the ready operation. - return readyReplyFuture.transform(new Mapper() { - @Override - public ActorSelection checkedApply(Object serializedReadyReply) { - LOG.debug("Tx {} readyTransaction", getIdentifier()); - - // At this point the ready operation succeeded and we need to extract the cohort - // actor path from the reply. - if(ReadyTransactionReply.isSerializedType(serializedReadyReply)) { - ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply); - return actorContext.actorSelection(extractCohortPathFrom(readyTxReply)); - } - - // Throwing an exception here will fail the Future. - throw new IllegalArgumentException(String.format("%s: Invalid reply type %s", - getIdentifier(), serializedReadyReply.getClass())); - } - }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); - } - - protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) { - return readyTxReply.getCohortPath(); + return TransactionReadyReplyMapper.transform(readyReplyFuture, actorContext, getIdentifier()); } private BatchedModifications newBatchedModifications() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionFutureCallback.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java similarity index 55% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionFutureCallback.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java index 0fddd66c54..66562975cc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionFutureCallback.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java @@ -1,4 +1,5 @@ /* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the @@ -10,13 +11,9 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; @@ -28,46 +25,40 @@ import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; /** - * Implements a Future OnComplete callback for a CreateTransaction message. This class handles - * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a - * retry task after a short delay. + * Handles creation of TransactionContext instances for remote transactions. This class creates + * remote transactions, if necessary, by sending CreateTransaction messages with retries, up to a limit, + * if the shard doesn't have a leader yet. This is done by scheduling a retry task after a short delay. *

* The end result from a completed CreateTransaction message is a TransactionContext that is * used to perform transaction operations. Transaction operations that occur before the - * CreateTransaction completes are cache and executed once the CreateTransaction completes, - * successfully or not. + * CreateTransaction completes are cache via a TransactionContextWrapper and executed once the + * CreateTransaction completes, successfully or not. */ -final class TransactionFutureCallback extends OnComplete { - private static final Logger LOG = LoggerFactory.getLogger(TransactionFutureCallback.class); +final class RemoteTransactionContextSupport { + private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContextSupport.class); /** * Time interval in between transaction create retries. */ private static final FiniteDuration CREATE_TX_TRY_INTERVAL = FiniteDuration.create(1, TimeUnit.SECONDS); - /** - * The list of transaction operations to execute once the CreateTransaction completes. - */ - @GuardedBy("txOperationsOnComplete") - private final List txOperationsOnComplete = Lists.newArrayList(); - private final TransactionProxy proxy; + private final TransactionProxy parent; private final String shardName; - /** - * The TransactionContext resulting from the CreateTransaction reply. - */ - private volatile TransactionContext transactionContext; - /** * The target primary shard. */ private volatile ActorSelection primaryShard; private volatile int createTxTries; - TransactionFutureCallback(final TransactionProxy proxy, final String shardName) { - this.proxy = Preconditions.checkNotNull(proxy); + private final TransactionContextWrapper transactionContextAdapter; + + RemoteTransactionContextSupport(final TransactionContextWrapper transactionContextAdapter, final TransactionProxy parent, + final String shardName) { + this.parent = Preconditions.checkNotNull(parent); this.shardName = shardName; - createTxTries = (int) (proxy.getActorContext().getDatastoreContext(). + this.transactionContextAdapter = transactionContextAdapter; + createTxTries = (int) (parent.getActorContext().getDatastoreContext(). getShardLeaderElectionTimeout().duration().toMillis() / CREATE_TX_TRY_INTERVAL.toMillis()); } @@ -76,24 +67,20 @@ final class TransactionFutureCallback extends OnComplete { return shardName; } - TransactionContext getTransactionContext() { - return transactionContext; - } - private TransactionType getTransactionType() { - return proxy.getTransactionType(); - } - - private TransactionIdentifier getIdentifier() { - return proxy.getIdentifier(); + return parent.getType(); } private ActorContext getActorContext() { - return proxy.getActorContext(); + return parent.getActorContext(); } private Semaphore getOperationLimiter() { - return proxy.getOperationLimiter(); + return parent.getLimiter(); + } + + private TransactionIdentifier getIdentifier() { + return parent.getIdentifier(); } /** @@ -110,43 +97,13 @@ final class TransactionFutureCallback extends OnComplete { // For write-only Tx's we prepare the transaction modifications directly on the shard actor // to avoid the overhead of creating a separate transaction actor. // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow. - executeTxOperatonsOnComplete(proxy.createValidTransactionContext(this.primaryShard, + transactionContextAdapter.executePriorTransactionOperations(createValidTransactionContext(this.primaryShard, this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION)); } else { tryCreateTransaction(); } } - /** - * Adds a TransactionOperation to be executed after the CreateTransaction completes. - */ - private void addTxOperationOnComplete(TransactionOperation operation) { - boolean invokeOperation = true; - synchronized(txOperationsOnComplete) { - if(transactionContext == null) { - LOG.debug("Tx {} Adding operation on complete", getIdentifier()); - - invokeOperation = false; - txOperationsOnComplete.add(operation); - } - } - - if(invokeOperation) { - operation.invoke(transactionContext); - } - } - - void enqueueTransactionOperation(final TransactionOperation op) { - - if (transactionContext != null) { - op.invoke(transactionContext); - } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - addTxOperationOnComplete(op); - } - } - /** * Performs a CreateTransaction try async. */ @@ -156,15 +113,19 @@ final class TransactionFutureCallback extends OnComplete { } Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(), - getTransactionType().ordinal(), proxy.getTransactionChainId()).toSerializable(); + getTransactionType().ordinal(), getIdentifier().getChainId()).toSerializable(); Future createTxFuture = getActorContext().executeOperationAsync(primaryShard, serializedCreateMessage); - createTxFuture.onComplete(this, getActorContext().getClientDispatcher()); + createTxFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + onCreateTransactionComplete(failure, response); + } + }, getActorContext().getClientDispatcher()); } - @Override - public void onComplete(Throwable failure, Object response) { + private void onCreateTransactionComplete(Throwable failure, Object response) { if(failure instanceof NoShardLeaderException) { // There's no leader for the shard yet - schedule and try again, unless we're out // of retries. Note: createTxTries is volatile as it may be written by different @@ -188,12 +149,7 @@ final class TransactionFutureCallback extends OnComplete { createTransactionContext(failure, response); } - void createTransactionContext(Throwable failure, Object response) { - // Mainly checking for state violation here to perform a volatile read of "initialized" to - // ensure updates to operationLimter et al are visible to this thread (ie we're doing - // "piggy-back" synchronization here). - proxy.ensureInitializied(); - + private void createTransactionContext(Throwable failure, Object response) { // Create the TransactionContext from the response or failure. Store the new // TransactionContext locally until we've completed invoking the // TransactionOperations. This avoids thread timing issues which could cause @@ -217,47 +173,33 @@ final class TransactionFutureCallback extends OnComplete { localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), getOperationLimiter()); } - executeTxOperatonsOnComplete(localTransactionContext); - } - - private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) { - while(true) { - // Access to txOperationsOnComplete and transactionContext must be protected and atomic - // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing - // issues and ensure no TransactionOperation is missed and that they are processed - // in the order they occurred. - - // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy - // in case a TransactionOperation results in another transaction operation being - // queued (eg a put operation from a client read Future callback that is notified - // synchronously). - Collection operationsBatch = null; - synchronized(txOperationsOnComplete) { - if(txOperationsOnComplete.isEmpty()) { - // We're done invoking the TransactionOperations so we can now publish the - // TransactionContext. - transactionContext = localTransactionContext; - break; - } - - operationsBatch = new ArrayList<>(txOperationsOnComplete); - txOperationsOnComplete.clear(); - } - - // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. - // A slight down-side is that we need to re-acquire the lock below but this should - // be negligible. - for(TransactionOperation oper: operationsBatch) { - oper.invoke(localTransactionContext); - } - } + transactionContextAdapter.executePriorTransactionOperations(localTransactionContext); } private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { LOG.debug("Tx {} Received {}", getIdentifier(), reply); - return proxy.createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()), + return createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()), reply.getTransactionPath(), reply.getVersion()); } -} \ No newline at end of file + private TransactionContext createValidTransactionContext(ActorSelection transactionActor, String transactionPath, + short remoteTransactionVersion) { + // TxActor is always created where the leader of the shard is. + // Check if TxActor is created in the same node + boolean isTxActorLocal = getActorContext().isPathLocal(transactionPath); + final TransactionContext ret; + + if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { + ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(), + getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getCompleter()); + } else { + ret = new RemoteTransactionContext(transactionActor, getIdentifier(), getActorContext(), + isTxActorLocal, remoteTransactionVersion, parent.getCompleter()); + } + + TransactionContextCleanup.track(this, ret); + return ret; + } +} + diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 57749a1a73..52e7a78e5b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -63,7 +63,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort< } return null; } - }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); + }, TransactionReadyReplyMapper.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 97f3e7444d..b44f0b15b2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -1,85 +1,128 @@ /* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorSelection; +import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; -import java.util.Collections; -import java.util.List; +import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Future; +import scala.concurrent.Promise; /** - * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard + * A chain of {@link TransactionProxy}s. It allows a single open transaction to be open + * at a time. For remote transactions, it also tracks the outstanding readiness requests + * towards the shard and unblocks operations only after all have completed. */ -public class TransactionChainProxy implements DOMStoreTransactionChain { - - private interface State { - boolean isReady(); +final class TransactionChainProxy extends AbstractTransactionContextFactory implements DOMStoreTransactionChain { + private static abstract class State { + /** + * Check if it is okay to allocate a new transaction. + * @throws IllegalStateException if a transaction may not be allocated. + */ + abstract void checkReady(); - List> getPreviousReadyFutures(); + /** + * Return the future which needs to be waited for before shard information + * is returned (which unblocks remote transactions). + * @return Future to wait for, or null of no wait is necessary + */ + abstract Future previousFuture(); } - private static class Allocated implements State { - private final ChainedTransactionProxy transaction; + private static abstract class Pending extends State { + private final TransactionIdentifier transaction; + private final Future previousFuture; - Allocated(ChainedTransactionProxy transaction) { - this.transaction = transaction; + Pending(final TransactionIdentifier transaction, final Future previousFuture) { + this.previousFuture = previousFuture; + this.transaction = Preconditions.checkNotNull(transaction); } @Override - public boolean isReady() { - return transaction.isReady(); + final Future previousFuture() { + return previousFuture; + } + + final TransactionIdentifier getIdentifier() { + return transaction; + } + } + + private static final class Allocated extends Pending { + Allocated(final TransactionIdentifier transaction, final Future previousFuture) { + super(transaction, previousFuture); } @Override - public List> getPreviousReadyFutures() { - return transaction.getReadyFutures(); + void checkReady() { + throw new IllegalStateException(String.format("Previous transaction %s is not ready yet", getIdentifier())); } } - private static abstract class AbstractDefaultState implements State { + private static final class Submitted extends Pending { + Submitted(final TransactionIdentifier transaction, final Future previousFuture) { + super(transaction, previousFuture); + } + @Override - public List> getPreviousReadyFutures() { - return Collections.emptyList(); + void checkReady() { + // Okay to allocate } } - private static final State IDLE_STATE = new AbstractDefaultState() { + private static abstract class DefaultState extends State { @Override - public boolean isReady() { - return true; + final Future previousFuture() { + return null; + } + } + + private static final State IDLE_STATE = new DefaultState() { + @Override + void checkReady() { + // Okay to allocate } }; - private static final State CLOSED_STATE = new AbstractDefaultState() { + private static final State CLOSED_STATE = new DefaultState() { @Override - public boolean isReady() { + void checkReady() { throw new TransactionChainClosedException("Transaction chain has been closed"); } }; - private static final AtomicInteger counter = new AtomicInteger(0); + private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class); + private static final AtomicInteger CHAIN_COUNTER = new AtomicInteger(); + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "currentState"); - private final ActorContext actorContext; private final String transactionChainId; + private final TransactionContextFactory parent; private volatile State currentState = IDLE_STATE; - public TransactionChainProxy(ActorContext actorContext) { - this.actorContext = actorContext; - transactionChainId = actorContext.getCurrentMemberName() + "-txn-chain-" + counter.incrementAndGet(); + TransactionChainProxy(final TransactionContextFactory parent) { + super(parent.getActorContext()); + transactionChainId = parent.getActorContext().getCurrentMemberName() + "-txn-chain-" + CHAIN_COUNTER.incrementAndGet(); + this.parent = parent; } public String getTransactionChainId() { @@ -88,22 +131,19 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - State localState = currentState; - checkReadyState(localState); - - return new ChainedTransactionProxy(actorContext, TransactionType.READ_ONLY, - transactionChainId, localState.getPreviousReadyFutures()); + currentState.checkReady(); + return new TransactionProxy(this, TransactionType.READ_ONLY); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - actorContext.acquireTxCreationPermit(); + getActorContext().acquireTxCreationPermit(); return allocateWriteTransaction(TransactionType.READ_WRITE); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - actorContext.acquireTxCreationPermit(); + getActorContext().acquireTxCreationPermit(); return allocateWriteTransaction(TransactionType.WRITE_ONLY); } @@ -112,24 +152,106 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { currentState = CLOSED_STATE; // Send a close transaction chain request to each and every shard - actorContext.broadcast(new CloseTransactionChain(transactionChainId).toSerializable()); + getActorContext().broadcast(new CloseTransactionChain(transactionChainId).toSerializable()); + parent.removeTransactionChain(this); } - private ChainedTransactionProxy allocateWriteTransaction(TransactionType type) { + private TransactionProxy allocateWriteTransaction(final TransactionType type) { State localState = currentState; + localState.checkReady(); - checkReadyState(localState); + final TransactionProxy ret = new TransactionProxy(this, type); + currentState = new Allocated(ret.getIdentifier(), localState.previousFuture()); + return ret; + } - // Pass the ready Futures from the previous Tx. - ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type, - transactionChainId, localState.getPreviousReadyFutures()); + @Override + protected LocalTransactionChain factoryForShard(final String shardName, final ActorSelection shardLeader, final DataTree dataTree) { + final LocalTransactionChain ret = new LocalTransactionChain(this, shardLeader, dataTree); + LOG.debug("Allocated transaction chain {} for shard {} leader {}", ret, shardName, shardLeader); + return ret; + } - currentState = new Allocated(txProxy); + @Override + protected DataTree dataTreeForFactory(final LocalTransactionChain factory) { + return factory.getDataTree(); + } + + /** + * This method is overridden to ensure the previous Tx's ready operations complete + * before we initiate the next Tx in the chain to avoid creation failures if the + * previous Tx's ready operations haven't completed yet. + */ + @Override + protected Future findPrimaryShard(final String shardName) { + // Read current state atomically + final State localState = currentState; + + // There are no outstanding futures, shortcut + final Future previous = localState.previousFuture(); + if (previous == null) { + return parent.findPrimaryShard(shardName); + } + + LOG.debug("Waiting for ready futures for on chain {}", getTransactionChainId()); + + // Add a callback for completion of the combined Futures. + final Promise returnPromise = akka.dispatch.Futures.promise(); + + final OnComplete onComplete = new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object notUsed) { + if (failure != null) { + // A Ready Future failed so fail the returned Promise. + returnPromise.failure(failure); + } else { + LOG.debug("Previous Tx readied - proceeding to FindPrimaryShard on chain {}", + getTransactionChainId()); + + // Send the FindPrimaryShard message and use the resulting Future to complete the + // returned Promise. + returnPromise.completeWith(parent.findPrimaryShard(shardName)); + } + } + }; - return txProxy; + previous.onComplete(onComplete, getActorContext().getClientDispatcher()); + return returnPromise.future(); } - private void checkReadyState(State state) { - Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet"); + @Override + protected void onTransactionReady(final TransactionIdentifier transaction, final Collection> cohortFutures) { + final State localState = currentState; + Preconditions.checkState(localState instanceof Allocated, "Readying transaction %s while state is %s", transaction, localState); + final TransactionIdentifier currentTx = ((Allocated)localState).getIdentifier(); + Preconditions.checkState(transaction.equals(currentTx), "Readying transaction %s while %s is allocated", transaction, currentTx); + + // Transaction ready and we are not waiting for futures -- go to idle + if (cohortFutures.isEmpty()) { + currentState = IDLE_STATE; + return; + } + + // Combine the ready Futures into 1 + final Future> combined = akka.dispatch.Futures.sequence( + cohortFutures, getActorContext().getClientDispatcher()); + + // Record the we have outstanding futures + final State newState = new Submitted(transaction, combined); + currentState = newState; + + // Attach a completion reset, but only if we do not allocate a transaction + // in-between + combined.onComplete(new OnComplete>() { + @Override + public void onComplete(final Throwable arg0, final Iterable arg1) { + STATE_UPDATER.compareAndSet(TransactionChainProxy.this, newState, IDLE_STATE); + } + }, getActorContext().getClientDispatcher()); + } + + @Override + protected TransactionIdentifier nextIdentifier() { + return TransactionIdentifier.create(getMemberName(), TX_COUNTER.getAndIncrement(), transactionChainId); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextCleanup.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextCleanup.java new file mode 100644 index 0000000000..8998f5c74c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextCleanup.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.FinalizablePhantomReference; +import com.google.common.base.FinalizableReferenceQueue; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A PhantomReference that closes remote transactions for a TransactionContext when it's + * garbage collected. This is used for read-only transactions as they're not explicitly closed + * by clients. So the only way to detect that a transaction is no longer in use and it's safe + * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed + * but TransactionProxy instances should generally be short-lived enough to avoid being moved + * to the old generation space and thus should be cleaned up in a timely manner as the GC + * runs on the young generation (eden, swap1...) space much more frequently. + */ +final class TransactionContextCleanup extends FinalizablePhantomReference { + private static final Logger LOG = LoggerFactory.getLogger(TransactionContextCleanup.class); + /** + * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The + * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some + * trickery to clean up its internal thread when the bundle is unloaded. + */ + private static final FinalizableReferenceQueue QUEUE = new FinalizableReferenceQueue(); + + /** + * This stores the TransactionProxyCleanupPhantomReference instances statically, This is + * necessary because PhantomReferences need a hard reference so they're not garbage collected. + * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map + * and thus becomes eligible for garbage collection. + */ + private static final Map CACHE = new ConcurrentHashMap<>(); + + private final TransactionContext cleanup; + + private TransactionContextCleanup(RemoteTransactionContextSupport referent, TransactionContext cleanup) { + super(referent, QUEUE); + this.cleanup = cleanup; + } + + static void track(final RemoteTransactionContextSupport referent, final TransactionContext cleanup) { + final TransactionContextCleanup ret = new TransactionContextCleanup(referent, cleanup); + CACHE.put(cleanup, ret); + } + + @Override + public void finalizeReferent() { + LOG.trace("Cleaning up {} Tx actors {}", cleanup); + + if (CACHE.remove(cleanup) != null) { + cleanup.closeTransaction(); + } + } + + static void untrack(final RemoteTransactionContext cleanup) { + CACHE.remove(cleanup); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextFactory.java new file mode 100644 index 0000000000..8d7ca990dd --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextFactory.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSelection; +import java.util.ArrayList; +import java.util.Collection; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListenerRegistration; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import scala.concurrent.Future; + +/** + * An {@link AbstractTransactionContextFactory} which produces TransactionContext instances for single + * transactions (ie not chained). + */ +final class TransactionContextFactory extends AbstractTransactionContextFactory { + + @GuardedBy("childChains") + private final Collection childChains = new ArrayList<>(); + + private final ShardInfoListenerRegistration reg; + + private TransactionContextFactory(final ActorContext actorContext) { + super(actorContext); + this.reg = actorContext.registerShardInfoListener(this); + } + + static TransactionContextFactory create(final ActorContext actorContext) { + return new TransactionContextFactory(actorContext); + } + + @Override + public void close() { + reg.close(); + } + + @Override + protected TransactionIdentifier nextIdentifier() { + return TransactionIdentifier.create(getMemberName(), TX_COUNTER.getAndIncrement(), null); + } + + @Override + protected LocalTransactionFactoryImpl factoryForShard(final String shardName, final ActorSelection shardLeader, final DataTree dataTree) { + return new LocalTransactionFactoryImpl(getActorContext(), shardLeader, dataTree); + } + + @Override + protected Future findPrimaryShard(final String shardName) { + return getActorContext().findPrimaryShardAsync(shardName); + } + + @Override + protected void onTransactionReady(final TransactionIdentifier transaction, final Collection> cohortFutures) { + // Transactions are disconnected, this is a no-op + } + + DOMStoreTransactionChain createTransactionChain() { + final TransactionChainProxy ret = new TransactionChainProxy(this); + + synchronized (childChains) { + childChains.add(ret); + } + + return ret; + } + + void removeTransactionChain(final TransactionChainProxy chain) { + synchronized (childChains) { + childChains.remove(chain); + } + } + + @Override + public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) { + synchronized (childChains) { + for (TransactionChainProxy chain : childChains) { + chain.onShardInfoUpdated(shardName, primaryShardInfo); + } + super.onShardInfoUpdated(shardName, primaryShardInfo); + } + } + + @Override + protected DataTree dataTreeForFactory(final LocalTransactionFactoryImpl factory) { + return factory.getDataTree(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java new file mode 100644 index 0000000000..137f6529c7 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target + * TransactionContext instance are cached until the TransactionContext instance becomes available at which + * time they are executed. + * + * @author Thomas Pantelis + */ +class TransactionContextWrapper { + private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class); + + /** + * The list of transaction operations to execute once the TransactionContext becomes available. + */ + @GuardedBy("queuedTxOperations") + private final List queuedTxOperations = Lists.newArrayList(); + + /** + * The resulting TransactionContext. + */ + private volatile TransactionContext transactionContext; + + private final TransactionIdentifier identifier; + + TransactionContextWrapper(TransactionIdentifier identifier) { + this.identifier = identifier; + } + + TransactionContext getTransactionContext() { + return transactionContext; + } + + TransactionIdentifier getIdentifier() { + return identifier; + } + + /** + * Adds a TransactionOperation to be executed once the TransactionContext becomes available. + */ + private void enqueueTransactionOperation(TransactionOperation operation) { + boolean invokeOperation = true; + synchronized(queuedTxOperations) { + if(transactionContext == null) { + LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier()); + + invokeOperation = false; + queuedTxOperations.add(operation); + } + } + + if(invokeOperation) { + operation.invoke(transactionContext); + } + } + + void maybeExecuteTransactionOperation(final TransactionOperation op) { + + if (transactionContext != null) { + op.invoke(transactionContext); + } else { + // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future + // callback to be executed after the Tx is created. + enqueueTransactionOperation(op); + } + } + + void executePriorTransactionOperations(TransactionContext localTransactionContext) { + while(true) { + // Access to queuedTxOperations and transactionContext must be protected and atomic + // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing + // issues and ensure no TransactionOperation is missed and that they are processed + // in the order they occurred. + + // We'll make a local copy of the queuedTxOperations list to handle re-entrancy + // in case a TransactionOperation results in another transaction operation being + // queued (eg a put operation from a client read Future callback that is notified + // synchronously). + Collection operationsBatch = null; + synchronized(queuedTxOperations) { + if(queuedTxOperations.isEmpty()) { + // We're done invoking the TransactionOperations so we can now publish the + // TransactionContext. + transactionContext = localTransactionContext; + break; + } + + operationsBatch = new ArrayList<>(queuedTxOperations); + queuedTxOperations.clear(); + } + + // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. + // A slight down-side is that we need to re-acquire the lock below but this should + // be negligible. + for(TransactionOperation oper: operationsBatch) { + oper.invoke(localTransactionContext); + } + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 5081c9b4f8..82258b46a4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -5,33 +5,28 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; -import akka.dispatch.Mapper; -import akka.dispatch.OnComplete; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; @@ -42,265 +37,139 @@ import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; import scala.concurrent.Promise; /** - * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard - *

- * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during - * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will - * be created on each of those shards by the TransactionProxy - *

- *

- * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various - * shards will be executed. - *

+ * A transaction potentially spanning multiple backend shards. */ public class TransactionProxy extends AbstractDOMStoreTransaction implements DOMStoreReadWriteTransaction { - private static enum TransactionState { OPEN, READY, CLOSED, } - - static final Mapper SAME_FAILURE_TRANSFORMER = - new Mapper() { - @Override - public Throwable apply(Throwable failure) { - return failure; - } - }; - - private static final AtomicLong counter = new AtomicLong(); - private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class); - /** - * Stores the remote Tx actors for each requested data store path to be used by the - * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The - * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the - * remoteTransactionActors list so they will be visible to the thread accessing the - * PhantomReference. - */ - List remoteTransactionActors; - volatile AtomicBoolean remoteTransactionActorsMB; - - /** - * Stores the create transaction results per shard. - */ - private final Map txFutureCallbackMap = new HashMap<>(); - - private final TransactionType transactionType; - final ActorContext actorContext; - private final SchemaContext schemaContext; + private final Map txContextAdapters = new HashMap<>(); + private final AbstractTransactionContextFactory txContextFactory; + private final TransactionType type; private TransactionState state = TransactionState.OPEN; + private volatile OperationCompleter operationCompleter; + private volatile Semaphore operationLimiter; - private volatile boolean initialized; - private Semaphore operationLimiter; - private OperationCompleter operationCompleter; + @VisibleForTesting + public TransactionProxy(final AbstractTransactionContextFactory txContextFactory, final TransactionType type) { + super(txContextFactory.nextIdentifier(), false); + this.txContextFactory = txContextFactory; + this.type = Preconditions.checkNotNull(type); - public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { - this(actorContext, transactionType, ""); + LOG.debug("New {} Tx - {}", type, getIdentifier()); } - public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) { - super(createIdentifier(actorContext, transactionChainId)); - this.actorContext = Preconditions.checkNotNull(actorContext, - "actorContext should not be null"); - this.transactionType = Preconditions.checkNotNull(transactionType, - "transactionType should not be null"); - this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(), - "schemaContext should not be null"); - - LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId); - } + @Override + public CheckedFuture exists(final YangInstanceIdentifier path) { + Preconditions.checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed"); - private static TransactionIdentifier createIdentifier(ActorContext actorContext, String transactionChainId) { - String memberName = actorContext.getCurrentMemberName(); - if (memberName == null) { - memberName = "UNKNOWN-MEMBER"; - } + LOG.debug("Tx {} exists {}", getIdentifier(), path); - return TransactionIdentifier.create(memberName, counter.getAndIncrement(), transactionChainId); - } + throttleOperation(); - @VisibleForTesting - boolean hasTransactionContext() { - for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - if(transactionContext != null) { - return true; + final SettableFuture proxyFuture = SettableFuture.create(); + TransactionContextWrapper contextAdapter = getContextAdapter(path); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.dataExists(path, proxyFuture); } - } - - return false; - } + }); - private static boolean isRootPath(YangInstanceIdentifier path) { - return !path.getPathArguments().iterator().hasNext(); + return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } @Override public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { - - Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, - "Read operation on write-only transaction is not allowed"); + Preconditions.checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed"); LOG.debug("Tx {} read {}", getIdentifier(), path); - final SettableFuture>> proxyFuture = SettableFuture.create(); - - if(isRootPath(path)){ - readAllData(path, proxyFuture); + if (YangInstanceIdentifier.EMPTY.equals(path)) { + return readAllData(); } else { throttleOperation(); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.readData(path, proxyFuture); - } - }); - - } - - return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); - } - - private void readAllData(final YangInstanceIdentifier path, - final SettableFuture>> proxyFuture) { - Set allShardNames = actorContext.getConfiguration().getAllShardNames(); - List>>> futures = new ArrayList<>(allShardNames.size()); - - for(String shardName : allShardNames){ - final SettableFuture>> subProxyFuture = SettableFuture.create(); - - throttleOperation(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.readData(path, subProxyFuture); - } - }); - - futures.add(subProxyFuture); + return singleShardRead(shardNameFromIdentifier(path), path); } - - final ListenableFuture>>> future = Futures.allAsList(futures); - - future.addListener(new Runnable() { - @Override - public void run() { - try { - proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(), - future.get(), actorContext.getSchemaContext())); - } catch (DataValidationFailedException | InterruptedException | ExecutionException e) { - proxyFuture.setException(e); - } - } - }, actorContext.getActorSystem().dispatcher()); } - @Override - public CheckedFuture exists(final YangInstanceIdentifier path) { - - Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, - "Exists operation on write-only transaction is not allowed"); - - LOG.debug("Tx {} exists {}", getIdentifier(), path); - - throttleOperation(); - - final SettableFuture proxyFuture = SettableFuture.create(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + private CheckedFuture>, ReadFailedException> singleShardRead( + final String shardName, final YangInstanceIdentifier path) { + final SettableFuture>> proxyFuture = SettableFuture.create(); + TransactionContextWrapper contextAdapter = getContextAdapter(shardName); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { - transactionContext.dataExists(path, proxyFuture); + transactionContext.readData(path, proxyFuture); } }); return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } - private void checkModificationState() { - Preconditions.checkState(transactionType != TransactionType.READ_ONLY, - "Modification operation on read-only transaction is not allowed"); - Preconditions.checkState(state == TransactionState.OPEN, - "Transaction is sealed - further modifications are not allowed"); - } - - private void throttleOperation() { - throttleOperation(1); - } + private CheckedFuture>, ReadFailedException> readAllData() { + final Set allShardNames = txContextFactory.getActorContext().getConfiguration().getAllShardNames(); + final Collection>, ReadFailedException>> futures = new ArrayList<>(allShardNames.size()); - private void throttleOperation(int acquirePermits) { - if(!initialized) { - // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem - operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit()); - operationCompleter = new OperationCompleter(operationLimiter); - - // Make sure we write this last because it's volatile and will also publish the non-volatile writes - // above as well so they'll be visible to other threads. - initialized = true; + for (String shardName : allShardNames) { + futures.add(singleShardRead(shardName, YangInstanceIdentifier.EMPTY)); } - try { - if(!operationLimiter.tryAcquire(acquirePermits, - actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ - LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier()); - } - } catch (InterruptedException e) { - if(LOG.isDebugEnabled()) { - LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e); - } else { - LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier()); + final ListenableFuture>>> listFuture = Futures.allAsList(futures); + final ListenableFuture>> aggregateFuture; + + aggregateFuture = Futures.transform(listFuture, new Function>>, Optional>>() { + @Override + public Optional> apply(final List>> input) { + try { + return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.EMPTY, input, txContextFactory.getActorContext().getSchemaContext()); + } catch (DataValidationFailedException e) { + throw new IllegalArgumentException("Failed to aggregate", e); + } } - } - } + }); - final void ensureInitializied() { - Preconditions.checkState(initialized, "Transaction %s was not propertly initialized.", getIdentifier()); + return MappingCheckedFuture.create(aggregateFuture, ReadFailedException.MAPPER); } @Override - public void write(final YangInstanceIdentifier path, final NormalizedNode data) { - + public void delete(final YangInstanceIdentifier path) { checkModificationState(); - LOG.debug("Tx {} write {}", getIdentifier(), path); + LOG.debug("Tx {} delete {}", getIdentifier(), path); throttleOperation(); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + TransactionContextWrapper contextAdapter = getContextAdapter(path); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { - transactionContext.writeData(path, data); + transactionContext.deleteData(path); } }); } @Override public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { - checkModificationState(); LOG.debug("Tx {} merge {}", getIdentifier(), path); throttleOperation(); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + TransactionContextWrapper contextAdapter = getContextAdapter(path); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.mergeData(path, data); @@ -309,23 +178,29 @@ public class TransactionProxy extends AbstractDOMStoreTransaction data) { checkModificationState(); - LOG.debug("Tx {} delete {}", getIdentifier(), path); + LOG.debug("Tx {} write {}", getIdentifier(), path); throttleOperation(); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + TransactionContextWrapper contextAdapter = getContextAdapter(path); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { - transactionContext.deleteData(path); + transactionContext.writeData(path, data); } }); } + private void checkModificationState() { + Preconditions.checkState(type != TransactionType.READ_ONLY, + "Modification operation on read-only transaction is not allowed"); + Preconditions.checkState(state == TransactionState.OPEN, + "Transaction is sealed - further modifications are not allowed"); + } + private boolean seal(final TransactionState newState) { if (state == TransactionState.OPEN) { state = newState; @@ -336,59 +211,88 @@ public class TransactionProxy extends AbstractDOMStoreTransaction ready() { - Preconditions.checkState(transactionType != TransactionType.READ_ONLY, - "Read-only transactions cannot be readied"); + public final void close() { + if (!seal(TransactionState.CLOSED)) { + Preconditions.checkState(state == TransactionState.CLOSED, "Transaction %s is ready, it cannot be closed", + getIdentifier()); + // Idempotent no-op as per AutoCloseable recommendation + return; + } + + for (TransactionContextWrapper contextAdapter : txContextAdapters.values()) { + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.closeTransaction(); + } + }); + } + + + txContextAdapters.clear(); + } + + @Override + public final AbstractThreePhaseCommitCohort ready() { + Preconditions.checkState(type != TransactionType.READ_ONLY, "Read-only transactions cannot be readied"); final boolean success = seal(TransactionState.READY); Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state); - LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(), - txFutureCallbackMap.size()); - - if (txFutureCallbackMap.isEmpty()) { - TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext); - return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE; + LOG.debug("Tx {} Readying {} components for commit", getIdentifier(), txContextAdapters.size()); + + final AbstractThreePhaseCommitCohort ret; + switch (txContextAdapters.size()) { + case 0: + TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(txContextFactory.getActorContext()); + ret = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE; + break; + case 1: + final Entry e = Iterables.getOnlyElement(txContextAdapters.entrySet()); + ret = createSingleCommitCohort(e.getKey(), e.getValue()); + break; + default: + ret = createMultiCommitCohort(txContextAdapters.entrySet()); } - throttleOperation(txFutureCallbackMap.size()); - - final boolean isSingleShard = txFutureCallbackMap.size() == 1; - return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort(); + txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures()); + return ret; } - @SuppressWarnings({ "rawtypes", "unchecked" }) - private AbstractThreePhaseCommitCohort createSingleCommitCohort() { - TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next(); + private AbstractThreePhaseCommitCohort createSingleCommitCohort(final String shardName, + final TransactionContextWrapper contextAdapter) { + throttleOperation(); - LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(), - txFutureCallback.getShardName(), getTransactionChainId()); + LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName); final OperationCallback.Reference operationCallbackRef = new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK); - final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + + final TransactionContext transactionContext = contextAdapter.getTransactionContext(); final Future future; - if (transactionContext != null) { - // avoid the creation of a promise and a TransactionOperation - future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef); - } else { + if (transactionContext == null) { final Promise promise = akka.dispatch.Futures.promise(); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef)); } }); future = promise.future(); + } else { + // avoid the creation of a promise and a TransactionOperation + future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef); } - return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef); + return new SingleCommitCohortProxy(txContextFactory.getActorContext(), future, getIdentifier().toString(), + operationCallbackRef); } private Future getReadyOrDirectCommitFuture(TransactionContext transactionContext, OperationCallback.Reference operationCallbackRef) { - if(transactionContext.supportsDirectCommit()) { - TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext); + if (transactionContext.supportsDirectCommit()) { + TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback( + txContextFactory.getActorContext()); operationCallbackRef.set(rateLimitingCallback); rateLimitingCallback.run(); return transactionContext.directCommit(); @@ -397,153 +301,106 @@ public class TransactionProxy extends AbstractDOMStoreTransaction createMultiCommitCohort() { - List> cohortFutures = new ArrayList<>(txFutureCallbackMap.size()); - for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { + private AbstractThreePhaseCommitCohort createMultiCommitCohort( + final Set> txContextAdapterEntries) { - LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(), - txFutureCallback.getShardName(), getTransactionChainId()); + throttleOperation(); + final List> cohortFutures = new ArrayList<>(txContextAdapterEntries.size()); + for (Entry e : txContextAdapterEntries) { + LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey()); - final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - final Future future; + TransactionContextWrapper contextAdapter = e.getValue(); + final TransactionContext transactionContext = contextAdapter.getTransactionContext(); + Future future; if (transactionContext != null) { // avoid the creation of a promise and a TransactionOperation future = transactionContext.readyTransaction(); } else { final Promise promise = akka.dispatch.Futures.promise(); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { promise.completeWith(transactionContext.readyTransaction()); } }); + future = promise.future(); } cohortFutures.add(future); } - return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString()); + return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString()); } - @Override - public void close() { - if (!seal(TransactionState.CLOSED)) { - if (state == TransactionState.CLOSED) { - // Idempotent no-op as per AutoCloseable recommendation - return; - } - - throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed", - getIdentifier())); - } - - for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.closeTransaction(); - } - }); - } - - txFutureCallbackMap.clear(); - - if(remoteTransactionActorsMB != null) { - remoteTransactionActors.clear(); - remoteTransactionActorsMB.set(true); - } - } - - private String shardNameFromIdentifier(YangInstanceIdentifier path){ + private static String shardNameFromIdentifier(final YangInstanceIdentifier path) { return ShardStrategyFactory.getStrategy(path).findShard(path); } - protected Future sendFindPrimaryShardAsync(String shardName) { - return actorContext.findPrimaryShardAsync(shardName); + private TransactionContextWrapper getContextAdapter(final YangInstanceIdentifier path) { + return getContextAdapter(shardNameFromIdentifier(path)); } - final TransactionType getTransactionType() { - return transactionType; - } + private TransactionContextWrapper getContextAdapter(final String shardName) { + final TransactionContextWrapper existing = txContextAdapters.get(shardName); + if (existing != null) { + return existing; + } - final Semaphore getOperationLimiter() { - return operationLimiter; + final TransactionContextWrapper fresh = txContextFactory.newTransactionAdapter(this, shardName); + txContextAdapters.put(shardName, fresh); + return fresh; } - private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) { - String shardName = shardNameFromIdentifier(path); - return getOrCreateTxFutureCallback(shardName); + TransactionType getType() { + return type; } - private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) { - TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName); - if(txFutureCallback == null) { - Future findPrimaryFuture = sendFindPrimaryShardAsync(shardName); - - final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName); + boolean isReady() { + return state != TransactionState.OPEN; + } - txFutureCallback = newTxFutureCallback; - txFutureCallbackMap.put(shardName, txFutureCallback); + ActorContext getActorContext() { + return txContextFactory.getActorContext(); + } - findPrimaryFuture.onComplete(new OnComplete() { - @Override - public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) { - if(failure != null) { - newTxFutureCallback.createTransactionContext(failure, null); - } else { - newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor()); - } - } - }, actorContext.getClientDispatcher()); + OperationCompleter getCompleter() { + OperationCompleter ret = operationCompleter; + if (ret == null) { + final Semaphore s = getLimiter(); + ret = new OperationCompleter(s); + operationCompleter = ret; } - return txFutureCallback; + return ret; } - String getTransactionChainId() { - return getIdentifier().getChainId(); + Semaphore getLimiter() { + Semaphore ret = operationLimiter; + if (ret == null) { + // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem + ret = new Semaphore(getActorContext().getTransactionOutstandingOperationLimit()); + operationLimiter = ret; + } + return ret; } - protected ActorContext getActorContext() { - return actorContext; + void throttleOperation() { + throttleOperation(1); } - TransactionContext createValidTransactionContext(ActorSelection transactionActor, - String transactionPath, short remoteTransactionVersion) { - - if (transactionType == TransactionType.READ_ONLY) { - // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference - // to close the remote Tx's when this instance is no longer in use and is garbage - // collected. - - if(remoteTransactionActorsMB == null) { - remoteTransactionActors = Lists.newArrayList(); - remoteTransactionActorsMB = new AtomicBoolean(); - - TransactionProxyCleanupPhantomReference.track(TransactionProxy.this); + private void throttleOperation(int acquirePermits) { + try { + if (!getLimiter().tryAcquire(acquirePermits, + getActorContext().getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ + LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier()); + } + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier(), e); + } else { + LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier()); } - - // Add the actor to the remoteTransactionActors list for access by the - // cleanup PhantonReference. - remoteTransactionActors.add(transactionActor); - - // Write to the memory barrier volatile to publish the above update to the - // remoteTransactionActors list for thread visibility. - remoteTransactionActorsMB.set(true); - } - - // TxActor is always created where the leader of the shard is. - // Check if TxActor is created in the same node - boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); - - if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { - return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(), - actorContext, isTxActorLocal, remoteTransactionVersion, - operationCompleter); - } else { - return new TransactionContextImpl(transactionActor, getIdentifier(), - actorContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxyCleanupPhantomReference.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxyCleanupPhantomReference.java deleted file mode 100644 index 77834d9563..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxyCleanupPhantomReference.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore; - -import akka.actor.ActorSelection; -import com.google.common.base.FinalizablePhantomReference; -import com.google.common.base.FinalizableReferenceQueue; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A PhantomReference that closes remote transactions for a TransactionProxy when it's - * garbage collected. This is used for read-only transactions as they're not explicitly closed - * by clients. So the only way to detect that a transaction is no longer in use and it's safe - * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed - * but TransactionProxy instances should generally be short-lived enough to avoid being moved - * to the old generation space and thus should be cleaned up in a timely manner as the GC - * runs on the young generation (eden, swap1...) space much more frequently. - */ -final class TransactionProxyCleanupPhantomReference - extends FinalizablePhantomReference { - private static final Logger LOG = LoggerFactory.getLogger(TransactionProxyCleanupPhantomReference.class); - /** - * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The - * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some - * trickery to clean up its internal thread when the bundle is unloaded. - */ - private static final FinalizableReferenceQueue phantomReferenceQueue = - new FinalizableReferenceQueue(); - - /** - * This stores the TransactionProxyCleanupPhantomReference instances statically, This is - * necessary because PhantomReferences need a hard reference so they're not garbage collected. - * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map - * and thus becomes eligible for garbage collection. - */ - private static final Map phantomReferenceCache = - new ConcurrentHashMap<>(); - - private final List remoteTransactionActors; - private final AtomicBoolean remoteTransactionActorsMB; - private final ActorContext actorContext; - private final TransactionIdentifier identifier; - - private TransactionProxyCleanupPhantomReference(TransactionProxy referent) { - super(referent, phantomReferenceQueue); - - // Note we need to cache the relevant fields from the TransactionProxy as we can't - // have a hard reference to the TransactionProxy instance itself. - - remoteTransactionActors = referent.remoteTransactionActors; - remoteTransactionActorsMB = referent.remoteTransactionActorsMB; - actorContext = referent.actorContext; - identifier = referent.getIdentifier(); - } - - static void track(TransactionProxy referent) { - final TransactionProxyCleanupPhantomReference ret = new TransactionProxyCleanupPhantomReference(referent); - phantomReferenceCache.put(ret, ret); - } - - @Override - public void finalizeReferent() { - LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}", - remoteTransactionActors.size(), identifier); - - phantomReferenceCache.remove(this); - - // Access the memory barrier volatile to ensure all previous updates to the - // remoteTransactionActors list are visible to this thread. - - if(remoteTransactionActorsMB.get()) { - for(ActorSelection actor : remoteTransactionActors) { - LOG.trace("Sending CloseTransaction to {}", actor); - actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable()); - } - } - } -} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionReadyReplyMapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionReadyReplyMapper.java new file mode 100644 index 0000000000..22bfbb1316 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionReadyReplyMapper.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSelection; +import akka.dispatch.Mapper; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +/** + * A {@link Mapper} extracting the {@link ActorSelection} pointing to the actor which + * is backing a particular transaction. + * + * This class is not for general consumption. It is public only to support the pre-lithium compatibility + * package. + * + * TODO: once we remove compatibility, make this class package-private and final. + */ +public class TransactionReadyReplyMapper extends Mapper { + protected static final Mapper SAME_FAILURE_TRANSFORMER = new Mapper() { + @Override + public Throwable apply(final Throwable failure) { + return failure; + } + }; + private static final Logger LOG = LoggerFactory.getLogger(TransactionReadyReplyMapper.class); + private final TransactionIdentifier identifier; + private final ActorContext actorContext; + + protected TransactionReadyReplyMapper(final ActorContext actorContext, final TransactionIdentifier identifier) { + this.actorContext = Preconditions.checkNotNull(actorContext); + this.identifier = Preconditions.checkNotNull(identifier); + } + + protected final ActorContext getActorContext() { + return actorContext; + } + + protected String extractCohortPathFrom(final ReadyTransactionReply readyTxReply) { + return readyTxReply.getCohortPath(); + } + + @Override + public final ActorSelection checkedApply(final Object serializedReadyReply) { + LOG.debug("Tx {} readyTransaction", identifier); + + // At this point the ready operation succeeded and we need to extract the cohort + // actor path from the reply. + if (ReadyTransactionReply.isSerializedType(serializedReadyReply)) { + ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply); + return actorContext.actorSelection(extractCohortPathFrom(readyTxReply)); + } + + // Throwing an exception here will fail the Future. + throw new IllegalArgumentException(String.format("%s: Invalid reply type %s", + identifier, serializedReadyReply.getClass())); + } + + static Future transform(final Future readyReplyFuture, final ActorContext actorContext, + final TransactionIdentifier identifier) { + return readyReplyFuture.transform(new TransactionReadyReplyMapper(actorContext, identifier), + SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java index 249a115588..dc82565bc0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java @@ -10,12 +10,11 @@ package org.opendaylight.controller.cluster.datastore.compat; import akka.actor.ActorSelection; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.OperationCompleter; -import org.opendaylight.controller.cluster.datastore.TransactionContextImpl; +import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -30,7 +29,8 @@ import scala.concurrent.Future; * * @author Thomas Pantelis */ -public class PreLithiumTransactionContextImpl extends TransactionContextImpl { +@Deprecated +public class PreLithiumTransactionContextImpl extends RemoteTransactionContext { private static final Logger LOG = LoggerFactory.getLogger(PreLithiumTransactionContextImpl.class); private final String transactionPath; @@ -69,7 +69,7 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl { } @Override - protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) { + protected Future transformReadyReply(final Future readyReplyFuture) { // In base Helium we used to return the local path of the actor which represented // a remote ThreePhaseCommitCohort. The local path would then be converted to // a remote path using this resolvePath method. To maintain compatibility with @@ -77,11 +77,11 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl { // At some point in the future when upgrades from Helium are not supported // we could remove this code to resolvePath and just use the cohortPath as the // resolved cohortPath - if(getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) { - return getActorContext().resolvePath(transactionPath, readyTxReply.getCohortPath()); + if (getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) { + return PreLithiumTransactionReadyReplyMapper.transform(readyReplyFuture, getActorContext(), getIdentifier(), transactionPath); + } else { + return super.transformReadyReply(readyReplyFuture); } - - return readyTxReply.getCohortPath(); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionReadyReplyMapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionReadyReplyMapper.java new file mode 100644 index 0000000000..c8fab0822a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionReadyReplyMapper.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.compat; + +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.datastore.TransactionReadyReplyMapper; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import scala.concurrent.Future; +import akka.actor.ActorSelection; + +/** + * A {@link Mapper} extracting the {@link ActorSelection} pointing to the actor which + * is backing a particular transaction. This class supports the Helium base release + * behavior. + */ +@Deprecated +public final class PreLithiumTransactionReadyReplyMapper extends TransactionReadyReplyMapper { + private final String transactionPath; + + private PreLithiumTransactionReadyReplyMapper(ActorContext actorContext, TransactionIdentifier identifier, final String transactionPath) { + super(actorContext, identifier); + this.transactionPath = Preconditions.checkNotNull(transactionPath); + } + + @Override + protected String extractCohortPathFrom(final ReadyTransactionReply readyTxReply) { + return getActorContext().resolvePath(transactionPath, readyTxReply.getCohortPath()); + } + + public static Future transform(final Future readyReplyFuture, final ActorContext actorContext, + final TransactionIdentifier identifier, final String transactionPath) { + return readyReplyFuture.transform(new PreLithiumTransactionReadyReplyMapper(actorContext, identifier, transactionPath), + SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java index 6d952f9a4a..f8cd18ced2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java @@ -13,14 +13,16 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification /** * Message notifying the shard leader to apply modifications which have been * prepared locally against its DataTree. This message is not directly serializable, - * simply because the leader and sender need to be on the same system. + * simply because the leader and sender need to be on the same system. When it needs + * to be sent out to a remote system, it needs to be intercepted by {@link ReadyLocalTransactionSerializer} + * and turned into {@link BatchedModifications}. */ public final class ReadyLocalTransaction { private final DataTreeModification modification; private final String transactionID; private final boolean doCommitOnReady; - public ReadyLocalTransaction(final String transactionID, DataTreeModification modification, boolean doCommitOnReady) { + public ReadyLocalTransaction(final String transactionID, final DataTreeModification modification, final boolean doCommitOnReady) { this.transactionID = Preconditions.checkNotNull(transactionID); this.modification = Preconditions.checkNotNull(modification); this.doCommitOnReady = doCommitOnReady; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 18a8798c47..ad05a1ca71 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -28,8 +28,13 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.common.util.concurrent.RateLimiter; +import java.util.ArrayList; +import java.util.Collection; import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; @@ -65,7 +70,7 @@ import scala.concurrent.duration.FiniteDuration; * easily. An ActorContext can be freely passed around to local object instances * but should not be passed to actors especially remote actors */ -public class ActorContext { +public class ActorContext implements RemovalListener> { private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class); private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store"; private static final String METRIC_RATE = "rate"; @@ -104,6 +109,8 @@ public class ActorContext { private volatile SchemaContext schemaContext; private volatile boolean updated; private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry(); + @GuardedBy("shardInfoListeners") + private final Collection> shardInfoListeners = new ArrayList<>(); public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration) { @@ -146,6 +153,7 @@ public class ActorContext { primaryShardInfoCache = CacheBuilder.newBuilder() .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) + .removalListener(this) .build(); } @@ -236,6 +244,12 @@ public class ActorContext { ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath); PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree)); primaryShardInfoCache.put(shardName, Futures.successful(info)); + + synchronized (shardInfoListeners) { + for (ShardInfoListenerRegistration reg : shardInfoListeners) { + reg.getInstance().onShardInfoUpdated(shardName, info); + } + } return info; } @@ -566,4 +580,28 @@ public class ActorContext { Cache> getPrimaryShardInfoCache() { return primaryShardInfoCache; } + + public ShardInfoListenerRegistration registerShardInfoListener(final T listener) { + final ShardInfoListenerRegistration reg = new ShardInfoListenerRegistration(listener, this); + + synchronized (shardInfoListeners) { + shardInfoListeners.add(reg); + } + return reg; + } + + protected void removeShardInfoListener(final ShardInfoListenerRegistration registration) { + synchronized (shardInfoListeners) { + shardInfoListeners.remove(registration); + } + } + + @Override + public void onRemoval(final RemovalNotification> notification) { + synchronized (shardInfoListeners) { + for (ShardInfoListenerRegistration reg : shardInfoListeners) { + reg.getInstance().onShardInfoUpdated(notification.getKey(), null); + } + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListener.java new file mode 100644 index 0000000000..83c5d3716f --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListener.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.utils; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; + +/** + * Listener interface used to register for primary shard information changes. + * Implementations of this interface can be registered with {@link ActorContext} + * to receive notifications about shard information changes. + */ +public interface ShardInfoListener { + /** + * Update {@link PrimaryShardInfo} for a particular shard. + * @param shardName Shard name + * @param primaryShardInfo New {@link PrimaryShardInfo}, null if the information + * became unavailable. + */ + void onShardInfoUpdated(@Nonnull String shardName, @Nullable PrimaryShardInfo primaryShardInfo); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListenerRegistration.java new file mode 100644 index 0000000000..3dca66d49d --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListenerRegistration.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.utils; + +import com.google.common.base.Preconditions; +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; + +/** + * Registration of a {@link ShardInfoListener} instance. + * + * @param Type of listener + */ +public class ShardInfoListenerRegistration extends AbstractObjectRegistration { + private final ActorContext parent; + + protected ShardInfoListenerRegistration(final T instance, final ActorContext parent) { + super(instance); + this.parent = Preconditions.checkNotNull(parent); + } + + @Override + protected void removeRegistration() { + parent.removeShardInfoListener(this); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index c22b7acd38..0b2d3ce108 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -66,6 +66,8 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener; +import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListenerRegistration; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; @@ -111,6 +113,8 @@ public abstract class AbstractTransactionProxyTest { @Mock protected ActorContext mockActorContext; + protected TransactionContextFactory mockComponentFactory; + private SchemaContext schemaContext; @Mock @@ -150,6 +154,10 @@ public abstract class AbstractTransactionProxyTest { doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext(); doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); + doReturn(mock(ShardInfoListenerRegistration.class)).when(mockActorContext).registerShardInfoListener( + any(ShardInfoListener.class)); + + mockComponentFactory = TransactionContextFactory.create(mockActorContext); Timer timer = new MetricRegistry().timer("test"); doReturn(timer).when(mockActorContext).getOperationTimer(any(String.class)); @@ -260,6 +268,7 @@ public abstract class AbstractTransactionProxyTest { return Futures.successful(new BatchedModificationsReply(count)); } + @SuppressWarnings("unchecked") protected Future incompleteFuture() { return mock(Future.class); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 94cc6e5e59..f3d93b896d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -2,30 +2,32 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; -import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.actor.PoisonPill; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import akka.cluster.Cluster; +import akka.testkit.JavaTestKit; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.ConfigFactory; +import java.io.IOException; +import java.math.BigInteger; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; -import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; -import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; -import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; @@ -37,19 +39,38 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -public class DistributedDataStoreIntegrationTest extends AbstractActorTest { +public class DistributedDataStoreIntegrationTest { + + private static ActorSystem system; private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100); + @BeforeClass + public static void setUpClass() throws IOException { + system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); + Cluster.get(system).join(member1Address); + } + + @AfterClass + public static void tearDownClass() throws IOException { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + protected ActorSystem getSystem() { + return system; + } + @Test public void testWriteTransactionWithSingleShard() throws Exception{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", "test-1"); @@ -65,47 +86,59 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testWriteTransactionWithMultipleShards() throws Exception{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1"); DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); assertNotNull("newWriteOnlyTransaction returned null", writeTx); - YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH; - NormalizedNode nodeToWrite1 = CarsModel.emptyContainer(); - writeTx.write(nodePath1, nodeToWrite1); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH; - NormalizedNode nodeToWrite2 = PeopleModel.emptyContainer(); - writeTx.write(nodePath2, nodeToWrite2); + doCommit(writeTx.ready()); - DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + writeTx = dataStore.newWriteOnlyTransaction(); - doCommit(cohort); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + + doCommit(writeTx.ready()); + + writeTx = dataStore.newWriteOnlyTransaction(); + + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + writeTx.write(carPath, car); + + MapEntryNode person = PeopleModel.newPersonEntry("jack"); + YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + writeTx.write(personPath, person); + + doCommit(writeTx.ready()); // Verify the data in the store DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - Optional> optional = readTx.read(nodePath1).get(); + Optional> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", nodeToWrite1, optional.get()); + assertEquals("Data node", car, optional.get()); - optional = readTx.read(nodePath2).get(); + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", nodeToWrite2, optional.get()); + assertEquals("Data node", person, optional.get()); cleanup(dataStore); }}; } @Test - public void testReadWriteTransaction() throws Exception{ + public void testReadWriteTransactionWithSingleShard() throws Exception{ System.setProperty("shard.persistent", "true"); - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = - setupDistributedDataStore("testReadWriteTransaction", "test-1"); + setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1"); // 1. Create a read-write Tx @@ -147,9 +180,65 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } + @Test + public void testReadWriteTransactionWithMultipleShards() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = + setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1"); + + DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); + + readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + + doCommit(readWriteTx.ready()); + + readWriteTx = dataStore.newReadWriteTransaction(); + + readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + + doCommit(readWriteTx.ready()); + + readWriteTx = dataStore.newReadWriteTransaction(); + + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + readWriteTx.write(carPath, car); + + MapEntryNode person = PeopleModel.newPersonEntry("jack"); + YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + readWriteTx.write(personPath, person); + + Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS); + assertEquals("exists", true, exists); + + Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); + + doCommit(readWriteTx.ready()); + + // Verify the data in the store + + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); + + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); + + cleanup(dataStore); + }}; + } + private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly) throws Exception { - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ String shardName = "test-1"; // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't @@ -251,7 +340,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testTransactionReadsWithShardNotInitiallyReady() throws Exception { - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ String testName = "testTransactionReadsWithShardNotInitiallyReady"; String shardName = "test-1"; @@ -324,7 +413,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test(expected=NotInitializedException.class) public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ String testName = "testTransactionCommitFailureWithShardNotInitialized"; String shardName = "test-1"; @@ -394,7 +483,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test(expected=NotInitializedException.class) public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ String testName = "testTransactionReadFailureWithShardNotInitialized"; String shardName = "test-1"; @@ -466,7 +555,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { } private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable { - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ String testName = "testTransactionCommitFailureWithNoShardLeader"; String shardName = "default"; @@ -548,7 +637,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testTransactionAbort() throws Exception{ System.setProperty("shard.persistent", "true"); - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore("transactionAbortIntegrationTest", "test-1"); @@ -571,9 +660,9 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { } @Test - public void testTransactionChain() throws Exception{ - new IntegrationTestKit(getSystem()) {{ - DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1"); + public void testTransactionChainWithSingleShard() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1"); // 1. Create a Tx chain and write-only Tx @@ -658,9 +747,74 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } + @Test + public void testTransactionChainWithMultipleShards() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards", + "cars-1", "people-1"); + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + + DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + + DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); + + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + readWriteTx.write(carPath, car); + + MapEntryNode person = PeopleModel.newPersonEntry("jack"); + YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + readWriteTx.merge(personPath, person); + + Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); + + optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); + + DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); + + writeTx = txChain.newWriteOnlyTransaction(); + + //writeTx.delete(personPath); + + DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready(); + + doCommit(cohort1); + doCommit(cohort2); + doCommit(cohort3); + + txChain.close(); + + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); + + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + //assertEquals("isPresent", false, optional.isPresent()); + assertEquals("isPresent", true, optional.isPresent()); + + cleanup(dataStore); + }}; + } + @Test public void testCreateChainedTransactionsInQuickSuccession() throws Exception{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore( "testCreateChainedTransactionsInQuickSuccession", "test-1"); @@ -691,7 +845,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore( "testCreateChainedTransactionAfterEmptyTxReadied", "test-1"); @@ -714,7 +868,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable { - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore( "testCreateChainedTransactionWhenPreviousNotReady", "test-1"); @@ -734,7 +888,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testCreateChainedTransactionAfterClose() throws Throwable { - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore( "testCreateChainedTransactionAfterClose", "test-1"); @@ -750,7 +904,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testChangeListenerRegistration() throws Exception{ - new IntegrationTestKit(getSystem()) {{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration", "test-1"); @@ -796,129 +950,4 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { cleanup(dataStore); }}; } - - class IntegrationTestKit extends ShardTestKit { - - IntegrationTestKit(ActorSystem actorSystem) { - super(actorSystem); - } - - DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) { - return setupDistributedDataStore(typeName, true, shardNames); - } - - DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader, - String... shardNames) { - MockClusterWrapper cluster = new MockClusterWrapper(); - Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf"); - ShardStrategyFactory.setConfiguration(config); - - datastoreContextBuilder.dataStoreType(typeName); - - DatastoreContext datastoreContext = datastoreContextBuilder.build(); - - DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, - config, datastoreContext); - - SchemaContext schemaContext = SchemaContextHelper.full(); - dataStore.onGlobalContextUpdated(schemaContext); - - if(waitUntilLeader) { - for(String shardName: shardNames) { - ActorRef shard = null; - for(int i = 0; i < 20 * 5 && shard == null; i++) { - Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); - Optional shardReply = dataStore.getActorContext().findLocalShard(shardName); - if(shardReply.isPresent()) { - shard = shardReply.get(); - } - } - - assertNotNull("Shard was not created", shard); - - waitUntilLeader(shard); - } - } - - return dataStore; - } - - void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath, - NormalizedNode nodeToWrite) throws Exception { - - // 1. Create a write-only Tx - - DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); - - // 2. Write some data - - writeTx.write(nodePath, nodeToWrite); - - // 3. Ready the Tx for commit - - DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); - - // 4. Commit the Tx - - doCommit(cohort); - - // 5. Verify the data in the store - - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - - Optional> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", nodeToWrite, optional.get()); - } - - void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception { - Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); - } - - void cleanup(DistributedDataStore dataStore) { - dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null); - } - - void assertExceptionOnCall(Callable callable, Class expType) - throws Exception { - try { - callable.call(); - fail("Expected " + expType.getSimpleName()); - } catch(Exception e) { - assertEquals("Exception type", expType, e.getClass()); - } - } - - void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain, - Class expType) throws Exception { - assertExceptionOnCall(new Callable() { - @Override - public Void call() throws Exception { - txChain.newWriteOnlyTransaction(); - return null; - } - }, expType); - - assertExceptionOnCall(new Callable() { - @Override - public Void call() throws Exception { - txChain.newReadWriteTransaction(); - return null; - } - }, expType); - - assertExceptionOnCall(new Callable() { - @Override - public Void call() throws Exception { - txChain.newReadOnlyTransaction(); - return null; - } - }, expType); - } - } - } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java new file mode 100644 index 0000000000..7cef2fd743 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -0,0 +1,327 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import akka.cluster.Cluster; +import akka.testkit.JavaTestKit; +import com.google.common.base.Optional; +import com.typesafe.config.ConfigFactory; +import java.math.BigInteger; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; +import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; +import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; + +/** + * End-to-end distributed data store tests that exercise remote shards and transactions. + * + * @author Thomas Pantelis + */ +public class DistributedDataStoreRemotingIntegrationTest { + + private static final String[] SHARD_NAMES = {"cars", "people"}; + + private ActorSystem leaderSystem; + private ActorSystem followerSystem; + + private final DatastoreContext.Builder leaderDatastoreContextBuilder = + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1); + + private final DatastoreContext.Builder followerDatastoreContextBuilder = + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(200); + + private DistributedDataStore followerDistributedDataStore; + private DistributedDataStore leaderDistributedDataStore; + private IntegrationTestKit followerTestKit; + private IntegrationTestKit leaderTestKit; + + @Before + public void setUpClass() { + leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); + Cluster.get(leaderSystem).join(member1Address); + + followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + Cluster.get(followerSystem).join(member1Address); + } + + @After + public void tearDownClass() { + JavaTestKit.shutdownActorSystem(leaderSystem); + JavaTestKit.shutdownActorSystem(followerSystem); + } + + private void initDatastores(String type) { + leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); + + String moduleShardsConfig = "module-shards-member1-and-2.conf"; + + followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder); + followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES); + + leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, true, SHARD_NAMES); + + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES); + } + + private void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception { + Optional> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + + CollectionNodeBuilder listBuilder = ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME); + for(NormalizedNode entry: entries) { + listBuilder.withChild((MapEntryNode) entry); + } + + assertEquals("Car list node", listBuilder.build(), optional.get()); + } + + private void verifyNode(DOMStoreReadTransaction readTx, YangInstanceIdentifier path, NormalizedNode expNode) + throws Exception { + Optional> optional = readTx.read(path).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", expNode, optional.get()); + } + + private void verifyExists(DOMStoreReadTransaction readTx, YangInstanceIdentifier path) throws Exception { + Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS); + assertEquals("exists", true, exists); + } + + @Test + public void testWriteTransactionWithSingleShard() throws Exception { + String testName = "testWriteTransactionWithSingleShard"; + initDatastores(testName); + + String followerCarShardName = "member-2-shard-cars-" + testName; + InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class ); + + DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + + MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); + writeTx.merge(car1Path, car1); + + MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000)); + YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage"); + writeTx.merge(car2Path, car2); + + followerTestKit.doCommit(writeTx.ready()); + + verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2); + + verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2); + + // Test delete + + writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + + writeTx.delete(car1Path); + + followerTestKit.doCommit(writeTx.ready()); + + verifyExists(followerDistributedDataStore.newReadOnlyTransaction(), car2Path); + + verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2); + + verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car2); + + // Re-instate the follower member 2 as a single-node to verify replication and recovery. + + InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName); + + JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + JavaTestKit.shutdownActorSystem(followerSystem, null, true); + + ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2")); + + DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder). + setupDistributedDataStore(testName, "module-shards-member2", true, SHARD_NAMES); + + verifyCars(member2Datastore.newReadOnlyTransaction(), car2); + + JavaTestKit.shutdownActorSystem(newSystem); + } + + @Test + public void testReadWriteTransactionWithSingleShard() throws Exception { + initDatastores("testReadWriteTransactionWithSingleShard"); + + DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", rwTx); + + rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + + MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + rwTx.merge(CarsModel.newCarPath("optima"), car1); + + verifyCars(rwTx, car1); + + MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000)); + YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage"); + rwTx.merge(car2Path, car2); + + verifyExists(rwTx, car2Path); + + followerTestKit.doCommit(rwTx.ready()); + + verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2); + } + + @Test + public void testWriteTransactionWithMultipleShards() throws Exception { + initDatastores("testWriteTransactionWithMultipleShards"); + + DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + YangInstanceIdentifier carsPath = CarsModel.BASE_PATH; + NormalizedNode carsNode = CarsModel.emptyContainer(); + writeTx.write(carsPath, carsNode); + + YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH; + NormalizedNode peopleNode = PeopleModel.emptyContainer(); + writeTx.write(peoplePath, peopleNode); + + followerTestKit.doCommit(writeTx.ready()); + + DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); + + verifyNode(readTx, carsPath, carsNode); + verifyNode(readTx, peoplePath, peopleNode); + } + + @Test + public void testReadWriteTransactionWithMultipleShards() throws Exception { + initDatastores("testReadWriteTransactionWithMultipleShards"); + + DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", rwTx); + + YangInstanceIdentifier carsPath = CarsModel.BASE_PATH; + NormalizedNode carsNode = CarsModel.emptyContainer(); + rwTx.write(carsPath, carsNode); + + YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH; + NormalizedNode peopleNode = PeopleModel.emptyContainer(); + rwTx.write(peoplePath, peopleNode); + + followerTestKit.doCommit(rwTx.ready()); + + DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); + + verifyNode(readTx, carsPath, carsNode); + verifyNode(readTx, peoplePath, peopleNode); + } + + @Test + public void testTransactionChain() throws Exception { + initDatastores("testTransactionChain"); + + DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); + + // Add the top-level cars container with write-only. + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + + writeTx.ready(); + + // Verify the top-level cars container with read-only. + + verifyNode(txChain.newReadOnlyTransaction(), CarsModel.BASE_PATH, CarsModel.emptyContainer()); + + // Perform car operations with read-write. + + DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + + verifyNode(rwTx, CarsModel.BASE_PATH, CarsModel.emptyContainer()); + + rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + + MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); + rwTx.write(car1Path, car1); + + verifyExists(rwTx, car1Path); + + verifyCars(rwTx, car1); + + MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000)); + rwTx.merge(CarsModel.newCarPath("sportage"), car2); + + rwTx.delete(car1Path); + + followerTestKit.doCommit(rwTx.ready()); + + txChain.close(); + + verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2); + } + + @Test + public void testReadyLocalTransactionForwardedToLeader() throws Exception { + initDatastores("testReadyLocalTransactionForwardedToLeader"); + + Optional carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars"); + assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent()); + + TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(); + dataTree.setSchemaContext(SchemaContextHelper.full()); + DataTreeModification modification = dataTree.takeSnapshot().newModification(); + + new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification); + new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification); + + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + new WriteModification(CarsModel.newCarPath("optima"), car).apply(modification); + + String transactionID = "tx-1"; + ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(transactionID , modification, true); + + carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); + followerTestKit.expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS); + + verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java new file mode 100644 index 0000000000..109e77c523 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java @@ -0,0 +1,167 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; +import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +class IntegrationTestKit extends ShardTestKit { + + DatastoreContext.Builder datastoreContextBuilder; + + IntegrationTestKit(ActorSystem actorSystem, Builder datastoreContextBuilder) { + super(actorSystem); + this.datastoreContextBuilder = datastoreContextBuilder; + } + + DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) { + return setupDistributedDataStore(typeName, true, shardNames); + } + + DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader, + String... shardNames) { + return setupDistributedDataStore(typeName, "module-shards.conf", waitUntilLeader, shardNames); + } + + DistributedDataStore setupDistributedDataStore(String typeName, String moduleShardsConfig, boolean waitUntilLeader, + String... shardNames) { + ClusterWrapper cluster = new ClusterWrapperImpl(getSystem()); + Configuration config = new ConfigurationImpl(moduleShardsConfig, "modules.conf"); + ShardStrategyFactory.setConfiguration(config); + + datastoreContextBuilder.dataStoreType(typeName); + + DatastoreContext datastoreContext = datastoreContextBuilder.build(); + + DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, datastoreContext); + + SchemaContext schemaContext = SchemaContextHelper.full(); + dataStore.onGlobalContextUpdated(schemaContext); + + if(waitUntilLeader) { + waitUntilLeader(dataStore.getActorContext(), shardNames); + } + + return dataStore; + } + + void waitUntilLeader(ActorContext actorContext, String... shardNames) { + for(String shardName: shardNames) { + ActorRef shard = null; + for(int i = 0; i < 20 * 5 && shard == null; i++) { + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + Optional shardReply = actorContext.findLocalShard(shardName); + if(shardReply.isPresent()) { + shard = shardReply.get(); + } + } + + assertNotNull("Shard was not created", shard); + + waitUntilLeader(shard); + } + } + + void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath, + NormalizedNode nodeToWrite) throws Exception { + + // 1. Create a write-only Tx + + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); + + // 2. Write some data + + writeTx.write(nodePath, nodeToWrite); + + // 3. Ready the Tx for commit + + DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + + // 4. Commit the Tx + + doCommit(cohort); + + // 5. Verify the data in the store + + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + Optional> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", nodeToWrite, optional.get()); + } + + void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception { + Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS); + assertEquals("canCommit", true, canCommit); + cohort.preCommit().get(5, TimeUnit.SECONDS); + cohort.commit().get(5, TimeUnit.SECONDS); + } + + void cleanup(DistributedDataStore dataStore) { + if(dataStore != null) { + dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null); + } + } + + void assertExceptionOnCall(Callable callable, Class expType) + throws Exception { + try { + callable.call(); + fail("Expected " + expType.getSimpleName()); + } catch(Exception e) { + assertEquals("Exception type", expType, e.getClass()); + } + } + + void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain, + Class expType) throws Exception { + assertExceptionOnCall(new Callable() { + @Override + public Void call() throws Exception { + txChain.newWriteOnlyTransaction(); + return null; + } + }, expType); + + assertExceptionOnCall(new Callable() { + @Override + public Void call() throws Exception { + txChain.newReadWriteTransaction(); + return null; + } + }, expType); + + assertExceptionOnCall(new Callable() { + @Override + public Void call() throws Exception { + txChain.newReadOnlyTransaction(); + return null; + } + }, expType); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java index 76f299b548..11ca195311 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java @@ -16,7 +16,6 @@ import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; @@ -32,7 +31,6 @@ import org.junit.Test; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; @@ -48,7 +46,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { @Test public void testNewReadOnlyTransaction() throws Exception { - DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadOnlyTransaction(); + DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadOnlyTransaction(); Assert.assertTrue(dst instanceof DOMStoreReadTransaction); } @@ -56,7 +54,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { @SuppressWarnings("resource") @Test public void testNewReadWriteTransaction() throws Exception { - DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadWriteTransaction(); + DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadWriteTransaction(); Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction); } @@ -64,29 +62,29 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { @SuppressWarnings("resource") @Test public void testNewWriteOnlyTransaction() throws Exception { - DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newWriteOnlyTransaction(); + DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newWriteOnlyTransaction(); Assert.assertTrue(dst instanceof DOMStoreWriteTransaction); } @Test public void testClose() throws Exception { - new TransactionChainProxy(mockActorContext).close(); + new TransactionChainProxy(mockComponentFactory).close(); verify(mockActorContext, times(1)).broadcast(anyObject()); } @Test public void testTransactionChainsHaveUniqueId(){ - TransactionChainProxy one = new TransactionChainProxy(mock(ActorContext.class)); - TransactionChainProxy two = new TransactionChainProxy(mock(ActorContext.class)); + TransactionChainProxy one = new TransactionChainProxy(mockComponentFactory); + TransactionChainProxy two = new TransactionChainProxy(mockComponentFactory); Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId()); } @Test public void testRateLimitingUsedInReadWriteTxCreation(){ - TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory); txChainProxy.newReadWriteTransaction(); @@ -95,7 +93,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { @Test public void testRateLimitingUsedInWriteOnlyTxCreation(){ - TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory); txChainProxy.newWriteOnlyTransaction(); @@ -105,7 +103,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { @Test public void testRateLimitingNotUsedInReadOnlyTxCreation(){ - TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory); txChainProxy.newReadOnlyTransaction(); @@ -120,7 +118,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { public void testChainedWriteOnlyTransactions() throws Exception { dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); - TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory); ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem()); @@ -186,7 +184,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { */ @Test public void testChainedReadWriteTransactions() throws Exception { - TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory); ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); @@ -257,7 +255,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { expectBatchedModifications(actorRef, 1); - TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory); DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index d37a9db394..6cf63157e1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -78,7 +78,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { public void testRead() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); @@ -107,7 +107,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(Futures.successful(new Object())).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); } @@ -119,7 +119,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(Futures.failed(new TestException())).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); } @@ -138,7 +138,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync( any(ActorSelection.class), any()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); propagateReadFailedExceptionCause(invoker.invoke(transactionProxy)); } @@ -179,7 +179,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); transactionProxy.write(TestModel.TEST_PATH, expectedNode); @@ -199,7 +199,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test(expected=IllegalStateException.class) public void testReadPreConditionCheck() { - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); transactionProxy.read(TestModel.TEST_PATH); } @@ -216,7 +216,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync( eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY)); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); } @@ -225,7 +225,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { public void testExists() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedDataExists()); @@ -259,8 +259,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(Futures.successful(new Object())).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); } @@ -272,7 +271,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(Futures.failed(new TestException())).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH)); } @@ -288,7 +287,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedDataExists()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); @@ -306,7 +305,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test(expected=IllegalStateException.class) public void testExistsPreConditionCheck() { - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); transactionProxy.exists(TestModel.TEST_PATH); } @@ -319,7 +318,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModifications(actorRef, 1); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); @@ -342,7 +341,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); final CountDownLatch readComplete = new CountDownLatch(1); final AtomicReference caughtEx = new AtomicReference<>(); @@ -382,13 +381,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test(expected=IllegalStateException.class) public void testWritePreConditionCheck() { - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); } @Test(expected=IllegalStateException.class) public void testWriteAfterReadyPreConditionCheck() { - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); transactionProxy.ready(); @@ -404,7 +403,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModifications(actorRef, 1); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); @@ -418,7 +417,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModifications(actorRef, 1); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); transactionProxy.delete(TestModel.TEST_PATH); @@ -436,7 +435,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModifications(actorRef, 1); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); transactionProxy.read(TestModel.TEST_PATH); @@ -464,7 +463,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModificationsReady(actorRef, true); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); transactionProxy.read(TestModel.TEST_PATH); @@ -494,7 +493,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModificationsReady(actorRef, true); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); transactionProxy.read(TestModel.TEST_PATH); @@ -519,7 +518,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModificationsReady(actorRef1); expectBatchedModificationsReady(actorRef2); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -542,7 +541,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModificationsReady(actorRef, true); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); @@ -571,7 +570,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModificationsReady(actorRef, true); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); @@ -603,7 +602,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectFailedBatchedModifications(actorRef); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); @@ -617,7 +616,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception { doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -661,7 +660,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModificationsReady(actorRef2); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -677,8 +676,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testGetIdentifier() { setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - TransactionType.READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); Object id = transactionProxy.getIdentifier(); assertNotNull("getIdentifier returned null", id); @@ -692,7 +690,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); transactionProxy.read(TestModel.TEST_PATH); @@ -720,7 +718,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); doReturn(true).when(mockActorContext).isPathLocal(anyString()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); // negative test case with null as the reply doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( @@ -759,7 +757,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModificationsReady(actorRef, true); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); @@ -811,7 +809,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(true).when(mockActorContext).isPathLocal(actorPath); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); long start = System.nanoTime(); @@ -860,7 +858,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(true).when(mockActorContext).isPathLocal(anyString()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); long start = System.nanoTime(); @@ -1198,7 +1196,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH; YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH; - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type); transactionProxy.write(writePath1, writeNode1); transactionProxy.write(writePath2, writeNode2); @@ -1276,7 +1274,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedDataExists()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); transactionProxy.write(writePath1, writeNode1); transactionProxy.write(writePath2, writeNode2); @@ -1352,7 +1350,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher(); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); Optional> readOptional = transactionProxy.read( YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java index 5681b76092..a6656b2681 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java @@ -156,7 +156,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), eq(actorRef.path().toString())); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); Optional> readOptional = transactionProxy.read(TestModel.TEST_PATH). get(5, TimeUnit.SECONDS); @@ -228,7 +228,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), eq(actorRef.path().toString())); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); transactionProxy.write(TestModel.TEST_PATH, testNode); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/CarsModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/CarsModel.java index 93b552a6da..468e2da310 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/CarsModel.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/CarsModel.java @@ -23,13 +23,13 @@ public class CarsModel { public static final QName BASE_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars"); - public static final YangInstanceIdentifier BASE_PATH = YangInstanceIdentifier.of(BASE_QNAME); - public static final QName CARS_QNAME = QName.create(BASE_QNAME, "cars"); public static final QName CAR_QNAME = QName.create(CARS_QNAME, "car"); public static final QName CAR_NAME_QNAME = QName.create(CAR_QNAME, "name"); public static final QName CAR_PRICE_QNAME = QName.create(CAR_QNAME, "price"); + public static final YangInstanceIdentifier BASE_PATH = YangInstanceIdentifier.of(BASE_QNAME); + public static final YangInstanceIdentifier CAR_LIST_PATH = BASE_PATH.node(CAR_QNAME); public static NormalizedNode create(){ @@ -69,4 +69,17 @@ public class CarsModel { .build(); } + public static NormalizedNode newCarMapNode() { + return ImmutableNodes.mapNodeBuilder(CAR_QNAME).build(); + } + + public static MapEntryNode newCarEntry(String name, BigInteger price) { + return ImmutableNodes.mapEntryBuilder(CAR_QNAME, CAR_NAME_QNAME, name) + .withChild(ImmutableNodes.leafNode(CAR_NAME_QNAME, name)) + .withChild(ImmutableNodes.leafNode(CAR_PRICE_QNAME, price)).build(); + } + + public static YangInstanceIdentifier newCarPath(String name) { + return YangInstanceIdentifier.builder(CAR_LIST_PATH).nodeWithKey(CAR_QNAME, CAR_NAME_QNAME, name).build(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/PeopleModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/PeopleModel.java index a7cb14f0b3..fbe3df9faa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/PeopleModel.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/PeopleModel.java @@ -22,13 +22,13 @@ public class PeopleModel { public static final QName BASE_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:people", "2014-03-13", "people"); - public static final YangInstanceIdentifier BASE_PATH = YangInstanceIdentifier.of(BASE_QNAME); public static final QName PEOPLE_QNAME = QName.create(BASE_QNAME, "people"); public static final QName PERSON_QNAME = QName.create(PEOPLE_QNAME, "person"); public static final QName PERSON_NAME_QNAME = QName.create(PERSON_QNAME, "name"); public static final QName PERSON_AGE_QNAME = QName.create(PERSON_QNAME, "age"); - + public static final YangInstanceIdentifier BASE_PATH = YangInstanceIdentifier.of(BASE_QNAME); + public static final YangInstanceIdentifier PERSON_LIST_PATH = BASE_PATH.node(PERSON_QNAME); public static NormalizedNode create(){ @@ -69,4 +69,16 @@ public class PeopleModel { .build(); } + public static NormalizedNode newPersonMapNode() { + return ImmutableNodes.mapNodeBuilder(PERSON_QNAME).build(); + } + + public static MapEntryNode newPersonEntry(String name) { + return ImmutableNodes.mapEntryBuilder(PERSON_QNAME, PERSON_NAME_QNAME, name) + .withChild(ImmutableNodes.leafNode(PERSON_NAME_QNAME, name)).build(); + } + + public static YangInstanceIdentifier newPersonPath(String name) { + return YangInstanceIdentifier.builder(PERSON_LIST_PATH).nodeWithKey(PERSON_QNAME, PERSON_NAME_QNAME, name).build(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index 03634627d6..8f5550fe00 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -63,10 +63,12 @@ Member1 { serializers { java = "akka.serialization.JavaSerializer" proto = "akka.remote.serialization.ProtobufSerializer" + readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer" } serialization-bindings { "com.google.protobuf.Message" = proto + "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal } } remote { @@ -113,10 +115,12 @@ Member2 { serializers { java = "akka.serialization.JavaSerializer" proto = "akka.remote.serialization.ProtobufSerializer" + readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer" } serialization-bindings { "com.google.protobuf.Message" = proto + "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal } } remote { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1-and-2.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1-and-2.conf new file mode 100644 index 0000000000..61c3538dd4 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1-and-2.conf @@ -0,0 +1,26 @@ +module-shards = [ + { + name = "people" + shards = [ + { + name="people" + replicas = [ + "member-1", + "member-2" + ] + } + ] + }, + { + name = "cars" + shards = [ + { + name="cars" + replicas = [ + "member-1", + "member-2" + ] + } + ] + } +] diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member2.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member2.conf new file mode 100644 index 0000000000..fd996bddbe --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member2.conf @@ -0,0 +1,24 @@ +module-shards = [ + { + name = "people" + shards = [ + { + name="people" + replicas = [ + "member-2" + ] + } + ] + }, + { + name = "cars" + shards = [ + { + name="cars" + replicas = [ + "member-2" + ] + } + ] + } +] \ No newline at end of file diff --git a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractSnapshotBackedTransactionChain.java b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractSnapshotBackedTransactionChain.java index b7776b2a39..7683937ce2 100644 --- a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractSnapshotBackedTransactionChain.java +++ b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractSnapshotBackedTransactionChain.java @@ -120,19 +120,26 @@ public abstract class AbstractSnapshotBackedTransactionChain extends Transact @Override public final DOMStoreReadTransaction newReadOnlyTransaction() { + return newReadOnlyTransaction(nextTransactionIdentifier()); + } + + protected DOMStoreReadTransaction newReadOnlyTransaction(T transactionId) { final Entry entry = getSnapshot(); - return SnapshotBackedTransactions.newReadTransaction(nextTransactionIdentifier(), getDebugTransactions(), entry.getValue()); + return SnapshotBackedTransactions.newReadTransaction(transactionId, getDebugTransactions(), entry.getValue()); } @Override public final DOMStoreReadWriteTransaction newReadWriteTransaction() { + return newReadWriteTransaction(nextTransactionIdentifier()); + } + + protected DOMStoreReadWriteTransaction newReadWriteTransaction(T transactionId) { Entry entry; DOMStoreReadWriteTransaction ret; do { entry = getSnapshot(); - ret = new SnapshotBackedReadWriteTransaction(nextTransactionIdentifier(), - getDebugTransactions(), entry.getValue(), this); + ret = new SnapshotBackedReadWriteTransaction(transactionId, getDebugTransactions(), entry.getValue(), this); } while (!recordTransaction(entry.getKey(), ret)); return ret; @@ -140,13 +147,16 @@ public abstract class AbstractSnapshotBackedTransactionChain extends Transact @Override public final DOMStoreWriteTransaction newWriteOnlyTransaction() { + return newWriteOnlyTransaction(nextTransactionIdentifier()); + } + + protected DOMStoreWriteTransaction newWriteOnlyTransaction(T transactionId) { Entry entry; DOMStoreWriteTransaction ret; do { entry = getSnapshot(); - ret = new SnapshotBackedWriteTransaction(nextTransactionIdentifier(), - getDebugTransactions(), entry.getValue(), this); + ret = new SnapshotBackedWriteTransaction(transactionId, getDebugTransactions(), entry.getValue(), this); } while (!recordTransaction(entry.getKey(), ret)); return ret;