import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
* transaction factories.
*/
-abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory>
- implements ShardInfoListener, AutoCloseable {
+abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
protected static final AtomicLong TX_COUNTER = new AtomicLong();
private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
if (maybeDataTree.isPresent()) {
- knownLocal.put(shardName, factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get()));
- LOG.debug("Shard {} resolved to local data tree", shardName);
- }
- }
-
- @Override
- public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) {
- final F existing = knownLocal.get(shardName);
- if (existing != null) {
- if (primaryShardInfo != null) {
- final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
- if (maybeDataTree.isPresent()) {
- final DataTree newDataTree = maybeDataTree.get();
- final DataTree oldDataTree = dataTreeForFactory(existing);
- if (!oldDataTree.equals(newDataTree)) {
- final F newChain = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), newDataTree);
- knownLocal.replace(shardName, existing, newChain);
- LOG.debug("Replaced shard {} local data tree to {}", shardName, newDataTree);
- }
+ if(!knownLocal.containsKey(shardName)) {
+ LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
- return;
- }
- }
- if (knownLocal.remove(shardName, existing)) {
- LOG.debug("Shard {} invalidated data tree {}", shardName, existing);
- } else {
- LOG.debug("Shard {} failed to invalidate data tree {} ... strange", shardName, existing);
+ F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
+ knownLocal.putIfAbsent(shardName, factory);
}
+ } else if(knownLocal.containsKey(shardName)) {
+ LOG.debug("Shard {} invalidating local data tree", shardName);
+
+ knownLocal.remove(shardName);
}
}
*/
protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, DataTree dataTree);
- /**
- * Extract the backing data tree from a particular factory.
- *
- * @param factory Transaction factory
- * @return Backing data tree
- */
- protected abstract DataTree dataTreeForFactory(F factory);
-
/**
* Callback invoked from child transactions to push any futures, which need to
* be waited for before the next transaction is allocated.
protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) {
- return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier()), parent.getCompleter());
+ switch(parent.getType()) {
+ case READ_ONLY:
+ final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
+ return new LocalTransactionContext(parent.getIdentifier(), readOnly, parent.getCompleter()) {
+ @Override
+ protected DOMStoreWriteTransaction getWriteDelegate() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected DOMStoreReadTransaction getReadDelegate() {
+ return readOnly;
+ }
+ };
+ case READ_WRITE:
+ final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
+ return new LocalTransactionContext(parent.getIdentifier(), readWrite, parent.getCompleter()) {
+ @Override
+ protected DOMStoreWriteTransaction getWriteDelegate() {
+ return readWrite;
+ }
+
+ @Override
+ protected DOMStoreReadTransaction getReadDelegate() {
+ return readWrite;
+ }
+ };
+ case WRITE_ONLY:
+ final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
+ return new LocalTransactionContext(parent.getIdentifier(), writeOnly, parent.getCompleter()) {
+ @Override
+ protected DOMStoreWriteTransaction getWriteDelegate() {
+ return writeOnly;
+ }
+
+ @Override
+ protected DOMStoreReadTransaction getReadDelegate() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ default:
+ throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
+ }
}
}
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
String shardDispatcher =
new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
+ PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, cluster, configuration,
- datastoreContext, shardDispatcher, shardManagerId ), cluster, configuration, datastoreContext);
+ datastoreContext, shardDispatcher, shardManagerId, primaryShardInfoCache), cluster,
+ configuration, datastoreContext, primaryShardInfoCache);
this.waitTillReadyTimeInMillis =
actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
datastoreInfoMXBean.registerMBean();
}
- public DistributedDataStore(ActorContext actorContext) {
+ @VisibleForTesting
+ DistributedDataStore(ActorContext actorContext) {
this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
this.txContextFactory = TransactionContextFactory.create(actorContext);
this.type = UNKNOWN_TYPE;
}
private ActorRef createShardManager(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration,
- DatastoreContext datastoreContext, String shardDispatcher, String shardManagerId){
+ DatastoreContext datastoreContext, String shardDispatcher, String shardManagerId,
+ PrimaryShardInfoFutureCache primaryShardInfoCache){
Exception lastException = null;
for(int i=0;i<100;i++) {
try {
return actorSystem.actorOf(
- ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch)
- .withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId);
+ ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch,
+ primaryShardInfoCache).withDispatcher(shardDispatcher).withMailbox(
+ ActorContext.MAILBOX), shardManagerId);
} catch (Exception e){
lastException = e;
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.sal.core.spi.data.AbstractSnapshotBackedTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
};
}
+ @Override
+ public DOMStoreReadTransaction newReadOnlyTransaction(TransactionIdentifier identifier) {
+ return super.newReadOnlyTransaction(identifier);
+ }
+
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier) {
return super.newReadWriteTransaction(identifier);
}
+
+ @Override
+ public DOMStoreWriteTransaction newWriteOnlyTransaction(TransactionIdentifier identifier) {
+ return super.newWriteOnlyTransaction(identifier);
+ }
}
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import scala.concurrent.Future;
*
* @author Thomas Pantelis
*/
-final class LocalTransactionContext extends AbstractTransactionContext {
- private final DOMStoreReadWriteTransaction delegate;
+abstract class LocalTransactionContext extends AbstractTransactionContext {
+
+ private final DOMStoreTransaction txDelegate;
private final OperationCompleter completer;
- LocalTransactionContext(TransactionIdentifier identifier, DOMStoreReadWriteTransaction delegate, OperationCompleter completer) {
+ LocalTransactionContext(TransactionIdentifier identifier, DOMStoreTransaction txDelegate, OperationCompleter completer) {
super(identifier);
- this.delegate = Preconditions.checkNotNull(delegate);
+ this.txDelegate = Preconditions.checkNotNull(txDelegate);
this.completer = Preconditions.checkNotNull(completer);
}
+ protected abstract DOMStoreWriteTransaction getWriteDelegate();
+
+ protected abstract DOMStoreReadTransaction getReadDelegate();
+
@Override
- public void writeData(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- delegate.write(path, data);
+ public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ getWriteDelegate().write(path, data);
completer.onComplete(null, null);
}
@Override
- public void mergeData(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- delegate.merge(path, data);
+ public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ getWriteDelegate().merge(path, data);
completer.onComplete(null, null);
}
@Override
- public void deleteData(final YangInstanceIdentifier path) {
- delegate.delete(path);
+ public void deleteData(YangInstanceIdentifier path) {
+ getWriteDelegate().delete(path);
completer.onComplete(null, null);
}
@Override
- public void readData(final YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
-
- Futures.addCallback(delegate.read(path), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+ public void readData(YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
+ Futures.addCallback(getReadDelegate().read(path), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
@Override
public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
proxyFuture.set(result);
}
@Override
- public void dataExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
- Futures.addCallback(delegate.exists(path), new FutureCallback<Boolean>() {
+ public void dataExists(YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
+ Futures.addCallback(getReadDelegate().exists(path), new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
proxyFuture.set(result);
}
private LocalThreePhaseCommitCohort ready() {
- LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) delegate.ready();
+ LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
completer.onComplete(null, null);
return ready;
}
@Override
public void closeTransaction() {
- delegate.close();
+ txDelegate.close();
}
}
package org.opendaylight.controller.cluster.datastore;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
/**
* A factory for creating local transactions used by {@link AbstractTransactionContextFactory} to instantiate
* @author Thomas Pantelis
*/
interface LocalTransactionFactory {
+ DOMStoreReadTransaction newReadOnlyTransaction(TransactionIdentifier identifier);
+
DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier);
+
+ DOMStoreWriteTransaction newWriteOnlyTransaction(TransactionIdentifier identifier);
}
\ No newline at end of file
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions;
import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
return dataTree;
}
+ @Override
+ public DOMStoreReadTransaction newReadOnlyTransaction(TransactionIdentifier identifier) {
+ return SnapshotBackedTransactions.newReadTransaction(identifier, false, dataTree.takeSnapshot());
+ }
+
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier) {
return SnapshotBackedTransactions.newReadWriteTransaction(identifier, false, dataTree.takeSnapshot(), this);
}
+ @Override
+ public DOMStoreWriteTransaction newWriteOnlyTransaction(TransactionIdentifier identifier) {
+ return SnapshotBackedTransactions.newWriteTransaction(identifier, false, dataTree.takeSnapshot(), this);
+ }
+
@Override
protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> tx) {
// No-op
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
private final CountDownLatch waitTillReadyCountdownLatch;
+ private final PrimaryShardInfoFutureCache primaryShardInfoCache;
+
/**
*/
protected ShardManager(ClusterWrapper cluster, Configuration configuration,
- DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
+ DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
+ PrimaryShardInfoFutureCache primaryShardInfoCache) {
this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+ this.primaryShardInfoCache = primaryShardInfoCache;
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
final ClusterWrapper cluster,
final Configuration configuration,
final DatastoreContext datastoreContext,
- final CountDownLatch waitTillReadyCountdownLatch) {
+ final CountDownLatch waitTillReadyCountdownLatch,
+ final PrimaryShardInfoFutureCache primaryShardInfoCache) {
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
+ Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
- return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch));
+ return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
+ waitTillReadyCountdownLatch, primaryShardInfoCache));
}
@Override
ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
if(shardInformation != null) {
shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
- shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
+ if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
+ primaryShardInfoCache.remove(shardInformation.getShardName());
+ }
+
checkReady();
} else {
LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
return false;
}
- void setLeaderId(String leaderId) {
+ boolean setLeaderId(String leaderId) {
+ boolean changed = !Objects.equal(this.leaderId, leaderId);
this.leaderId = leaderId;
notifyOnShardInitializedCallbacks();
+
+ return changed;
}
}
final Configuration configuration;
final DatastoreContext datastoreContext;
private final CountDownLatch waitTillReadyCountdownLatch;
+ private final PrimaryShardInfoFutureCache primaryShardInfoCache;
- ShardManagerCreator(ClusterWrapper cluster,
- Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
+ ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
+ CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
this.cluster = cluster;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+ this.primaryShardInfoCache = primaryShardInfoCache;
}
@Override
public ShardManager create() throws Exception {
- return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+ return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
+ primaryShardInfoCache);
}
}
// Send a close transaction chain request to each and every shard
getActorContext().broadcast(new CloseTransactionChain(transactionChainId).toSerializable());
- parent.removeTransactionChain(this);
}
private TransactionProxy allocateWriteTransaction(final TransactionType type) {
return ret;
}
- @Override
- protected DataTree dataTreeForFactory(final LocalTransactionChain factory) {
- return factory.getDataTree();
- }
-
/**
* This method is overridden to ensure the previous Tx's ready operations complete
* before we initiate the next Tx in the chain to avoid creation failures if the
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSelection;
-import java.util.ArrayList;
import java.util.Collection;
-import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListenerRegistration;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import scala.concurrent.Future;
*/
final class TransactionContextFactory extends AbstractTransactionContextFactory<LocalTransactionFactoryImpl> {
- @GuardedBy("childChains")
- private final Collection<TransactionChainProxy> childChains = new ArrayList<>();
-
- private final ShardInfoListenerRegistration<TransactionContextFactory> reg;
-
private TransactionContextFactory(final ActorContext actorContext) {
super(actorContext);
- this.reg = actorContext.registerShardInfoListener(this);
}
static TransactionContextFactory create(final ActorContext actorContext) {
@Override
public void close() {
- reg.close();
}
@Override
}
DOMStoreTransactionChain createTransactionChain() {
- final TransactionChainProxy ret = new TransactionChainProxy(this);
-
- synchronized (childChains) {
- childChains.add(ret);
- }
-
- return ret;
- }
-
- void removeTransactionChain(final TransactionChainProxy chain) {
- synchronized (childChains) {
- childChains.remove(chain);
- }
- }
-
- @Override
- public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) {
- synchronized (childChains) {
- for (TransactionChainProxy chain : childChains) {
- chain.onShardInfoUpdated(shardName, primaryShardInfo);
- }
- super.onShardInfoUpdated(shardName, primaryShardInfo);
- }
- }
-
- @Override
- protected DataTree dataTreeForFactory(final LocalTransactionFactoryImpl factory) {
- return factory.getDataTree();
+ return new TransactionChainProxy(this);
}
}
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.PoisonPill;
-import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.concurrent.TimeUnit;
-import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
* easily. An ActorContext can be freely passed around to local object instances
* but should not be passed to actors especially remote actors
*/
-public class ActorContext implements RemovalListener<String, Future<PrimaryShardInfo>> {
+public class ActorContext {
private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
private static final String METRIC_RATE = "rate";
private Timeout transactionCommitOperationTimeout;
private Timeout shardInitializationTimeout;
private final Dispatchers dispatchers;
- private Cache<String, Future<PrimaryShardInfo>> primaryShardInfoCache;
private volatile SchemaContext schemaContext;
private volatile boolean updated;
private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
- @GuardedBy("shardInfoListeners")
- private final Collection<ShardInfoListenerRegistration<?>> shardInfoListeners = new ArrayList<>();
+
+ private final PrimaryShardInfoFutureCache primaryShardInfoCache;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration) {
this(actorSystem, shardManager, clusterWrapper, configuration,
- DatastoreContext.newBuilder().build());
+ DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
}
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper, Configuration configuration,
- DatastoreContext datastoreContext) {
+ DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
this.actorSystem = actorSystem;
this.shardManager = shardManager;
this.clusterWrapper = clusterWrapper;
this.configuration = configuration;
this.datastoreContext = datastoreContext;
this.dispatchers = new Dispatchers(actorSystem.dispatchers());
+ this.primaryShardInfoCache = primaryShardInfoCache;
setCachedProperties();
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
-
- primaryShardInfoCache = CacheBuilder.newBuilder()
- .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
- .removalListener(this)
- .build();
}
public DatastoreContext getDatastoreContext() {
DataTree localShardDataTree) {
ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree));
- primaryShardInfoCache.put(shardName, Futures.successful(info));
-
- synchronized (shardInfoListeners) {
- for (ShardInfoListenerRegistration<?> reg : shardInfoListeners) {
- reg.getInstance().onShardInfoUpdated(shardName, info);
- }
- }
+ primaryShardInfoCache.putSuccessful(shardName, info);
return info;
}
return ask(actorRef, message, timeout);
}
- @VisibleForTesting
- Cache<String, Future<PrimaryShardInfo>> getPrimaryShardInfoCache() {
+ public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
return primaryShardInfoCache;
}
-
- public <T extends ShardInfoListener> ShardInfoListenerRegistration<T> registerShardInfoListener(final T listener) {
- final ShardInfoListenerRegistration<T> reg = new ShardInfoListenerRegistration<T>(listener, this);
-
- synchronized (shardInfoListeners) {
- shardInfoListeners.add(reg);
- }
- return reg;
- }
-
- protected void removeShardInfoListener(final ShardInfoListenerRegistration<?> registration) {
- synchronized (shardInfoListeners) {
- shardInfoListeners.remove(registration);
- }
- }
-
- @Override
- public void onRemoval(final RemovalNotification<String, Future<PrimaryShardInfo>> notification) {
- synchronized (shardInfoListeners) {
- for (ShardInfoListenerRegistration<?> reg : shardInfoListeners) {
- reg.getInstance().onShardInfoUpdated(notification.getKey(), null);
- }
- }
- }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.dispatch.Futures;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import scala.concurrent.Future;
+
+/**
+ * Maintains a cache of PrimaryShardInfo Future instances per shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class PrimaryShardInfoFutureCache {
+ private final Cache<String, Future<PrimaryShardInfo>> primaryShardInfoCache = CacheBuilder.newBuilder().build();
+
+ public @Nullable Future<PrimaryShardInfo> getIfPresent(@Nonnull String shardName) {
+ return primaryShardInfoCache.getIfPresent(shardName);
+ }
+
+ public void putSuccessful(@Nonnull String shardName, @Nonnull PrimaryShardInfo info) {
+ primaryShardInfoCache.put(shardName, Futures.successful(info));
+ }
+
+ public void remove(@Nonnull String shardName) {
+ primaryShardInfoCache.invalidate(shardName);
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.utils;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-
-/**
- * Listener interface used to register for primary shard information changes.
- * Implementations of this interface can be registered with {@link ActorContext}
- * to receive notifications about shard information changes.
- */
-public interface ShardInfoListener {
- /**
- * Update {@link PrimaryShardInfo} for a particular shard.
- * @param shardName Shard name
- * @param primaryShardInfo New {@link PrimaryShardInfo}, null if the information
- * became unavailable.
- */
- void onShardInfoUpdated(@Nonnull String shardName, @Nullable PrimaryShardInfo primaryShardInfo);
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.utils;
-
-import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
-
-/**
- * Registration of a {@link ShardInfoListener} instance.
- *
- * @param <T> Type of listener
- */
-public class ShardInfoListenerRegistration<T extends ShardInfoListener> extends AbstractObjectRegistration<T> {
- private final ActorContext parent;
-
- protected ShardInfoListenerRegistration(final T instance, final ActorContext parent) {
- super(instance);
- this.parent = Preconditions.checkNotNull(parent);
- }
-
- @Override
- protected void removeRegistration() {
- parent.removeShardInfoListener(this);
- }
-}
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener;
-import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListenerRegistration;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
- doReturn(mock(ShardInfoListenerRegistration.class)).when(mockActorContext).registerShardInfoListener(
- any(ShardInfoListener.class));
mockComponentFactory = TransactionContextFactory.create(mockActorContext);
import akka.cluster.Cluster;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
}};
}
+ @Test
+ public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
+ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
+ DistributedDataStore dataStore = setupDistributedDataStore(
+ "testSingleTransactionsWritesInQuickSuccession", "cars-1");
+
+ DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+
+ DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ doCommit(writeTx.ready());
+
+ writeTx = txChain.newWriteOnlyTransaction();
+
+ int nCars = 5;
+ for(int i = 0; i < nCars; i++) {
+ writeTx.write(CarsModel.newCarPath("car" + i),
+ CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+ }
+
+ doCommit(writeTx.ready());
+
+ Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
+ CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
+
+ cleanup(dataStore);
+ }};
+ }
+
private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
final boolean writeOnly) throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
@Test
public void testTransactionAbort() throws Exception{
- System.setProperty("shard.persistent", "true");
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
DistributedDataStore dataStore =
setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
DistributedDataStore dataStore = setupDistributedDataStore(
- "testCreateChainedTransactionsInQuickSuccession", "test-1");
+ "testCreateChainedTransactionsInQuickSuccession", "cars-1");
- DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+ ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
+ ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
+ LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
- NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
+ DOMTransactionChain txChain = broker.createTransactionChain(listener);
+
+ List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
- int nTxs = 20;
- List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(nTxs);
- for(int i = 0; i < nTxs; i++) {
- DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
+ DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ futures.add(writeTx.submit());
- rwTx.merge(TestModel.TEST_PATH, testNode);
+ int nCars = 100;
+ for(int i = 0; i < nCars; i++) {
+ DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
- cohorts.add(rwTx.ready());
+ rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
+ CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+ futures.add(rwTx.submit());
}
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- doCommit(cohort);
+ for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
+ f.checkedGet();
}
+ Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
+ LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
+
txChain.close();
+ broker.close();
+
cleanup(dataStore);
}};
}
private static final String[] SHARD_NAMES = {"cars", "people"};
+ private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+ private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559");
+
+ private static final String MODULE_SHARDS_CONFIG = "module-shards-member1-and-2.conf";
+
private ActorSystem leaderSystem;
private ActorSystem followerSystem;
DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
private final DatastoreContext.Builder followerDatastoreContextBuilder =
- DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(200);
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
private DistributedDataStore followerDistributedDataStore;
private DistributedDataStore leaderDistributedDataStore;
@Before
public void setUpClass() {
leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
- Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
- Cluster.get(leaderSystem).join(member1Address);
+ Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
- Cluster.get(followerSystem).join(member1Address);
+ Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
}
@After
private void initDatastores(String type) {
leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
- String moduleShardsConfig = "module-shards-member1-and-2.conf";
-
followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
- followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES);
+ followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
- leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, true, SHARD_NAMES);
+ leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES);
}
assertEquals("isPresent", false, optional.isPresent());
}
+ @Test
+ public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
+ String testName = "testSingleShardTransactionsWithLeaderChanges";
+ initDatastores(testName);
+
+ String followerCarShardName = "member-2-shard-cars-" + testName;
+ InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class );
+
+ // Write top-level car container from the follower so it uses a remote Tx.
+
+ DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+
+ followerTestKit.doCommit(writeTx.ready());
+
+ InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
+
+ // Switch the leader to the follower
+
+ followerDatastoreContextBuilder.shardElectionTimeoutFactor(1);
+ followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+
+ JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+
+ followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
+
+ leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+ Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
+
+ DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
+ IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
+ DistributedDataStore newMember1Datastore = newMember1TestKit.
+ setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
+
+ followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
+
+ // Write a car entry to the new leader - should switch to local Tx
+
+ writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+
+ MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
+ writeTx.merge(car1Path, car1);
+
+ followerTestKit.doCommit(writeTx.ready());
+
+ verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
+ }
+
@Test
public void testReadyLocalTransactionForwardedToLeader() throws Exception {
initDatastores("testReadyLocalTransactionForwardedToLeader");
void waitUntilLeader(ActorContext actorContext, String... shardNames) {
for(String shardName: shardNames) {
- ActorRef shard = null;
- for(int i = 0; i < 20 * 5 && shard == null; i++) {
- Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
- Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName);
- if(shardReply.isPresent()) {
- shard = shardReply.get();
- }
- }
+ ActorRef shard = findLocalShard(actorContext, shardName);
assertNotNull("Shard was not created", shard);
}
}
+ void waitUntilNoLeader(ActorContext actorContext, String... shardNames) {
+ for(String shardName: shardNames) {
+ ActorRef shard = findLocalShard(actorContext, shardName);
+ assertNotNull("No local shard found", shard);
+
+ waitUntilNoLeader(shard);
+ }
+ }
+
+ private ActorRef findLocalShard(ActorContext actorContext, String shardName) {
+ ActorRef shard = null;
+ for(int i = 0; i < 20 * 5 && shard == null; i++) {
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName);
+ if(shardReply.isPresent()) {
+ shard = shardReply.get();
+ }
+ }
+ return shard;
+ }
+
void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
NormalizedNode<?, ?> nodeToWrite) throws Exception {
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@Before
public void setUp(){
MockitoAnnotations.initMocks(this);
- localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, new OperationCompleter(limiter));
+ localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, new OperationCompleter(limiter)) {
+ @Override
+ protected DOMStoreWriteTransaction getWriteDelegate() {
+ return readWriteTransaction;
+ }
+
+ @Override
+ protected DOMStoreReadTransaction getReadDelegate() {
+ return readWriteTransaction;
+ }
+ };
}
@Test
import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
}
+ private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
+
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
InMemoryJournal.clear();
}
- private Props newShardMgrProps() {
+ private Props newShardMgrProps(boolean persistent) {
return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
- datastoreContextBuilder.build(), ready);
+ datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache);
}
private Props newPropsShardMgrWithMockShardActor() {
@Override
public ShardManager create() throws Exception {
return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
- ready, name, shardActor);
+ ready, name, shardActor, primaryShardInfoCache);
}
};
throws Exception {
new JavaTestKit(getSystem()) {{
final TestActorRef<ShardManager> shardManager =
- TestActorRef.create(getSystem(), newShardMgrProps());
+ TestActorRef.create(getSystem(), newShardMgrProps(true));
assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
throws Exception {
new JavaTestKit(getSystem()) {{
final TestActorRef<ShardManager> shardManager =
- TestActorRef.create(getSystem(), newShardMgrProps());
+ TestActorRef.create(getSystem(), newShardMgrProps(true));
SchemaContext schemaContext = mock(SchemaContext.class);
Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
public void testRecoveryApplicable(){
new JavaTestKit(getSystem()) {
{
- final Props persistentProps = ShardManager.props(
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build(), ready);
+ final Props persistentProps = newShardMgrProps(true);
final TestActorRef<ShardManager> persistentShardManager =
TestActorRef.create(getSystem(), persistentProps);
assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
- final Props nonPersistentProps = ShardManager.props(
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(false).build(), ready);
+ final Props nonPersistentProps = newShardMgrProps(false);
final TestActorRef<ShardManager> nonPersistentShardManager =
TestActorRef.create(getSystem(), nonPersistentProps);
private static final long serialVersionUID = 1L;
@Override
public ShardManager create() throws Exception {
- return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) {
+ return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(),
+ ready, new PrimaryShardInfoFutureCache()) {
@Override
protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
DataPersistenceProviderMonitor dataPersistenceProviderMonitor
public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
new JavaTestKit(getSystem()) {
{
- TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
new JavaTestKit(getSystem()) {
{
- TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
new JavaTestKit(getSystem()) {
{
- TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
new JavaTestKit(getSystem()) {
{
- TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
"unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
@Test
public void testByDefaultSyncStatusIsFalse() throws Exception{
- final Props persistentProps = ShardManager.props(
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build(), ready);
+ final Props persistentProps = newShardMgrProps(true);
final TestActorRef<ShardManager> shardManager =
TestActorRef.create(getSystem(), persistentProps);
final Props persistentProps = ShardManager.props(
new MockClusterWrapper(),
new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build(), ready);
+ DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
final TestActorRef<ShardManager> shardManager =
TestActorRef.create(getSystem(), persistentProps);
@Test
public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
- final Props persistentProps = ShardManager.props(
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build(), ready);
+ final Props persistentProps = newShardMgrProps(true);
final TestActorRef<ShardManager> shardManager =
TestActorRef.create(getSystem(), persistentProps);
final Props persistentProps = ShardManager.props(
new MockClusterWrapper(),
new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build(), ready);
+ DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
final TestActorRef<ShardManager> shardManager =
TestActorRef.create(getSystem(), persistentProps);
return Arrays.asList("default", "astronauts");
}
},
- DatastoreContext.newBuilder().persistent(true).build(), ready);
+ DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache);
final TestActorRef<ShardManager> shardManager =
TestActorRef.create(getSystem(), persistentProps);
TestShardManager(String shardMrgIDSuffix) {
super(new MockClusterWrapper(), new MockConfiguration(),
- DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready);
+ DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready,
+ new PrimaryShardInfoFutureCache());
}
@Override
protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
- ActorRef shardActor) {
- super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+ ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
+ super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache);
this.shardActor = shardActor;
this.name = name;
}
Assert.fail("Leader not found for shard " + shard.path());
}
+ protected void waitUntilNoLeader(ActorRef shard) {
+ FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
+ for(int i = 0; i < 20 * 5; i++) {
+ Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
+ try {
+ FindLeaderReply resp = (FindLeaderReply)Await.result(future, duration);
+ if(resp.getLeaderActor() == null) {
+ return;
+ }
+ } catch(TimeoutException e) {
+ } catch(Exception e) {
+ System.err.println("FindLeader threw ex");
+ e.printStackTrace();
+ }
+
+
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ Assert.fail("Unexpected leader found for shard " + shard.path());
+ }
}
\ No newline at end of file
public void testClientDispatcherIsGlobalDispatcher(){
ActorContext actorContext =
new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), DatastoreContext.newBuilder().build());
+ mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
ActorContext actorContext =
new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), DatastoreContext.newBuilder().build());
+ mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
new JavaTestKit(getSystem()) {{
ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
mock(Configuration.class), DatastoreContext.newBuilder().
- operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
+ operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
assertEquals("getTransactionCommitOperationTimeout", 7,
final String expPrimaryPath = "akka://test-system/find-primary-shard";
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext) {
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath));
assertEquals(cachedInfo, actual);
- // Wait for 200 Milliseconds. The cached entry should have been removed.
-
- Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ actorContext.getPrimaryShardInfoCache().remove("foobar");
cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
final String expPrimaryPath = "akka://test-system/find-primary-shard";
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext) {
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
assertEquals(cachedInfo, actual);
- // Wait for 200 Milliseconds. The cached entry should have been removed.
-
- Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ actorContext.getPrimaryShardInfoCache().remove("foobar");
cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext) {
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new PrimaryNotFoundException("not found"));
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext) {
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new NotInitializedException("not iniislized"));
ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
mock(ClusterWrapper.class), mockConfig,
- DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
+ DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
actorContext.broadcast(new TestMessage());
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import akka.actor.ActorSelection;
+import com.google.common.base.Optional;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import scala.concurrent.Future;
+
+/**
+ * Unit tests for PrimaryShardInfoFutureCache.
+ *
+ * @author Thomas Pantelis
+ */
+public class PrimaryShardInfoFutureCacheTest {
+
+ @Test
+ public void testOperations() {
+ PrimaryShardInfoFutureCache cache = new PrimaryShardInfoFutureCache();
+
+ assertEquals("getIfPresent", null, cache.getIfPresent("foo"));
+
+ PrimaryShardInfo shardInfo = new PrimaryShardInfo(mock(ActorSelection.class), Optional.<DataTree>absent());
+ cache.putSuccessful("foo", shardInfo);
+
+ Future<PrimaryShardInfo> future = cache.getIfPresent("foo");
+ assertNotNull("Null future", future);
+ assertEquals("getIfPresent", shardInfo, future.value().get().get());
+
+ cache.remove("foo");
+
+ assertEquals("getIfPresent", null, cache.getIfPresent("foo"));
+ }
+}