package org.opendaylight.controller.messagebus.app.impl;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Futures;
-
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.regex.Pattern;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.regex.Pattern;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
public class EventSourceTopology implements EventAggregatorService, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
private final EventSourceService eventSourceService;
private final RpcProviderRegistry rpcRegistry;
- private final ExecutorService executorService;
public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
+
this.dataBroker = dataBroker;
- this.executorService = Executors.newCachedThreadPool();
this.rpcRegistry = rpcRegistry;
aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
+
}
private <T extends DataObject> void putData(final LogicalDatastoreType store,
- final InstanceIdentifier<T> path, final T data) {
+ final InstanceIdentifier<T> path,
+ final T data){
final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
tx.put(store, path, data, true);
tx.submit();
+
}
private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath, final Node node) {
}
private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
- executorService.execute(new NotifyAllNodeExecutor(dataBroker, nodeIdPatternRegex, eventSourceTopic));
+
+ final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
+
+ final CheckedFuture<Optional<Topology>, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH);
+
+ Futures.addCallback(future, new FutureCallback<Optional<Topology>>(){
+
+ @Override
+ public void onSuccess(Optional<Topology> data) {
+ if(data.isPresent()) {
+ final List<Node> nodes = data.get().getNode();
+ for (final Node node : nodes) {
+ if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
+ eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
+ }
+ }
+ }
+ tx.close();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Can not notify existing nodes {}", t);
+ tx.close();
+ }
+
+ });
+
}
@Override
// FIXME: Return registration object.
}
- private class NotifyAllNodeExecutor implements Runnable {
-
- private final EventSourceTopic topic;
- private final DataBroker dataBroker;
- private final Pattern nodeIdPatternRegex;
-
- public NotifyAllNodeExecutor(final DataBroker dataBroker, final Pattern nodeIdPatternRegex, final EventSourceTopic topic) {
- this.topic = topic;
- this.dataBroker = dataBroker;
- this.nodeIdPatternRegex = nodeIdPatternRegex;
- }
-
- @Override
- public void run() {
- //# Code reader note: Context of Node type is NetworkTopology
- final List<Node> nodes = snapshot();
- for (final Node node : nodes) {
- if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
- topic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
- }
- }
- }
-
- private List<Node> snapshot() {
- try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();) {
-
- final Optional<Topology> data = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH).checkedGet();
-
- if(data.isPresent()) {
- final List<Node> nodeList = data.get().getNode();
- if(nodeList != null) {
- return nodeList;
- }
- }
- return Collections.emptyList();
- } catch (final ReadFailedException e) {
- LOG.error("Unable to retrieve node list.", e);
- return Collections.emptyList();
- }
- }
- }
}
import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
private int currentRecoveryBatchCount;
+ private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
+
public RaftActor(String id, Map<String, String> peerAddresses) {
this(id, peerAddresses, Optional.<ConfigParams>absent());
}
}
protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
- RaftActorBehavior oldBehavior = currentBehavior;
+ reusableBehaviorStateHolder.init(currentBehavior);
currentBehavior = newBehavior;
- handleBehaviorChange(oldBehavior, currentBehavior);
+ handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
}
@Override public void handleCommand(Object message) {
} else if(message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
} else {
- RaftActorBehavior oldBehavior = currentBehavior;
+ reusableBehaviorStateHolder.init(currentBehavior);
+
currentBehavior = currentBehavior.handleMessage(getSender(), message);
- handleBehaviorChange(oldBehavior, currentBehavior);
+ handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
}
}
}
- private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
+ private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) {
+ RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
+
if (oldBehavior != currentBehavior){
onStateChanged();
}
- String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId();
- String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name();
+ String oldBehaviorLeaderId = oldBehavior == null ? null : oldBehaviorState.getLeaderId();
+ String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name();
// it can happen that the state has not changed but the leader has changed.
- onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+ Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
+ if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) {
+ if(roleChangeNotifier.isPresent()) {
+ roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf());
+ }
- if (getRoleChangeNotifier().isPresent() &&
+ onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+ }
+
+ if (roleChangeNotifier.isPresent() &&
(oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
- getRoleChangeNotifier().get().tell(
- new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()),
- getSelf());
+ roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
+ currentBehavior.state().name()), getSelf());
}
}
return currentBehavior;
}
+ private static class BehaviorStateHolder {
+ private RaftActorBehavior behavior;
+ private String leaderId;
+
+ void init(RaftActorBehavior behavior) {
+ this.behavior = behavior;
+ this.leaderId = behavior != null ? behavior.getLeaderId() : null;
+ }
+
+ RaftActorBehavior getBehavior() {
+ return behavior;
+ }
+
+ String getLeaderId() {
+ return leaderId;
+ }
+ }
}
import org.junit.Test;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@Test
public void testRaftRoleChangeNotifier() throws Exception {
new JavaTestKit(getSystem()) {{
- ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
+ TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
+ Props.create(MessageCollectorActor.class));
MessageCollectorActor.waitUntilReady(notifierActor);
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
String persistenceId = factory.generateActorId("notifier-");
- factory.createTestActor(MockRaftActor.props(persistenceId,
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
- List<RoleChanged> matches = null;
- for(int i = 0; i < 5000 / heartBeatInterval; i++) {
- matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
- assertNotNull(matches);
- if(matches.size() == 3) {
- break;
- }
- Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
- }
-
- assertEquals(3, matches.size());
+ List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
// check if the notifier got a role change from null to Follower
RoleChanged raftRoleChanged = matches.get(0);
assertEquals(persistenceId, raftRoleChanged.getMemberId());
assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
+
+ LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
+ notifierActor, LeaderStateChanged.class);
+
+ assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
+
+ notifierActor.underlyingActor().clear();
+
+ MockRaftActor raftActor = raftActorRef.underlyingActor();
+ final String newLeaderId = "new-leader";
+ Follower follower = new Follower(raftActor.getRaftActorContext()) {
+ @Override
+ public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+ leaderId = newLeaderId;
+ return this;
+ }
+ };
+
+ raftActor.changeCurrentBehavior(follower);
+
+ leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+ assertEquals(persistenceId, leaderStateChange.getMemberId());
+ assertEquals(null, leaderStateChange.getLeaderId());
+
+ raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
+ assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+
+ notifierActor.underlyingActor().clear();
+
+ raftActor.handleCommand("any");
+
+ leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+ assertEquals(persistenceId, leaderStateChange.getMemberId());
+ assertEquals(newLeaderId, leaderStateChange.getLeaderId());
}};
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * A message initiated internally from the RaftActor when some state of a leader has changed
+ *
+ * @author Thomas Pantelis
+ */
+public class LeaderStateChanged implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String memberId;
+ private final String leaderId;
+
+ public LeaderStateChanged(String memberId, String leaderId) {
+ this.memberId = memberId;
+ this.leaderId = leaderId;
+ }
+
+ public String getMemberId() {
+ return memberId;
+ }
+
+ public String getLeaderId() {
+ return leaderId;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("LeaderStateChanged [memberId=").append(memberId).append(", leaderId=").append(leaderId)
+ .append("]");
+ return builder.toString();
+ }
+}
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
/**
- * The RoleChangeNotifier is responsible for receiving Raft role change messages and notifying
+ * The RoleChangeNotifier is responsible for receiving Raft role and leader state change messages and notifying
* the listeners (within the same node), which are registered with it.
* <p/>
* The RoleChangeNotifier is instantiated by the Shard and injected into the RaftActor.
*/
public class RoleChangeNotifier extends AbstractUntypedActor implements AutoCloseable {
- private String memberId;
- private Map<ActorPath, ActorRef> registeredListeners = Maps.newHashMap();
+ private final String memberId;
+ private final Map<ActorPath, ActorRef> registeredListeners = Maps.newHashMap();
private RoleChangeNotification latestRoleChangeNotification = null;
+ private LeaderStateChanged latestLeaderStateChanged;
public RoleChangeNotifier(String memberId) {
this.memberId = memberId;
getSender().tell(new RegisterRoleChangeListenerReply(), getSelf());
+ if(latestLeaderStateChanged != null) {
+ getSender().tell(latestLeaderStateChanged, getSelf());
+ }
+
if (latestRoleChangeNotification != null) {
getSender().tell(latestRoleChangeNotification, getSelf());
}
for (ActorRef listener: registeredListeners.values()) {
listener.tell(latestRoleChangeNotification, getSelf());
}
+ } else if (message instanceof LeaderStateChanged) {
+ latestLeaderStateChanged = (LeaderStateChanged)message;
+
+ for (ActorRef listener: registeredListeners.values()) {
+ listener.tell(latestLeaderStateChanged, getSelf());
+ }
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
+import org.slf4j.Logger;
+
+/**
+ * A factory for creating DOM transactions, either normal or chained.
+ *
+ * @author Thomas Pantelis
+ */
+public class DOMTransactionFactory {
+
+ private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
+ private final InMemoryDOMDataStore store;
+ private final ShardStats shardMBean;
+ private final Logger log;
+ private final String name;
+
+ public DOMTransactionFactory(InMemoryDOMDataStore store, ShardStats shardMBean, Logger log, String name) {
+ this.store = store;
+ this.shardMBean = shardMBean;
+ this.log = log;
+ this.name = name;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends DOMStoreTransaction> T newTransaction(TransactionProxy.TransactionType type,
+ String transactionID, String transactionChainID) {
+
+ DOMStoreTransactionFactory factory = store;
+
+ if(!transactionChainID.isEmpty()) {
+ factory = transactionChains.get(transactionChainID);
+ if(factory == null) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Creating transaction with ID {} from chain {}", name, transactionID,
+ transactionChainID);
+ }
+
+ DOMStoreTransactionChain transactionChain = store.createTransactionChain();
+ transactionChains.put(transactionChainID, transactionChain);
+ factory = transactionChain;
+ }
+ } else {
+ log.debug("{}: Creating transaction with ID {}", name, transactionID);
+ }
+
+ T transaction = null;
+ switch(type) {
+ case READ_ONLY:
+ transaction = (T) factory.newReadOnlyTransaction();
+ shardMBean.incrementReadOnlyTransactionCount();
+ break;
+ case READ_WRITE:
+ transaction = (T) factory.newReadWriteTransaction();
+ shardMBean.incrementReadWriteTransactionCount();
+ break;
+ case WRITE_ONLY:
+ transaction = (T) factory.newWriteOnlyTransaction();
+ shardMBean.incrementWriteOnlyTransactionCount();
+ break;
+ }
+
+ return transaction;
+ }
+
+ public void closeTransactionChain(String transactionChainID) {
+ DOMStoreTransactionChain chain =
+ transactionChains.remove(transactionChainID);
+
+ if(chain != null) {
+ chain.close();
+ }
+ }
+
+ public void closeAllTransactionChains() {
+ for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
+ entry.getValue().close();
+ }
+
+ transactionChains.clear();
+ }
+}
private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
+ private boolean writeOnlyTransactionOptimizationsEnabled = false;
private DatastoreContext() {
setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
this.dataStoreType = other.dataStoreType;
this.shardBatchedModificationCount = other.shardBatchedModificationCount;
+ this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
return shardBatchedModificationCount;
}
+ public boolean isWriteOnlyTransactionOptimizationsEnabled() {
+ return writeOnlyTransactionOptimizationsEnabled;
+ }
+
public static class Builder {
private final DatastoreContext datastoreContext;
private int maxShardDataChangeExecutorPoolSize =
return this;
}
+ public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) {
+ datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
+ return this;
+ }
+
public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
return this;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private final InMemoryDOMDataStore store;
/// The name of this shard
- private final ShardIdentifier name;
+ private final String name;
private final ShardStats shardMBean;
private ShardRecoveryCoordinator recoveryCoordinator;
private List<Object> currentLogRecoveryBatch;
- private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
+ private final DOMTransactionFactory transactionFactory;
private final String txnDispatcherPath;
- protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+ protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- super(name.toString(), mapPeerAddresses(peerAddresses),
- Optional.of(datastoreContext.getShardRaftConfig()));
+ super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
- this.name = name;
+ this.name = name.toString();
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
this.dataPersistenceProvider = (datastoreContext.isPersistent())
getContext().become(new MeteringBehavior(this));
}
- commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
- datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
+ transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
+
+ commitCoordinator = new ShardCommitCoordinator(transactionFactory,
+ TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
+ datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
setTransactionCommitTimeout();
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
}
- private static Map<String, String> mapPeerAddresses(
- final Map<ShardIdentifier, String> peerAddresses) {
- Map<String, String> map = new HashMap<>();
-
- for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
- .entrySet()) {
- map.put(entry.getKey().toString(), entry.getValue());
- }
-
- return map;
- }
-
public static Props props(final ShardIdentifier name,
- final Map<ShardIdentifier, String> peerAddresses,
+ final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
Preconditions.checkNotNull(name, "name should not be null");
Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
try {
if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCreateTransaction(message);
+ } else if (BatchedModifications.class.isInstance(message)) {
+ handleBatchedModifications((BatchedModifications)message);
} else if (message instanceof ForwardedReadyTransaction) {
handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
} else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
commitCoordinator.handleCanCommit(canCommit, getSender(), self());
}
+ private void handleBatchedModifications(BatchedModifications batched) {
+ // This message is sent to prepare the modificationsa transaction directly on the Shard as an
+ // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
+ // BatchedModifications message, the caller sets the ready flag in the message indicating
+ // modifications are complete. The reply contains the cohort actor path (this actor) for the caller
+ // to initiate the 3-phase commit. This also avoids the overhead of sending an additional
+ // ReadyTransaction message.
+
+ // If we're not the leader then forward to the leader. This is a safety measure - we shouldn't
+ // normally get here if we're not the leader as the front-end (TransactionProxy) should determine
+ // the primary/leader shard. However with timing and caching on the front-end, there's a small
+ // window where it could have a stale leader during leadership transitions.
+ //
+ if(isLeader()) {
+ try {
+ BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
+ sender().tell(reply, self());
+ } catch (Exception e) {
+ LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
+ batched.getTransactionID(), e);
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ } else {
+ ActorSelection leader = getLeader();
+ if(leader != null) {
+ // TODO: what if this is not the first batch and leadership changed in between batched messages?
+ // We could check if the commitCoordinator already has a cached entry and forward all the previous
+ // batched modifications.
+ LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
+ leader.forward(batched, getContext());
+ } else {
+ // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
+ // it more resilient in case we're in the process of electing a new leader.
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Could not find the leader for shard %s. This typically happens" +
+ " when the system is coming up or recovering and a leader is being elected. Try again" +
+ " later.", persistenceId()))), getSelf());
+ }
+ }
+ }
+
private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
ready.getTransactionID(), ready.getTxnClientVersion());
// commitCoordinator in preparation for the subsequent three phase commit initiated by
// the front-end.
commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
- ready.getModification());
+ (MutableCompositeModification) ready.getModification());
// Return our actor path as we'll handle the three phase commit, except if the Tx client
// version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- DOMStoreTransactionChain chain =
- transactionChains.remove(closeTransactionChain.getTransactionChainId());
-
- if(chain != null) {
- chain.close();
- }
+ transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
}
private ActorRef createTypedTransactionActor(int transactionType,
ShardTransactionIdentifier transactionId, String transactionChainId,
short clientVersion ) {
- DOMStoreTransactionFactory factory = store;
-
- if(!transactionChainId.isEmpty()) {
- factory = transactionChains.get(transactionChainId);
- if(factory == null){
- DOMStoreTransactionChain transactionChain = store.createTransactionChain();
- transactionChains.put(transactionChainId, transactionChain);
- factory = transactionChain;
- }
- }
-
- if(this.schemaContext == null) {
- throw new IllegalStateException("SchemaContext is not set");
- }
-
- if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
-
- shardMBean.incrementWriteOnlyTransactionCount();
-
- return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
-
- } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
-
- shardMBean.incrementReadWriteTransactionCount();
-
- return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
-
- } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+ DOMStoreTransaction transaction = transactionFactory.newTransaction(
+ TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(),
+ transactionChainId);
- shardMBean.incrementReadOnlyTransactionCount();
-
- return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
-
- } else {
- throw new IllegalArgumentException(
- "Shard="+name + ":CreateTransaction message has unidentified transaction type="
- + transactionType);
- }
+ return createShardTransaction(transaction, transactionId, clientVersion);
}
private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
}
// If this actor is no longer the leader close all the transaction chains
- if(!isLeader){
- for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
- if(LOG.isDebugEnabled()) {
- LOG.debug(
- "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
- persistenceId(), entry.getKey(), getId());
- }
- entry.getValue().close();
+ if(!isLeader) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader",
+ persistenceId(), getId());
}
- transactionChains.clear();
+ transactionFactory.closeAllTransactionChains();
}
}
}
@Override public String persistenceId() {
- return this.name.toString();
+ return this.name;
}
@VisibleForTesting
return dataPersistenceProvider;
}
+ @VisibleForTesting
+ ShardCommitCoordinator getCommitCoordinator() {
+ return commitCoordinator;
+ }
+
+
private static class ShardCreator implements Creator<Shard> {
private static final long serialVersionUID = 1L;
final ShardIdentifier name;
- final Map<ShardIdentifier, String> peerAddresses;
+ final Map<String, String> peerAddresses;
final DatastoreContext datastoreContext;
final SchemaContext schemaContext;
- ShardCreator(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+ ShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
this.name = name;
this.peerAddresses = peerAddresses;
import akka.actor.ActorRef;
import akka.actor.Status;
+import akka.serialization.Serialization;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.slf4j.Logger;
/**
*/
public class ShardCommitCoordinator {
+ // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
+ public interface CohortDecorator {
+ DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual);
+ }
+
private final Cache<String, CohortEntry> cohortCache;
private CohortEntry currentCohortEntry;
+ private final DOMTransactionFactory transactionFactory;
+
private final Queue<CohortEntry> queuedCohortEntries;
private int queueCapacity;
private final String name;
- public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log,
- String name) {
- cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
- cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
+ private final String shardActorPath;
+
+ private final RemovalListener<String, CohortEntry> cacheRemovalListener =
+ new RemovalListener<String, CohortEntry>() {
+ @Override
+ public void onRemoval(RemovalNotification<String, CohortEntry> notification) {
+ if(notification.getCause() == RemovalCause.EXPIRED) {
+ log.warn("{}: Transaction {} was timed out of the cache", name, notification.getKey());
+ }
+ }
+ };
+
+ // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
+ private CohortDecorator cohortDecorator;
+
+ public ShardCommitCoordinator(DOMTransactionFactory transactionFactory,
+ long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
this.queueCapacity = queueCapacity;
this.log = log;
this.name = name;
+ this.transactionFactory = transactionFactory;
+
+ shardActorPath = Serialization.serializedActorPath(shardActor);
+
+ cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
+ removalListener(cacheRemovalListener).build();
// We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
// since this should only be accessed on the shard's dispatcher.
}
/**
- * This method caches a cohort entry for the given transactions ID in preparation for the
- * subsequent 3-phase commit.
+ * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
+ * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
*
* @param transactionID the ID of the transaction
* @param cohort the cohort to participate in the transaction commit
- * @param modification the modification made by the transaction
+ * @param modification the modifications made by the transaction
*/
public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
- Modification modification) {
+ MutableCompositeModification modification) {
cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
}
+ /**
+ * This method handles a BatchedModifications message for a transaction being prepared directly on the
+ * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
+ * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
+ * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
+ *
+ * @param batched the BatchedModifications
+ * @param shardActor the transaction's shard actor
+ *
+ * @throws ExecutionException if an error occurs loading the cache
+ */
+ public BatchedModificationsReply handleTransactionModifications(BatchedModifications batched)
+ throws ExecutionException {
+ CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
+ if(cohortEntry == null) {
+ cohortEntry = new CohortEntry(batched.getTransactionID(),
+ transactionFactory.<DOMStoreWriteTransaction>newTransaction(
+ TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(),
+ batched.getTransactionChainID()));
+ cohortCache.put(batched.getTransactionID(), cohortEntry);
+ }
+
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Applying {} batched modifications for Tx {}", name,
+ batched.getModifications().size(), batched.getTransactionID());
+ }
+
+ cohortEntry.applyModifications(batched.getModifications());
+
+ String cohortPath = null;
+ if(batched.isReady()) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Readying Tx {}, client version {}", name,
+ batched.getTransactionID(), batched.getVersion());
+ }
+
+ cohortEntry.ready(cohortDecorator);
+ cohortPath = shardActorPath;
+ }
+
+ return new BatchedModificationsReply(batched.getModifications().size(), cohortPath);
+ }
+
/**
* This method handles the canCommit phase for a transaction.
*
}
}
+ @VisibleForTesting
+ void setCohortDecorator(CohortDecorator cohortDecorator) {
+ this.cohortDecorator = cohortDecorator;
+ }
+
+
static class CohortEntry {
private final String transactionID;
- private final DOMStoreThreePhaseCommitCohort cohort;
- private final Modification modification;
+ private DOMStoreThreePhaseCommitCohort cohort;
+ private final MutableCompositeModification compositeModification;
+ private final DOMStoreWriteTransaction transaction;
private ActorRef canCommitSender;
private ActorRef shard;
private long lastAccessTime;
+ CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) {
+ this.compositeModification = new MutableCompositeModification();
+ this.transaction = transaction;
+ this.transactionID = transactionID;
+ }
+
CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
- Modification modification) {
+ MutableCompositeModification compositeModification) {
this.transactionID = transactionID;
this.cohort = cohort;
- this.modification = modification;
+ this.compositeModification = compositeModification;
+ this.transaction = null;
}
void updateLastAccessTime() {
return cohort;
}
- Modification getModification() {
- return modification;
+ MutableCompositeModification getModification() {
+ return compositeModification;
+ }
+
+ void applyModifications(Iterable<Modification> modifications) {
+ for(Modification modification: modifications) {
+ compositeModification.addModification(modification);
+ modification.apply(transaction);
+ }
+ }
+
+ void ready(CohortDecorator cohortDecorator) {
+ Preconditions.checkState(cohort == null, "cohort was already set");
+
+ cohort = transaction.ready();
+
+ if(cohortDecorator != null) {
+ // Call the hook for unit tests.
+ cohort = cohortDecorator.decorate(transactionID, cohort);
+ }
}
ActorRef getCanCommitSender() {
}
boolean hasModifications(){
- if(modification instanceof CompositeModification){
- return ((CompositeModification) modification).getModifications().size() > 0;
- }
- return true;
+ return compositeModification.getModifications().size() > 0;
}
}
}
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.Address;
+import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.japi.Procedure;
import akka.persistence.RecoveryCompleted;
import akka.persistence.RecoveryFailure;
+import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
*/
public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
- private final Logger LOG = LoggerFactory.getLogger(getClass());
+ private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
// Stores a mapping between a member name and the address of the member
// Member names look like "member-1", "member-2" etc and are as specified
onRoleChangeNotification((RoleChangeNotification) message);
} else if(message instanceof FollowerInitialSyncUpStatus){
onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
- } else{
+ } else if(message instanceof ShardNotInitializedTimeout) {
+ onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
+ } else if(message instanceof LeaderStateChanged) {
+ onLeaderStateChanged((LeaderStateChanged)message);
+ } else {
unknownMessage(message);
}
}
+ private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) {
+ LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
+
+ ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
+ if(shardInformation != null) {
+ shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
+ } else {
+ LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
+ }
+ }
+
+ private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
+ ShardInformation shardInfo = message.getShardInfo();
+
+ LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
+ shardInfo.getShardId());
+
+ shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
+
+ if(!shardInfo.isShardInitialized()) {
+ message.getSender().tell(new ActorNotInitialized(), getSelf());
+ } else {
+ message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
+ }
+ }
+
private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
- LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(),
- status.isInitialSyncDone());
+ LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
+ status.getName(), status.isInitialSyncDone());
ShardInformation shardInformation = findShardInformation(status.getName());
}
private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
- LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(),
+ LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
roleChanged.getOldRole(), roleChanged.getNewRole());
ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
shardInformation.setRole(roleChanged.getNewRole());
if (isReady()) {
- LOG.info("All Shards are ready - data store {} is ready, available count is {}", type,
- waitTillReadyCountdownLatch.getCount());
+ LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+ persistenceId(), type, waitTillReadyCountdownLatch.getCount());
waitTillReadyCountdownLatch.countDown();
}
private boolean isReady() {
boolean isReady = true;
for (ShardInformation info : localShards.values()) {
- if(RaftState.Candidate.name().equals(info.getRole()) || Strings.isNullOrEmpty(info.getRole())){
+ if(!info.isShardReady()){
isReady = false;
break;
}
if (shardId.getShardName() == null) {
return;
}
+
markShardAsInitialized(shardId.getShardName());
}
private void markShardAsInitialized(String shardName) {
LOG.debug("Initializing shard [{}]", shardName);
+
ShardInformation shardInformation = localShards.get(shardName);
if (shardInformation != null) {
shardInformation.setActorInitialized();
+
+ shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
}
}
return;
}
- sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
+ sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
@Override
public Object get() {
return new LocalShardFound(shardInformation.getActor());
});
}
- private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
- final Supplier<Object> messageSupplier) {
- if (!shardInformation.isShardInitialized()) {
- if(waitUntilInitialized) {
+ private void sendResponse(ShardInformation shardInformation, boolean doWait,
+ boolean wantShardReady, final Supplier<Object> messageSupplier) {
+ if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
+ if(doWait) {
final ActorRef sender = getSender();
final ActorRef self = self();
- shardInformation.addRunnableOnInitialized(new Runnable() {
+
+ Runnable replyRunnable = new Runnable() {
@Override
public void run() {
sender.tell(messageSupplier.get(), self);
}
- });
- } else {
+ };
+
+ OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
+ new OnShardInitialized(replyRunnable);
+
+ shardInformation.addOnShardInitialized(onShardInitialized);
+
+ Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
+ datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
+ new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
+ getContext().dispatcher(), getSelf());
+
+ onShardInitialized.setTimeoutSchedule(timeoutSchedule);
+
+ } else if (!shardInformation.isShardInitialized()) {
getSender().tell(new ActorNotInitialized(), getSelf());
+ } else {
+ getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
}
return;
getSender().tell(messageSupplier.get(), getSelf());
}
+ private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
+ return new NoShardLeaderException(String.format(
+ "Could not find a leader for shard %s. This typically happens when the system is coming up or " +
+ "recovering and a leader is being elected. Try again later.", shardId));
+ }
+
private void memberRemoved(ClusterEvent.MemberRemoved message) {
memberNameToAddress.remove(message.member().roles().head());
}
for(ShardInformation info : localShards.values()){
String shardName = info.getShardName();
- info.updatePeerAddress(getShardIdentifier(memberName, shardName),
- getShardActorPath(shardName, memberName));
+ info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
+ getShardActorPath(shardName, memberName), getSelf());
}
}
LOG.debug("Sending new SchemaContext to Shards");
for (ShardInformation info : localShards.values()) {
if (info.getActor() == null) {
- info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
- info.getPeerAddresses(), datastoreContext, schemaContext)
- .withDispatcher(shardDispatcherPath), info.getShardId().toString()));
+ info.setActor(newShardActor(schemaContext, info));
} else {
info.getActor().tell(message, getSelf());
}
- info.getActor().tell(new RegisterRoleChangeListener(), self());
}
}
}
+ @VisibleForTesting
+ protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
+ return getContext().actorOf(Shard.props(info.getShardId(),
+ info.getPeerAddresses(), datastoreContext, schemaContext)
+ .withDispatcher(shardDispatcherPath), info.getShardId().toString());
+ }
+
private void findPrimary(FindPrimary message) {
- String shardName = message.getShardName();
+ final String shardName = message.getShardName();
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
if (info != null) {
- sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
+ sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
@Override
public Object get() {
- return new PrimaryFound(info.getActorPath().toString()).toSerializable();
+ Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable();
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Found primary for {}: {}", shardName, found);
+ }
+
+ return found;
}
});
List<String> localShardActorNames = new ArrayList<>();
for(String shardName : memberShardNames){
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
- Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
+ Map<String, String> peerAddresses = getPeerAddresses(shardName);
localShardActorNames.add(shardId.toString());
localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
}
* @param shardName
* @return
*/
- private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
+ private Map<String, String> getPeerAddresses(String shardName){
- Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
+ Map<String, String> peerAddresses = new HashMap<>();
- List<String> members =
- this.configuration.getMembersFromShardName(shardName);
+ List<String> members = this.configuration.getMembersFromShardName(shardName);
String currentMemberName = this.cluster.getCurrentMemberName();
for(String memberName : members){
if(!currentMemberName.equals(memberName)){
- ShardIdentifier shardId = getShardIdentifier(memberName,
- shardName);
- String path =
- getShardActorPath(shardName, currentMemberName);
- peerAddresses.put(shardId, path);
+ ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+ String path = getShardActorPath(shardName, currentMemberName);
+ peerAddresses.put(shardId.toString(), path);
}
}
return peerAddresses;
return mBean;
}
- private class ShardInformation {
+ @VisibleForTesting
+ protected static class ShardInformation {
private final ShardIdentifier shardId;
private final String shardName;
private ActorRef actor;
private ActorPath actorPath;
- private final Map<ShardIdentifier, String> peerAddresses;
+ private final Map<String, String> peerAddresses;
// flag that determines if the actor is ready for business
private boolean actorInitialized = false;
private boolean followerSyncStatus = false;
- private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
+ private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
private String role ;
+ private String leaderId;
private ShardInformation(String shardName, ShardIdentifier shardId,
- Map<ShardIdentifier, String> peerAddresses) {
+ Map<String, String> peerAddresses) {
this.shardName = shardName;
this.shardId = shardId;
this.peerAddresses = peerAddresses;
return shardId;
}
- Map<ShardIdentifier, String> getPeerAddresses() {
+ Map<String, String> getPeerAddresses() {
return peerAddresses;
}
- void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
+ void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
LOG.info("updatePeerAddress for peer {} with address {}", peerId,
peerAddress);
if(peerAddresses.containsKey(peerId)){
peerId, peerAddress, actor.path());
}
- actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf());
+ actor.tell(new PeerAddressResolved(peerId.toString(), peerAddress), sender);
}
+
+ notifyOnShardInitializedCallbacks();
}
}
+ boolean isShardReady() {
+ return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
+ }
+
+ boolean isShardReadyWithLeaderId() {
+ return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId));
+ }
+
boolean isShardInitialized() {
return getActor() != null && actorInitialized;
}
+ boolean isLeader() {
+ return Objects.equal(leaderId, shardId.toString());
+ }
+
+ String getSerializedLeaderActor() {
+ if(isLeader()) {
+ return Serialization.serializedActorPath(getActor());
+ } else {
+ return peerAddresses.get(leaderId);
+ }
+ }
+
void setActorInitialized() {
+ LOG.debug("Shard {} is initialized", shardId);
+
this.actorInitialized = true;
- for(Runnable runnable: runnablesOnInitialized) {
- runnable.run();
+ notifyOnShardInitializedCallbacks();
+ }
+
+ private void notifyOnShardInitializedCallbacks() {
+ if(onShardInitializedSet.isEmpty()) {
+ return;
}
- runnablesOnInitialized.clear();
+ boolean ready = isShardReadyWithLeaderId();
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
+ ready ? "ready" : "initialized", onShardInitializedSet.size());
+ }
+
+ Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
+ while(iter.hasNext()) {
+ OnShardInitialized onShardInitialized = iter.next();
+ if(!(onShardInitialized instanceof OnShardReady) || ready) {
+ iter.remove();
+ onShardInitialized.getTimeoutSchedule().cancel();
+ onShardInitialized.getReplyRunnable().run();
+ }
+ }
}
- void addRunnableOnInitialized(Runnable runnable) {
- runnablesOnInitialized.add(runnable);
+ void addOnShardInitialized(OnShardInitialized onShardInitialized) {
+ onShardInitializedSet.add(onShardInitialized);
}
- public void setRole(String newRole) {
- this.role = newRole;
+ void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
+ onShardInitializedSet.remove(onShardInitialized);
}
- public String getRole(){
- return this.role;
+ void setRole(String newRole) {
+ this.role = newRole;
+
+ notifyOnShardInitializedCallbacks();
}
- public void setFollowerSyncStatus(boolean syncStatus){
+ void setFollowerSyncStatus(boolean syncStatus){
this.followerSyncStatus = syncStatus;
}
- public boolean isInSync(){
+ boolean isInSync(){
if(RaftState.Follower.name().equals(this.role)){
return followerSyncStatus;
} else if(RaftState.Leader.name().equals(this.role)){
return false;
}
+ void setLeaderId(String leaderId) {
+ this.leaderId = leaderId;
+
+ notifyOnShardInitializedCallbacks();
+ }
}
private static class ShardManagerCreator implements Creator<ShardManager> {
}
}
+ private static class OnShardInitialized {
+ private final Runnable replyRunnable;
+ private Cancellable timeoutSchedule;
+
+ OnShardInitialized(Runnable replyRunnable) {
+ this.replyRunnable = replyRunnable;
+ }
+
+ Runnable getReplyRunnable() {
+ return replyRunnable;
+ }
+
+ Cancellable getTimeoutSchedule() {
+ return timeoutSchedule;
+ }
+
+ void setTimeoutSchedule(Cancellable timeoutSchedule) {
+ this.timeoutSchedule = timeoutSchedule;
+ }
+ }
+
+ private static class OnShardReady extends OnShardInitialized {
+ OnShardReady(Runnable replyRunnable) {
+ super(replyRunnable);
+ }
+ }
+
+ private static class ShardNotInitializedTimeout {
+ private final ActorRef sender;
+ private final ShardInformation shardInfo;
+ private final OnShardInitialized onShardInitialized;
+
+ ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
+ this.sender = sender;
+ this.shardInfo = shardInfo;
+ this.onShardInitialized = onShardInitialized;
+ }
+
+ ActorRef getSender() {
+ return sender;
+ }
+
+ ShardInformation getShardInfo() {
+ return shardInfo;
+ }
+
+ OnShardInitialized getOnShardInitialized() {
+ return onShardInitialized;
+ }
+ }
+
static class SchemaContextModules implements Serializable {
private static final long serialVersionUID = -8884620101025936590L;
import java.util.List;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
public class TransactionContextImpl extends AbstractTransactionContext {
private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
+ private final String transactionChainId;
private final ActorContext actorContext;
- private final String transactionPath;
private final ActorSelection actor;
private final boolean isTxActorLocal;
private final short remoteTransactionVersion;
private final OperationCompleter operationCompleter;
private BatchedModifications batchedModifications;
- protected TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
- ActorContext actorContext, SchemaContext schemaContext,
- boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) {
+ protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+ String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+ short remoteTransactionVersion, OperationCompleter operationCompleter) {
super(identifier);
- this.transactionPath = transactionPath;
this.actor = actor;
+ this.transactionChainId = transactionChainId;
this.actorContext = actorContext;
this.isTxActorLocal = isTxActorLocal;
this.remoteTransactionVersion = remoteTransactionVersion;
return actor;
}
+ protected ActorContext getActorContext() {
+ return actorContext;
+ }
+
protected short getRemoteTransactionVersion() {
return remoteTransactionVersion;
}
// Send the remaining batched modifications if any.
- sendBatchedModifications();
+ sendAndRecordBatchedModifications();
// Send the ReadyTransaction message to the Tx actor.
- final Future<Object> replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+ Future<Object> readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+ return combineRecordedOperationsFutures(readyReplyFuture);
+ }
+
+ protected Future<ActorSelection> combineRecordedOperationsFutures(final Future<Object> withLastReplyFuture) {
// Combine all the previously recorded put/merge/delete operation reply Futures and the
// ReadyTransactionReply Future into one Future. If any one fails then the combined
// Future will fail. We need all prior operations and the ready operation to succeed
// in order to attempt commit.
- List<Future<Object>> futureList =
- Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
+ List<Future<Object>> futureList = Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
futureList.addAll(recordedOperationFutures);
- futureList.add(replyFuture);
+ futureList.add(withLastReplyFuture);
Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
actorContext.getClientDispatcher());
// de-serializing each reply.
// Note the Future get call here won't block as it's complete.
- Object serializedReadyReply = replyFuture.value().get().get();
+ Object serializedReadyReply = withLastReplyFuture.value().get().get();
if (serializedReadyReply instanceof ReadyTransactionReply) {
return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
-
+ } else if(serializedReadyReply instanceof BatchedModificationsReply) {
+ return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath());
} else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
- String cohortPath = reply.getCohortPath();
-
- // In Helium we used to return the local path of the actor which represented
- // a remote ThreePhaseCommitCohort. The local path would then be converted to
- // a remote path using this resolvePath method. To maintain compatibility with
- // a Helium node we need to continue to do this conversion.
- // At some point in the future when upgrades from Helium are not supported
- // we could remove this code to resolvePath and just use the cohortPath as the
- // resolved cohortPath
- if(TransactionContextImpl.this.remoteTransactionVersion <
- DataStoreVersions.HELIUM_1_VERSION) {
- cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
- }
-
+ String cohortPath = deserializeCohortPath(reply.getCohortPath());
return actorContext.actorSelection(cohortPath);
-
} else {
// Throwing an exception here will fail the Future.
throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
}, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
}
+ protected String deserializeCohortPath(String cohortPath) {
+ return cohortPath;
+ }
+
private void batchModification(Modification modification) {
if(batchedModifications == null) {
- batchedModifications = new BatchedModifications(remoteTransactionVersion);
+ batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion,
+ transactionChainId);
}
batchedModifications.addModification(modification);
if(batchedModifications.getModifications().size() >=
actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
- sendBatchedModifications();
+ sendAndRecordBatchedModifications();
}
}
- private void sendBatchedModifications() {
+ private void sendAndRecordBatchedModifications() {
+ Future<Object> sentFuture = sendBatchedModifications();
+ if(sentFuture != null) {
+ recordedOperationFutures.add(sentFuture);
+ }
+ }
+
+ protected Future<Object> sendBatchedModifications() {
+ return sendBatchedModifications(false);
+ }
+
+ protected Future<Object> sendBatchedModifications(boolean ready) {
+ Future<Object> sent = null;
if(batchedModifications != null) {
- LOG.debug("Tx {} sending {} batched modifications", identifier,
- batchedModifications.getModifications().size());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} sending {} batched modifications, ready: {}", identifier,
+ batchedModifications.getModifications().size(), ready);
+ }
- recordedOperationFutures.add(executeOperationAsync(batchedModifications));
- batchedModifications = null;
+ batchedModifications.setReady(ready);
+ sent = executeOperationAsync(batchedModifications);
+
+ batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion,
+ transactionChainId);
}
+
+ return sent;
}
@Override
// Send the remaining batched modifications if any.
- sendBatchedModifications();
+ sendAndRecordBatchedModifications();
// If there were any previous recorded put/merge/delete operation reply Futures then we
// must wait for them to successfully complete. This is necessary to honor the read
// Send the remaining batched modifications if any.
- sendBatchedModifications();
+ sendAndRecordBatchedModifications();
// If there were any previous recorded put/merge/delete operation reply Futures then we
// must wait for them to successfully complete. This is necessary to honor the read
public static enum TransactionType {
READ_ONLY,
WRITE_ONLY,
- READ_WRITE
+ READ_WRITE;
+
+ public static TransactionType fromInt(int type) {
+ if(type == WRITE_ONLY.ordinal()) {
+ return WRITE_ONLY;
+ } else if(type == READ_WRITE.ordinal()) {
+ return READ_WRITE;
+ } else if(type == READ_ONLY.ordinal()) {
+ return READ_ONLY;
+ } else {
+ throw new IllegalArgumentException("In TransactionType enum value" + type);
+ }
+ }
}
static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
@Override
public void onComplete(Throwable failure, ActorSelection primaryShard) {
if(failure != null) {
- newTxFutureCallback.onComplete(failure, null);
+ newTxFutureCallback.createTransactionContext(failure, null);
} else {
newTxFutureCallback.setPrimaryShard(primaryShard);
}
* Sets the target primary shard and initiates a CreateTransaction try.
*/
void setPrimaryShard(ActorSelection primaryShard) {
- LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
-
this.primaryShard = primaryShard;
- tryCreateTransaction();
+
+ if(transactionType == TransactionType.WRITE_ONLY &&
+ actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+ LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
+ identifier, primaryShard);
+
+ // For write-only Tx's we prepare the transaction modifications directly on the shard actor
+ // to avoid the overhead of creating a separate transaction actor.
+ // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
+ executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
+ this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
+ } else {
+ tryCreateTransaction();
+ }
}
/**
boolean invokeOperation = true;
synchronized(txOperationsOnComplete) {
if(transactionContext == null) {
- LOG.debug("Tx {} Adding operation on complete {}", identifier);
+ LOG.debug("Tx {} Adding operation on complete", identifier);
invokeOperation = false;
txOperationsOnComplete.add(operation);
* Performs a CreateTransaction try async.
*/
private void tryCreateTransaction() {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} Primary shard {} found - trying create transaction", identifier, primaryShard);
+ }
+
Object serializedCreateMessage = new CreateTransaction(identifier.toString(),
TransactionProxy.this.transactionType.ordinal(),
getTransactionChainId()).toSerializable();
}
}
+ createTransactionContext(failure, response);
+ }
+
+ private void createTransactionContext(Throwable failure, Object response) {
// Mainly checking for state violation here to perform a volatile read of "initialized" to
// ensure updates to operationLimter et al are visible to this thread (ie we're doing
// "piggy-back" synchronization here).
// TransactionContext until after we've executed all cached TransactionOperations.
TransactionContext localTransactionContext;
if(failure != null) {
- LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
- failure.getMessage());
+ LOG.debug("Tx {} Creating NoOpTransaction because of error", identifier, failure);
localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
} else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
}
private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
- String transactionPath = reply.getTransactionPath();
-
LOG.debug("Tx {} Received {}", identifier, reply);
- ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
+ return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
+ reply.getTransactionPath(), reply.getVersion());
+ }
+
+ private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
+ String transactionPath, short remoteTransactionVersion) {
if (transactionType == TransactionType.READ_ONLY) {
// Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
// Check if TxActor is created in the same node
boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
- if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
- return new TransactionContextImpl(transactionPath, transactionActor, identifier,
- actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
- } else {
+ if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier,
- actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+ transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
+ operationCompleter);
+ } else if (transactionType == TransactionType.WRITE_ONLY &&
+ actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+ return new WriteOnlyTransactionContextImpl(transactionActor, identifier, transactionChainId,
+ actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
+ } else {
+ return new TransactionContextImpl(transactionActor, identifier, transactionChainId,
+ actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * Context for a write-only transaction.
+ *
+ * @author Thomas Pantelis
+ */
+public class WriteOnlyTransactionContextImpl extends TransactionContextImpl {
+ private static final Logger LOG = LoggerFactory.getLogger(WriteOnlyTransactionContextImpl.class);
+
+ public WriteOnlyTransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+ String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+ short remoteTransactionVersion, OperationCompleter operationCompleter) {
+ super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal,
+ remoteTransactionVersion, operationCompleter);
+ }
+
+ @Override
+ public Future<ActorSelection> readyTransaction() {
+ LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+ identifier, recordedOperationFutures.size());
+
+ // Send the remaining batched modifications if any.
+
+ Future<Object> lastModificationsFuture = sendBatchedModifications(true);
+
+ return combineRecordedOperationsFutures(lastModificationsFuture);
+ }
+}
package org.opendaylight.controller.cluster.datastore.compat;
import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.OperationCompleter;
import org.opendaylight.controller.cluster.datastore.TransactionContextImpl;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
/**
* Implementation of TransactionContextImpl used when talking to a pre-Lithium controller that doesn't
* @author Thomas Pantelis
*/
public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
+ private static final Logger LOG = LoggerFactory.getLogger(PreLithiumTransactionContextImpl.class);
+
+ private final String transactionPath;
public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
- ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+ String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationCompleter operationCompleter) {
- super(transactionPath, actor, identifier, actorContext, schemaContext, isTxActorLocal,
- remoteTransactionVersion, operationCompleter);
+ super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal,
+ remoteTransactionVersion, operationCompleter);
+ this.transactionPath = transactionPath;
}
@Override
recordedOperationFutures.add(executeOperationAsync(
new WriteData(path, data, getRemoteTransactionVersion())));
}
+
+ @Override
+ public Future<ActorSelection> readyTransaction() {
+ LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+ identifier, recordedOperationFutures.size());
+
+ // Send the ReadyTransaction message to the Tx actor.
+
+ Future<Object> lastReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+
+ return combineRecordedOperationsFutures(lastReplyFuture);
+ }
+
+ @Override
+ protected String deserializeCohortPath(String cohortPath) {
+ // In base Helium we used to return the local path of the actor which represented
+ // a remote ThreePhaseCommitCohort. The local path would then be converted to
+ // a remote path using this resolvePath method. To maintain compatibility with
+ // a Helium node we need to continue to do this conversion.
+ // At some point in the future when upgrades from Helium are not supported
+ // we could remove this code to resolvePath and just use the cohortPath as the
+ // resolved cohortPath
+ if(getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+ return getActorContext().resolvePath(transactionPath, cohortPath);
+ }
+
+ return cohortPath;
+ }
}
public class ShardTransactionIdentifier {
private final String remoteTransactionId;
+ private final String stringRepresentation;
public ShardTransactionIdentifier(String remoteTransactionId) {
this.remoteTransactionId = Preconditions.checkNotNull(remoteTransactionId,
"remoteTransactionId should not be null");
+
+ stringRepresentation = new StringBuilder(remoteTransactionId.length() + 6).append("shard-").
+ append(remoteTransactionId).toString();
}
public String getRemoteTransactionId() {
}
@Override public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("shard-").append(remoteTransactionId);
- return sb.toString();
+ return stringRepresentation;
}
}
*/
package org.opendaylight.controller.cluster.datastore.messages;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
/**
public class BatchedModifications extends MutableCompositeModification implements SerializableMessage {
private static final long serialVersionUID = 1L;
+ private boolean ready;
+ private String transactionID;
+ private String transactionChainID;
+
public BatchedModifications() {
}
- public BatchedModifications(short version) {
+ public BatchedModifications(String transactionID, short version, String transactionChainID) {
super(version);
+ this.transactionID = Preconditions.checkNotNull(transactionID, "transactionID can't be null");
+ this.transactionChainID = transactionChainID != null ? transactionChainID : "";
+ }
+
+ public boolean isReady() {
+ return ready;
+ }
+
+ public void setReady(boolean ready) {
+ this.ready = ready;
+ }
+
+ public String getTransactionID() {
+ return transactionID;
+ }
+
+ public String getTransactionChainID() {
+ return transactionChainID;
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+ transactionID = in.readUTF();
+ transactionChainID = in.readUTF();
+ ready = in.readBoolean();
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+ out.writeUTF(transactionID);
+ out.writeUTF(transactionChainID);
+ out.writeBoolean(ready);
}
@Override
public Object toSerializable() {
return this;
}
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("BatchedModifications [transactionID=").append(transactionID).append(", ready=").append(ready)
+ .append(", modifications size=").append(getModifications().size()).append("]");
+ return builder.toString();
+ }
}
public class BatchedModificationsReply extends VersionedExternalizableMessage {
private static final long serialVersionUID = 1L;
+ private static final byte COHORT_PATH_NOT_PRESENT = 0;
+ private static final byte COHORT_PATH_PRESENT = 1;
+
private int numBatched;
+ private String cohortPath;
public BatchedModificationsReply() {
}
this.numBatched = numBatched;
}
+ public BatchedModificationsReply(int numBatched, String cohortPath) {
+ this.numBatched = numBatched;
+ this.cohortPath = cohortPath;
+ }
public int getNumBatched() {
return numBatched;
}
+ public String getCohortPath() {
+ return cohortPath;
+ }
+
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
numBatched = in.readInt();
+
+ if(in.readByte() == COHORT_PATH_PRESENT) {
+ cohortPath = in.readUTF();
+ }
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
out.writeInt(numBatched);
+
+ if(cohortPath != null) {
+ out.writeByte(COHORT_PATH_PRESENT);
+ out.writeUTF(cohortPath);
+ } else {
+ out.writeByte(COHORT_PATH_NOT_PRESENT);
+ }
}
@Override
public Object toSerializable() {
return this;
}
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append(", cohortPath=")
+ .append(cohortPath).append("]");
+ return builder.toString();
+ }
}
public static final Class<FindPrimary> SERIALIZABLE_CLASS = FindPrimary.class;
private final String shardName;
- private final boolean waitUntilInitialized;
+ private final boolean waitUntilReady;
- public FindPrimary(String shardName, boolean waitUntilInitialized){
+ public FindPrimary(String shardName, boolean waitUntilReady){
Preconditions.checkNotNull(shardName, "shardName should not be null");
this.shardName = shardName;
- this.waitUntilInitialized = waitUntilInitialized;
+ this.waitUntilReady = waitUntilReady;
}
public String getShardName() {
return shardName;
}
- public boolean isWaitUntilInitialized() {
- return waitUntilInitialized;
+ public boolean isWaitUntilReady() {
+ return waitUntilReady;
}
@Override
public static FindPrimary fromSerializable(Object message){
return (FindPrimary) message;
}
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("FindPrimary [shardName=").append(shardName).append(", waitUntilReady=").append(waitUntilReady)
+ .append("]");
+ return builder.toString();
+ }
}
package org.opendaylight.controller.cluster.datastore.messages;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
public class PeerAddressResolved {
- private final ShardIdentifier peerId;
+ private final String peerId;
private final String peerAddress;
- public PeerAddressResolved(ShardIdentifier peerId, String peerAddress) {
+ public PeerAddressResolved(String peerId, String peerAddress) {
this.peerId = peerId;
this.peerAddress = peerAddress;
}
- public ShardIdentifier getPeerId() {
+ public String getPeerId() {
return peerId;
}
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
/**
* Abstract base class for a versioned Externalizable message.
public abstract class VersionedExternalizableMessage implements Externalizable, SerializableMessage {
private static final long serialVersionUID = 1L;
- private short version;
+ private short version = DataStoreVersions.CURRENT_VERSION;
public VersionedExternalizableMessage() {
}
import akka.actor.PoisonPill;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
import akka.pattern.AskTimeoutException;
import akka.util.Timeout;
import com.codahale.metrics.JmxReporter;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
private final int transactionOutstandingOperationLimit;
private Timeout transactionCommitOperationTimeout;
+ private Timeout shardInitializationTimeout;
private final Dispatchers dispatchers;
- private final Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
+ private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
private volatile SchemaContext schemaContext;
private volatile boolean updated;
this.dispatchers = new Dispatchers(actorSystem.dispatchers());
setCachedProperties();
- primaryShardActorSelectionCache = CacheBuilder.newBuilder()
- .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
- .build();
-
- operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
- operationTimeout = new Timeout(operationDuration);
- transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
- TimeUnit.SECONDS));
Address selfAddress = clusterWrapper.getSelfAddress();
if (selfAddress != null && !selfAddress.host().isEmpty()) {
transactionCommitOperationTimeout = new Timeout(Duration.create(
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
+
+ shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
+
+ primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
+ .build();
}
public DatastoreContext getDatastoreContext() {
return schemaContext;
}
- /**
- * Finds the primary shard for the given shard name
- *
- * @param shardName
- * @return
- */
- public Optional<ActorSelection> findPrimaryShard(String shardName) {
- String path = findPrimaryPathOrNull(shardName);
- if (path == null){
- return Optional.absent();
- }
- return Optional.of(actorSystem.actorSelection(path));
- }
-
public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
if(ret != null){
return ret;
}
Future<Object> future = executeOperationAsync(shardManager,
- new FindPrimary(shardName, true).toSerializable(),
- datastoreContext.getShardInitializationTimeout());
+ new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout);
return future.transform(new Mapper<Object, ActorSelection>() {
@Override
} else if(response instanceof PrimaryNotFound) {
throw new PrimaryNotFoundException(
String.format("No primary shard found for %S.", shardName));
+ } else if(response instanceof NoShardLeaderException) {
+ throw (NoShardLeaderException)response;
}
throw new UnknownMessageException(String.format(
*/
public Future<ActorRef> findLocalShardAsync( final String shardName) {
Future<Object> future = executeOperationAsync(shardManager,
- new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
+ new FindLocalShard(shardName, true), shardInitializationTimeout);
return future.map(new Mapper<Object, ActorRef>() {
@Override
}, getClientDispatcher());
}
- private String findPrimaryPathOrNull(String shardName) {
- Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
-
- if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
- PrimaryFound found = PrimaryFound.fromSerializable(result);
-
- LOG.debug("Primary found {}", found.getPrimaryPath());
- return found.getPrimaryPath();
-
- } else if (result.getClass().equals(ActorNotInitialized.class)){
- throw new NotInitializedException(
- String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
- );
-
- } else {
- return null;
- }
- }
-
-
/**
* Executes an operation on a local actor and wait for it's response
*
*
* @param message
*/
- public void broadcast(Object message){
- for(String shardName : configuration.getAllShardNames()){
-
- Optional<ActorSelection> primary = findPrimaryShard(shardName);
- if (primary.isPresent()) {
- primary.get().tell(message, ActorRef.noSender());
- } else {
- LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
- message.getClass().getSimpleName(), shardName);
- }
+ public void broadcast(final Object message){
+ for(final String shardName : configuration.getAllShardNames()){
+
+ Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
+ primaryFuture.onComplete(new OnComplete<ActorSelection>() {
+ @Override
+ public void onComplete(Throwable failure, ActorSelection primaryShard) {
+ if(failure != null) {
+ LOG.warn("broadcast failed to send message {} to shard {}: {}",
+ message.getClass().getSimpleName(), shardName, failure);
+ } else {
+ primaryShard.tell(message, ActorRef.noSender());
+ }
+ }
+ }, getClientDispatcher());
}
}
}
protected Props newShardProps() {
- return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ return Shard.props(shardID, Collections.<String,String>emptyMap(),
newDatastoreContext(), SCHEMA_CONTEXT);
}
Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
- return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
newDatastoreContext(), SCHEMA_CONTEXT) {
@Override
protected void onRecoveryComplete() {
protected final String memberName = "mock-member";
- protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).
- shardBatchedModificationCount(1);
+ protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2);
@BeforeClass
public static void setUpClass() throws IOException {
eq(actorSelection(actorRef)), isA(BatchedModifications.class));
}
+ protected void expectBatchedModificationsReady(ActorRef actorRef, int count) {
+ Future<BatchedModificationsReply> replyFuture = Futures.successful(
+ new BatchedModificationsReply(count, actorRef.path().toString()));
+ doReturn(replyFuture).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(BatchedModifications.class));
+ }
+
protected void expectBatchedModifications(int count) {
doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
any(ActorSelection.class), isA(BatchedModifications.class));
protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
TransactionType type, int transactionVersion, String prefix, ActorRef shardActorRef) {
- ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
- log.info("Created mock shard Tx actor {}", txActorRef);
+ ActorRef txActorRef;
+ if(type == TransactionType.WRITE_ONLY && transactionVersion >= DataStoreVersions.LITHIUM_VERSION &&
+ dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled()) {
+ txActorRef = shardActorRef;
+ } else {
+ txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+ log.info("Created mock shard Tx actor {}", txActorRef);
- doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext).actorSelection(
- txActorRef.path().toString());
+ doReturn(actorSystem.actorSelection(txActorRef.path())).
+ when(mockActorContext).actorSelection(txActorRef.path().toString());
- doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext).
- executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
- eqCreateTransaction(prefix, type));
+ doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+ eqCreateTransaction(prefix, type));
+ }
return txActorRef;
}
return captured;
}
- protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected) {
+ protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected, boolean expIsReady) {
List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
- verifyBatchedModifications(batchedModifications.get(0), expected);
+ verifyBatchedModifications(batchedModifications.get(0), expIsReady, expected);
}
- protected void verifyBatchedModifications(Object message, Modification... expected) {
+ protected void verifyBatchedModifications(Object message, boolean expIsReady, Modification... expected) {
assertEquals("Message type", BatchedModifications.class, message.getClass());
BatchedModifications batchedModifications = (BatchedModifications)message;
assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
+ assertEquals("isReady", expIsReady, batchedModifications.isReady());
for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
Modification actual = batchedModifications.getModifications().get(i);
assertEquals("Modification type", expected[i].getClass(), actual.getClass());
}};
}
- @Test
- public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{
+ private void testTransactionWritesWithShardNotInitiallyReady(final boolean writeOnly) throws Exception {
new IntegrationTestKit(getSystem()) {{
String testName = "testTransactionWritesWithShardNotInitiallyReady";
String shardName = "test-1";
// Create the write Tx
- final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+ final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
+ dataStore.newReadWriteTransaction();
assertNotNull("newReadWriteTransaction returned null", writeTx);
// Do some modification operations and ready the Tx on a separate thread.
}
@Test
- public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{
+ public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
+ datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+ testTransactionWritesWithShardNotInitiallyReady(true);
+ }
+
+ @Test
+ public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
+ testTransactionWritesWithShardNotInitiallyReady(false);
+ }
+
+ @Test
+ public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
new IntegrationTestKit(getSystem()) {{
String testName = "testTransactionReadsWithShardNotInitiallyReady";
String shardName = "test-1";
}};
}
- @Test(expected=NoShardLeaderException.class)
- public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{
+ private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
new IntegrationTestKit(getSystem()) {{
String testName = "testTransactionCommitFailureWithNoShardLeader";
String shardName = "test-1";
// by setting the election timeout, which is based on the heartbeat interval, really high.
datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
+ datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
// Set the leader election timeout low for the test.
// Create the write Tx.
- final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+ final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
+ dataStore.newReadWriteTransaction();
assertNotNull("newReadWriteTransaction returned null", writeTx);
// Do some modifications and ready the Tx on a separate thread.
}};
}
+ @Test(expected=NoShardLeaderException.class)
+ public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
+ datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+ testTransactionCommitFailureWithNoShardLeader(true);
+ }
+
+ @Test(expected=NoShardLeaderException.class)
+ public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
+ testTransactionCommitFailureWithNoShardLeader(false);
+ }
+
@Test
public void testTransactionAbort() throws Exception{
System.setProperty("shard.persistent", "true");
package org.opendaylight.controller.cluster.datastore;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.RaftState;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
public class RoleChangeNotifierTest extends AbstractActorTest {
TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
getSystem(), RoleChangeNotifier.getProps(memberId), memberId);
- RoleChangeNotifier roleChangeNotifier = notifierTestActorRef.underlyingActor();
-
notifierTestActorRef.tell(new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor);
// no notification should be sent as listener has not yet registered
}};
}
+
+ @Test
+ public void testHandleLeaderStateChanged() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String actorId = "testHandleLeaderStateChanged";
+ TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(
+ getSystem(), RoleChangeNotifier.getProps(actorId), actorId);
+
+ notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader1"), ActorRef.noSender());
+
+ // listener registers after the sate has been changed, ensure we sent the latest state change after a reply
+ notifierTestActorRef.tell(new RegisterRoleChangeListener(), getRef());
+
+ expectMsgClass(RegisterRoleChangeListenerReply.class);
+
+ LeaderStateChanged leaderStateChanged = expectMsgClass(LeaderStateChanged.class);
+ assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId());
+ assertEquals("getLeaderId", "leader1", leaderStateChanged.getLeaderId());
+
+ notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader2"), ActorRef.noSender());
+
+ leaderStateChanged = expectMsgClass(LeaderStateChanged.class);
+ assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId());
+ assertEquals("getLeaderId", "leader2", leaderStateChanged.getLeaderId());
+ }};
+ }
}
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
public class ShardManagerTest extends AbstractActorTest {
private static int ID_COUNTER = 1;
@Mock
private static CountDownLatch ready;
- private static ActorRef mockShardActor;
+ private static TestActorRef<MessageCollectorActor> mockShardActor;
+
+ private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
+ dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
@Before
public void setUp() {
InMemoryJournal.clear();
if(mockShardActor == null) {
- String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
- mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name);
+ String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString();
+ mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name);
}
+
+ mockShardActor.underlyingActor().clear();
}
@After
}
private Props newShardMgrProps() {
- DatastoreContext.Builder builder = DatastoreContext.newBuilder();
- builder.dataStoreType(shardMrgIDSuffix);
return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
- builder.build(), ready);
+ datastoreContextBuilder.build(), ready);
+ }
+
+ private Props newPropsShardMgrWithMockShardActor() {
+ Creator<ShardManager> creator = new Creator<ShardManager>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
+ datastoreContextBuilder.build(), ready) {
+ @Override
+ protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+ return mockShardActor;
+ }
+ };
+ }
+ };
+
+ return Props.create(new DelegatingShardManagerCreator(creator));
}
@Test
public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
- expectMsgEquals(duration("5 seconds"),
- new PrimaryNotFound("non-existent").toSerializable());
+ expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable());
}};
}
@Test
- public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
+ public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
+ shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef());
+
+ MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class);
+ shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+ RaftState.Leader.name())), mockShardActor);
+
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
- expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
+ primaryFound.getPrimaryPath().contains("member-1-shard-default"));
}};
}
@Test
- public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
+ public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+ MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
+
+ String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(memberId1,
+ RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+ shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
+ primaryFound.getPrimaryPath().contains("member-2-shard-default"));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
}
@Test
- public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
+ public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+
+ expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(memberId,
+ RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+
+ expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
+
+ shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
+ primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
// We're passing waitUntilInitialized = true to FindPrimary so the response should be
- // delayed until we send ActorInitialized.
- Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
- new Timeout(5, TimeUnit.SECONDS));
+ // delayed until we send ActorInitialized and RoleChangeNotification.
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+
+ expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
shardManager.tell(new ActorInitialized(), mockShardActor);
- Object resp = Await.result(future, duration("5 seconds"));
- assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
+ expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.tell(new RoleChangeNotification(memberId,
+ RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor);
+
+ expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
+
+ shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
+
+ PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+ assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(),
+ primaryFound.getPrimaryPath().contains("member-1-shard-default"));
+
+ expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+
+ expectMsgClass(duration("2 seconds"), ActorNotInitialized.class);
+
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+ shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
+ null, RaftState.Candidate.name()), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+
+ expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
+ }};
+ }
+
+ @Test
+ public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+
+ expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
}};
}
@Test
public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
@Test
public void testOnReceiveFindLocalShardForExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
@Test
public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
@Test
public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
new JavaTestKit(getSystem()) {{
- final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
public void testRoleChangeNotificationReleaseReady() throws Exception {
new JavaTestKit(getSystem()) {
{
- final Props persistentProps = ShardManager.props(
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build(), ready);
- final TestActorRef<ShardManager> shardManager =
- TestActorRef.create(getSystem(), persistentProps);
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
- shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+ shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+ memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
verify(ready, times(1)).countDown();
public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
new JavaTestKit(getSystem()) {
{
- final Props persistentProps = ShardManager.props(
- new MockClusterWrapper(),
- new MockConfiguration(),
- DatastoreContext.newBuilder().persistent(true).build(), ready);
- final TestActorRef<ShardManager> shardManager =
- TestActorRef.create(getSystem(), persistentProps);
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
- shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+ shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+ "unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
verify(ready, never()).countDown();
import static org.mockito.Mockito.mock;
import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.dispatch.Dispatchers;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.mockito.InOrder;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import scala.concurrent.duration.FiniteDuration;
public class ShardTest extends AbstractShardTest {
+
@Test
public void testRegisterChangeListener() throws Exception {
new ShardTestKit(getSystem()) {{
@Override
public Shard create() throws Exception {
- return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
newDatastoreContext(), SCHEMA_CONTEXT) {
@Override
public void onReceiveCommand(final Object message) throws Exception {
final CountDownLatch recoveryComplete = new CountDownLatch(1);
class TestShard extends Shard {
TestShard() {
- super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
+ super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
newDatastoreContext(), SCHEMA_CONTEXT);
}
Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
String address = "akka://foobar";
- shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
+ shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
assertEquals("getPeerAddresses", address,
((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
waitUntilLeader(shard);
- // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
-
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
-
- String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+ final String transactionID1 = "tx1";
+ final String transactionID2 = "tx2";
+ final String transactionID3 = "tx3";
- String transactionID2 = "tx2";
- MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
- TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
- modification2);
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort1 = new AtomicReference<>();
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort2 = new AtomicReference<>();
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort3 = new AtomicReference<>();
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) {
+ if(transactionID.equals(transactionID1)) {
+ mockCohort1.set(createDelegatingMockCohort("cohort1", actual));
+ return mockCohort1.get();
+ } else if(transactionID.equals(transactionID2)) {
+ mockCohort2.set(createDelegatingMockCohort("cohort2", actual));
+ return mockCohort2.get();
+ } else {
+ mockCohort3.set(createDelegatingMockCohort("cohort3", actual));
+ return mockCohort3.get();
+ }
+ }
+ };
- String transactionID3 = "tx3";
- MutableCompositeModification modification3 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
- YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
- modification3);
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
long timeoutSec = 5;
final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
- // by the ShardTransaction.
+ // Send a BatchedModifications message for the first transaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
- ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
- assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class);
+ assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath());
+ assertEquals("getNumBatched", 1, batchedReply.getNumBatched());
// Send the CanCommitTransaction message for the first Tx.
expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Can commit", true, canCommitReply.getCanCommit());
- // Send the ForwardedReadyTransaction for the next 2 Tx's.
+ // Send BatchedModifications for the next 2 Tx's.
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder(
+ TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
// processed after the first Tx completes.
assertEquals("Commits complete", true, done);
- InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
- inOrder.verify(cohort1).canCommit();
- inOrder.verify(cohort1).preCommit();
- inOrder.verify(cohort1).commit();
- inOrder.verify(cohort2).canCommit();
- inOrder.verify(cohort2).preCommit();
- inOrder.verify(cohort2).commit();
- inOrder.verify(cohort3).canCommit();
- inOrder.verify(cohort3).preCommit();
- inOrder.verify(cohort3).commit();
+ InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get());
+ inOrder.verify(mockCohort1.get()).canCommit();
+ inOrder.verify(mockCohort1.get()).preCommit();
+ inOrder.verify(mockCohort1.get()).commit();
+ inOrder.verify(mockCohort2.get()).canCommit();
+ inOrder.verify(mockCohort2.get()).preCommit();
+ inOrder.verify(mockCohort2.get()).commit();
+ inOrder.verify(mockCohort3.get()).canCommit();
+ inOrder.verify(mockCohort3.get()).preCommit();
+ inOrder.verify(mockCohort3.get()).commit();
// Verify data in the data store.
}};
}
+ private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
+ NormalizedNode<?, ?> data, boolean ready) {
+ return newBatchedModifications(transactionID, null, path, data, ready);
+ }
+
+ private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
+ YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready) {
+ BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
+ batched.addModification(new WriteModification(path, data));
+ batched.setReady(ready);
+ return batched;
+ }
+
+ @SuppressWarnings("unchecked")
@Test
- public void testCommitWithPersistenceDisabled() throws Throwable {
- dataStoreContextBuilder.persistent(false);
+ public void testMultipleBatchedModifications() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testCommitPhaseFailure");
+ "testMultipleBatchedModifications");
waitUntilLeader(shard);
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+ final String transactionID = "tx";
+ FiniteDuration duration = duration("5 seconds");
- // Setup a simulated transactions with a mock cohort.
+ final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
+ if(mockCohort.get() == null) {
+ mockCohort.set(createDelegatingMockCohort("cohort", actual));
+ }
- String transactionID = "tx";
- MutableCompositeModification modification = new MutableCompositeModification();
- NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
- TestModel.TEST_PATH, containerNode, modification);
+ return mockCohort.get();
+ }
+ };
- FiniteDuration duration = duration("5 seconds");
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
+ // Send a BatchedModifications to start a transaction.
+
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
+
+ // Send a couple more BatchedModifications.
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
+ shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
+ TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message.
shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
- InOrder inOrder = inOrder(cohort);
- inOrder.verify(cohort).canCommit();
- inOrder.verify(cohort).preCommit();
- inOrder.verify(cohort).commit();
+ InOrder inOrder = inOrder(mockCohort.get());
+ inOrder.verify(mockCohort.get()).canCommit();
+ inOrder.verify(mockCohort.get()).preCommit();
+ inOrder.verify(mockCohort.get()).commit();
+
+ // Verify data in the data store.
+
+ NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
+ assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
+ assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
+ outerList.getValue() instanceof Iterable);
+ Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
+ assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
+ entry instanceof MapEntryNode);
+ MapEntryNode mapEntry = (MapEntryNode)entry;
+ Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
+ mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
+ assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
+ assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testBatchedModificationsOnTransactionChain() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testBatchedModificationsOnTransactionChain");
+
+ waitUntilLeader(shard);
+
+ String transactionChainID = "txChain";
+ String transactionID1 = "tx1";
+ String transactionID2 = "tx2";
+
+ FiniteDuration duration = duration("5 seconds");
+
+ // Send a BatchedModifications to start a chained write transaction and ready it.
+
+ ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
+ containerNode, true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
+
+ // Create a read Tx on the same chain.
+
+ shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() ,
+ transactionChainID).toSerializable(), getRef());
+
+ CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
+
+ getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
+ ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
+ assertEquals("Read node", containerNode, readReply.getNormalizedNode());
+
+ // Commit the write transaction.
+
+ shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+ // Verify data in the data store.
+
+ NormalizedNode<?, ?> actualNode = readStore(shard, path);
+ assertEquals("Stored node", containerNode, actualNode);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testOnBatchedModificationsWhenNotLeader() {
+ final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
+ new ShardTestKit(getSystem()) {{
+ Creator<Shard> creator = new Creator<Shard>() {
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
+ newDatastoreContext(), SCHEMA_CONTEXT) {
+ @Override
+ protected boolean isLeader() {
+ return overrideLeaderCalls.get() ? false : super.isLeader();
+ }
+
+ @Override
+ protected ActorSelection getLeader() {
+ return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
+ super.getLeader();
+ }
+ };
+ }
+ };
+
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
+
+ waitUntilLeader(shard);
+
+ overrideLeaderCalls.set(true);
+
+ BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
+
+ shard.tell(batched, ActorRef.noSender());
+
+ expectMsgEquals(batched);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testCommitWithPersistenceDisabled() throws Throwable {
+ dataStoreContextBuilder.persistent(false);
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWithPersistenceDisabled");
+
+ waitUntilLeader(shard);
+
+ String transactionID = "tx";
+ FiniteDuration duration = duration("5 seconds");
+
+ // Send a BatchedModifications to start a transaction.
+
+ NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
waitUntilLeader(shard);
- // Setup 2 simulated transactions with mock cohorts. The first one fails in the
- // commit phase.
+ // Setup 2 mock cohorts. The first one fails in the commit phase.
- String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ final String transactionID1 = "tx1";
+ final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
- String transactionID2 = "tx2";
- MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+ final String transactionID2 = "tx2";
+ final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
+ DOMStoreThreePhaseCommitCohort actual) {
+ return transactionID1.equals(transactionID) ? cohort1 : cohort2;
+ }
+ };
+
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
+ // Send BatchedModifications to start and ready each transaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message for the first Tx.
waitUntilLeader(shard);
String transactionID = "tx1";
- MutableCompositeModification modification = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
+ DOMStoreThreePhaseCommitCohort actual) {
+ return cohort;
+ }
+ };
+
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
FiniteDuration duration = duration("5 seconds");
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
+ // Send BatchedModifications to start and ready a transaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message.
final FiniteDuration duration = duration("5 seconds");
String transactionID = "tx1";
- MutableCompositeModification modification = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
+ DOMStoreThreePhaseCommitCohort actual) {
+ return cohort;
+ }
+ };
+
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+
+ // Send BatchedModifications to start and ready a transaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message.
}
};
- MutableCompositeModification modification = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
- modification, preCommit);
-
- shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
- cohort, modification, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
final FiniteDuration duration = duration("5 seconds");
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
-
writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
writeToStore(shard, TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
- // Create 1st Tx - will timeout
+ // Create and ready the 1st Tx - will timeout
String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
- modification1);
+ shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder(
+ TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- // Create 2nd Tx
+ // Create and ready the 2nd Tx
- String transactionID2 = "tx3";
- MutableCompositeModification modification2 = new MutableCompositeModification();
+ String transactionID2 = "tx2";
YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
- DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
- listNodePath,
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
- modification2);
-
- // Ready the Tx's
-
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
-
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+ shard.tell(newBatchedModifications(transactionID2, listNodePath,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// canCommit 1st Tx. We don't send the commit so it should timeout.
final FiniteDuration duration = duration("5 seconds");
- InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
-
String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
String transactionID2 = "tx2";
- MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
- TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
- modification2);
-
String transactionID3 = "tx3";
- MutableCompositeModification modification3 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
- TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
- // Ready the Tx's
+ // Send a BatchedModifications to start transactions and ready them.
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
- shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
- cohort3, modification3, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// canCommit 1st Tx.
// Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
- String transactionID1 = "tx1";
- MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ final String transactionID1 = "tx1";
+ final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
- String transactionID2 = "tx2";
- MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+ final String transactionID2 = "tx2";
+ final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
FiniteDuration duration = duration("5 seconds");
final Timeout timeout = new Timeout(duration);
- // Simulate the ForwardedReadyTransaction messages that would be sent
- // by the ShardTransaction.
+ ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
+ @Override
+ public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
+ DOMStoreThreePhaseCommitCohort actual) {
+ return transactionID1.equals(transactionID) ? cohort1 : cohort2;
+ }
+ };
+
+ shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
- shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
- cohort1, modification1, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ // Send BatchedModifications to start and ready each transaction.
- shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
- cohort2, modification2, true), getRef());
- expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+ shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
+
+ shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ expectMsgClass(duration, BatchedModificationsReply.class);
// Send the CanCommitTransaction message for the first Tx.
Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
- return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ return new Shard(shardID, Collections.<String,String>emptyMap(),
newDatastoreContext(), SCHEMA_CONTEXT) {
DelegatingPersistentDataProvider delegating;
final DatastoreContext persistentContext = DatastoreContext.newBuilder().
shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
- final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+ final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
persistentContext, SCHEMA_CONTEXT);
final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
- final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+ final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
nonPersistentContext, SCHEMA_CONTEXT);
new ShardTestKit(getSystem()) {{
shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
+
}
@Test
}
private ActorRef createShard(){
- return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<ShardIdentifier, String>emptyMap(), datastoreContext,
+ return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<String, String>emptyMap(), datastoreContext,
TestModel.createTestContext()));
}
private ActorRef createShard(){
return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
+ Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
}
private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
- BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+ BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
batched.addModification(new WriteModification(writePath, writeData));
batched.addModification(new MergeModification(mergePath, mergeData));
batched.addModification(new DeleteModification(deletePath));
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
verify(mockActorContext, times(0)).acquireTxCreationPermit();
}
+ /**
+ * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
+ * initiated until the first one completes its read future.
+ */
+ @Test
+ public void testChainedWriteOnlyTransactions() throws Exception {
+ dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+
+ TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+ ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
+
+ Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
+ doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
+
+ DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
+
+ NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ writeTx1.write(TestModel.TEST_PATH, writeNode1);
+
+ writeTx1.ready();
+
+ verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+ verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
+
+ ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
+
+ expectBatchedModifications(txActorRef2, 1);
+
+ final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
+
+ final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
+
+ final AtomicReference<Exception> caughtEx = new AtomicReference<>();
+ final CountDownLatch write2Complete = new CountDownLatch(1);
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
+ } catch (Exception e) {
+ caughtEx.set(e);
+ } finally {
+ write2Complete.countDown();
+ }
+ }
+ }.start();
+
+ assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
+
+ if(caughtEx.get() != null) {
+ throw caughtEx.get();
+ }
+
+ try {
+ verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ } catch (AssertionError e) {
+ fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
+ }
+
+ batchedReplyPromise1.success(new BatchedModificationsReply(1, txActorRef1.path().toString()));
+
+ // Tx 2 should've proceeded to find the primary shard.
+ verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ }
+
/**
* Tests 2 successive chained read-write transactions and verifies the second transaction isn't
* initiated until the first one completes its read future.
writeTx1.ready();
- verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1));
+ verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), false);
String tx2MemberName = "tx2MemberName";
doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@Test
public void testWrite() throws Exception {
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
expectBatchedModifications(actorRef, 1);
- expectReadyTransaction(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- // This sends the batched modification.
- transactionProxy.ready();
-
- verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
-
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class);
+ verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
}
@Test
// This sends the batched modification.
transactionProxy.ready();
- verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+ verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
BatchedModificationsReply.class);
@Test
public void testMerge() throws Exception {
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
expectBatchedModifications(actorRef, 1);
- expectReadyTransaction(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
- // This sends the batched modification.
- transactionProxy.ready();
-
- verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite));
-
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class);
+ verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
}
@Test
public void testDelete() throws Exception {
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
expectBatchedModifications(actorRef, 1);
- expectReadyTransaction(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
transactionProxy.delete(TestModel.TEST_PATH);
- // This sends the batched modification.
- transactionProxy.ready();
-
- verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH));
-
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class);
+ verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
}
@Test
- public void testReady() throws Exception {
+ public void testReadyWithReadWrite() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
isA(BatchedModifications.class));
+
+ verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ }
+
+ @Test
+ public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
+ dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ expectBatchedModificationsReady(actorRef, 1);
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures());
+
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+ List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+ assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+ verifyBatchedModifications(batchedModifications.get(0), true,
+ new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+
+ verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ }
+
+ @Test
+ public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
+ dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ expectBatchedModificationsReady(actorRef, 1);
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ BatchedModificationsReply.class);
+
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+ List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+ assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
+
+ verifyBatchedModifications(batchedModifications.get(0), false,
+ new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+
+ verifyBatchedModifications(batchedModifications.get(1), true);
+
+ verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
}
@Test
public void testReadyWithRecordingOperationFailure() throws Exception {
+ dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
+
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
expectFailedBatchedModifications(actorRef);
- expectReadyTransaction(actorRef);
-
doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
@Test
public void testReadyWithReplyFailure() throws Exception {
+ dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- expectBatchedModifications(actorRef, 1);
-
- doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeOperationAsync(eq(actorSelection(actorRef)),
- isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ expectFailedBatchedModifications(actorRef);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class);
-
verifyCohortFutures(proxy, TestException.class);
}
- @Test
- public void testReadyWithInitialCreateTransactionFailure() throws Exception {
-
- doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when(
- mockActorContext).findPrimaryShardAsync(anyString());
+ private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
+ doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- verifyCohortFutures(proxy, PrimaryNotFoundException.class);
+ verifyCohortFutures(proxy, toThrow.getClass());
+ }
+
+ @Test
+ public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception {
+ testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
+ }
+
+ @Test
+ public void testWriteOnlyTxWithNotInitializedException() throws Exception {
+ testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
+ }
+
+ @Test
+ public void testWriteOnlyTxWithNoShardLeaderException() throws Exception {
+ testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
}
@Test
public void testReadyWithInvalidReplyMessageType() throws Exception {
+ dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- expectBatchedModifications(actorRef, 1);
+ //expectBatchedModifications(actorRef, 1);
doReturn(Futures.successful(new Object())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)),
- isA(ReadyTransaction.SERIALIZABLE_CLASS));
+ isA(BatchedModifications.class));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
verifyCohortFutures(proxy, IllegalArgumentException.class);
}
- @Test
- public void testUnusedTransaction() throws Exception {
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
-
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
- assertEquals("canCommit", true, ready.canCommit().get());
- ready.preCommit().get();
- ready.commit().get();
- }
-
@Test
public void testGetIdentifier() {
setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
*/
@Test
public void testLocalTxActorRead() throws Exception {
- ActorSystem actorSystem = getSystem();
- ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-
- doReturn(actorSystem.actorSelection(shardActorRef.path())).
- when(mockActorContext).actorSelection(shardActorRef.path().toString());
-
- doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
- when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
- String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
- CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
- .setTransactionId("txn-1").setTransactionActorPath(actorPath).build();
-
- doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
- executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
- eqCreateTransaction(memberName, READ_ONLY));
-
- doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+ setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
+ doReturn(true).when(mockActorContext).isPathLocal(anyString());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
@Test
public void testLocalTxActorReady() throws Exception {
- ActorSystem actorSystem = getSystem();
- ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-
- doReturn(actorSystem.actorSelection(shardActorRef.path())).
- when(mockActorContext).actorSelection(shardActorRef.path().toString());
-
- doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
- when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
- String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
- CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
- setTransactionId("txn-1").setTransactionActorPath(actorPath).
- setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
-
- doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
- executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
- eqCreateTransaction(memberName, WRITE_ONLY));
-
- doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+ doReturn(true).when(mockActorContext).isPathLocal(anyString());
doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
any(ActorSelection.class), isA(BatchedModifications.class));
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class);
-
// testing ready
- doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), isA(ReadyTransaction.class));
+ doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.class));
DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
}
private static interface TransactionProxyOperation {
doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
} else {
- doReturn(Futures.failed(new Exception("not found")))
+ doReturn(Futures.failed(new PrimaryNotFoundException("test")))
.when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
}
- String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+ ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+ String actorPath = txActorRef.path().toString();
CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
setTransactionId("txn-1").setTransactionActorPath(actorPath).
setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
+ doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
+
doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
eqCreateTransaction(memberName, READ_WRITE));
- doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+ doReturn(true).when(mockActorContext).isPathLocal(anyString());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
@Test
public void testWriteThrottlingWhenShardFound(){
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testWriteThrottlingWhenShardNotFound(){
// Confirm that there is no throttling when the Shard is not found
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testWriteCompletion(){
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeThrottlingWhenShardFound(){
-
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeThrottlingWhenShardNotFound(){
-
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeCompletion(){
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testDeleteCompletion(){
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
}, 2, true);
}
- @Test
- public void testModificationOperationBatching() throws Throwable {
+ private void testModificationOperationBatching(TransactionType type) throws Exception {
int shardBatchedModificationCount = 3;
- doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
- when(mockActorContext).getDatastoreContext();
+ dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
expectBatchedModifications(actorRef, shardBatchedModificationCount);
YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type);
transactionProxy.write(writePath1, writeNode1);
transactionProxy.write(writePath2, writeNode2);
List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
- verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+ verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
- verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+ verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
- verifyBatchedModifications(batchedModifications.get(2), new MergeModification(mergePath3, mergeNode3),
+ boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled();
+ verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3),
new DeleteModification(deletePath2));
- verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
- BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+ if(optimizedWriteOnly) {
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ BatchedModificationsReply.class, BatchedModificationsReply.class);
+ } else {
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+ }
+ }
+
+ @Test
+ public void testReadWriteModificationOperationBatching() throws Throwable {
+ testModificationOperationBatching(READ_WRITE);
+ }
+
+ @Test
+ public void testWriteOnlyModificationOperationBatching() throws Throwable {
+ testModificationOperationBatching(WRITE_ONLY);
+ }
+
+ @Test
+ public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable {
+ dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+ testModificationOperationBatching(WRITE_ONLY);
}
@Test
public void testModificationOperationBatchingWithInterleavedReads() throws Throwable {
+
int shardBatchedModificationCount = 10;
- doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()).
- when(mockActorContext).getDatastoreContext();
+ dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
- verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1),
+ verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
new WriteModification(writePath2, writeNode2));
- verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1),
+ verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
new MergeModification(mergePath2, mergeNode2));
- verifyBatchedModifications(batchedModifications.get(2), new DeleteModification(deletePath));
+ verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
InOrder inOrder = Mockito.inOrder(mockActorContext);
inOrder.verify(mockActorContext).executeOperationAsync(
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
import akka.actor.ActorRef;
import akka.dispatch.Futures;
import com.google.common.base.Optional;
import java.util.concurrent.TimeUnit;
+import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()),
eq(actorRef.path().toString()));
}
+
+ @Test
+ @Ignore
+ // FIXME: disabled until we can get the primary shard version from the ShardManager as we now skip
+ // creating transaction actors for write-only Tx's.
+ public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception {
+ short version = DataStoreVersions.HELIUM_2_VERSION;
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version);
+
+ NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
+ executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
+
+ doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+ doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+ eq(actorRef.path().toString()));
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+
+ transactionProxy.write(TestModel.TEST_PATH, testNode);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+ }
}
SchemaContext schemaContext = TestModel.createTestContext();
Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
shardName("inventory").type("config").build(),
- Collections.<ShardIdentifier,String>emptyMap(),
+ Collections.<String,String>emptyMap(),
DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
SchemaContext schemaContext = TestModel.createTestContext();
Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
shardName("inventory").type("config").build(),
- Collections.<ShardIdentifier,String>emptyMap(),
+ Collections.<String,String>emptyMap(),
DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
- BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
+ BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, "txChain");
batched.addModification(new WriteModification(writePath, writeData));
batched.addModification(new MergeModification(mergePath, mergeData));
batched.addModification(new DeleteModification(deletePath));
+ batched.setReady(true);
BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
(Serializable) batched.toSerializable());
assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion());
+ assertEquals("getTransactionID", "tx1", clone.getTransactionID());
+ assertEquals("getTransactionChainID", "txChain", clone.getTransactionChainID());
+ assertEquals("isReady", true, clone.isReady());
assertEquals("getModifications size", 3, clone.getModifications().size());
DeleteModification delete = (DeleteModification)clone.getModifications().get(2);
assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
assertEquals("getPath", deletePath, delete.getPath());
+
+ // Test with different params.
+
+ batched = new BatchedModifications("tx2", (short)10, null);
+
+ clone = (BatchedModifications) SerializationUtils.clone((Serializable) batched.toSerializable());
+
+ assertEquals("getVersion", 10, clone.getVersion());
+ assertEquals("getTransactionID", "tx2", clone.getTransactionID());
+ assertEquals("getTransactionChainID", "", clone.getTransactionChainID());
+ assertEquals("isReady", false, clone.isReady());
+
+ assertEquals("getModifications size", 0, clone.getModifications().size());
+
}
@Test
BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone(
(Serializable) new BatchedModificationsReply(100).toSerializable());
assertEquals("getNumBatched", 100, clone.getNumBatched());
+ assertEquals("getCohortPath", null, clone.getCohortPath());
+
+ clone = (BatchedModificationsReply) SerializationUtils.clone(
+ (Serializable) new BatchedModificationsReply(50, "cohort path").toSerializable());
+ assertEquals("getNumBatched", 50, clone.getNumBatched());
+ assertEquals("getCohortPath", "cohort path", clone.getCohortPath());
}
}
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.ConfigFactory;
+import java.util.Arrays;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.time.StopWatch;
import org.junit.Assert;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
public class ActorContextTest extends AbstractActorTest{
+ static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
+
+ private static class TestMessage {
+ }
+
private static class MockShardManager extends UntypedActor {
private final boolean found;
private final ActorRef actorRef;
+ private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
private MockShardManager(boolean found, ActorRef actorRef){
}
@Override public void onReceive(Object message) throws Exception {
+ if(message instanceof FindPrimary) {
+ FindPrimary fp = (FindPrimary)message;
+ Object resp = findPrimaryResponses.get(fp.getShardName());
+ if(resp == null) {
+ log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
+ } else {
+ getSender().tell(resp, getSelf());
+ }
+
+ return;
+ }
+
if(found){
getSender().tell(new LocalShardFound(actorRef), getSelf());
} else {
}
}
+ void addFindPrimaryResp(String shardName, Object resp) {
+ findPrimaryResponses.put(shardName, resp);
+ }
+
private static Props props(final boolean found, final ActorRef actorRef){
return Props.create(new MockShardManagerCreator(found, actorRef) );
}
+ private static Props props(){
+ return Props.create(new MockShardManagerCreator() );
+ }
+
@SuppressWarnings("serial")
private static class MockShardManagerCreator implements Creator<MockShardManager> {
final boolean found;
final ActorRef actorRef;
+ MockShardManagerCreator() {
+ this.found = false;
+ this.actorRef = null;
+ }
+
MockShardManagerCreator(boolean found, ActorRef actorRef) {
this.found = found;
this.actorRef = actorRef;
@Test
public void testRateLimiting(){
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ transactionCreationInitialRateLimit(155L).build();
ActorContext actorContext =
new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext);
+ mock(Configuration.class), dataStoreContext);
// Check that the initial value is being picked up from DataStoreContext
- assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
+ assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
actorContext.setTxCreationLimit(1.0);
@Test
public void testClientDispatcherIsGlobalDispatcher(){
-
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
-
ActorContext actorContext =
new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext);
+ mock(Configuration.class), DatastoreContext.newBuilder().build());
assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
@Test
public void testClientDispatcherIsNotGlobalDispatcher(){
-
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
-
ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
ActorContext actorContext =
new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext);
+ mock(Configuration.class), DatastoreContext.newBuilder().build());
assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
TestActorRef<MessageCollectorActor> shardManager =
TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext) {
+ mock(Configuration.class), dataStoreContext) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
TestActorRef<MessageCollectorActor> shardManager =
TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext) {
+ mock(Configuration.class), dataStoreContext) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new PrimaryNotFound("foobar"));
Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
assertNull(cached);
-
}
@Test
TestActorRef<MessageCollectorActor> shardManager =
TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
- DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
-
- doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
- doReturn("config").when(mockDataStoreContext).getDataStoreType();
- doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), mockDataStoreContext) {
+ mock(Configuration.class), dataStoreContext) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new ActorNotInitialized());
Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
assertNull(cached);
+ }
+
+ @Test
+ public void testBroadcast() {
+ new JavaTestKit(getSystem()) {{
+ ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
+ MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
+ shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()).toSerializable());
+ shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()).toSerializable());
+ shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
+
+ Configuration mockConfig = mock(Configuration.class);
+ doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
+ when(mockConfig).getAllShardNames();
+ ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ mock(ClusterWrapper.class), mockConfig,
+ DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
+
+ actorContext.broadcast(new TestMessage());
+
+ expectFirstMatching(shardActorRef1, TestMessage.class);
+ expectFirstMatching(shardActorRef2, TestMessage.class);
+ }};
}
+ private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
+ int count = 5000 / 50;
+ for(int i = 0; i < count; i++) {
+ try {
+ T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
+ if(message != null) {
+ return message;
+ }
+ } catch (Exception e) {}
+
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ Assert.fail("Did not receive message of type " + clazz);
+ return null;
+ }
}
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
-
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
* </p>
*/
public class MessageCollectorActor extends UntypedActor {
- private List<Object> messages = new ArrayList<>();
+ private final List<Object> messages = new ArrayList<>();
@Override public void onReceive(Object message) throws Exception {
if(message instanceof String){
}
}
+ public void clear() {
+ messages.clear();
+ }
+
public static List<Object> getAllMessages(ActorRef actor) throws Exception {
FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
Timeout operationTimeout = new Timeout(operationDuration);
return output;
}
+ public static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
+ int count = 5000 / 50;
+ for(int i = 0; i < count; i++) {
+ try {
+ T message = (T) getFirstMatching(actor, clazz);
+ if(message != null) {
+ return message;
+ }
+ } catch (Exception e) {}
+
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ Assert.fail("Did not receive message of type " + clazz);
+ return null;
+ }
}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
-import com.google.common.base.Optional;
public class MockActorContext extends ActorContext {
return executeRemoteOperationResponse;
}
- @Override public Optional<ActorSelection> findPrimaryShard(String shardName) {
- return Optional.absent();
- }
-
public void setExecuteShardOperationResponse(Object response){
executeShardOperationResponse = response;
}