*/
package org.opendaylight.controller.sal.compatibility;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Iterables;
import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader;
import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.FlowTopologyDiscoveryService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.Link;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInInventoryService, OpendaylightFlowStatisticsListener, OpendaylightFlowTableStatisticsListener, OpendaylightPortStatisticsListener {
private static final Logger LOG = LoggerFactory.getLogger(InventoryAndReadAdapter.class);
private static final short OPENFLOWV10_TABLE_ID = 0;
+ private static final int SLEEP_FOR_NOTIFICATIONS_MILLIS = 500;
private final InventoryNotificationProvider inventoryNotificationProvider = new InventoryNotificationProvider();
private final Map<PathArgument,List<PathArgument>> nodeToNodeConnectorsMap = new ConcurrentHashMap<>();
private List<IPluginOutInventoryService> inventoryPublisher = new CopyOnWriteArrayList<>();
private List<IPluginOutReadService> statisticsPublisher = new CopyOnWriteArrayList<>();
+ private Cache<String, TransactionNotificationList<? extends TransactionAware>> txCache;
private OpendaylightFlowTableStatisticsService flowTableStatisticsService;
private OpendaylightPortStatisticsService nodeConnectorStatisticsService;
public void startAdapter() {
inventoryNotificationProvider.setDataProviderService(getDataProviderService());
inventoryNotificationProvider.setInventoryPublisher(getInventoryPublisher());
+ txCache = CacheBuilder.newBuilder().expireAfterWrite(60L, TimeUnit.SECONDS).maximumSize(10000).build();
// inventoryNotificationProvider.start();
}
@Override
public List<FlowOnNode> readAllFlow(final Node node, final boolean cached) {
- final ArrayList<FlowOnNode> output = new ArrayList<>();
- final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
- if (table != null) {
- final List<Flow> flows = table.getFlow();
- LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
+ final ArrayList<FlowOnNode> ret= new ArrayList<>();
+ if (cached) {
+ final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
+ if (table != null) {
+ final List<Flow> flows = table.getFlow();
+ LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
+
+ for (final Flow flow : flows) {
+ final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class);
+ if (statsFromDataStore != null) {
+ final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node));
+ ret.add(addFlowStats(it, statsFromDataStore.getFlowStatistics()));
+ }
+ }
+ }
+ } else {
+ LOG.debug("readAllFlow cached:{}", cached);
+ GetAllFlowStatisticsFromFlowTableInput input =
+ new GetAllFlowStatisticsFromFlowTableInputBuilder()
+ .setNode(NodeMapping.toNodeRef(node))
+ .setTableId(new TableId(OPENFLOWV10_TABLE_ID))
+ .build();
+
+ Future<RpcResult<GetAllFlowStatisticsFromFlowTableOutput>> future =
+ getFlowStatisticsService().getAllFlowStatisticsFromFlowTable(input);
- for (final Flow flow : flows) {
- final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class);
- if (statsFromDataStore != null) {
- final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node));
- output.add(addFlowStats(it, statsFromDataStore.getFlowStatistics()));
+ RpcResult<GetAllFlowStatisticsFromFlowTableOutput> result = null;
+ try {
+ // having a blocking call is fine here, as we need to join
+ // the notifications and return the result
+ result = future.get();
+ } catch (Exception e) {
+ LOG.error("Exception in getAllFlowStatisticsFromFlowTable ", e);
+ return ret;
+ }
+
+ GetAllFlowStatisticsFromFlowTableOutput output = result.getResult();
+ if (output == null) {
+ return ret;
+ }
+
+ TransactionId transactionId = output.getTransactionId();
+ String cacheKey = buildCacheKey(transactionId, NodeMapping.toNodeId(node));
+ LOG.info("readAllFlow transactionId:{} cacheKey:{}", transactionId, cacheKey);
+
+ // insert an entry in tempcache, will get updated when notification is received
+ txCache.put(cacheKey, new TransactionNotificationList<FlowsStatisticsUpdate>(
+ transactionId, node.getNodeIDString()));
+
+ TransactionNotificationList<FlowsStatisticsUpdate> txnList =
+ (TransactionNotificationList<FlowsStatisticsUpdate>) txCache.getIfPresent(cacheKey);
+
+ // this loop would not be infinite as the cache will remove an entry
+ // after defined time if not written to
+ while (txnList != null && !txnList.areAllNotificationsGathered()) {
+ LOG.debug("readAllFlow waiting for notification...");
+ waitForNotification();
+ txnList = (TransactionNotificationList<FlowsStatisticsUpdate>) txCache.getIfPresent(cacheKey);
+ }
+
+ if (txnList == null) {
+ return ret;
+ }
+
+ List<FlowsStatisticsUpdate> notifications = txnList.getNotifications();
+ for (FlowsStatisticsUpdate flowsStatisticsUpdate : notifications) {
+ List<FlowAndStatisticsMapList> flowAndStatisticsMapList = flowsStatisticsUpdate.getFlowAndStatisticsMapList();
+ if (flowAndStatisticsMapList != null) {
+ for (FlowAndStatisticsMapList flowAndStatistics : flowAndStatisticsMapList) {
+ final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flowAndStatistics, node));
+ ret.add(addFlowStats(it, flowAndStatistics));
+ }
}
}
}
+ return ret;
+ }
+
+ private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
+ return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
+ }
- return output;
+ private void waitForNotification() {
+ try {
+ // going for a simple sleep approach,as wait-notify on a monitor would require
+ // us to maintain monitors per txn-node combo
+ Thread.sleep(SLEEP_FOR_NOTIFICATIONS_MILLIS);
+ LOG.trace("statCollector is waking up from a wait stat Response sleep");
+ } catch (final InterruptedException e) {
+ LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
+ }
}
@Override
for (final IPluginOutReadService statsPublisher : getStatisticsPublisher()) {
statsPublisher.nodeFlowStatisticsUpdated(aDNode, adsalFlowsStatistics);
}
+
+ updateTransactionCache(notification, notification.getId(), !notification.isMoreReplies());
}
/**
private List<PathArgument> removeNodeConnectors(final InstanceIdentifier<? extends Object> nodeIdentifier) {
return this.nodeToNodeConnectorsMap.remove(Iterables.get(nodeIdentifier.getPathArguments(), 1));
}
+
+ private <T extends TransactionAware> void updateTransactionCache(T notification, NodeId nodeId, boolean lastNotification) {
+
+ String cacheKey = buildCacheKey(notification.getTransactionId(), nodeId);
+ TransactionNotificationList<T> txnList = (TransactionNotificationList<T>) txCache.getIfPresent(cacheKey);
+ final Optional<TransactionNotificationList<T>> optional = Optional.<TransactionNotificationList<T>>fromNullable(txnList);
+ if (optional.isPresent()) {
+ LOG.info("updateTransactionCache cacheKey:{}, lastNotification:{}, txnList-present:{}", cacheKey, lastNotification, optional.isPresent());
+ TransactionNotificationList<T> txn = optional.get();
+ txn.addNotification(notification);
+ txn.setAllNotificationsGathered(lastNotification);
+ }
+ }
+
+ private class TransactionNotificationList<T extends TransactionAware> {
+ private TransactionId id;
+ private String nId;
+ private List<T> notifications;
+ private boolean allNotificationsGathered;
+
+ public TransactionNotificationList(TransactionId id, String nId) {
+ this.nId = nId;
+ this.id = id;
+ notifications = new ArrayList<T>();
+ }
+
+ public void addNotification(T notification) {
+ notifications.add(notification);
+ }
+
+ public void setAllNotificationsGathered(boolean allNotificationsGathered) {
+ this.allNotificationsGathered = allNotificationsGathered;
+ }
+
+ public boolean areAllNotificationsGathered() {
+ return allNotificationsGathered;
+ }
+
+ public List<T> getNotifications() {
+ return notifications;
+ }
+
+ }
+
}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
-import java.math.BigInteger;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.regex.Pattern;
import org.opendaylight.controller.sal.common.util.Arguments;
import org.opendaylight.controller.sal.core.AdvertisedBandwidth;
import org.opendaylight.controller.sal.core.Bandwidth;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.math.BigInteger;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
public final class NodeMapping {
private static final Logger LOG = LoggerFactory
* @param aDNode
* @return
*/
- private static NodeId toNodeId(org.opendaylight.controller.sal.core.Node aDNode) {
+ public static NodeId toNodeId(org.opendaylight.controller.sal.core.Node aDNode) {
String targetPrefix = null;
if (NodeIDType.OPENFLOW.equals(aDNode.getType())) {
targetPrefix = OPENFLOW_ID_PREFIX;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class DataChangeListener extends AbstractUntypedActor {
+ private static final Logger LOG = LoggerFactory.getLogger(DataChangeListener.class);
+
private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
private boolean notificationsEnabled = false;
this.listener = Preconditions.checkNotNull(listener, "listener should not be null");
}
- @Override public void handleReceive(Object message) throws Exception {
+ @Override
+ public void handleReceive(Object message) throws Exception {
if(message instanceof DataChanged){
dataChanged(message);
} else if(message instanceof EnableNotification){
private void enableNotification(EnableNotification message) {
notificationsEnabled = message.isEnabled();
+ LOG.debug("{} notifications for listener {}", (notificationsEnabled ? "Enabled" : "Disabled"),
+ listener);
}
private void dataChanged(Object message) {
// Do nothing if notifications are not enabled
- if(!notificationsEnabled){
+ if(!notificationsEnabled) {
+ LOG.debug("Notifications not enabled for listener {} - dropping change notification",
+ listener);
return;
}
DataChanged reply = (DataChanged) message;
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
- change = reply.getChange();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = reply.getChange();
+
+ LOG.debug("Sending change notification {} to listener {}", change, listener);
+
this.listener.onDataChanged(change);
// It seems the sender is never null but it doesn't hurt to check. If the caller passes in
package org.opendaylight.controller.cluster.datastore;
+import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
+import akka.dispatch.OnComplete;
+import akka.util.Timeout;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import scala.concurrent.Future;
/**
* ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard
* The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
* </p>
*/
+@SuppressWarnings("rawtypes")
public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
+
+ public static final Timeout REGISTER_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
+
private volatile ActorSelection listenerRegistrationActor;
- private final AsyncDataChangeListener listener;
- private final ActorRef dataChangeListenerActor;
+ private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
+ private ActorRef dataChangeListenerActor;
+ private final String shardName;
+ private final ActorContext actorContext;
private boolean closed = false;
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- DataChangeListenerRegistrationProxy(
- ActorSelection listenerRegistrationActor,
- L listener, ActorRef dataChangeListenerActor) {
- this.listenerRegistrationActor = listenerRegistrationActor;
+ DataChangeListenerRegistrationProxy (
+ String shardName, ActorContext actorContext, L listener) {
+ this.shardName = shardName;
+ this.actorContext = actorContext;
this.listener = listener;
- this.dataChangeListenerActor = dataChangeListenerActor;
}
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- DataChangeListenerRegistrationProxy(
- L listener, ActorRef dataChangeListenerActor) {
- this(null, listener, dataChangeListenerActor);
+ @VisibleForTesting
+ ActorSelection getListenerRegistrationActor() {
+ return listenerRegistrationActor;
+ }
+
+ @VisibleForTesting
+ ActorRef getDataChangeListenerActor() {
+ return dataChangeListenerActor;
}
@Override
return listener;
}
- public void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+ private void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+ if(listenerRegistrationActor == null) {
+ return;
+ }
+
boolean sendCloseMessage = false;
synchronized(this) {
if(closed) {
this.listenerRegistrationActor = listenerRegistrationActor;
}
}
+
if(sendCloseMessage) {
listenerRegistrationActor.tell(new
CloseDataChangeListenerRegistration().toSerializable(), null);
}
+ }
- this.listenerRegistrationActor = listenerRegistrationActor;
+ public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
+
+ dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+ DataChangeListener.props(listener));
+
+ Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName, REGISTER_TIMEOUT);
+ findFuture.onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(Throwable failure, ActorRef shard) {
+ if(failure instanceof LocalShardNotFoundException) {
+ LOG.debug("No local shard found for {} - DataChangeListener {} at path {} " +
+ "cannot be registered", shardName, listener, path);
+ } else if(failure != null) {
+ LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} " +
+ "cannot be registered: {}", shardName, listener, path, failure);
+ } else {
+ doRegistration(shard, path, scope);
+ }
+ }
+ }, actorContext.getActorSystem().dispatcher());
}
- public ActorSelection getListenerRegistrationActor() {
- return listenerRegistrationActor;
+ private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
+ DataChangeScope scope) {
+
+ Future<Object> future = actorContext.executeOperationAsync(shard,
+ new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+ REGISTER_TIMEOUT);
+
+ future.onComplete(new OnComplete<Object>(){
+ @Override
+ public void onComplete(Throwable failure, Object result) {
+ if(failure != null) {
+ LOG.error("Failed to register DataChangeListener {} at path {}",
+ listener, path.toString(), failure);
+ } else {
+ RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+ setListenerRegistrationActor(actorContext.actorSelection(
+ reply.getListenerRegistrationPath()));
+ }
+ }
+ }, actorContext.getActorSystem().dispatcher());
}
@Override
sendCloseMessage = !closed && listenerRegistrationActor != null;
closed = true;
}
+
if(sendCloseMessage) {
- listenerRegistrationActor.tell(new
- CloseDataChangeListenerRegistration().toSerializable(), null);
+ listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration().toSerializable(),
+ ActorRef.noSender());
+ listenerRegistrationActor = null;
}
- dataChangeListenerActor.tell(PoisonPill.getInstance(), null);
+ if(dataChangeListenerActor != null) {
+ dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ dataChangeListenerActor = null;
+ }
}
}
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
/**
*
String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
- Optional<ActorRef> shard = actorContext.findLocalShard(shardName);
-
- //if shard is NOT local
- if (!shard.isPresent()) {
- LOG.debug("No local shard for shardName {} was found so returning a noop registration", shardName);
- return new NoOpDataChangeListenerRegistration(listener);
- }
- //if shard is local
- ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(DataChangeListener.props(listener));
- Future future = actorContext.executeOperationAsync(shard.get(),
- new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
- new Timeout(actorContext.getOperationDuration().$times(REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR)));
-
final DataChangeListenerRegistrationProxy listenerRegistrationProxy =
- new DataChangeListenerRegistrationProxy(listener, dataChangeListenerActor);
-
- future.onComplete(new OnComplete() {
-
- @Override
- public void onComplete(Throwable failure, Object result)
- throws Throwable {
- if (failure != null) {
- LOG.error("Failed to register listener at path " + path.toString(), failure);
- return;
- }
- RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
- listenerRegistrationProxy.setListenerRegistrationActor(actorContext
- .actorSelection(reply.getListenerRegistrationPath()));
- }
- }, actorContext.getActorSystem().dispatcher());
+ new DataChangeListenerRegistrationProxy(shardName, actorContext, listener);
+ listenerRegistrationProxy.init(path, scope);
return listenerRegistrationProxy;
-
}
@Override
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.UntypedActor;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
-
-public class NoOpCohort extends UntypedActor {
-
- @Override public void onReceive(Object message) throws Exception {
- if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
- getSender().tell(new CanCommitTransactionReply(false).toSerializable(), getSelf());
- } else if (message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
- getSender().tell(
- new PreCommitTransactionReply().toSerializable(),
- getSelf());
- } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
- getSender().tell(new CommitTransactionReply().toSerializable(), getSelf());
- } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
- getSender().tell(new AbortTransactionReply().toSerializable(), getSelf());
- } else {
- throw new Exception ("Not recognized message received,message="+message);
- }
-
- }
-}
-
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * When a consumer registers a data change listener and no local shard is
- * available to register that listener with then we return an instance of
- * NoOpDataChangeListenerRegistration
- *
- * <p>
- *
- * The NoOpDataChangeListenerRegistration as it's name suggests does
- * nothing when an operation is invoked on it
- */
-public class NoOpDataChangeListenerRegistration
- implements ListenerRegistration {
-
- private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
- listener;
-
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> NoOpDataChangeListenerRegistration(
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
-
- this.listener = listener;
- }
-
- @Override
- public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
- return listener;
- }
-
- @Override public void close() {
-
- }
-}
import scala.concurrent.duration.FiniteDuration;
import javax.annotation.Nonnull;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
private final ShardStats shardMBean;
- private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
+ private final List<ActorSelection> dataChangeListeners = Lists.newArrayList();
+
+ private final List<DelayedListenerRegistration> delayedListenerRegistrations =
+ Lists.newArrayList();
private final DatastoreContext datastoreContext;
if (message instanceof RecoveryFailure){
LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+
+ // Even though recovery failed, we still need to finish our recovery, eg send the
+ // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
+ onRecoveryComplete();
} else {
super.onReceiveRecover(message);
}
store.onGlobalContextUpdated(message.getSchemaContext());
}
- @VisibleForTesting void updateSchemaContext(SchemaContext schemaContext) {
+ @VisibleForTesting
+ void updateSchemaContext(SchemaContext schemaContext) {
store.onGlobalContextUpdated(schemaContext);
}
- private void registerChangeListener(
- RegisterChangeListener registerChangeListener) {
+ private void registerChangeListener(RegisterChangeListener registerChangeListener) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("registerDataChangeListener for {}", registerChangeListener
- .getPath());
+ LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
+
+ ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> registration;
+ if(isLeader()) {
+ registration = doChangeListenerRegistration(registerChangeListener);
+ } else {
+ LOG.debug("Shard is not the leader - delaying registration");
+
+ DelayedListenerRegistration delayedReg =
+ new DelayedListenerRegistration(registerChangeListener);
+ delayedListenerRegistrations.add(delayedReg);
+ registration = delayedReg;
}
+ ActorRef listenerRegistration = getContext().actorOf(
+ DataChangeListenerRegistration.props(registration));
- ActorSelection dataChangeListenerPath = getContext()
- .system().actorSelection(
- registerChangeListener.getDataChangeListenerPath());
+ LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+ listenerRegistration.path());
+ getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf());
+ }
+
+ private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> doChangeListenerRegistration(
+ RegisterChangeListener registerChangeListener) {
+
+ ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
+ registerChangeListener.getDataChangeListenerPath());
// Notify the listener if notifications should be enabled or not
// If this shard is the leader then it will enable notifications else
// it will not
- dataChangeListenerPath
- .tell(new EnableNotification(isLeader()), getSelf());
+ dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
// Now store a reference to the data change listener so it can be notified
// at a later point if notifications should be enabled or disabled
dataChangeListeners.add(dataChangeListenerPath);
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
- listener = new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
-
- ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- registration = store.registerChangeListener(registerChangeListener.getPath(),
- listener, registerChangeListener.getScope());
- ActorRef listenerRegistration =
- getContext().actorOf(
- DataChangeListenerRegistration.props(registration));
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
+ new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
- if(LOG.isDebugEnabled()) {
- LOG.debug(
- "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
- , listenerRegistration.path().toString());
- }
+ LOG.debug("Registering for path {}", registerChangeListener.getPath());
- getSender()
- .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
- getSelf());
+ return store.registerChangeListener(registerChangeListener.getPath(), listener,
+ registerChangeListener.getScope());
}
private boolean isMetricsCaptureEnabled(){
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
- // Schedule a message to be periodically sent to check if the current in-progress
- // transaction should be expired and aborted.
- FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
- txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
- period, period, getSelf(),
- TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
+ // Being paranoid here - this method should only be called once but just in case...
+ if(txCommitTimeoutCheckSchedule == null) {
+ // Schedule a message to be periodically sent to check if the current in-progress
+ // transaction should be expired and aborted.
+ FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
+ txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
+ period, period, getSelf(),
+ TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
+ }
}
@Override
}
}
- @Override protected void onStateChanged() {
+ @Override
+ protected void onStateChanged() {
+ boolean isLeader = isLeader();
for (ActorSelection dataChangeListener : dataChangeListeners) {
- dataChangeListener
- .tell(new EnableNotification(isLeader()), getSelf());
+ dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
+ }
+
+ if(isLeader) {
+ for(DelayedListenerRegistration reg: delayedListenerRegistrations) {
+ if(!reg.isClosed()) {
+ reg.setDelegate(doChangeListenerRegistration(reg.getRegisterChangeListener()));
+ }
+ }
+
+ delayedListenerRegistrations.clear();
}
shardMBean.setRaftState(getRaftState().name());
shardMBean.setCurrentTerm(getCurrentTerm());
// If this actor is no longer the leader close all the transaction chains
- if(!isLeader()){
+ if(!isLeader){
for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
if(LOG.isDebugEnabled()) {
LOG.debug(
ShardStats getShardMBean() {
return shardMBean;
}
+
+ private static class DelayedListenerRegistration implements
+ ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
+
+ private volatile boolean closed;
+
+ private final RegisterChangeListener registerChangeListener;
+
+ private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> delegate;
+
+ DelayedListenerRegistration(RegisterChangeListener registerChangeListener) {
+ this.registerChangeListener = registerChangeListener;
+ }
+
+ void setDelegate( ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> registration) {
+ this.delegate = registration;
+ }
+
+ boolean isClosed() {
+ return closed;
+ }
+
+ RegisterChangeListener getRegisterChangeListener() {
+ return registerChangeListener;
+ }
+
+ @Override
+ public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
+ return delegate != null ? delegate.getInstance() : null;
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ if(delegate != null) {
+ delegate.close();
+ }
+ }
+ }
}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
LOG.debug("Initializing shard [{}]", shardName);
ShardInformation shardInformation = localShards.get(shardName);
if (shardInformation != null) {
- shardInformation.setShardInitialized(true);
+ shardInformation.setActorInitialized();
}
}
return;
}
- sendResponse(shardInformation, new Supplier<Object>() {
+ sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
@Override
public Object get() {
return new LocalShardFound(shardInformation.getActor());
});
}
- private void sendResponse(ShardInformation shardInformation, Supplier<Object> messageSupplier) {
- if (shardInformation.getActor() == null || !shardInformation.isShardInitialized()) {
- getSender().tell(new ActorNotInitialized(), getSelf());
+ private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
+ final Supplier<Object> messageSupplier) {
+ if (!shardInformation.isShardInitialized()) {
+ if(waitUntilInitialized) {
+ final ActorRef sender = getSender();
+ final ActorRef self = self();
+ shardInformation.addRunnableOnInitialized(new Runnable() {
+ @Override
+ public void run() {
+ sender.tell(messageSupplier.get(), self);
+ }
+ });
+ } else {
+ getSender().tell(new ActorNotInitialized(), getSelf());
+ }
+
return;
}
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
if (info != null) {
- sendResponse(info, new Supplier<Object>() {
+ sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
@Override
public Object get() {
return new PrimaryFound(info.getActorPath().toString()).toSerializable();
private ActorRef actor;
private ActorPath actorPath;
private final Map<ShardIdentifier, String> peerAddresses;
- private boolean shardInitialized = false; // flag that determines if the actor is ready for business
+
+ // flag that determines if the actor is ready for business
+ private boolean actorInitialized = false;
+
+ private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
private ShardInformation(String shardName, ShardIdentifier shardId,
Map<ShardIdentifier, String> peerAddresses) {
}
boolean isShardInitialized() {
- return shardInitialized;
+ return getActor() != null && actorInitialized;
+ }
+
+ void setActorInitialized() {
+ this.actorInitialized = true;
+
+ for(Runnable runnable: runnablesOnInitialized) {
+ runnable.run();
+ }
+
+ runnablesOnInitialized.clear();
}
- void setShardInitialized(boolean shardInitialized) {
- this.shardInitialized = shardInitialized;
+ void addRunnableOnInitialized(Runnable runnable) {
+ runnablesOnInitialized.add(runnable);
}
}
}
static class SchemaContextModules implements Serializable {
- private static final long serialVersionUID = 1L;
-
private final Set<String> modules;
SchemaContextModules(Set<String> modules){
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.exceptions;
+
+/**
+ * Exception thrown when attempting to find a local shard but it doesn't exist.
+ *
+ * @author Thomas Pantelis
+ */
+public class LocalShardNotFoundException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public LocalShardNotFoundException(String message){
+ super(message);
+ }
+}
*/
public class FindLocalShard {
private final String shardName;
+ private final boolean waitUntilInitialized;
- public FindLocalShard(String shardName) {
+ public FindLocalShard(String shardName, boolean waitUntilInitialized) {
this.shardName = shardName;
+ this.waitUntilInitialized = waitUntilInitialized;
}
public String getShardName() {
return shardName;
}
+
+ public boolean isWaitUntilInitialized() {
+ return waitUntilInitialized;
+ }
}
*
*/
public class FindPrimary implements SerializableMessage{
- public static final Class SERIALIZABLE_CLASS = FindPrimary.class;
+ public static final Class<FindPrimary> SERIALIZABLE_CLASS = FindPrimary.class;
+
private final String shardName;
+ private final boolean waitUntilInitialized;
- public FindPrimary(String shardName){
+ public FindPrimary(String shardName, boolean waitUntilInitialized){
Preconditions.checkNotNull(shardName, "shardName should not be null");
this.shardName = shardName;
+ this.waitUntilInitialized = waitUntilInitialized;
}
public String getShardName() {
return shardName;
}
- @Override
- public Object toSerializable() {
- return this;
- }
+ public boolean isWaitUntilInitialized() {
+ return waitUntilInitialized;
+ }
- public static FindPrimary fromSerializable(Object message){
- return (FindPrimary) message;
- }
+ @Override
+ public Object toSerializable() {
+ return this;
+ }
+
+ public static FindPrimary fromSerializable(Object message){
+ return (FindPrimary) message;
+ }
}
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
+import akka.dispatch.Mapper;
import akka.util.Timeout;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
}
/**
- * Finds a local shard given it's shard name and return it's ActorRef
+ * Finds a local shard given its shard name and return it's ActorRef
*
* @param shardName the name of the local shard that needs to be found
* @return a reference to a local shard actor which represents the shard
* specified by the shardName
*/
public Optional<ActorRef> findLocalShard(String shardName) {
- Object result = executeOperation(shardManager, new FindLocalShard(shardName));
+ Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
if (result instanceof LocalShardFound) {
LocalShardFound found = (LocalShardFound) result;
return Optional.absent();
}
+ /**
+ * Finds a local shard async given its shard name and return a Future from which to obtain the
+ * ActorRef.
+ *
+ * @param shardName the name of the local shard that needs to be found
+ */
+ public Future<ActorRef> findLocalShardAsync( final String shardName, Timeout timeout) {
+ Future<Object> future = executeOperationAsync(shardManager,
+ new FindLocalShard(shardName, true), timeout);
+
+ return future.map(new Mapper<Object, ActorRef>() {
+ @Override
+ public ActorRef checkedApply(Object response) throws Throwable {
+ if(response instanceof LocalShardFound) {
+ LocalShardFound found = (LocalShardFound)response;
+ LOG.debug("Local shard found {}", found.getPath());
+ return found.getPath();
+ } else if(response instanceof ActorNotInitialized) {
+ throw new NotInitializedException(
+ String.format("Found local shard for %s but it's not initialized yet.",
+ shardName));
+ } else if(response instanceof LocalShardNotFound) {
+ throw new LocalShardNotFoundException(
+ String.format("Local shard for %s does not exist.", shardName));
+ }
+
+ throw new UnknownMessageException(String.format(
+ "FindLocalShard returned unkown response: %s", response));
+ }
+ }, getActorSystem().dispatcher());
+ }
private String findPrimaryPathOrNull(String shardName) {
- Object result = executeOperation(shardManager, new FindPrimary(shardName).toSerializable());
+ Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
PrimaryFound found = PrimaryFound.fromSerializable(result);
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.cluster.datastore;
+import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
import akka.actor.Props;
-import junit.framework.Assert;
+import akka.actor.Terminated;
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
-import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
+import scala.concurrent.ExecutionContextExecutor;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
-import java.util.List;
+/**
+ * Unit tests for DataChangeListenerRegistrationProxy.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertNotNull;
-import static junit.framework.TestCase.assertTrue;
+ @SuppressWarnings("unchecked")
+ private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockListener =
+ Mockito.mock(AsyncDataChangeListener.class);
-public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
+ @Test
+ public void testGetInstance() throws Exception {
+ DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ "shard", Mockito.mock(ActorContext.class), mockListener);
+
+ Assert.assertEquals(mockListener, proxy.getInstance());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout=10000)
+ public void testSuccessfulRegistration() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+ mock(ClusterWrapper.class), mock(Configuration.class));
+
+ final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ "shard-1", actorContext, mockListener);
+
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+ new Thread() {
+ @Override
+ public void run() {
+ proxy.init(path, scope);
+ }
+
+ }.start();
+
+ FiniteDuration timeout = duration("5 seconds");
+ FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+ Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+ reply(new LocalShardFound(getRef()));
+
+ RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
+ Assert.assertEquals("getPath", path, registerMsg.getPath());
+ Assert.assertEquals("getScope", scope, registerMsg.getScope());
+
+ reply(new RegisterChangeListenerReply(getRef().path()));
+
+ for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
+ proxy.getListenerRegistrationActor());
+
+ watch(proxy.getDataChangeListenerActor());
- private ActorRef dataChangeListenerActor = getSystem().actorOf(Props.create(DoNothingActor.class));
+ proxy.close();
- private static class MockDataChangeListener implements
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
+ // The listener registration actor should get a Close message
+ expectMsgClass(timeout, CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
- @Override public void onDataChanged(
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- throw new UnsupportedOperationException("onDataChanged");
- }
+ // The DataChangeListener actor should be terminated
+ expectMsgClass(timeout, Terminated.class);
+
+ proxy.close();
+
+ expectNoMsg();
+ }};
}
- @Test
- public void testGetInstance() throws Exception {
- final Props props = Props.create(MessageCollectorActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
+ @Test(timeout=10000)
+ public void testLocalShardNotFound() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+ mock(ClusterWrapper.class), mock(Configuration.class));
- MockDataChangeListener listener =
- new MockDataChangeListener();
- DataChangeListenerRegistrationProxy proxy =
- new DataChangeListenerRegistrationProxy(
- getSystem().actorSelection(actorRef.path()),
- listener, dataChangeListenerActor);
+ final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ "shard-1", actorContext, mockListener);
- Assert.assertEquals(listener, proxy.getInstance());
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+ new Thread() {
+ @Override
+ public void run() {
+ proxy.init(path, scope);
+ }
+ }.start();
+
+ FiniteDuration timeout = duration("5 seconds");
+ FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+ Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+ reply(new LocalShardNotFound("shard-1"));
+
+ expectNoMsg(duration("1 seconds"));
+ }};
}
- @Test
- public void testClose() throws Exception {
- final Props props = Props.create(MessageCollectorActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
+ @Test(timeout=10000)
+ public void testLocalShardNotInitialized() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+ mock(ClusterWrapper.class), mock(Configuration.class));
- DataChangeListenerRegistrationProxy proxy =
- new DataChangeListenerRegistrationProxy(
- getSystem().actorSelection(actorRef.path()),
- new MockDataChangeListener(), dataChangeListenerActor);
+ final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ "shard-1", actorContext, mockListener);
- proxy.close();
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+ new Thread() {
+ @Override
+ public void run() {
+ proxy.init(path, scope);
+ }
+
+ }.start();
+
+ FiniteDuration timeout = duration("5 seconds");
+ FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+ Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+ reply(new ActorNotInitialized());
+
+ new Within(duration("1 seconds")) {
+ @Override
+ protected void run() {
+ expectNoMsg();
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testFailedRegistration() {
+ new JavaTestKit(getSystem()) {{
+ ActorSystem mockActorSystem = mock(ActorSystem.class);
- //Check if it was received by the remote actor
- ActorContext
- testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
- Object messages = testContext
- .executeOperation(actorRef, "messages");
+ ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class),
+ "testFailedRegistration");
+ doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
+ ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(
+ MoreExecutors.sameThreadExecutor());
+ doReturn(executor).when(mockActorSystem).dispatcher();
- assertNotNull(messages);
+ ActorContext actorContext = mock(ActorContext.class);
- assertTrue(messages instanceof List);
+ String shardName = "shard-1";
+ final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ shardName, actorContext, mockListener);
- List<Object> listMessages = (List<Object>) messages;
+ doReturn(mockActorSystem).when(actorContext).getActorSystem();
+ doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
+ doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName),
+ any(Timeout.class));
+ doReturn(Futures.failed(new RuntimeException("mock"))).
+ when(actorContext).executeOperationAsync(any(ActorRef.class),
+ any(Object.class), any(Timeout.class));
- assertEquals(1, listMessages.size());
+ proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
+ AsyncDataBroker.DataChangeScope.ONE);
- assertTrue(listMessages.get(0).getClass()
- .equals(CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS));
+ Assert.assertEquals("getListenerRegistrationActor", null,
+ proxy.getListenerRegistrationActor());
+ }};
}
+ @SuppressWarnings("unchecked")
@Test
- public void testCloseWhenRegistrationIsNull() throws Exception {
- final Props props = Props.create(MessageCollectorActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
+ public void testCloseBeforeRegistration() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = mock(ActorContext.class);
- DataChangeListenerRegistrationProxy proxy =
- new DataChangeListenerRegistrationProxy(
- new MockDataChangeListener(), dataChangeListenerActor);
+ String shardName = "shard-1";
+ final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ shardName, actorContext, mockListener);
- proxy.close();
+ doReturn(getSystem()).when(actorContext).getActorSystem();
+ doReturn(getSystem().actorSelection(getRef().path())).
+ when(actorContext).actorSelection(getRef().path());
+ doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
+ doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName),
+ any(Timeout.class));
- //Check if it was received by the remote actor
- ActorContext
- testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
- Object messages = testContext
- .executeOperation(actorRef, "messages");
+ Answer<Future<Object>> answer = new Answer<Future<Object>>() {
+ @Override
+ public Future<Object> answer(InvocationOnMock invocation) {
+ proxy.close();
+ return Futures.successful((Object)new RegisterChangeListenerReply(getRef().path()));
+ }
+ };
- assertNotNull(messages);
+ doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class),
+ any(Object.class), any(Timeout.class));
- assertTrue(messages instanceof List);
+ proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
+ AsyncDataBroker.DataChangeScope.ONE);
- List<Object> listMessages = (List<Object>) messages;
+ expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
- assertEquals(0, listMessages.size());
+ Assert.assertEquals("getListenerRegistrationActor", null,
+ proxy.getListenerRegistrationActor());
+ }};
}
}
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
}};
}
+ @Test
+ public void testChangeListenerRegistration() throws Exception{
+ new IntegrationTestKit(getSystem()) {{
+ DistributedDataStore dataStore =
+ setupDistributedDataStore("testChangeListenerRegistration", "test-1");
+
+ MockDataChangeListener listener = new MockDataChangeListener(3);
+
+ ListenerRegistration<MockDataChangeListener>
+ listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
+ DataChangeScope.SUBTREE);
+
+ assertNotNull("registerChangeListener returned null", listenerReg);
+
+ testWriteTransaction(dataStore, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+ YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
+ nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
+ testWriteTransaction(dataStore, listPath,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
+
+ listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath );
+
+ listenerReg.close();
+
+ testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
+ nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
+
+ listener.expectNoMoreChanges("Received unexpected change after close");
+
+ cleanup(dataStore);
+ }};
+ }
+
class IntegrationTestKit extends ShardTestKit {
IntegrationTestKit(ActorSystem actorSystem) {
+++ /dev/null
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.dispatch.ExecutionContexts;
-import akka.dispatch.Futures;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import java.util.concurrent.TimeUnit;
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertNull;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class DistributedDataStoreTest extends AbstractActorTest{
-
- private DistributedDataStore distributedDataStore;
- private MockActorContext mockActorContext;
- private ActorRef doNothingActorRef;
-
- @Before
- public void setUp() throws Exception {
- ShardStrategyFactory.setConfiguration(new MockConfiguration());
- final Props props = Props.create(DoNothingActor.class);
-
- doNothingActorRef = getSystem().actorOf(props);
-
- mockActorContext = new MockActorContext(getSystem(), doNothingActorRef);
- distributedDataStore = new DistributedDataStore(mockActorContext);
- distributedDataStore.onGlobalContextUpdated(
- TestModel.createTestContext());
-
- // Make CreateTransactionReply as the default response. Will need to be
- // tuned if a specific test requires some other response
- mockActorContext.setExecuteShardOperationResponse(
- CreateTransactionReply.newBuilder()
- .setTransactionActorPath(doNothingActorRef.path().toString())
- .setTransactionId("txn-1 ")
- .build());
- }
-
- @After
- public void tearDown() throws Exception {
-
- }
-
- @SuppressWarnings("resource")
- @Test
- public void testConstructor(){
- ActorSystem actorSystem = mock(ActorSystem.class);
-
- new DistributedDataStore(actorSystem, "config",
- mock(ClusterWrapper.class), mock(Configuration.class),
- DatastoreContext.newBuilder().build());
-
- verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
- }
-
- @Test
- public void testRegisterChangeListenerWhenShardIsNotLocal() throws Exception {
-
- ListenerRegistration registration =
- distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
- @Override
- public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- throw new UnsupportedOperationException("onDataChanged");
- }
- }, AsyncDataBroker.DataChangeScope.BASE);
-
- // Since we do not expect the shard to be local registration will return a NoOpRegistration
- assertTrue(registration instanceof NoOpDataChangeListenerRegistration);
-
- assertNotNull(registration);
- }
-
- @Test
- public void testRegisterChangeListenerWhenShardIsLocal() throws Exception {
- ActorContext actorContext = mock(ActorContext.class);
-
- distributedDataStore = new DistributedDataStore(actorContext);
- distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
-
- Future future = mock(Future.class);
- when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
- when(actorContext.getActorSystem()).thenReturn(getSystem());
- when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
- when(actorContext
- .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(future);
-
- ListenerRegistration registration =
- distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
- mock(AsyncDataChangeListener.class),
- AsyncDataBroker.DataChangeScope.BASE);
-
- assertNotNull(registration);
-
- assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
- }
-
- @Test
- public void testRegisterChangeListenerWhenSuccessfulReplyReceived() throws Exception {
- ActorContext actorContext = mock(ActorContext.class);
-
- distributedDataStore = new DistributedDataStore(actorContext);
- distributedDataStore.onGlobalContextUpdated(
- TestModel.createTestContext());
-
- ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.sameThreadExecutor());
-
- // Make Future successful
- Future f = Futures.successful(new RegisterChangeListenerReply(doNothingActorRef.path()));
-
- // Setup the mocks
- ActorSystem actorSystem = mock(ActorSystem.class);
- ActorSelection actorSelection = mock(ActorSelection.class);
-
- when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
- when(actorSystem.dispatcher()).thenReturn(executor);
- when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
- when(actorContext.getActorSystem()).thenReturn(actorSystem);
- when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
- when(actorContext
- .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(f);
- when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
-
- ListenerRegistration registration =
- distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
- mock(AsyncDataChangeListener.class),
- AsyncDataBroker.DataChangeScope.BASE);
-
- assertNotNull(registration);
-
- assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
-
- ActorSelection listenerRegistrationActor =
- ((DataChangeListenerRegistrationProxy) registration).getListenerRegistrationActor();
-
- assertNotNull(listenerRegistrationActor);
-
- assertEquals(actorSelection, listenerRegistrationActor);
- }
-
- @Test
- public void testRegisterChangeListenerWhenSuccessfulReplyFailed() throws Exception {
- ActorContext actorContext = mock(ActorContext.class);
-
- distributedDataStore = new DistributedDataStore(actorContext);
- distributedDataStore.onGlobalContextUpdated(
- TestModel.createTestContext());
-
- ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.sameThreadExecutor());
-
- // Make Future fail
- Future f = Futures.failed(new IllegalArgumentException());
-
- // Setup the mocks
- ActorSystem actorSystem = mock(ActorSystem.class);
- ActorSelection actorSelection = mock(ActorSelection.class);
-
- when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
- when(actorSystem.dispatcher()).thenReturn(executor);
- when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
- when(actorContext.getActorSystem()).thenReturn(actorSystem);
- when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
- when(actorContext
- .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(f);
- when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
-
- ListenerRegistration registration =
- distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
- mock(AsyncDataChangeListener.class),
- AsyncDataBroker.DataChangeScope.BASE);
-
- assertNotNull(registration);
-
- assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
-
- ActorSelection listenerRegistrationActor =
- ((DataChangeListenerRegistrationProxy) registration).getListenerRegistrationActor();
-
- assertNull(listenerRegistrationActor);
-
- }
-
-
- @Test
- public void testCreateTransactionChain() throws Exception {
- final DOMStoreTransactionChain transactionChain = distributedDataStore.createTransactionChain();
- assertNotNull(transactionChain);
- }
-
- @Test
- public void testNewReadOnlyTransaction() throws Exception {
- final DOMStoreReadTransaction transaction = distributedDataStore.newReadOnlyTransaction();
- assertNotNull(transaction);
- }
-
- @Test
- public void testNewWriteOnlyTransaction() throws Exception {
- final DOMStoreWriteTransaction transaction = distributedDataStore.newWriteOnlyTransaction();
- assertNotNull(transaction);
- }
-
- @Test
- public void testNewReadWriteTransaction() throws Exception {
- final DOMStoreReadWriteTransaction transaction = distributedDataStore.newReadWriteTransaction();
- assertNotNull(transaction);
- }
-}
import akka.actor.ActorRef;
import akka.actor.Props;
+import akka.pattern.Patterns;
import akka.persistence.RecoveryCompleted;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import akka.util.Timeout;
import akka.japi.Creator;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
import java.net.URI;
import java.util.Collection;
import java.util.HashSet;
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shardManager.tell(new FindPrimary("non-existent").toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
expectMsgEquals(duration("5 seconds"),
new PrimaryNotFound("non-existent").toSerializable());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
}};
}
@Test
- public void testOnReceiveFindPrimaryForNotInitialzedShard() throws Exception {
+ public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
}};
}
+ @Test
+ public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ // We're passing waitUntilInitialized = true to FindPrimary so the response should be
+ // delayed until we send ActorInitialized.
+ Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
+ new Timeout(5, TimeUnit.SECONDS));
+
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ Object resp = Await.result(future, duration("5 seconds"));
+ assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
+ }};
+ }
+
@Test
public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shardManager.tell(new FindLocalShard("non-existent"), getRef());
+ shardManager.tell(new FindLocalShard("non-existent", false), getRef());
LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- //shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
}};
}
+ @Test
+ public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
+ // delayed until we send ActorInitialized.
+ Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
+ new Timeout(5, TimeUnit.SECONDS));
+
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ Object resp = Await.result(future, duration("5 seconds"));
+ assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
+ }};
+ }
+
@Test
public void testOnReceiveMemberUp() throws Exception {
new JavaTestKit(getSystem()) {{
MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
- shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
PrimaryFound.SERIALIZABLE_CLASS));
MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
- shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
- shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
}};
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.dispatch.Dispatchers;
import akka.dispatch.OnComplete;
import akka.japi.Creator;
import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
- private static final ShardIdentifier IDENTIFIER = ShardIdentifier.builder().memberName("member-1")
- .shardName("inventory").type("config").build();
-
private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
- private static String shardName() {
- return "shard" + NEXT_SHARD_NUM.getAndIncrement();
- }
+ private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
- shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).build();
+ shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
+ shardHeartbeatIntervalInMillis(100).build();
@Before
public void setUp() {
}
private Props newShardProps() {
- return Shard.props(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+ return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
dataStoreContext, SCHEMA_CONTEXT);
}
@Test
- public void testOnReceiveRegisterListener() throws Exception {
- new JavaTestKit(getSystem()) {{
- ActorRef subject = getSystem().actorOf(newShardProps(), "testRegisterChangeListener");
+ public void testRegisterChangeListener() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps(), "testRegisterChangeListener");
- subject.tell(new UpdateSchemaContext(SchemaContextHelper.full()), getRef());
+ waitUntilLeader(shard);
- subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
- getRef().path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+ shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
- EnableNotification enable = expectMsgClass(duration("3 seconds"), EnableNotification.class);
- assertEquals("isEnabled", false, enable.isEnabled());
+ MockDataChangeListener listener = new MockDataChangeListener(1);
+ ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+ "testRegisterChangeListener-DataChangeListener");
+
+ shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
+ dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
RegisterChangeListenerReply.class);
- assertTrue(reply.getListenerRegistrationPath().toString().matches(
+ String replyPath = reply.getListenerRegistrationPath().toString();
+ assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
"akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
+
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ listener.waitForChangeEvents(path);
+
+ dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @SuppressWarnings("serial")
+ @Test
+ public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
+ // This test tests the timing window in which a change listener is registered before the
+ // shard becomes the leader. We verify that the listener is registered and notified of the
+ // existing data when the shard becomes the leader.
+ new ShardTestKit(getSystem()) {{
+ // For this test, we want to send the RegisterChangeListener message after the shard
+ // has recovered from persistence and before it becomes the leader. So we subclass
+ // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
+ // we know that the shard has been initialized to a follower and has started the
+ // election process. The following 2 CountDownLatches are used to coordinate the
+ // ElectionTimeout with the sending of the RegisterChangeListener message.
+ final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
+ final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
+ Creator<Shard> creator = new Creator<Shard>() {
+ boolean firstElectionTimeout = true;
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ dataStoreContext, SCHEMA_CONTEXT) {
+ @Override
+ public void onReceiveCommand(final Object message) {
+ if(message instanceof ElectionTimeout && firstElectionTimeout) {
+ // Got the first ElectionTimeout. We don't forward it to the
+ // base Shard yet until we've sent the RegisterChangeListener
+ // message. So we signal the onFirstElectionTimeout latch to tell
+ // the main thread to send the RegisterChangeListener message and
+ // start a thread to wait on the onChangeListenerRegistered latch,
+ // which the main thread signals after it has sent the message.
+ // After the onChangeListenerRegistered is triggered, we send the
+ // original ElectionTimeout message to proceed with the election.
+ firstElectionTimeout = false;
+ final ActorRef self = getSelf();
+ new Thread() {
+ @Override
+ public void run() {
+ Uninterruptibles.awaitUninterruptibly(
+ onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+ self.tell(message, self);
+ }
+ }.start();
+
+ onFirstElectionTimeout.countDown();
+ } else {
+ super.onReceiveCommand(message);
+ }
+ }
+ };
+ }
+ };
+
+ MockDataChangeListener listener = new MockDataChangeListener(1);
+ ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+ "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
+
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(creator)),
+ "testRegisterChangeListenerWhenNotLeaderInitially");
+
+ // Write initial data into the in-memory store.
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ // Wait until the shard receives the first ElectionTimeout message.
+ assertEquals("Got first ElectionTimeout", true,
+ onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+
+ // Now send the RegisterChangeListener and wait for the reply.
+ shard.tell(new RegisterChangeListener(path, dclActor.path(),
+ AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
+
+ RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterChangeListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ // Sanity check - verify the shard is not the leader yet.
+ shard.tell(new FindLeader(), getRef());
+ FindLeaderReply findLeadeReply =
+ expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+ assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+
+ // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
+ // with the election process.
+ onChangeListenerRegistered.countDown();
+
+ // Wait for the shard to become the leader and notify our listener with the existing
+ // data in the store.
+ listener.waitForChangeEvents(path);
+
+ dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testCreateTransaction(){
new ShardTestKit(getSystem()) {{
- ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransaction");
+ ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
- waitUntilLeader(subject);
+ waitUntilLeader(shard);
- subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- subject.tell(new CreateTransaction("txn-1",
+ shard.tell(new CreateTransaction("txn-1",
TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
String path = reply.getTransactionActorPath().toString();
assertTrue("Unexpected transaction path " + path,
path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
- expectNoMsg();
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testCreateTransactionOnChain(){
new ShardTestKit(getSystem()) {{
- final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
+ final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
- waitUntilLeader(subject);
+ waitUntilLeader(shard);
- subject.tell(new CreateTransaction("txn-1",
+ shard.tell(new CreateTransaction("txn-1",
TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
getRef());
String path = reply.getTransactionActorPath().toString();
assertTrue("Unexpected transaction path " + path,
path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
- expectNoMsg();
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testPeerAddressResolved(){
- new JavaTestKit(getSystem()) {{
- final ShardIdentifier identifier =
- ShardIdentifier.builder().memberName("member-1")
- .shardName("inventory").type("config").build();
+ new ShardTestKit(getSystem()) {{
+ final CountDownLatch recoveryComplete = new CountDownLatch(1);
+ class TestShard extends Shard {
+ TestShard() {
+ super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
+ dataStoreContext, SCHEMA_CONTEXT);
+ }
- Props props = Shard.props(identifier,
- Collections.<ShardIdentifier, String>singletonMap(identifier, null),
- dataStoreContext, SCHEMA_CONTEXT);
- final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved");
+ Map<String, String> getPeerAddresses() {
+ return getRaftActorContext().getPeerAddresses();
+ }
- new Within(duration("3 seconds")) {
@Override
- protected void run() {
+ protected void onRecoveryComplete() {
+ try {
+ super.onRecoveryComplete();
+ } finally {
+ recoveryComplete.countDown();
+ }
+ }
+ }
- subject.tell(
- new PeerAddressResolved(identifier, "akka://foobar"),
- getRef());
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(new Creator<Shard>() {
+ @Override
+ public TestShard create() throws Exception {
+ return new TestShard();
+ }
+ })), "testPeerAddressResolved");
- expectNoMsg();
- }
- };
+ //waitUntilLeader(shard);
+ assertEquals("Recovery complete", true,
+ Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
+
+ String address = "akka://foobar";
+ shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
+
+ assertEquals("getPeerAddresses", address,
+ ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testApplySnapshot() throws ExecutionException, InterruptedException {
- TestActorRef<Shard> ref = TestActorRef.create(getSystem(), newShardProps());
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
+ "testApplySnapshot");
NormalizedNodeToNodeCodec codec =
new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
- writeToStore(ref, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
- NormalizedNode<?,?> expected = readStore(ref, root);
+ NormalizedNode<?,?> expected = readStore(shard, root);
NormalizedNodeMessages.Container encode = codec.encode(expected);
encode.getNormalizedNode().toByteString().toByteArray(),
Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
- ref.underlyingActor().onReceiveCommand(applySnapshot);
+ shard.underlyingActor().onReceiveCommand(applySnapshot);
- NormalizedNode<?,?> actual = readStore(ref, root);
+ NormalizedNode<?,?> actual = readStore(shard, root);
assertEquals(expected, actual);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
@Test
public void testApplyState() throws Exception {
- TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
assertEquals("Applied state", node, actual);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
@SuppressWarnings("serial")
DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
- InMemorySnapshotStore.addSnapshot(IDENTIFIER.toString(), Snapshot.create(
+ InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
root).
getNormalizedNode().toByteString().toByteArray(),
// Set up the InMemoryJournal.
- InMemoryJournal.addEntry(IDENTIFIER.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
+ InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
new WriteModification(TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
SCHEMA_CONTEXT))));
Modification mod = new MergeModification(path,
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
SCHEMA_CONTEXT);
- InMemoryJournal.addEntry(IDENTIFIER.toString(), i, new ReplicatedLogImplEntry(i, 1,
+ InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
newPayload(mod)));
}
- InMemoryJournal.addEntry(IDENTIFIER.toString(), nListEntries + 1,
+ InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
new ApplyLogEntries(nListEntries));
// Create the actor and wait for recovery complete.
Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
- return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+ return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
dataStoreContext, SCHEMA_CONTEXT) {
@Override
protected void onRecoveryComplete() {
shard.underlyingActor().getShardMBean().getCommitIndex());
assertEquals("Last applied", nListEntries,
shard.underlyingActor().getShardMBean().getLastApplied());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
private CompositeModificationPayload newPayload(Modification... mods) {
System.setProperty("shard.persistent", "true");
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testConcurrentThreePhaseCommits");
waitUntilLeader(shard);
@Override
public void onComplete(Throwable error, Object resp) {
if(error != null) {
- System.out.println(new java.util.Date()+": "+getClass().getSimpleName() + " failure: "+error);
caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
} else {
try {
assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
+ for(int i = 0; i < 20 * 5; i++) {
+ long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
+ if(lastLogIndex == 2) {
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
public void testCommitPhaseFailure() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitPhaseFailure");
waitUntilLeader(shard);
inOrder.verify(cohort1).preCommit();
inOrder.verify(cohort1).commit();
inOrder.verify(cohort2).canCommit();
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
public void testPreCommitPhaseFailure() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testPreCommitPhaseFailure");
waitUntilLeader(shard);
InOrder inOrder = inOrder(cohort);
inOrder.verify(cohort).canCommit();
inOrder.verify(cohort).preCommit();
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
public void testCanCommitPhaseFailure() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCanCommitPhaseFailure");
waitUntilLeader(shard);
shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
System.setProperty("shard.persistent", "true");
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testAbortBeforeFinishCommit");
waitUntilLeader(shard);
NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionCommitTimeout");
waitUntilLeader(shard);
NormalizedNode<?, ?> node = readStore(shard, listNodePath);
assertNotNull(listNodePath + " not found", node);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionCommitQueueCapacityExceeded");
waitUntilLeader(shard);
shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
public void testCanCommitBeforeReadyFailure() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCanCommitBeforeReadyFailure");
shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
public void testAbortTransaction() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testAbortTransaction");
waitUntilLeader(shard);
InOrder inOrder = inOrder(cohort1, cohort2);
inOrder.verify(cohort1).canCommit();
inOrder.verify(cohort2).canCommit();
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
- return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+ return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
dataStoreContext, SCHEMA_CONTEXT) {
@Override
public void saveSnapshot(Object snapshot) {
shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
package org.opendaylight.controller.cluster.datastore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
}
protected void waitUntilLeader(ActorRef shard) {
+ FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
for(int i = 0; i < 20 * 5; i++) {
- Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(5, TimeUnit.SECONDS));
+ Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
try {
- FindLeaderReply resp = (FindLeaderReply)Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+ FindLeaderReply resp = (FindLeaderReply)Await.result(future, duration);
if(resp.getLeaderActor() != null) {
return;
}
- } catch (Exception e) {
+ } catch(TimeoutException e) {
+ } catch(Exception e) {
+ System.err.println("FindLeader threw ex");
e.printStackTrace();
}
+
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * A mock DataChangeListener implementation.
+ *
+ * @author Thomas Pantelis
+ */
+public class MockDataChangeListener implements
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
+
+ private final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ changeList = Lists.newArrayList();
+ private final CountDownLatch changeLatch;
+ private final int expChangeEventCount;
+
+ public MockDataChangeListener(int expChangeEventCount) {
+ changeLatch = new CountDownLatch(expChangeEventCount);
+ this.expChangeEventCount = expChangeEventCount;
+ }
+
+ @Override
+ public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+ changeList.add(change);
+ changeLatch.countDown();
+ }
+
+ public void waitForChangeEvents(YangInstanceIdentifier... expPaths) {
+ assertEquals("Change notifications complete", true,
+ Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS));
+
+ for(int i = 0; i < expPaths.length; i++) {
+ assertTrue(String.format("Change %d does not contain %s", (i+1), expPaths[i]),
+ changeList.get(i).getCreatedData().containsKey(expPaths[i]));
+ }
+ }
+
+ public void expectNoMoreChanges(String assertMsg) {
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ assertEquals(assertMsg, expChangeEventCount, changeList.size());
+ }
+}
{
darknessFactor.set( darkness );
}
+
+ LOG.info("onDataChanged - new Toaster config: {}", toaster);
}
}