-->
<!-- test to validate features.xml -->
<!--FIXME BUG-2195 When running single feature tests for netconf connector, features including ssh proxy server always fail (this behavior does not appear when running karaf distro directly)-->
- <!--<dependency>-->
- <!--<groupId>org.opendaylight.yangtools</groupId>-->
- <!--<artifactId>features-test</artifactId>-->
- <!--<version>${yangtools.version}</version>-->
- <!--<scope>test</scope>-->
- <!--</dependency>-->
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>features-test</artifactId>
+ <version>${yangtools.version}</version>
+ <scope>test</scope>
+ </dependency>
<!-- dependency for opendaylight-karaf-empty for use by testing -->
<dependency>
<groupId>org.opendaylight.controller</groupId>
*/
package org.opendaylight.controller.sal.compatibility;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Iterables;
import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader;
import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.FlowTopologyDiscoveryService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.Link;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInInventoryService, OpendaylightFlowStatisticsListener, OpendaylightFlowTableStatisticsListener, OpendaylightPortStatisticsListener {
private static final Logger LOG = LoggerFactory.getLogger(InventoryAndReadAdapter.class);
private static final short OPENFLOWV10_TABLE_ID = 0;
+ private static final int SLEEP_FOR_NOTIFICATIONS_MILLIS = 500;
private final InventoryNotificationProvider inventoryNotificationProvider = new InventoryNotificationProvider();
private final Map<PathArgument,List<PathArgument>> nodeToNodeConnectorsMap = new ConcurrentHashMap<>();
private List<IPluginOutInventoryService> inventoryPublisher = new CopyOnWriteArrayList<>();
private List<IPluginOutReadService> statisticsPublisher = new CopyOnWriteArrayList<>();
+ private Cache<String, TransactionNotificationList<? extends TransactionAware>> txCache;
private OpendaylightFlowTableStatisticsService flowTableStatisticsService;
private OpendaylightPortStatisticsService nodeConnectorStatisticsService;
public void startAdapter() {
inventoryNotificationProvider.setDataProviderService(getDataProviderService());
inventoryNotificationProvider.setInventoryPublisher(getInventoryPublisher());
+ txCache = CacheBuilder.newBuilder().expireAfterWrite(60L, TimeUnit.SECONDS).maximumSize(10000).build();
// inventoryNotificationProvider.start();
}
@Override
public List<FlowOnNode> readAllFlow(final Node node, final boolean cached) {
- final ArrayList<FlowOnNode> output = new ArrayList<>();
- final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
- if (table != null) {
- final List<Flow> flows = table.getFlow();
- LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
+ final ArrayList<FlowOnNode> ret= new ArrayList<>();
+ if (cached) {
+ final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
+ if (table != null) {
+ final List<Flow> flows = table.getFlow();
+ LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
+
+ for (final Flow flow : flows) {
+ final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class);
+ if (statsFromDataStore != null) {
+ final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node));
+ ret.add(addFlowStats(it, statsFromDataStore.getFlowStatistics()));
+ }
+ }
+ }
+ } else {
+ LOG.debug("readAllFlow cached:{}", cached);
+ GetAllFlowStatisticsFromFlowTableInput input =
+ new GetAllFlowStatisticsFromFlowTableInputBuilder()
+ .setNode(NodeMapping.toNodeRef(node))
+ .setTableId(new TableId(OPENFLOWV10_TABLE_ID))
+ .build();
+
+ Future<RpcResult<GetAllFlowStatisticsFromFlowTableOutput>> future =
+ getFlowStatisticsService().getAllFlowStatisticsFromFlowTable(input);
- for (final Flow flow : flows) {
- final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class);
- if (statsFromDataStore != null) {
- final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node));
- output.add(addFlowStats(it, statsFromDataStore.getFlowStatistics()));
+ RpcResult<GetAllFlowStatisticsFromFlowTableOutput> result = null;
+ try {
+ // having a blocking call is fine here, as we need to join
+ // the notifications and return the result
+ result = future.get();
+ } catch (Exception e) {
+ LOG.error("Exception in getAllFlowStatisticsFromFlowTable ", e);
+ return ret;
+ }
+
+ GetAllFlowStatisticsFromFlowTableOutput output = result.getResult();
+ if (output == null) {
+ return ret;
+ }
+
+ TransactionId transactionId = output.getTransactionId();
+ String cacheKey = buildCacheKey(transactionId, NodeMapping.toNodeId(node));
+ LOG.info("readAllFlow transactionId:{} cacheKey:{}", transactionId, cacheKey);
+
+ // insert an entry in tempcache, will get updated when notification is received
+ txCache.put(cacheKey, new TransactionNotificationList<FlowsStatisticsUpdate>(
+ transactionId, node.getNodeIDString()));
+
+ TransactionNotificationList<FlowsStatisticsUpdate> txnList =
+ (TransactionNotificationList<FlowsStatisticsUpdate>) txCache.getIfPresent(cacheKey);
+
+ // this loop would not be infinite as the cache will remove an entry
+ // after defined time if not written to
+ while (txnList != null && !txnList.areAllNotificationsGathered()) {
+ LOG.debug("readAllFlow waiting for notification...");
+ waitForNotification();
+ txnList = (TransactionNotificationList<FlowsStatisticsUpdate>) txCache.getIfPresent(cacheKey);
+ }
+
+ if (txnList == null) {
+ return ret;
+ }
+
+ List<FlowsStatisticsUpdate> notifications = txnList.getNotifications();
+ for (FlowsStatisticsUpdate flowsStatisticsUpdate : notifications) {
+ List<FlowAndStatisticsMapList> flowAndStatisticsMapList = flowsStatisticsUpdate.getFlowAndStatisticsMapList();
+ if (flowAndStatisticsMapList != null) {
+ for (FlowAndStatisticsMapList flowAndStatistics : flowAndStatisticsMapList) {
+ final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flowAndStatistics, node));
+ ret.add(addFlowStats(it, flowAndStatistics));
+ }
}
}
}
+ return ret;
+ }
+
+ private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
+ return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
+ }
- return output;
+ private void waitForNotification() {
+ try {
+ // going for a simple sleep approach,as wait-notify on a monitor would require
+ // us to maintain monitors per txn-node combo
+ Thread.sleep(SLEEP_FOR_NOTIFICATIONS_MILLIS);
+ LOG.trace("statCollector is waking up from a wait stat Response sleep");
+ } catch (final InterruptedException e) {
+ LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
+ }
}
@Override
for (final IPluginOutReadService statsPublisher : getStatisticsPublisher()) {
statsPublisher.nodeFlowStatisticsUpdated(aDNode, adsalFlowsStatistics);
}
+
+ updateTransactionCache(notification, notification.getId(), !notification.isMoreReplies());
}
/**
private List<PathArgument> removeNodeConnectors(final InstanceIdentifier<? extends Object> nodeIdentifier) {
return this.nodeToNodeConnectorsMap.remove(Iterables.get(nodeIdentifier.getPathArguments(), 1));
}
+
+ private <T extends TransactionAware> void updateTransactionCache(T notification, NodeId nodeId, boolean lastNotification) {
+
+ String cacheKey = buildCacheKey(notification.getTransactionId(), nodeId);
+ TransactionNotificationList<T> txnList = (TransactionNotificationList<T>) txCache.getIfPresent(cacheKey);
+ final Optional<TransactionNotificationList<T>> optional = Optional.<TransactionNotificationList<T>>fromNullable(txnList);
+ if (optional.isPresent()) {
+ LOG.info("updateTransactionCache cacheKey:{}, lastNotification:{}, txnList-present:{}", cacheKey, lastNotification, optional.isPresent());
+ TransactionNotificationList<T> txn = optional.get();
+ txn.addNotification(notification);
+ txn.setAllNotificationsGathered(lastNotification);
+ }
+ }
+
+ private class TransactionNotificationList<T extends TransactionAware> {
+ private TransactionId id;
+ private String nId;
+ private List<T> notifications;
+ private boolean allNotificationsGathered;
+
+ public TransactionNotificationList(TransactionId id, String nId) {
+ this.nId = nId;
+ this.id = id;
+ notifications = new ArrayList<T>();
+ }
+
+ public void addNotification(T notification) {
+ notifications.add(notification);
+ }
+
+ public void setAllNotificationsGathered(boolean allNotificationsGathered) {
+ this.allNotificationsGathered = allNotificationsGathered;
+ }
+
+ public boolean areAllNotificationsGathered() {
+ return allNotificationsGathered;
+ }
+
+ public List<T> getNotifications() {
+ return notifications;
+ }
+
+ }
+
}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
-import java.math.BigInteger;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.regex.Pattern;
import org.opendaylight.controller.sal.common.util.Arguments;
import org.opendaylight.controller.sal.core.AdvertisedBandwidth;
import org.opendaylight.controller.sal.core.Bandwidth;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.math.BigInteger;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
public final class NodeMapping {
private static final Logger LOG = LoggerFactory
* @param aDNode
* @return
*/
- private static NodeId toNodeId(org.opendaylight.controller.sal.core.Node aDNode) {
+ public static NodeId toNodeId(org.opendaylight.controller.sal.core.Node aDNode) {
String targetPrefix = null;
if (NodeIDType.OPENFLOW.equals(aDNode.getType())) {
targetPrefix = OPENFLOW_ID_PREFIX;
*/
public abstract class AbstractUntypedActorWithMetering extends AbstractUntypedActor {
+ //this is used in the metric name. Some transient actors do not have defined names
+ private String actorNameOverride;
+
public AbstractUntypedActorWithMetering() {
if (isMetricsCaptureEnabled())
getContext().become(new MeteringBehavior(this));
}
+ public AbstractUntypedActorWithMetering(String actorNameOverride){
+ this.actorNameOverride = actorNameOverride;
+ if (isMetricsCaptureEnabled())
+ getContext().become(new MeteringBehavior(this));
+ }
+
private boolean isMetricsCaptureEnabled(){
CommonConfig config = new CommonConfig(getContext().system().settings().config());
return config.isMetricCaptureEnabled();
}
+
+ public String getActorNameOverride() {
+ return actorNameOverride;
+ }
}
private final MetricRegistry METRICREGISTRY = MetricsReporter.getInstance().getMetricsRegistry();
private final String MSG_PROCESSING_RATE = "msg-rate";
- private String actorName;
+ private String actorQualifiedName;
private Timer msgProcessingTimer;
/**
*
* @param actor whose behaviour needs to be metered
*/
- public MeteringBehavior(UntypedActor actor){
+ public MeteringBehavior(AbstractUntypedActorWithMetering actor){
Preconditions.checkArgument(actor != null, "actor must not be null");
+ this.meteredActor = actor;
+ String actorName = actor.getActorNameOverride() != null ? actor.getActorNameOverride()
+ : actor.getSelf().path().name();
+ init(actorName);
+ }
+
+ public MeteringBehavior(UntypedActor actor){
+ Preconditions.checkArgument(actor != null, "actor must not be null");
this.meteredActor = actor;
- actorName = meteredActor.getSelf().path().toStringWithoutAddress();
- final String msgProcessingTime = MetricRegistry.name(actorName, MSG_PROCESSING_RATE);
+
+ String actorName = actor.getSelf().path().name();
+ init(actorName);
+ }
+
+ private void init(String actorName){
+ actorQualifiedName = new StringBuilder(meteredActor.getSelf().path().parent().toStringWithoutAddress()).
+ append("/").append(actorName).toString();
+
+ final String msgProcessingTime = MetricRegistry.name(actorQualifiedName, MSG_PROCESSING_RATE);
msgProcessingTimer = METRICREGISTRY.timer(msgProcessingTime);
}
final String messageType = message.getClass().getSimpleName();
final String msgProcessingTimeByMsgType =
- MetricRegistry.name(actorName, MSG_PROCESSING_RATE, messageType);
+ MetricRegistry.name(actorQualifiedName, MSG_PROCESSING_RATE, messageType);
final Timer msgProcessingTimerByMsgType = METRICREGISTRY.timer(msgProcessingTimeByMsgType);
--- /dev/null
+/*
+ *
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+public class NodeTypes {
+
+ public static final byte LEAF_NODE = 1;
+ public static final byte LEAF_SET = 2;
+ public static final byte LEAF_SET_ENTRY_NODE = 3;
+ public static final byte CONTAINER_NODE = 4;
+ public static final byte UNKEYED_LIST = 5;
+ public static final byte UNKEYED_LIST_ITEM = 6;
+ public static final byte MAP_NODE = 7;
+ public static final byte MAP_ENTRY_NODE = 8;
+ public static final byte ORDERED_MAP_NODE = 9;
+ public static final byte CHOICE_NODE = 10;
+ public static final byte AUGMENTATION_NODE = 11;
+ public static final byte ANY_XML_NODE = 12;
+ public static final byte END_NODE = 13;
+
+}
--- /dev/null
+/*
+ *
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.node.utils.QNameFactory;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.OrderedMapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.ListNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeAttrBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * NormalizedNodeInputStreamReader reads the byte stream and constructs the normalized node including its children nodes.
+ * This process goes in recursive manner, where each NodeTypes object signifies the start of the object, except END_NODE.
+ * If a node can have children, then that node's end is calculated based on appearance of END_NODE.
+ *
+ */
+
+public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamReader {
+
+ private DataInputStream reader;
+
+ private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeInputStreamReader.class);
+
+ private Map<Integer, String> codedStringMap = new HashMap<>();
+ private static final String REVISION_ARG = "?revision=";
+
+ public NormalizedNodeInputStreamReader(InputStream stream) throws IOException {
+ Preconditions.checkNotNull(stream);
+ reader = new DataInputStream(stream);
+ }
+
+
+ public NormalizedNode<?, ?> readNormalizedNode() throws IOException {
+ NormalizedNode<?, ?> node = null;
+
+ // each node should start with a byte
+ byte nodeType = reader.readByte();
+
+ if(nodeType == NodeTypes.END_NODE) {
+ LOG.debug("End node reached. return");
+ return null;
+ }
+ else if(nodeType == NodeTypes.AUGMENTATION_NODE) {
+ LOG.debug("Reading augmentation node. will create augmentation identifier");
+
+ YangInstanceIdentifier.AugmentationIdentifier identifier =
+ new YangInstanceIdentifier.AugmentationIdentifier(readQNameSet());
+ DataContainerNodeBuilder<YangInstanceIdentifier.AugmentationIdentifier, AugmentationNode> augmentationBuilder =
+ Builders.augmentationBuilder().withNodeIdentifier(identifier);
+ augmentationBuilder = addDataContainerChildren(augmentationBuilder);
+ node = augmentationBuilder.build();
+
+ } else {
+ QName qName = readQName();
+
+ if(nodeType == NodeTypes.LEAF_SET_ENTRY_NODE) {
+ LOG.debug("Reading leaf set entry node. Will create NodeWithValue instance identifier");
+
+ // Read the object value
+ Object value = readObject();
+
+ YangInstanceIdentifier.NodeWithValue nodeWithValue = new YangInstanceIdentifier.NodeWithValue(qName, value);
+ node = Builders.leafSetEntryBuilder().withNodeIdentifier(nodeWithValue).withValue(value).build();
+
+ } else if(nodeType == NodeTypes.MAP_ENTRY_NODE) {
+ LOG.debug("Reading map entry node. Will create node identifier with predicates.");
+
+ YangInstanceIdentifier.NodeIdentifierWithPredicates nodeIdentifier =
+ new YangInstanceIdentifier.NodeIdentifierWithPredicates(qName, readKeyValueMap());
+ DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifierWithPredicates, MapEntryNode> mapEntryBuilder
+ = Builders.mapEntryBuilder().withNodeIdentifier(nodeIdentifier);
+
+ mapEntryBuilder = (DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifierWithPredicates,
+ MapEntryNode>)addDataContainerChildren(mapEntryBuilder);
+ node = mapEntryBuilder.build();
+
+ } else {
+ LOG.debug("Creating standard node identifier. ");
+ YangInstanceIdentifier.NodeIdentifier identifier = new YangInstanceIdentifier.NodeIdentifier(qName);
+ node = readNodeIdentifierDependentNode(nodeType, identifier);
+
+ }
+ }
+ return node;
+ }
+
+ private NormalizedNode<?, ?> readNodeIdentifierDependentNode(byte nodeType, YangInstanceIdentifier.NodeIdentifier identifier)
+ throws IOException {
+
+ switch(nodeType) {
+ case NodeTypes.LEAF_NODE :
+ LOG.debug("Read leaf node");
+ // Read the object value
+ NormalizedNodeAttrBuilder leafBuilder = Builders.leafBuilder();
+ return leafBuilder.withNodeIdentifier(identifier).withValue(readObject()).build();
+
+ case NodeTypes.ANY_XML_NODE :
+ LOG.debug("Read xml node");
+ Node value = (Node) readObject();
+ return Builders.anyXmlBuilder().withValue(value).build();
+
+ case NodeTypes.MAP_NODE :
+ LOG.debug("Read map node");
+ CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = Builders.mapBuilder().withNodeIdentifier(identifier);
+ mapBuilder = addMapNodeChildren(mapBuilder);
+ return mapBuilder.build();
+
+ case NodeTypes.CHOICE_NODE :
+ LOG.debug("Read choice node");
+ DataContainerNodeBuilder<YangInstanceIdentifier.NodeIdentifier, ChoiceNode> choiceBuilder =
+ Builders.choiceBuilder().withNodeIdentifier(identifier);
+ choiceBuilder = addDataContainerChildren(choiceBuilder);
+ return choiceBuilder.build();
+
+ case NodeTypes.ORDERED_MAP_NODE :
+ LOG.debug("Reading ordered map node");
+ CollectionNodeBuilder<MapEntryNode, OrderedMapNode> orderedMapBuilder =
+ Builders.orderedMapBuilder().withNodeIdentifier(identifier);
+ orderedMapBuilder = addMapNodeChildren(orderedMapBuilder);
+ return orderedMapBuilder.build();
+
+ case NodeTypes.UNKEYED_LIST :
+ LOG.debug("Read unkeyed list node");
+ CollectionNodeBuilder<UnkeyedListEntryNode, UnkeyedListNode> unkeyedListBuilder =
+ Builders.unkeyedListBuilder().withNodeIdentifier(identifier);
+ unkeyedListBuilder = addUnkeyedListChildren(unkeyedListBuilder);
+ return unkeyedListBuilder.build();
+
+ case NodeTypes.UNKEYED_LIST_ITEM :
+ LOG.debug("Read unkeyed list item node");
+ DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, UnkeyedListEntryNode> unkeyedListEntryBuilder
+ = Builders.unkeyedListEntryBuilder().withNodeIdentifier(identifier);
+
+ unkeyedListEntryBuilder = (DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, UnkeyedListEntryNode>)
+ addDataContainerChildren(unkeyedListEntryBuilder);
+ return unkeyedListEntryBuilder.build();
+
+ case NodeTypes.CONTAINER_NODE :
+ LOG.debug("Read container node");
+ DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, ContainerNode> containerBuilder =
+ Builders.containerBuilder().withNodeIdentifier(identifier);
+
+ containerBuilder = (DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, ContainerNode>)
+ addDataContainerChildren(containerBuilder);
+ return containerBuilder.build();
+
+ case NodeTypes.LEAF_SET :
+ LOG.debug("Read leaf set node");
+ ListNodeBuilder<Object, LeafSetEntryNode<Object>> leafSetBuilder =
+ Builders.leafSetBuilder().withNodeIdentifier(identifier);
+ leafSetBuilder = addLeafSetChildren(leafSetBuilder);
+ return leafSetBuilder.build();
+
+ default :
+ return null;
+ }
+ }
+
+ private QName readQName() throws IOException {
+ // Read in the same sequence of writing
+ String localName = readCodedString();
+ String namespace = readCodedString();
+ String revision = readCodedString();
+ String qName;
+ // Not using stringbuilder as compiler optimizes string concatenation of +
+ if(revision != null){
+ qName = "(" + namespace+ REVISION_ARG + revision + ")" +localName;
+ } else {
+ qName = "(" + namespace + ")" +localName;
+ }
+
+ return QNameFactory.create(qName);
+ }
+
+
+ private String readCodedString() throws IOException {
+ boolean readFromMap = reader.readBoolean();
+ if(readFromMap) {
+ return codedStringMap.get(reader.readInt());
+ } else {
+ String value = reader.readUTF();
+ if(value != null) {
+ codedStringMap.put(Integer.valueOf(codedStringMap.size()), value);
+ }
+ return value;
+ }
+ }
+
+ private Set<QName> readQNameSet() throws IOException{
+ // Read the children count
+ int count = reader.readInt();
+ Set<QName> children = new HashSet<>(count);
+ for(int i = 0; i<count; i++) {
+ children.add(readQName());
+ }
+ return children;
+ }
+
+ private Map<QName, Object> readKeyValueMap() throws IOException {
+ int count = reader.readInt();
+ Map<QName, Object> keyValueMap = new HashMap<>(count);
+
+ for(int i = 0; i<count; i++) {
+ keyValueMap.put(readQName(), readObject());
+ }
+
+ return keyValueMap;
+ }
+
+ private Object readObject() throws IOException {
+ byte objectType = reader.readByte();
+ switch(objectType) {
+ case ValueTypes.BITS_TYPE:
+ return readObjSet();
+
+ case ValueTypes.BOOL_TYPE :
+ return reader.readBoolean();
+
+ case ValueTypes.BYTE_TYPE :
+ return reader.readByte();
+
+ case ValueTypes.INT_TYPE :
+ return reader.readInt();
+
+ case ValueTypes.LONG_TYPE :
+ return reader.readLong();
+
+ case ValueTypes.QNAME_TYPE :
+ return readQName();
+
+ case ValueTypes.SHORT_TYPE :
+ return reader.readShort();
+
+ case ValueTypes.STRING_TYPE :
+ return reader.readUTF();
+
+ case ValueTypes.BIG_DECIMAL_TYPE :
+ return new BigDecimal(reader.readUTF());
+
+ case ValueTypes.BIG_INTEGER_TYPE :
+ return new BigInteger(reader.readUTF());
+
+ case ValueTypes.YANG_IDENTIFIER_TYPE :
+ int size = reader.readInt();
+
+ List<YangInstanceIdentifier.PathArgument> pathArguments = new ArrayList<>(size);
+
+ for(int i=0; i<size; i++) {
+ pathArguments.add(readPathArgument());
+ }
+ return YangInstanceIdentifier.create(pathArguments);
+
+ default :
+ return null;
+ }
+ }
+
+ private Set<String> readObjSet() throws IOException {
+ int count = reader.readInt();
+ Set<String> children = new HashSet<>(count);
+ for(int i = 0; i<count; i++) {
+ children.add(readCodedString());
+ }
+ return children;
+ }
+
+ private YangInstanceIdentifier.PathArgument readPathArgument() throws IOException {
+ // read Type
+ int type = reader.readByte();
+
+ switch(type) {
+
+ case PathArgumentTypes.AUGMENTATION_IDENTIFIER :
+ return new YangInstanceIdentifier.AugmentationIdentifier(readQNameSet());
+
+ case PathArgumentTypes.NODE_IDENTIFIER :
+ return new YangInstanceIdentifier.NodeIdentifier(readQName());
+
+ case PathArgumentTypes.NODE_IDENTIFIER_WITH_PREDICATES :
+ return new YangInstanceIdentifier.NodeIdentifierWithPredicates(readQName(), readKeyValueMap());
+
+ case PathArgumentTypes.NODE_IDENTIFIER_WITH_VALUE :
+ return new YangInstanceIdentifier.NodeWithValue(readQName(), readObject());
+
+ default :
+ return null;
+ }
+ }
+
+ private ListNodeBuilder<Object, LeafSetEntryNode<Object>> addLeafSetChildren(ListNodeBuilder<Object,
+ LeafSetEntryNode<Object>> builder)
+ throws IOException {
+
+ LOG.debug("Reading children of leaf set");
+ LeafSetEntryNode<Object> child = (LeafSetEntryNode<Object>)readNormalizedNode();
+
+ while(child != null) {
+ builder.withChild(child);
+ child = (LeafSetEntryNode<Object>)readNormalizedNode();
+ }
+ return builder;
+ }
+
+ private CollectionNodeBuilder<UnkeyedListEntryNode, UnkeyedListNode> addUnkeyedListChildren(
+ CollectionNodeBuilder<UnkeyedListEntryNode, UnkeyedListNode> builder)
+ throws IOException{
+
+ LOG.debug("Reading children of unkeyed list");
+ UnkeyedListEntryNode child = (UnkeyedListEntryNode)readNormalizedNode();
+
+ while(child != null) {
+ builder.withChild(child);
+ child = (UnkeyedListEntryNode)readNormalizedNode();
+ }
+ return builder;
+ }
+
+ private DataContainerNodeBuilder addDataContainerChildren(DataContainerNodeBuilder builder)
+ throws IOException {
+ LOG.debug("Reading data container (leaf nodes) nodes");
+
+ DataContainerChild<? extends YangInstanceIdentifier.PathArgument, ?> child =
+ (DataContainerChild<? extends YangInstanceIdentifier.PathArgument, ?>) readNormalizedNode();
+
+ while(child != null) {
+ builder.withChild(child);
+ child =
+ (DataContainerChild<? extends YangInstanceIdentifier.PathArgument, ?>) readNormalizedNode();
+ }
+ return builder;
+ }
+
+
+ private CollectionNodeBuilder addMapNodeChildren(CollectionNodeBuilder builder)
+ throws IOException {
+ LOG.debug("Reading map node children");
+ MapEntryNode child = (MapEntryNode)readNormalizedNode();
+
+ while(child != null){
+ builder.withChild(child);
+ child = (MapEntryNode)readNormalizedNode();
+ }
+
+ return builder;
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+}
--- /dev/null
+/*
+ *
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * NormalizedNodeOutputStreamWriter will be used by distributed datastore to send normalized node in
+ * a stream.
+ * A stream writer wrapper around this class will write node objects to stream in recursive manner.
+ * for example - If you have a ContainerNode which has a two LeafNode as children, then
+ * you will first call {@link #startContainerNode(YangInstanceIdentifier.NodeIdentifier, int)}, then will call
+ * {@link #leafNode(YangInstanceIdentifier.NodeIdentifier, Object)} twice and then, {@link #endNode()} to end
+ * container node.
+ *
+ * Based on the each node, the node type is also written to the stream, that helps in reconstructing the object,
+ * while reading.
+ *
+ *
+ */
+
+public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWriter{
+
+ private DataOutputStream writer;
+
+ private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeOutputStreamWriter.class);
+
+ private Map<String, Integer> stringCodeMap = new HashMap<>();
+
+ public NormalizedNodeOutputStreamWriter(OutputStream stream) throws IOException {
+ Preconditions.checkNotNull(stream);
+ writer = new DataOutputStream(stream);
+ }
+
+ @Override
+ public void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException {
+ Preconditions.checkNotNull(name, "Node identifier should not be null");
+ LOG.debug("Writing a new leaf node");
+ startNode(name.getNodeType(), NodeTypes.LEAF_NODE);
+
+ writeObject(value);
+ }
+
+ @Override
+ public void startLeafSet(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
+ Preconditions.checkNotNull(name, "Node identifier should not be null");
+ LOG.debug("Starting a new leaf set");
+
+ startNode(name.getNodeType(), NodeTypes.LEAF_SET);
+ }
+
+ @Override
+ public void leafSetEntryNode(YangInstanceIdentifier.NodeWithValue name, Object value) throws IOException, IllegalArgumentException {
+ Preconditions.checkNotNull(name, "Node identifier should not be null");
+
+ LOG.debug("Writing a new leaf set entry node");
+ startNode(name.getNodeType(), NodeTypes.LEAF_SET_ENTRY_NODE);
+
+ writeObject(value);
+ }
+
+ @Override
+ public void startContainerNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
+ Preconditions.checkNotNull(name, "Node identifier should not be null");
+
+ LOG.debug("Starting a new container node");
+
+ startNode(name.getNodeType(), NodeTypes.CONTAINER_NODE);
+ }
+
+ @Override
+ public void startUnkeyedList(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
+ Preconditions.checkNotNull(name, "Node identifier should not be null");
+ LOG.debug("Starting a new unkeyed list");
+
+ startNode(name.getNodeType(), NodeTypes.UNKEYED_LIST);
+ }
+
+ @Override
+ public void startUnkeyedListItem(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalStateException {
+ Preconditions.checkNotNull(name, "Node identifier should not be null");
+ LOG.debug("Starting a new unkeyed list item");
+
+ startNode(name.getNodeType(), NodeTypes.UNKEYED_LIST_ITEM);
+ }
+
+ @Override
+ public void startMapNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
+ Preconditions.checkNotNull(name, "Node identifier should not be null");
+ LOG.debug("Starting a new map node");
+
+ startNode(name.getNodeType(), NodeTypes.MAP_NODE);
+ }
+
+ @Override
+ public void startMapEntryNode(YangInstanceIdentifier.NodeIdentifierWithPredicates identifier, int childSizeHint) throws IOException, IllegalArgumentException {
+ Preconditions.checkNotNull(identifier, "Node identifier should not be null");
+ LOG.debug("Starting a new map entry node");
+ startNode(identifier.getNodeType(), NodeTypes.MAP_ENTRY_NODE);
+
+ writeKeyValueMap(identifier.getKeyValues());
+
+ }
+
+ @Override
+ public void startOrderedMapNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
+ Preconditions.checkNotNull(name, "Node identifier should not be null");
+ LOG.debug("Starting a new ordered map node");
+
+ startNode(name.getNodeType(), NodeTypes.ORDERED_MAP_NODE);
+ }
+
+ @Override
+ public void startChoiceNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
+ Preconditions.checkNotNull(name, "Node identifier should not be null");
+ LOG.debug("Starting a new choice node");
+
+ startNode(name.getNodeType(), NodeTypes.CHOICE_NODE);
+ }
+
+ @Override
+ public void startAugmentationNode(YangInstanceIdentifier.AugmentationIdentifier identifier) throws IOException, IllegalArgumentException {
+ Preconditions.checkNotNull(identifier, "Node identifier should not be null");
+ LOG.debug("Starting a new augmentation node");
+
+ writer.writeByte(NodeTypes.AUGMENTATION_NODE);
+ writeQNameSet(identifier.getPossibleChildNames());
+ }
+
+ @Override
+ public void anyxmlNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException {
+ Preconditions.checkNotNull(name, "Node identifier should not be null");
+ LOG.debug("Writing a new xml node");
+
+ startNode(name.getNodeType(), NodeTypes.ANY_XML_NODE);
+
+ writeObject(value);
+ }
+
+ @Override
+ public void endNode() throws IOException, IllegalStateException {
+ LOG.debug("Ending the node");
+
+ writer.writeByte(NodeTypes.END_NODE);
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ writer.flush();
+ }
+
+ private void startNode(final QName qName, byte nodeType) throws IOException {
+
+ Preconditions.checkNotNull(qName, "QName of node identifier should not be null.");
+ // First write the type of node
+ writer.writeByte(nodeType);
+ // Write Start Tag
+ writeQName(qName);
+ }
+
+ private void writeQName(QName qName) throws IOException {
+
+ writeCodedString(qName.getLocalName());
+ writeCodedString(qName.getNamespace().toString());
+ writeCodedString(qName.getFormattedRevision());
+ }
+
+ private void writeCodedString(String key) throws IOException {
+ Integer value = stringCodeMap.get(key);
+
+ if(value != null) {
+ writer.writeBoolean(true);
+ writer.writeInt(value);
+ } else {
+ if(key != null) {
+ stringCodeMap.put(key, Integer.valueOf(stringCodeMap.size()));
+ }
+ writer.writeBoolean(false);
+ writer.writeUTF(key);
+ }
+ }
+
+ private void writeObjSet(Set set) throws IOException {
+ if(!set.isEmpty()){
+ writer.writeInt(set.size());
+ for(Object o : set){
+ if(o instanceof String){
+ writeCodedString(o.toString());
+ } else {
+ throw new IllegalArgumentException("Expected value type to be String but was : " +
+ o.toString());
+ }
+ }
+ } else {
+ writer.writeInt(0);
+ }
+ }
+
+ private void writeYangInstanceIdentifier(YangInstanceIdentifier identifier) throws IOException {
+ Iterable<YangInstanceIdentifier.PathArgument> pathArguments = identifier.getPathArguments();
+ int size = Iterables.size(pathArguments);
+ writer.writeInt(size);
+
+ for(YangInstanceIdentifier.PathArgument pathArgument : pathArguments) {
+ writePathArgument(pathArgument);
+ }
+ }
+
+ private void writePathArgument(YangInstanceIdentifier.PathArgument pathArgument) throws IOException {
+
+ byte type = PathArgumentTypes.getSerializablePathArgumentType(pathArgument);
+
+ writer.writeByte(type);
+
+ switch(type) {
+ case PathArgumentTypes.NODE_IDENTIFIER :
+
+ YangInstanceIdentifier.NodeIdentifier nodeIdentifier =
+ (YangInstanceIdentifier.NodeIdentifier) pathArgument;
+
+ writeQName(nodeIdentifier.getNodeType());
+ break;
+
+ case PathArgumentTypes.NODE_IDENTIFIER_WITH_PREDICATES:
+
+ YangInstanceIdentifier.NodeIdentifierWithPredicates nodeIdentifierWithPredicates =
+ (YangInstanceIdentifier.NodeIdentifierWithPredicates) pathArgument;
+ writeQName(nodeIdentifierWithPredicates.getNodeType());
+
+ writeKeyValueMap(nodeIdentifierWithPredicates.getKeyValues());
+ break;
+
+ case PathArgumentTypes.NODE_IDENTIFIER_WITH_VALUE :
+
+ YangInstanceIdentifier.NodeWithValue nodeWithValue =
+ (YangInstanceIdentifier.NodeWithValue) pathArgument;
+
+ writeQName(nodeWithValue.getNodeType());
+ writeObject(nodeWithValue.getValue());
+ break;
+
+ case PathArgumentTypes.AUGMENTATION_IDENTIFIER :
+
+ YangInstanceIdentifier.AugmentationIdentifier augmentationIdentifier =
+ (YangInstanceIdentifier.AugmentationIdentifier) pathArgument;
+
+ // No Qname in augmentation identifier
+ writeQNameSet(augmentationIdentifier.getPossibleChildNames());
+ break;
+ default :
+ throw new IllegalStateException("Unknown node identifier type is found : " + pathArgument.getClass().toString() );
+ }
+ }
+
+ private void writeKeyValueMap(Map<QName, Object> keyValueMap) throws IOException {
+ if(keyValueMap != null && !keyValueMap.isEmpty()) {
+ writer.writeInt(keyValueMap.size());
+ Set<QName> qNameSet = keyValueMap.keySet();
+
+ for(QName qName : qNameSet) {
+ writeQName(qName);
+ writeObject(keyValueMap.get(qName));
+ }
+ } else {
+ writer.writeInt(0);
+ }
+ }
+
+ private void writeQNameSet(Set<QName> children) throws IOException {
+ // Write each child's qname separately, if list is empty send count as 0
+ if(children != null && !children.isEmpty()) {
+ writer.writeInt(children.size());
+ for(QName qName : children) {
+ writeQName(qName);
+ }
+ } else {
+ LOG.debug("augmentation node does not have any child");
+ writer.writeInt(0);
+ }
+ }
+
+ private void writeObject(Object value) throws IOException {
+
+ byte type = ValueTypes.getSerializableType(value);
+ // Write object type first
+ writer.writeByte(type);
+
+ switch(type) {
+ case ValueTypes.BOOL_TYPE:
+ writer.writeBoolean((Boolean) value);
+ break;
+ case ValueTypes.QNAME_TYPE:
+ writeQName((QName) value);
+ break;
+ case ValueTypes.INT_TYPE:
+ writer.writeInt((Integer) value);
+ break;
+ case ValueTypes.BYTE_TYPE:
+ writer.writeByte((Byte) value);
+ break;
+ case ValueTypes.LONG_TYPE:
+ writer.writeLong((Long) value);
+ break;
+ case ValueTypes.SHORT_TYPE:
+ writer.writeShort((Short) value);
+ break;
+ case ValueTypes.BITS_TYPE:
+ writeObjSet((Set) value);
+ break;
+ case ValueTypes.YANG_IDENTIFIER_TYPE:
+ writeYangInstanceIdentifier((YangInstanceIdentifier) value);
+ break;
+ default:
+ writer.writeUTF(value.toString());
+ break;
+ }
+ }
+}
--- /dev/null
+/*
+ *
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import java.io.IOException;
+
+
+public interface NormalizedNodeStreamReader extends AutoCloseable {
+
+ NormalizedNode<?, ?> readNormalizedNode() throws IOException;
+}
--- /dev/null
+
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Event Stream Writer based on Normalized Node tree representation
+ *
+ * <h3>Writing Event Stream</h3>
+ *
+ * <ul>
+ * <li><code>container</code> - Container node representation, start event is
+ * emitted using {@link #startContainerNode(YangInstanceIdentifier.NodeIdentifier, int)}
+ * and node end event is
+ * emitted using {@link #endNode()}. Container node is implementing
+ * {@link org.opendaylight.yangtools.yang.binding.DataObject} interface.
+ *
+ * <li><code>list</code> - YANG list statement has two representation in event
+ * stream - unkeyed list and map. Unkeyed list is YANG list which did not
+ * specify key.</li>
+ *
+ * <ul>
+ * <li><code>Map</code> - Map start event is emitted using
+ * {@link #startMapNode(YangInstanceIdentifier.NodeIdentifier, int)}
+ * and is ended using {@link #endNode()}. Each map entry start is emitted using
+ * {@link #startMapEntryNode(YangInstanceIdentifier.NodeIdentifierWithPredicates, int)}
+ * with Map of keys
+ * and finished using {@link #endNode()}.</li>
+ *
+ * <li><code>UnkeyedList</code> - Unkeyed list represent list without keys,
+ * unkeyed list start is emitted using
+ * {@link #startUnkeyedList(YangInstanceIdentifier.NodeIdentifier, int)} list
+ * end is emitted using {@link #endNode()}. Each list item is emitted using
+ * {@link #startUnkeyedListItem(YangInstanceIdentifier.NodeIdentifier, int)}
+ * and ended using {@link #endNode()}.</li>
+ * </ul>
+ *
+ * <li><code>leaf</code> - Leaf node event is emitted using
+ * {@link #leafNode(YangInstanceIdentifier.NodeIdentifier, Object)}.
+ * {@link #endNode()} MUST NOT BE emitted for
+ * leaf node.</li>
+ *
+ * <li><code>leaf-list</code> - Leaf list start is emitted using
+ * {@link #startLeafSet(YangInstanceIdentifier.NodeIdentifier, int)}.
+ * Leaf list end is emitted using
+ * {@link #endNode()}. Leaf list entries are emitted using
+ * {@link #leafSetEntryNode(YangInstanceIdentifier.NodeWithValue name, Object).
+ *
+ * <li><code>anyxml - Anyxml node event is emitted using
+ * {@link #leafNode(YangInstanceIdentifier.NodeIdentifier, Object)}. {@link #endNode()} MUST NOT BE emitted
+ * for anyxml node.</code></li>
+ *
+ *
+ * <li><code>choice</code> Choice node event is emmited by
+ * {@link #startChoiceNode(YangInstanceIdentifier.NodeIdentifier, int)} event and
+ * finished by invoking {@link #endNode()}
+ * <li>
+ * <code>augment</code> - Represents augmentation, augmentation node is started
+ * by invoking {@link #startAugmentationNode(YangInstanceIdentifier.AugmentationIdentifier)} and
+ * finished by invoking {@link #endNode()}.</li>
+ *
+ * </ul>
+ *
+ * <h3>Implementation notes</h3>
+ *
+ * <p>
+ * Implementations of this interface must not hold user suppled objects
+ * and resources needlessly.
+ *
+ */
+
+public interface NormalizedNodeStreamWriter extends Closeable, Flushable {
+
+ public final int UNKNOWN_SIZE = -1;
+
+ /**
+ * Write the leaf node identifier and value to the stream.
+ * @param name
+ * @param value
+ * @throws IOException
+ * @throws IllegalArgumentException
+ */
+ void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value)
+ throws IOException, IllegalArgumentException;
+
+ /**
+ * Start writing leaf Set node. You must call {@link #endNode()} once you are done writing all of its children.
+ * @param name
+ * @param childSizeHint is the estimated children count. Usage is optional in implementation.
+ * @throws IOException
+ * @throws IllegalArgumentException
+ */
+ void startLeafSet(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
+ throws IOException, IllegalArgumentException;
+
+ /**
+ * Write the leaf Set Entry Node object to the stream with identifier and value.
+ * @param name
+ * @param value
+ * @throws IOException
+ * @throws IllegalArgumentException
+ */
+ void leafSetEntryNode(YangInstanceIdentifier.NodeWithValue name, Object value)
+ throws IOException, IllegalArgumentException;
+
+ /**
+ * Start writing container node. You must call {@link #endNode()} once you are done writing all of its children.
+ * @param name
+ * @param childSizeHint is the estimated children count. Usage is optional in implementation.
+ * @throws IOException
+ * @throws IllegalArgumentException
+ */
+ void startContainerNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
+ throws IOException, IllegalArgumentException;
+
+ /**
+ * Start writing unkeyed list node. You must call {@link #endNode()} once you are done writing all of its children.
+ * @param name
+ * @param childSizeHint is the estimated children count. Usage is optional in implementation.
+ * @throws IOException
+ * @throws IllegalArgumentException
+ */
+ void startUnkeyedList(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
+ throws IOException, IllegalArgumentException;
+
+ /**
+ * Start writing unkeyed list item. You must call {@link #endNode()} once you are done writing all of its children.
+ * @param name
+ * @param childSizeHint is the estimated children count. Usage is optional in implementation.
+ * @throws IOException
+ * @throws IllegalStateException
+ */
+ void startUnkeyedListItem(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
+ throws IOException, IllegalStateException;
+
+ /**
+ * Start writing map node. You must call {@link #endNode()} once you are done writing all of its children.
+ * @param name
+ * @param childSizeHint is the estimated children count. Usage is optional in implementation.
+ * @throws IOException
+ * @throws IllegalArgumentException
+ */
+ void startMapNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
+ throws IOException, IllegalArgumentException;
+
+ /**
+ * Start writing map entry node. You must call {@link #endNode()} once you are done writing all of its children.
+ * @param identifier
+ * @param childSizeHint is the estimated children count. Usage is optional in implementation.
+ * @throws IOException
+ * @throws IllegalArgumentException
+ */
+ void startMapEntryNode(YangInstanceIdentifier.NodeIdentifierWithPredicates identifier, int childSizeHint)
+ throws IOException, IllegalArgumentException;
+
+ /**
+ * Start writing ordered map node. You must call {@link #endNode()} once you are done writing all of its children.
+ * @param name
+ * @param childSizeHint is the estimated children count. Usage is optional in implementation.
+ * @throws IOException
+ * @throws IllegalArgumentException
+ */
+ void startOrderedMapNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
+ throws IOException, IllegalArgumentException;
+
+ /**
+ * Start writing choice node. You must call {@link #endNode()} once you are done writing all of its children.
+ * @param name
+ * @param childSizeHint is the estimated children count. Usage is optional in implementation.
+ * @throws IOException
+ * @throws IllegalArgumentException
+ */
+ void startChoiceNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint)
+ throws IOException, IllegalArgumentException;
+
+ /**
+ * Start writing augmentation node. You must call {@link #endNode()} once you are done writing all of its children.
+ * @param identifier
+ * @throws IOException
+ * @throws IllegalArgumentException
+ */
+ void startAugmentationNode(YangInstanceIdentifier.AugmentationIdentifier identifier)
+ throws IOException, IllegalArgumentException;
+
+ /**
+ * Write any xml node identifier and value to the stream
+ * @param name
+ * @param value
+ * @throws IOException
+ * @throws IllegalArgumentException
+ */
+ void anyxmlNode(YangInstanceIdentifier.NodeIdentifier name, Object value)
+ throws IOException, IllegalArgumentException;
+
+ /**
+ * This method should be used to add end symbol/identifier of node in the stream.
+ * @throws IOException
+ * @throws IllegalStateException
+ */
+ void endNode() throws IOException, IllegalStateException;
+
+ @Override
+ void close() throws IOException;
+
+ @Override
+ void flush() throws IOException;
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import com.google.common.collect.ImmutableMap;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+import java.util.Map;
+
+public class PathArgumentTypes {
+ public static final byte AUGMENTATION_IDENTIFIER = 1;
+ public static final byte NODE_IDENTIFIER = 2;
+ public static final byte NODE_IDENTIFIER_WITH_VALUE = 3;
+ public static final byte NODE_IDENTIFIER_WITH_PREDICATES = 4;
+
+ private static Map<Class<?>, Byte> CLASS_TO_ENUM_MAP =
+ ImmutableMap.<Class<?>, Byte>builder().
+ put(YangInstanceIdentifier.AugmentationIdentifier.class, AUGMENTATION_IDENTIFIER).
+ put(YangInstanceIdentifier.NodeIdentifier.class, NODE_IDENTIFIER).
+ put(YangInstanceIdentifier.NodeIdentifierWithPredicates.class, NODE_IDENTIFIER_WITH_PREDICATES).
+ put(YangInstanceIdentifier.NodeWithValue.class, NODE_IDENTIFIER_WITH_VALUE).build();
+
+ public static byte getSerializablePathArgumentType(YangInstanceIdentifier.PathArgument pathArgument){
+
+ Byte type = CLASS_TO_ENUM_MAP.get(pathArgument.getClass());
+ if(type == null) {
+ throw new IllegalArgumentException("Unknown type of PathArgument = " + pathArgument);
+ }
+
+ return type;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class ValueTypes {
+ public static final byte SHORT_TYPE = 1;
+ public static final byte BYTE_TYPE = 2;
+ public static final byte INT_TYPE = 3;
+ public static final byte LONG_TYPE = 4;
+ public static final byte BOOL_TYPE = 5;
+ public static final byte QNAME_TYPE = 6;
+ public static final byte BITS_TYPE = 7;
+ public static final byte YANG_IDENTIFIER_TYPE = 8;
+ public static final byte STRING_TYPE = 9;
+ public static final byte BIG_INTEGER_TYPE = 10;
+ public static final byte BIG_DECIMAL_TYPE = 11;
+
+ private static Map<Class, Byte> types = new HashMap<>();
+
+ static {
+ types.put(String.class, Byte.valueOf(STRING_TYPE));
+ types.put(Byte.class, Byte.valueOf(BYTE_TYPE));
+ types.put(Integer.class, Byte.valueOf(INT_TYPE));
+ types.put(Long.class, Byte.valueOf(LONG_TYPE));
+ types.put(Boolean.class, Byte.valueOf(BOOL_TYPE));
+ types.put(QName.class, Byte.valueOf(QNAME_TYPE));
+ types.put(Set.class, Byte.valueOf(BITS_TYPE));
+ types.put(YangInstanceIdentifier.class, Byte.valueOf(YANG_IDENTIFIER_TYPE));
+ types.put(Short.class, Byte.valueOf(SHORT_TYPE));
+ types.put(BigInteger.class, Byte.valueOf(BIG_INTEGER_TYPE));
+ types.put(BigDecimal.class, Byte.valueOf(BIG_DECIMAL_TYPE));
+ }
+
+ public static final byte getSerializableType(Object node){
+ Preconditions.checkNotNull(node, "node should not be null");
+
+ Byte type = types.get(node.getClass());
+ if(type != null) {
+ return type;
+ } else if(node instanceof Set){
+ return BITS_TYPE;
+ }
+
+ throw new IllegalArgumentException("Unknown value type " + node.getClass().getSimpleName());
+ }
+}
--- /dev/null
+/*
+ *
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.util.TestModel;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+public class NormalizedNodeStreamReaderWriterTest {
+
+ final NormalizedNode<?, ?> input = TestModel.createTestContainer();
+
+ @Test
+ public void testNormalizedNodeStreamReaderWriter() {
+
+ byte[] byteData = null;
+
+ try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ NormalizedNodeStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream)) {
+
+ NormalizedNodeWriter normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(writer);
+ normalizedNodeWriter.write(input);
+ byteData = byteArrayOutputStream.toByteArray();
+
+ } catch (IOException e) {
+ fail("Writing to OutputStream failed :" + e.toString());
+ }
+
+ try(NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(new ByteArrayInputStream(byteData))) {
+
+ NormalizedNode<?,?> node = reader.readNormalizedNode();
+ Assert.assertEquals(input, node);
+
+ } catch (IOException e) {
+ fail("Reading from InputStream failed :" + e.toString());
+ }
+ }
+
+ @Test
+ public void testWithSerializable() {
+ SampleNormalizedNodeSerializable serializable = new SampleNormalizedNodeSerializable(input);
+ SampleNormalizedNodeSerializable clone = (SampleNormalizedNodeSerializable)SerializationUtils.clone(serializable);
+
+ Assert.assertEquals(input, clone.getInput());
+
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode;
+import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.OrderedMapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListNode;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeStreamWriter.UNKNOWN_SIZE;
+
+
+/**
+ * This class is used only for testing purpose for now, we may use similar logic while integrating
+ * with cluster
+ */
+
+public class NormalizedNodeWriter implements Closeable, Flushable {
+ private final NormalizedNodeStreamWriter writer;
+
+ private NormalizedNodeWriter(final NormalizedNodeStreamWriter writer) {
+ this.writer = Preconditions.checkNotNull(writer);
+ }
+
+ protected final NormalizedNodeStreamWriter getWriter() {
+ return writer;
+ }
+
+ /**
+ * Create a new writer backed by a {@link org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter}.
+ *
+ * @param writer Back-end writer
+ * @return A new instance.
+ */
+ public static NormalizedNodeWriter forStreamWriter(final NormalizedNodeStreamWriter writer) {
+ return new NormalizedNodeWriter(writer);
+ }
+
+
+ /**
+ * Iterate over the provided {@link org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode} and emit write
+ * events to the encapsulated {@link org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter}.
+ *
+ * @param node Node
+ * @return
+ * @throws java.io.IOException when thrown from the backing writer.
+ */
+ public final NormalizedNodeWriter write(final NormalizedNode<?, ?> node) throws IOException {
+ if (wasProcessedAsComplexNode(node)) {
+ return this;
+ }
+
+ if (wasProcessAsSimpleNode(node)) {
+ return this;
+ }
+
+ throw new IllegalStateException("It wasn't possible to serialize node " + node);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ writer.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.flush();
+ writer.close();
+ }
+
+ /**
+ * Emit a best guess of a hint for a particular set of children. It evaluates the
+ * iterable to see if the size can be easily gotten to. If it is, we hint at the
+ * real number of child nodes. Otherwise we emit UNKNOWN_SIZE.
+ *
+ * @param children Child nodes
+ * @return Best estimate of the collection size required to hold all the children.
+ */
+ static final int childSizeHint(final Iterable<?> children) {
+ return (children instanceof Collection) ? ((Collection<?>) children).size() : UNKNOWN_SIZE;
+ }
+
+ private boolean wasProcessAsSimpleNode(final NormalizedNode<?, ?> node) throws IOException {
+ if (node instanceof LeafSetEntryNode) {
+ final LeafSetEntryNode<?> nodeAsLeafList = (LeafSetEntryNode<?>)node;
+ writer.leafSetEntryNode(nodeAsLeafList.getIdentifier(), nodeAsLeafList.getValue());
+ return true;
+ } else if (node instanceof LeafNode) {
+ final LeafNode<?> nodeAsLeaf = (LeafNode<?>)node;
+ writer.leafNode(nodeAsLeaf.getIdentifier(), nodeAsLeaf.getValue());
+ return true;
+ } else if (node instanceof AnyXmlNode) {
+ final AnyXmlNode anyXmlNode = (AnyXmlNode)node;
+ writer.anyxmlNode(anyXmlNode.getIdentifier(), anyXmlNode.getValue());
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Emit events for all children and then emit an endNode() event.
+ *
+ * @param children Child iterable
+ * @return True
+ * @throws java.io.IOException when the writer reports it
+ */
+ protected final boolean writeChildren(final Iterable<? extends NormalizedNode<?, ?>> children) throws IOException {
+ for (NormalizedNode<?, ?> child : children) {
+ write(child);
+ }
+
+ writer.endNode();
+ return true;
+ }
+
+ protected boolean writeMapEntryNode(final MapEntryNode node) throws IOException {
+ writer.startMapEntryNode(node.getIdentifier(), childSizeHint(node.getValue()));
+ return writeChildren(node.getValue());
+ }
+
+ private boolean wasProcessedAsComplexNode(final NormalizedNode<?, ?> node) throws IOException {
+ if (node instanceof ContainerNode) {
+ final ContainerNode n = (ContainerNode) node;
+ writer.startContainerNode(n.getIdentifier(), childSizeHint(n.getValue()));
+ return writeChildren(n.getValue());
+ }
+ if (node instanceof MapEntryNode) {
+ return writeMapEntryNode((MapEntryNode) node);
+ }
+ if (node instanceof UnkeyedListEntryNode) {
+ final UnkeyedListEntryNode n = (UnkeyedListEntryNode) node;
+ writer.startUnkeyedListItem(n.getIdentifier(), childSizeHint(n.getValue()));
+ return writeChildren(n.getValue());
+ }
+ if (node instanceof ChoiceNode) {
+ final ChoiceNode n = (ChoiceNode) node;
+ writer.startChoiceNode(n.getIdentifier(), childSizeHint(n.getValue()));
+ return writeChildren(n.getValue());
+ }
+ if (node instanceof AugmentationNode) {
+ final AugmentationNode n = (AugmentationNode) node;
+ writer.startAugmentationNode(n.getIdentifier());
+ return writeChildren(n.getValue());
+ }
+ if (node instanceof UnkeyedListNode) {
+ final UnkeyedListNode n = (UnkeyedListNode) node;
+ writer.startUnkeyedList(n.getIdentifier(), childSizeHint(n.getValue()));
+ return writeChildren(n.getValue());
+ }
+ if (node instanceof OrderedMapNode) {
+ final OrderedMapNode n = (OrderedMapNode) node;
+ writer.startOrderedMapNode(n.getIdentifier(), childSizeHint(n.getValue()));
+ return writeChildren(n.getValue());
+ }
+ if (node instanceof MapNode) {
+ final MapNode n = (MapNode) node;
+ writer.startMapNode(n.getIdentifier(), childSizeHint(n.getValue()));
+ return writeChildren(n.getValue());
+ }
+ if (node instanceof LeafSetNode) {
+ //covers also OrderedLeafSetNode for which doesn't exist start* method
+ final LeafSetNode<?> n = (LeafSetNode<?>) node;
+ writer.startLeafSet(n.getIdentifier(), childSizeHint(n.getValue()));
+ return writeChildren(n.getValue());
+ }
+
+ return false;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.net.URISyntaxException;
+
+public class SampleNormalizedNodeSerializable implements Serializable {
+
+ private NormalizedNode<?, ?> input;
+
+ public SampleNormalizedNodeSerializable(NormalizedNode<?, ?> input) {
+ this.input = input;
+ }
+
+ public NormalizedNode<?, ?> getInput() {
+ return input;
+ }
+
+ private void readObject(final ObjectInputStream stream) throws IOException, ClassNotFoundException, URISyntaxException {
+ NormalizedNodeStreamReader reader = new NormalizedNodeInputStreamReader(stream);
+ this.input = reader.readNormalizedNode();
+ }
+
+ private void writeObject(final ObjectOutputStream stream) throws IOException {
+ NormalizedNodeStreamWriter writer = new NormalizedNodeOutputStreamWriter(stream);
+ NormalizedNodeWriter normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(writer);
+
+ normalizedNodeWriter.write(this.input);
+ }
+
+}
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class DataChangeListener extends AbstractUntypedActor {
+ private static final Logger LOG = LoggerFactory.getLogger(DataChangeListener.class);
+
private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
private boolean notificationsEnabled = false;
this.listener = Preconditions.checkNotNull(listener, "listener should not be null");
}
- @Override public void handleReceive(Object message) throws Exception {
+ @Override
+ public void handleReceive(Object message) throws Exception {
if(message instanceof DataChanged){
dataChanged(message);
} else if(message instanceof EnableNotification){
private void enableNotification(EnableNotification message) {
notificationsEnabled = message.isEnabled();
+ LOG.debug("{} notifications for listener {}", (notificationsEnabled ? "Enabled" : "Disabled"),
+ listener);
}
private void dataChanged(Object message) {
// Do nothing if notifications are not enabled
- if(!notificationsEnabled){
+ if(!notificationsEnabled) {
+ LOG.debug("Notifications not enabled for listener {} - dropping change notification",
+ listener);
return;
}
DataChanged reply = (DataChanged) message;
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
- change = reply.getChange();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = reply.getChange();
+
+ LOG.debug("Sending change notification {} to listener {}", change, listener);
+
this.listener.onDataChanged(change);
// It seems the sender is never null but it doesn't hurt to check. If the caller passes in
package org.opendaylight.controller.cluster.datastore;
+import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
+import akka.dispatch.OnComplete;
+import akka.util.Timeout;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import scala.concurrent.Future;
/**
* ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard
* The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
* </p>
*/
+@SuppressWarnings("rawtypes")
public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
+
+ public static final Timeout REGISTER_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
+
private volatile ActorSelection listenerRegistrationActor;
- private final AsyncDataChangeListener listener;
- private final ActorRef dataChangeListenerActor;
+ private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
+ private ActorRef dataChangeListenerActor;
+ private final String shardName;
+ private final ActorContext actorContext;
private boolean closed = false;
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- DataChangeListenerRegistrationProxy(
- ActorSelection listenerRegistrationActor,
- L listener, ActorRef dataChangeListenerActor) {
- this.listenerRegistrationActor = listenerRegistrationActor;
+ DataChangeListenerRegistrationProxy (
+ String shardName, ActorContext actorContext, L listener) {
+ this.shardName = shardName;
+ this.actorContext = actorContext;
this.listener = listener;
- this.dataChangeListenerActor = dataChangeListenerActor;
}
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- DataChangeListenerRegistrationProxy(
- L listener, ActorRef dataChangeListenerActor) {
- this(null, listener, dataChangeListenerActor);
+ @VisibleForTesting
+ ActorSelection getListenerRegistrationActor() {
+ return listenerRegistrationActor;
+ }
+
+ @VisibleForTesting
+ ActorRef getDataChangeListenerActor() {
+ return dataChangeListenerActor;
}
@Override
return listener;
}
- public void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+ private void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+ if(listenerRegistrationActor == null) {
+ return;
+ }
+
boolean sendCloseMessage = false;
synchronized(this) {
if(closed) {
this.listenerRegistrationActor = listenerRegistrationActor;
}
}
+
if(sendCloseMessage) {
listenerRegistrationActor.tell(new
CloseDataChangeListenerRegistration().toSerializable(), null);
}
+ }
- this.listenerRegistrationActor = listenerRegistrationActor;
+ public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
+
+ dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+ DataChangeListener.props(listener));
+
+ Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName, REGISTER_TIMEOUT);
+ findFuture.onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(Throwable failure, ActorRef shard) {
+ if(failure instanceof LocalShardNotFoundException) {
+ LOG.debug("No local shard found for {} - DataChangeListener {} at path {} " +
+ "cannot be registered", shardName, listener, path);
+ } else if(failure != null) {
+ LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} " +
+ "cannot be registered: {}", shardName, listener, path, failure);
+ } else {
+ doRegistration(shard, path, scope);
+ }
+ }
+ }, actorContext.getActorSystem().dispatcher());
}
- public ActorSelection getListenerRegistrationActor() {
- return listenerRegistrationActor;
+ private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
+ DataChangeScope scope) {
+
+ Future<Object> future = actorContext.executeOperationAsync(shard,
+ new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+ REGISTER_TIMEOUT);
+
+ future.onComplete(new OnComplete<Object>(){
+ @Override
+ public void onComplete(Throwable failure, Object result) {
+ if(failure != null) {
+ LOG.error("Failed to register DataChangeListener {} at path {}",
+ listener, path.toString(), failure);
+ } else {
+ RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+ setListenerRegistrationActor(actorContext.actorSelection(
+ reply.getListenerRegistrationPath()));
+ }
+ }
+ }, actorContext.getActorSystem().dispatcher());
}
@Override
sendCloseMessage = !closed && listenerRegistrationActor != null;
closed = true;
}
+
if(sendCloseMessage) {
- listenerRegistrationActor.tell(new
- CloseDataChangeListenerRegistration().toSerializable(), null);
+ listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration().toSerializable(),
+ ActorRef.noSender());
+ listenerRegistrationActor = null;
}
- dataChangeListenerActor.tell(PoisonPill.getInstance(), null);
+ if(dataChangeListenerActor != null) {
+ dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ dataChangeListenerActor = null;
+ }
}
}
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
/**
*
String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
- Optional<ActorRef> shard = actorContext.findLocalShard(shardName);
-
- //if shard is NOT local
- if (!shard.isPresent()) {
- LOG.debug("No local shard for shardName {} was found so returning a noop registration", shardName);
- return new NoOpDataChangeListenerRegistration(listener);
- }
- //if shard is local
- ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(DataChangeListener.props(listener));
- Future future = actorContext.executeOperationAsync(shard.get(),
- new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
- new Timeout(actorContext.getOperationDuration().$times(REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR)));
-
final DataChangeListenerRegistrationProxy listenerRegistrationProxy =
- new DataChangeListenerRegistrationProxy(listener, dataChangeListenerActor);
-
- future.onComplete(new OnComplete() {
-
- @Override
- public void onComplete(Throwable failure, Object result)
- throws Throwable {
- if (failure != null) {
- LOG.error("Failed to register listener at path " + path.toString(), failure);
- return;
- }
- RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
- listenerRegistrationProxy.setListenerRegistrationActor(actorContext
- .actorSelection(reply.getListenerRegistrationPath()));
- }
- }, actorContext.getActorSystem().dispatcher());
+ new DataChangeListenerRegistrationProxy(shardName, actorContext, listener);
+ listenerRegistrationProxy.init(path, scope);
return listenerRegistrationProxy;
-
}
@Override
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.UntypedActor;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
-
-public class NoOpCohort extends UntypedActor {
-
- @Override public void onReceive(Object message) throws Exception {
- if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
- getSender().tell(new CanCommitTransactionReply(false).toSerializable(), getSelf());
- } else if (message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
- getSender().tell(
- new PreCommitTransactionReply().toSerializable(),
- getSelf());
- } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
- getSender().tell(new CommitTransactionReply().toSerializable(), getSelf());
- } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
- getSender().tell(new AbortTransactionReply().toSerializable(), getSelf());
- } else {
- throw new Exception ("Not recognized message received,message="+message);
- }
-
- }
-}
-
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * When a consumer registers a data change listener and no local shard is
- * available to register that listener with then we return an instance of
- * NoOpDataChangeListenerRegistration
- *
- * <p>
- *
- * The NoOpDataChangeListenerRegistration as it's name suggests does
- * nothing when an operation is invoked on it
- */
-public class NoOpDataChangeListenerRegistration
- implements ListenerRegistration {
-
- private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
- listener;
-
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> NoOpDataChangeListenerRegistration(
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
-
- this.listener = listener;
- }
-
- @Override
- public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
- return listener;
- }
-
- @Override public void close() {
-
- }
-}
import scala.concurrent.duration.FiniteDuration;
import javax.annotation.Nonnull;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
private final ShardStats shardMBean;
- private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
+ private final List<ActorSelection> dataChangeListeners = Lists.newArrayList();
+
+ private final List<DelayedListenerRegistration> delayedListenerRegistrations =
+ Lists.newArrayList();
private final DatastoreContext datastoreContext;
if (message instanceof RecoveryFailure){
LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+
+ // Even though recovery failed, we still need to finish our recovery, eg send the
+ // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
+ onRecoveryComplete();
} else {
super.onReceiveRecover(message);
}
store.onGlobalContextUpdated(message.getSchemaContext());
}
- @VisibleForTesting void updateSchemaContext(SchemaContext schemaContext) {
+ @VisibleForTesting
+ void updateSchemaContext(SchemaContext schemaContext) {
store.onGlobalContextUpdated(schemaContext);
}
- private void registerChangeListener(
- RegisterChangeListener registerChangeListener) {
+ private void registerChangeListener(RegisterChangeListener registerChangeListener) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("registerDataChangeListener for {}", registerChangeListener
- .getPath());
+ LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
+
+ ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> registration;
+ if(isLeader()) {
+ registration = doChangeListenerRegistration(registerChangeListener);
+ } else {
+ LOG.debug("Shard is not the leader - delaying registration");
+
+ DelayedListenerRegistration delayedReg =
+ new DelayedListenerRegistration(registerChangeListener);
+ delayedListenerRegistrations.add(delayedReg);
+ registration = delayedReg;
}
+ ActorRef listenerRegistration = getContext().actorOf(
+ DataChangeListenerRegistration.props(registration));
- ActorSelection dataChangeListenerPath = getContext()
- .system().actorSelection(
- registerChangeListener.getDataChangeListenerPath());
+ LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+ listenerRegistration.path());
+ getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf());
+ }
+
+ private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> doChangeListenerRegistration(
+ RegisterChangeListener registerChangeListener) {
+
+ ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
+ registerChangeListener.getDataChangeListenerPath());
// Notify the listener if notifications should be enabled or not
// If this shard is the leader then it will enable notifications else
// it will not
- dataChangeListenerPath
- .tell(new EnableNotification(isLeader()), getSelf());
+ dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
// Now store a reference to the data change listener so it can be notified
// at a later point if notifications should be enabled or disabled
dataChangeListeners.add(dataChangeListenerPath);
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
- listener = new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
-
- ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- registration = store.registerChangeListener(registerChangeListener.getPath(),
- listener, registerChangeListener.getScope());
- ActorRef listenerRegistration =
- getContext().actorOf(
- DataChangeListenerRegistration.props(registration));
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
+ new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
- if(LOG.isDebugEnabled()) {
- LOG.debug(
- "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
- , listenerRegistration.path().toString());
- }
+ LOG.debug("Registering for path {}", registerChangeListener.getPath());
- getSender()
- .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
- getSelf());
+ return store.registerChangeListener(registerChangeListener.getPath(), listener,
+ registerChangeListener.getScope());
}
private boolean isMetricsCaptureEnabled(){
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
- // Schedule a message to be periodically sent to check if the current in-progress
- // transaction should be expired and aborted.
- FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
- txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
- period, period, getSelf(),
- TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
+ // Being paranoid here - this method should only be called once but just in case...
+ if(txCommitTimeoutCheckSchedule == null) {
+ // Schedule a message to be periodically sent to check if the current in-progress
+ // transaction should be expired and aborted.
+ FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
+ txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
+ period, period, getSelf(),
+ TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
+ }
}
@Override
}
}
- @Override protected void onStateChanged() {
+ @Override
+ protected void onStateChanged() {
+ boolean isLeader = isLeader();
for (ActorSelection dataChangeListener : dataChangeListeners) {
- dataChangeListener
- .tell(new EnableNotification(isLeader()), getSelf());
+ dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
+ }
+
+ if(isLeader) {
+ for(DelayedListenerRegistration reg: delayedListenerRegistrations) {
+ if(!reg.isClosed()) {
+ reg.setDelegate(doChangeListenerRegistration(reg.getRegisterChangeListener()));
+ }
+ }
+
+ delayedListenerRegistrations.clear();
}
shardMBean.setRaftState(getRaftState().name());
shardMBean.setCurrentTerm(getCurrentTerm());
// If this actor is no longer the leader close all the transaction chains
- if(!isLeader()){
+ if(!isLeader){
for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
if(LOG.isDebugEnabled()) {
LOG.debug(
ShardStats getShardMBean() {
return shardMBean;
}
+
+ private static class DelayedListenerRegistration implements
+ ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
+
+ private volatile boolean closed;
+
+ private final RegisterChangeListener registerChangeListener;
+
+ private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> delegate;
+
+ DelayedListenerRegistration(RegisterChangeListener registerChangeListener) {
+ this.registerChangeListener = registerChangeListener;
+ }
+
+ void setDelegate( ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> registration) {
+ this.delegate = registration;
+ }
+
+ boolean isClosed() {
+ return closed;
+ }
+
+ RegisterChangeListener getRegisterChangeListener() {
+ return registerChangeListener;
+ }
+
+ @Override
+ public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
+ return delegate != null ? delegate.getInstance() : null;
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ if(delegate != null) {
+ delegate.close();
+ }
+ }
+ }
}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
LOG.debug("Initializing shard [{}]", shardName);
ShardInformation shardInformation = localShards.get(shardName);
if (shardInformation != null) {
- shardInformation.setShardInitialized(true);
+ shardInformation.setActorInitialized();
}
}
return;
}
- sendResponse(shardInformation, new Supplier<Object>() {
+ sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
@Override
public Object get() {
return new LocalShardFound(shardInformation.getActor());
});
}
- private void sendResponse(ShardInformation shardInformation, Supplier<Object> messageSupplier) {
- if (shardInformation.getActor() == null || !shardInformation.isShardInitialized()) {
- getSender().tell(new ActorNotInitialized(), getSelf());
+ private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
+ final Supplier<Object> messageSupplier) {
+ if (!shardInformation.isShardInitialized()) {
+ if(waitUntilInitialized) {
+ final ActorRef sender = getSender();
+ final ActorRef self = self();
+ shardInformation.addRunnableOnInitialized(new Runnable() {
+ @Override
+ public void run() {
+ sender.tell(messageSupplier.get(), self);
+ }
+ });
+ } else {
+ getSender().tell(new ActorNotInitialized(), getSelf());
+ }
+
return;
}
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
if (info != null) {
- sendResponse(info, new Supplier<Object>() {
+ sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
@Override
public Object get() {
return new PrimaryFound(info.getActorPath().toString()).toSerializable();
private ActorRef actor;
private ActorPath actorPath;
private final Map<ShardIdentifier, String> peerAddresses;
- private boolean shardInitialized = false; // flag that determines if the actor is ready for business
+
+ // flag that determines if the actor is ready for business
+ private boolean actorInitialized = false;
+
+ private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
private ShardInformation(String shardName, ShardIdentifier shardId,
Map<ShardIdentifier, String> peerAddresses) {
}
boolean isShardInitialized() {
- return shardInitialized;
+ return getActor() != null && actorInitialized;
+ }
+
+ void setActorInitialized() {
+ this.actorInitialized = true;
+
+ for(Runnable runnable: runnablesOnInitialized) {
+ runnable.run();
+ }
+
+ runnablesOnInitialized.clear();
}
- void setShardInitialized(boolean shardInitialized) {
- this.shardInitialized = shardInitialized;
+ void addRunnableOnInitialized(Runnable runnable) {
+ runnablesOnInitialized.add(runnable);
}
}
}
static class SchemaContextModules implements Serializable {
- private static final long serialVersionUID = 1L;
-
private final Set<String> modules;
SchemaContextModules(Set<String> modules){
import akka.japi.Creator;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
* <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
* </p>
*/
-public abstract class ShardTransaction extends AbstractUntypedActor {
+public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
private final ActorRef shardActor;
private final SchemaContext schemaContext;
protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
ShardStats shardStats, String transactionID) {
+ super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
this.shardActor = shardActor;
this.schemaContext = schemaContext;
this.shardStats = shardStats;
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.exceptions;
+
+/**
+ * Exception thrown when attempting to find a local shard but it doesn't exist.
+ *
+ * @author Thomas Pantelis
+ */
+public class LocalShardNotFoundException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public LocalShardNotFoundException(String message){
+ super(message);
+ }
+}
*/
public class FindLocalShard {
private final String shardName;
+ private final boolean waitUntilInitialized;
- public FindLocalShard(String shardName) {
+ public FindLocalShard(String shardName, boolean waitUntilInitialized) {
this.shardName = shardName;
+ this.waitUntilInitialized = waitUntilInitialized;
}
public String getShardName() {
return shardName;
}
+
+ public boolean isWaitUntilInitialized() {
+ return waitUntilInitialized;
+ }
}
*
*/
public class FindPrimary implements SerializableMessage{
- public static final Class SERIALIZABLE_CLASS = FindPrimary.class;
+ public static final Class<FindPrimary> SERIALIZABLE_CLASS = FindPrimary.class;
+
private final String shardName;
+ private final boolean waitUntilInitialized;
- public FindPrimary(String shardName){
+ public FindPrimary(String shardName, boolean waitUntilInitialized){
Preconditions.checkNotNull(shardName, "shardName should not be null");
this.shardName = shardName;
+ this.waitUntilInitialized = waitUntilInitialized;
}
public String getShardName() {
return shardName;
}
- @Override
- public Object toSerializable() {
- return this;
- }
+ public boolean isWaitUntilInitialized() {
+ return waitUntilInitialized;
+ }
- public static FindPrimary fromSerializable(Object message){
- return (FindPrimary) message;
- }
+ @Override
+ public Object toSerializable() {
+ return this;
+ }
+
+ public static FindPrimary fromSerializable(Object message){
+ return (FindPrimary) message;
+ }
}
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
+import akka.dispatch.Mapper;
import akka.util.Timeout;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
}
/**
- * Finds a local shard given it's shard name and return it's ActorRef
+ * Finds a local shard given its shard name and return it's ActorRef
*
* @param shardName the name of the local shard that needs to be found
* @return a reference to a local shard actor which represents the shard
* specified by the shardName
*/
public Optional<ActorRef> findLocalShard(String shardName) {
- Object result = executeOperation(shardManager, new FindLocalShard(shardName));
+ Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
if (result instanceof LocalShardFound) {
LocalShardFound found = (LocalShardFound) result;
return Optional.absent();
}
+ /**
+ * Finds a local shard async given its shard name and return a Future from which to obtain the
+ * ActorRef.
+ *
+ * @param shardName the name of the local shard that needs to be found
+ */
+ public Future<ActorRef> findLocalShardAsync( final String shardName, Timeout timeout) {
+ Future<Object> future = executeOperationAsync(shardManager,
+ new FindLocalShard(shardName, true), timeout);
+
+ return future.map(new Mapper<Object, ActorRef>() {
+ @Override
+ public ActorRef checkedApply(Object response) throws Throwable {
+ if(response instanceof LocalShardFound) {
+ LocalShardFound found = (LocalShardFound)response;
+ LOG.debug("Local shard found {}", found.getPath());
+ return found.getPath();
+ } else if(response instanceof ActorNotInitialized) {
+ throw new NotInitializedException(
+ String.format("Found local shard for %s but it's not initialized yet.",
+ shardName));
+ } else if(response instanceof LocalShardNotFound) {
+ throw new LocalShardNotFoundException(
+ String.format("Local shard for %s does not exist.", shardName));
+ }
+
+ throw new UnknownMessageException(String.format(
+ "FindLocalShard returned unkown response: %s", response));
+ }
+ }, getActorSystem().dispatcher());
+ }
private String findPrimaryPathOrNull(String shardName) {
- Object result = executeOperation(shardManager, new FindPrimary(shardName).toSerializable());
+ Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
PrimaryFound found = PrimaryFound.fromSerializable(result);
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.cluster.datastore;
+import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
import akka.actor.Props;
-import junit.framework.Assert;
+import akka.actor.Terminated;
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
-import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
+import scala.concurrent.ExecutionContextExecutor;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
-import java.util.List;
+/**
+ * Unit tests for DataChangeListenerRegistrationProxy.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertNotNull;
-import static junit.framework.TestCase.assertTrue;
+ @SuppressWarnings("unchecked")
+ private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockListener =
+ Mockito.mock(AsyncDataChangeListener.class);
-public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
+ @Test
+ public void testGetInstance() throws Exception {
+ DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ "shard", Mockito.mock(ActorContext.class), mockListener);
+
+ Assert.assertEquals(mockListener, proxy.getInstance());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout=10000)
+ public void testSuccessfulRegistration() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+ mock(ClusterWrapper.class), mock(Configuration.class));
+
+ final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ "shard-1", actorContext, mockListener);
+
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+ new Thread() {
+ @Override
+ public void run() {
+ proxy.init(path, scope);
+ }
+
+ }.start();
+
+ FiniteDuration timeout = duration("5 seconds");
+ FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+ Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+ reply(new LocalShardFound(getRef()));
+
+ RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
+ Assert.assertEquals("getPath", path, registerMsg.getPath());
+ Assert.assertEquals("getScope", scope, registerMsg.getScope());
+
+ reply(new RegisterChangeListenerReply(getRef().path()));
+
+ for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
+ proxy.getListenerRegistrationActor());
+
+ watch(proxy.getDataChangeListenerActor());
- private ActorRef dataChangeListenerActor = getSystem().actorOf(Props.create(DoNothingActor.class));
+ proxy.close();
- private static class MockDataChangeListener implements
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
+ // The listener registration actor should get a Close message
+ expectMsgClass(timeout, CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
- @Override public void onDataChanged(
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- throw new UnsupportedOperationException("onDataChanged");
- }
+ // The DataChangeListener actor should be terminated
+ expectMsgClass(timeout, Terminated.class);
+
+ proxy.close();
+
+ expectNoMsg();
+ }};
}
- @Test
- public void testGetInstance() throws Exception {
- final Props props = Props.create(MessageCollectorActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
+ @Test(timeout=10000)
+ public void testLocalShardNotFound() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+ mock(ClusterWrapper.class), mock(Configuration.class));
- MockDataChangeListener listener =
- new MockDataChangeListener();
- DataChangeListenerRegistrationProxy proxy =
- new DataChangeListenerRegistrationProxy(
- getSystem().actorSelection(actorRef.path()),
- listener, dataChangeListenerActor);
+ final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ "shard-1", actorContext, mockListener);
- Assert.assertEquals(listener, proxy.getInstance());
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+ new Thread() {
+ @Override
+ public void run() {
+ proxy.init(path, scope);
+ }
+ }.start();
+
+ FiniteDuration timeout = duration("5 seconds");
+ FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+ Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+ reply(new LocalShardNotFound("shard-1"));
+
+ expectNoMsg(duration("1 seconds"));
+ }};
}
- @Test
- public void testClose() throws Exception {
- final Props props = Props.create(MessageCollectorActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
+ @Test(timeout=10000)
+ public void testLocalShardNotInitialized() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+ mock(ClusterWrapper.class), mock(Configuration.class));
- DataChangeListenerRegistrationProxy proxy =
- new DataChangeListenerRegistrationProxy(
- getSystem().actorSelection(actorRef.path()),
- new MockDataChangeListener(), dataChangeListenerActor);
+ final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ "shard-1", actorContext, mockListener);
- proxy.close();
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+ new Thread() {
+ @Override
+ public void run() {
+ proxy.init(path, scope);
+ }
+
+ }.start();
+
+ FiniteDuration timeout = duration("5 seconds");
+ FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+ Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+ reply(new ActorNotInitialized());
+
+ new Within(duration("1 seconds")) {
+ @Override
+ protected void run() {
+ expectNoMsg();
+ }
+ };
+ }};
+ }
+
+ @Test
+ public void testFailedRegistration() {
+ new JavaTestKit(getSystem()) {{
+ ActorSystem mockActorSystem = mock(ActorSystem.class);
- //Check if it was received by the remote actor
- ActorContext
- testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
- Object messages = testContext
- .executeOperation(actorRef, "messages");
+ ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class),
+ "testFailedRegistration");
+ doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
+ ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(
+ MoreExecutors.sameThreadExecutor());
+ doReturn(executor).when(mockActorSystem).dispatcher();
- assertNotNull(messages);
+ ActorContext actorContext = mock(ActorContext.class);
- assertTrue(messages instanceof List);
+ String shardName = "shard-1";
+ final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ shardName, actorContext, mockListener);
- List<Object> listMessages = (List<Object>) messages;
+ doReturn(mockActorSystem).when(actorContext).getActorSystem();
+ doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
+ doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName),
+ any(Timeout.class));
+ doReturn(Futures.failed(new RuntimeException("mock"))).
+ when(actorContext).executeOperationAsync(any(ActorRef.class),
+ any(Object.class), any(Timeout.class));
- assertEquals(1, listMessages.size());
+ proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
+ AsyncDataBroker.DataChangeScope.ONE);
- assertTrue(listMessages.get(0).getClass()
- .equals(CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS));
+ Assert.assertEquals("getListenerRegistrationActor", null,
+ proxy.getListenerRegistrationActor());
+ }};
}
+ @SuppressWarnings("unchecked")
@Test
- public void testCloseWhenRegistrationIsNull() throws Exception {
- final Props props = Props.create(MessageCollectorActor.class);
- final ActorRef actorRef = getSystem().actorOf(props);
+ public void testCloseBeforeRegistration() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = mock(ActorContext.class);
- DataChangeListenerRegistrationProxy proxy =
- new DataChangeListenerRegistrationProxy(
- new MockDataChangeListener(), dataChangeListenerActor);
+ String shardName = "shard-1";
+ final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+ shardName, actorContext, mockListener);
- proxy.close();
+ doReturn(getSystem()).when(actorContext).getActorSystem();
+ doReturn(getSystem().actorSelection(getRef().path())).
+ when(actorContext).actorSelection(getRef().path());
+ doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
+ doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName),
+ any(Timeout.class));
- //Check if it was received by the remote actor
- ActorContext
- testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
- Object messages = testContext
- .executeOperation(actorRef, "messages");
+ Answer<Future<Object>> answer = new Answer<Future<Object>>() {
+ @Override
+ public Future<Object> answer(InvocationOnMock invocation) {
+ proxy.close();
+ return Futures.successful((Object)new RegisterChangeListenerReply(getRef().path()));
+ }
+ };
- assertNotNull(messages);
+ doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class),
+ any(Object.class), any(Timeout.class));
- assertTrue(messages instanceof List);
+ proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
+ AsyncDataBroker.DataChangeScope.ONE);
- List<Object> listMessages = (List<Object>) messages;
+ expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
- assertEquals(0, listMessages.size());
+ Assert.assertEquals("getListenerRegistrationActor", null,
+ proxy.getListenerRegistrationActor());
+ }};
}
}
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
}};
}
+ @Test
+ public void testChangeListenerRegistration() throws Exception{
+ new IntegrationTestKit(getSystem()) {{
+ DistributedDataStore dataStore =
+ setupDistributedDataStore("testChangeListenerRegistration", "test-1");
+
+ MockDataChangeListener listener = new MockDataChangeListener(3);
+
+ ListenerRegistration<MockDataChangeListener>
+ listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
+ DataChangeScope.SUBTREE);
+
+ assertNotNull("registerChangeListener returned null", listenerReg);
+
+ testWriteTransaction(dataStore, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+ YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
+ nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
+ testWriteTransaction(dataStore, listPath,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
+
+ listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath );
+
+ listenerReg.close();
+
+ testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
+ nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
+
+ listener.expectNoMoreChanges("Received unexpected change after close");
+
+ cleanup(dataStore);
+ }};
+ }
+
class IntegrationTestKit extends ShardTestKit {
IntegrationTestKit(ActorSystem actorSystem) {
+++ /dev/null
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.dispatch.ExecutionContexts;
-import akka.dispatch.Futures;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import java.util.concurrent.TimeUnit;
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertNull;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class DistributedDataStoreTest extends AbstractActorTest{
-
- private DistributedDataStore distributedDataStore;
- private MockActorContext mockActorContext;
- private ActorRef doNothingActorRef;
-
- @Before
- public void setUp() throws Exception {
- ShardStrategyFactory.setConfiguration(new MockConfiguration());
- final Props props = Props.create(DoNothingActor.class);
-
- doNothingActorRef = getSystem().actorOf(props);
-
- mockActorContext = new MockActorContext(getSystem(), doNothingActorRef);
- distributedDataStore = new DistributedDataStore(mockActorContext);
- distributedDataStore.onGlobalContextUpdated(
- TestModel.createTestContext());
-
- // Make CreateTransactionReply as the default response. Will need to be
- // tuned if a specific test requires some other response
- mockActorContext.setExecuteShardOperationResponse(
- CreateTransactionReply.newBuilder()
- .setTransactionActorPath(doNothingActorRef.path().toString())
- .setTransactionId("txn-1 ")
- .build());
- }
-
- @After
- public void tearDown() throws Exception {
-
- }
-
- @SuppressWarnings("resource")
- @Test
- public void testConstructor(){
- ActorSystem actorSystem = mock(ActorSystem.class);
-
- new DistributedDataStore(actorSystem, "config",
- mock(ClusterWrapper.class), mock(Configuration.class),
- DatastoreContext.newBuilder().build());
-
- verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
- }
-
- @Test
- public void testRegisterChangeListenerWhenShardIsNotLocal() throws Exception {
-
- ListenerRegistration registration =
- distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
- @Override
- public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- throw new UnsupportedOperationException("onDataChanged");
- }
- }, AsyncDataBroker.DataChangeScope.BASE);
-
- // Since we do not expect the shard to be local registration will return a NoOpRegistration
- assertTrue(registration instanceof NoOpDataChangeListenerRegistration);
-
- assertNotNull(registration);
- }
-
- @Test
- public void testRegisterChangeListenerWhenShardIsLocal() throws Exception {
- ActorContext actorContext = mock(ActorContext.class);
-
- distributedDataStore = new DistributedDataStore(actorContext);
- distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
-
- Future future = mock(Future.class);
- when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
- when(actorContext.getActorSystem()).thenReturn(getSystem());
- when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
- when(actorContext
- .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(future);
-
- ListenerRegistration registration =
- distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
- mock(AsyncDataChangeListener.class),
- AsyncDataBroker.DataChangeScope.BASE);
-
- assertNotNull(registration);
-
- assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
- }
-
- @Test
- public void testRegisterChangeListenerWhenSuccessfulReplyReceived() throws Exception {
- ActorContext actorContext = mock(ActorContext.class);
-
- distributedDataStore = new DistributedDataStore(actorContext);
- distributedDataStore.onGlobalContextUpdated(
- TestModel.createTestContext());
-
- ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.sameThreadExecutor());
-
- // Make Future successful
- Future f = Futures.successful(new RegisterChangeListenerReply(doNothingActorRef.path()));
-
- // Setup the mocks
- ActorSystem actorSystem = mock(ActorSystem.class);
- ActorSelection actorSelection = mock(ActorSelection.class);
-
- when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
- when(actorSystem.dispatcher()).thenReturn(executor);
- when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
- when(actorContext.getActorSystem()).thenReturn(actorSystem);
- when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
- when(actorContext
- .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(f);
- when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
-
- ListenerRegistration registration =
- distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
- mock(AsyncDataChangeListener.class),
- AsyncDataBroker.DataChangeScope.BASE);
-
- assertNotNull(registration);
-
- assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
-
- ActorSelection listenerRegistrationActor =
- ((DataChangeListenerRegistrationProxy) registration).getListenerRegistrationActor();
-
- assertNotNull(listenerRegistrationActor);
-
- assertEquals(actorSelection, listenerRegistrationActor);
- }
-
- @Test
- public void testRegisterChangeListenerWhenSuccessfulReplyFailed() throws Exception {
- ActorContext actorContext = mock(ActorContext.class);
-
- distributedDataStore = new DistributedDataStore(actorContext);
- distributedDataStore.onGlobalContextUpdated(
- TestModel.createTestContext());
-
- ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.sameThreadExecutor());
-
- // Make Future fail
- Future f = Futures.failed(new IllegalArgumentException());
-
- // Setup the mocks
- ActorSystem actorSystem = mock(ActorSystem.class);
- ActorSelection actorSelection = mock(ActorSelection.class);
-
- when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
- when(actorSystem.dispatcher()).thenReturn(executor);
- when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
- when(actorContext.getActorSystem()).thenReturn(actorSystem);
- when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
- when(actorContext
- .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(f);
- when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
-
- ListenerRegistration registration =
- distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
- mock(AsyncDataChangeListener.class),
- AsyncDataBroker.DataChangeScope.BASE);
-
- assertNotNull(registration);
-
- assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
-
- ActorSelection listenerRegistrationActor =
- ((DataChangeListenerRegistrationProxy) registration).getListenerRegistrationActor();
-
- assertNull(listenerRegistrationActor);
-
- }
-
-
- @Test
- public void testCreateTransactionChain() throws Exception {
- final DOMStoreTransactionChain transactionChain = distributedDataStore.createTransactionChain();
- assertNotNull(transactionChain);
- }
-
- @Test
- public void testNewReadOnlyTransaction() throws Exception {
- final DOMStoreReadTransaction transaction = distributedDataStore.newReadOnlyTransaction();
- assertNotNull(transaction);
- }
-
- @Test
- public void testNewWriteOnlyTransaction() throws Exception {
- final DOMStoreWriteTransaction transaction = distributedDataStore.newWriteOnlyTransaction();
- assertNotNull(transaction);
- }
-
- @Test
- public void testNewReadWriteTransaction() throws Exception {
- final DOMStoreReadWriteTransaction transaction = distributedDataStore.newReadWriteTransaction();
- assertNotNull(transaction);
- }
-}
import akka.actor.ActorRef;
import akka.actor.Props;
+import akka.pattern.Patterns;
import akka.persistence.RecoveryCompleted;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import akka.util.Timeout;
import akka.japi.Creator;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
import java.net.URI;
import java.util.Collection;
import java.util.HashSet;
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shardManager.tell(new FindPrimary("non-existent").toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
expectMsgEquals(duration("5 seconds"),
new PrimaryNotFound("non-existent").toSerializable());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
}};
}
@Test
- public void testOnReceiveFindPrimaryForNotInitialzedShard() throws Exception {
+ public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
new JavaTestKit(getSystem()) {{
final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+ shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
}};
}
+ @Test
+ public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ // We're passing waitUntilInitialized = true to FindPrimary so the response should be
+ // delayed until we send ActorInitialized.
+ Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
+ new Timeout(5, TimeUnit.SECONDS));
+
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ Object resp = Await.result(future, duration("5 seconds"));
+ assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
+ }};
+ }
+
@Test
public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
new JavaTestKit(getSystem()) {{
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- shardManager.tell(new FindLocalShard("non-existent"), getRef());
+ shardManager.tell(new FindLocalShard("non-existent", false), getRef());
LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- //shardManager.tell(new ActorInitialized(), mockShardActor);
- shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+ shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
}};
}
+ @Test
+ public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+ // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
+ // delayed until we send ActorInitialized.
+ Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
+ new Timeout(5, TimeUnit.SECONDS));
+
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ Object resp = Await.result(future, duration("5 seconds"));
+ assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
+ }};
+ }
+
@Test
public void testOnReceiveMemberUp() throws Exception {
new JavaTestKit(getSystem()) {{
MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
- shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
PrimaryFound.SERIALIZABLE_CLASS));
MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
- shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
- shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+ shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
}};
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.dispatch.Dispatchers;
import akka.dispatch.OnComplete;
import akka.japi.Creator;
import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
+import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
- private static final ShardIdentifier IDENTIFIER = ShardIdentifier.builder().memberName("member-1")
- .shardName("inventory").type("config").build();
-
private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
- private static String shardName() {
- return "shard" + NEXT_SHARD_NUM.getAndIncrement();
- }
+ private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
- shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).build();
+ shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
+ shardHeartbeatIntervalInMillis(100).build();
@Before
public void setUp() {
}
private Props newShardProps() {
- return Shard.props(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+ return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
dataStoreContext, SCHEMA_CONTEXT);
}
@Test
- public void testOnReceiveRegisterListener() throws Exception {
- new JavaTestKit(getSystem()) {{
- ActorRef subject = getSystem().actorOf(newShardProps(), "testRegisterChangeListener");
+ public void testRegisterChangeListener() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps(), "testRegisterChangeListener");
- subject.tell(new UpdateSchemaContext(SchemaContextHelper.full()), getRef());
+ waitUntilLeader(shard);
- subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
- getRef().path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+ shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
- EnableNotification enable = expectMsgClass(duration("3 seconds"), EnableNotification.class);
- assertEquals("isEnabled", false, enable.isEnabled());
+ MockDataChangeListener listener = new MockDataChangeListener(1);
+ ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+ "testRegisterChangeListener-DataChangeListener");
+
+ shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
+ dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
RegisterChangeListenerReply.class);
- assertTrue(reply.getListenerRegistrationPath().toString().matches(
+ String replyPath = reply.getListenerRegistrationPath().toString();
+ assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
"akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
+
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ listener.waitForChangeEvents(path);
+
+ dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @SuppressWarnings("serial")
+ @Test
+ public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
+ // This test tests the timing window in which a change listener is registered before the
+ // shard becomes the leader. We verify that the listener is registered and notified of the
+ // existing data when the shard becomes the leader.
+ new ShardTestKit(getSystem()) {{
+ // For this test, we want to send the RegisterChangeListener message after the shard
+ // has recovered from persistence and before it becomes the leader. So we subclass
+ // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
+ // we know that the shard has been initialized to a follower and has started the
+ // election process. The following 2 CountDownLatches are used to coordinate the
+ // ElectionTimeout with the sending of the RegisterChangeListener message.
+ final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
+ final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
+ Creator<Shard> creator = new Creator<Shard>() {
+ boolean firstElectionTimeout = true;
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+ dataStoreContext, SCHEMA_CONTEXT) {
+ @Override
+ public void onReceiveCommand(final Object message) {
+ if(message instanceof ElectionTimeout && firstElectionTimeout) {
+ // Got the first ElectionTimeout. We don't forward it to the
+ // base Shard yet until we've sent the RegisterChangeListener
+ // message. So we signal the onFirstElectionTimeout latch to tell
+ // the main thread to send the RegisterChangeListener message and
+ // start a thread to wait on the onChangeListenerRegistered latch,
+ // which the main thread signals after it has sent the message.
+ // After the onChangeListenerRegistered is triggered, we send the
+ // original ElectionTimeout message to proceed with the election.
+ firstElectionTimeout = false;
+ final ActorRef self = getSelf();
+ new Thread() {
+ @Override
+ public void run() {
+ Uninterruptibles.awaitUninterruptibly(
+ onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+ self.tell(message, self);
+ }
+ }.start();
+
+ onFirstElectionTimeout.countDown();
+ } else {
+ super.onReceiveCommand(message);
+ }
+ }
+ };
+ }
+ };
+
+ MockDataChangeListener listener = new MockDataChangeListener(1);
+ ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
+ "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
+
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(creator)),
+ "testRegisterChangeListenerWhenNotLeaderInitially");
+
+ // Write initial data into the in-memory store.
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ // Wait until the shard receives the first ElectionTimeout message.
+ assertEquals("Got first ElectionTimeout", true,
+ onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
+
+ // Now send the RegisterChangeListener and wait for the reply.
+ shard.tell(new RegisterChangeListener(path, dclActor.path(),
+ AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
+
+ RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterChangeListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ // Sanity check - verify the shard is not the leader yet.
+ shard.tell(new FindLeader(), getRef());
+ FindLeaderReply findLeadeReply =
+ expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
+ assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+
+ // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
+ // with the election process.
+ onChangeListenerRegistered.countDown();
+
+ // Wait for the shard to become the leader and notify our listener with the existing
+ // data in the store.
+ listener.waitForChangeEvents(path);
+
+ dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testCreateTransaction(){
new ShardTestKit(getSystem()) {{
- ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransaction");
+ ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
- waitUntilLeader(subject);
+ waitUntilLeader(shard);
- subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- subject.tell(new CreateTransaction("txn-1",
+ shard.tell(new CreateTransaction("txn-1",
TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
String path = reply.getTransactionActorPath().toString();
assertTrue("Unexpected transaction path " + path,
path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
- expectNoMsg();
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testCreateTransactionOnChain(){
new ShardTestKit(getSystem()) {{
- final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
+ final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
- waitUntilLeader(subject);
+ waitUntilLeader(shard);
- subject.tell(new CreateTransaction("txn-1",
+ shard.tell(new CreateTransaction("txn-1",
TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
getRef());
String path = reply.getTransactionActorPath().toString();
assertTrue("Unexpected transaction path " + path,
path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
- expectNoMsg();
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testPeerAddressResolved(){
- new JavaTestKit(getSystem()) {{
- final ShardIdentifier identifier =
- ShardIdentifier.builder().memberName("member-1")
- .shardName("inventory").type("config").build();
+ new ShardTestKit(getSystem()) {{
+ final CountDownLatch recoveryComplete = new CountDownLatch(1);
+ class TestShard extends Shard {
+ TestShard() {
+ super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
+ dataStoreContext, SCHEMA_CONTEXT);
+ }
- Props props = Shard.props(identifier,
- Collections.<ShardIdentifier, String>singletonMap(identifier, null),
- dataStoreContext, SCHEMA_CONTEXT);
- final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved");
+ Map<String, String> getPeerAddresses() {
+ return getRaftActorContext().getPeerAddresses();
+ }
- new Within(duration("3 seconds")) {
@Override
- protected void run() {
+ protected void onRecoveryComplete() {
+ try {
+ super.onRecoveryComplete();
+ } finally {
+ recoveryComplete.countDown();
+ }
+ }
+ }
- subject.tell(
- new PeerAddressResolved(identifier, "akka://foobar"),
- getRef());
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ Props.create(new DelegatingShardCreator(new Creator<Shard>() {
+ @Override
+ public TestShard create() throws Exception {
+ return new TestShard();
+ }
+ })), "testPeerAddressResolved");
- expectNoMsg();
- }
- };
+ //waitUntilLeader(shard);
+ assertEquals("Recovery complete", true,
+ Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
+
+ String address = "akka://foobar";
+ shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
+
+ assertEquals("getPeerAddresses", address,
+ ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
public void testApplySnapshot() throws ExecutionException, InterruptedException {
- TestActorRef<Shard> ref = TestActorRef.create(getSystem(), newShardProps());
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
+ "testApplySnapshot");
NormalizedNodeToNodeCodec codec =
new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
- writeToStore(ref, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
- NormalizedNode<?,?> expected = readStore(ref, root);
+ NormalizedNode<?,?> expected = readStore(shard, root);
NormalizedNodeMessages.Container encode = codec.encode(expected);
encode.getNormalizedNode().toByteString().toByteArray(),
Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
- ref.underlyingActor().onReceiveCommand(applySnapshot);
+ shard.underlyingActor().onReceiveCommand(applySnapshot);
- NormalizedNode<?,?> actual = readStore(ref, root);
+ NormalizedNode<?,?> actual = readStore(shard, root);
assertEquals(expected, actual);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
@Test
public void testApplyState() throws Exception {
- TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
assertEquals("Applied state", node, actual);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
@SuppressWarnings("serial")
DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
- InMemorySnapshotStore.addSnapshot(IDENTIFIER.toString(), Snapshot.create(
+ InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
root).
getNormalizedNode().toByteString().toByteArray(),
// Set up the InMemoryJournal.
- InMemoryJournal.addEntry(IDENTIFIER.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
+ InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
new WriteModification(TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
SCHEMA_CONTEXT))));
Modification mod = new MergeModification(path,
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
SCHEMA_CONTEXT);
- InMemoryJournal.addEntry(IDENTIFIER.toString(), i, new ReplicatedLogImplEntry(i, 1,
+ InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
newPayload(mod)));
}
- InMemoryJournal.addEntry(IDENTIFIER.toString(), nListEntries + 1,
+ InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
new ApplyLogEntries(nListEntries));
// Create the actor and wait for recovery complete.
Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
- return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+ return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
dataStoreContext, SCHEMA_CONTEXT) {
@Override
protected void onRecoveryComplete() {
shard.underlyingActor().getShardMBean().getCommitIndex());
assertEquals("Last applied", nListEntries,
shard.underlyingActor().getShardMBean().getLastApplied());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
private CompositeModificationPayload newPayload(Modification... mods) {
System.setProperty("shard.persistent", "true");
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testConcurrentThreePhaseCommits");
waitUntilLeader(shard);
@Override
public void onComplete(Throwable error, Object resp) {
if(error != null) {
- System.out.println(new java.util.Date()+": "+getClass().getSimpleName() + " failure: "+error);
caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
} else {
try {
assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
+ for(int i = 0; i < 20 * 5; i++) {
+ long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
+ if(lastLogIndex == 2) {
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
public void testCommitPhaseFailure() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitPhaseFailure");
waitUntilLeader(shard);
inOrder.verify(cohort1).preCommit();
inOrder.verify(cohort1).commit();
inOrder.verify(cohort2).canCommit();
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
public void testPreCommitPhaseFailure() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testPreCommitPhaseFailure");
waitUntilLeader(shard);
InOrder inOrder = inOrder(cohort);
inOrder.verify(cohort).canCommit();
inOrder.verify(cohort).preCommit();
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
public void testCanCommitPhaseFailure() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCanCommitPhaseFailure");
waitUntilLeader(shard);
shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
System.setProperty("shard.persistent", "true");
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testAbortBeforeFinishCommit");
waitUntilLeader(shard);
NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionCommitTimeout");
waitUntilLeader(shard);
NormalizedNode<?, ?> node = readStore(shard, listNodePath);
assertNotNull(listNodePath + " not found", node);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionCommitQueueCapacityExceeded");
waitUntilLeader(shard);
shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
public void testCanCommitBeforeReadyFailure() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCanCommitBeforeReadyFailure");
shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
public void testAbortTransaction() throws Throwable {
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testAbortTransaction");
waitUntilLeader(shard);
InOrder inOrder = inOrder(cohort1, cohort2);
inOrder.verify(cohort1).canCommit();
inOrder.verify(cohort2).canCommit();
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
Creator<Shard> creator = new Creator<Shard>() {
@Override
public Shard create() throws Exception {
- return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
+ return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
dataStoreContext, SCHEMA_CONTEXT) {
@Override
public void saveSnapshot(Object snapshot) {
shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
package org.opendaylight.controller.cluster.datastore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
}
protected void waitUntilLeader(ActorRef shard) {
+ FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
for(int i = 0; i < 20 * 5; i++) {
- Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(5, TimeUnit.SECONDS));
+ Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
try {
- FindLeaderReply resp = (FindLeaderReply)Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+ FindLeaderReply resp = (FindLeaderReply)Await.result(future, duration);
if(resp.getLeaderActor() != null) {
return;
}
- } catch (Exception e) {
+ } catch(TimeoutException e) {
+ } catch(Exception e) {
+ System.err.println("FindLeader threw ex");
e.printStackTrace();
}
+
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * A mock DataChangeListener implementation.
+ *
+ * @author Thomas Pantelis
+ */
+public class MockDataChangeListener implements
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
+
+ private final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ changeList = Lists.newArrayList();
+ private final CountDownLatch changeLatch;
+ private final int expChangeEventCount;
+
+ public MockDataChangeListener(int expChangeEventCount) {
+ changeLatch = new CountDownLatch(expChangeEventCount);
+ this.expChangeEventCount = expChangeEventCount;
+ }
+
+ @Override
+ public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+ changeList.add(change);
+ changeLatch.countDown();
+ }
+
+ public void waitForChangeEvents(YangInstanceIdentifier... expPaths) {
+ assertEquals("Change notifications complete", true,
+ Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS));
+
+ for(int i = 0; i < expPaths.length; i++) {
+ assertTrue(String.format("Change %d does not contain %s", (i+1), expPaths[i]),
+ changeList.get(i).getCreatedData().containsKey(expPaths[i]));
+ }
+ }
+
+ public void expectNoMoreChanges(String assertMsg) {
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ assertEquals(assertMsg, expChangeEventCount, changeList.size());
+ }
+}
{
darknessFactor.set( darkness );
}
+
+ LOG.info("onDataChanged - new Toaster config: {}", toaster);
}
}