X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fmessages%2FDataChanged.java;h=97231dc366f38b24efe7b7424f8cec1da3f9e312;hb=8ec73bf853a9b6708b455c0321a585992e02b125;hp=a8827bebf4386b1a63918a30cb1b31faa81335a8;hpb=0eb621d29daaf08979c356e2148e99c48458e169;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChanged.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChanged.java index a8827bebf4..97231dc366 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChanged.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChanged.java @@ -8,256 +8,143 @@ package org.opendaylight.controller.cluster.datastore.messages; -import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; -import org.opendaylight.controller.cluster.datastore.utils.InstanceIdentifierUtils; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Map; +import java.util.Set; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; -import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages; +import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent; +import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; - -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -public class DataChanged implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = - DataChangeListenerMessages.DataChanged.class; - final private SchemaContext schemaContext; - private final AsyncDataChangeEvent> - change; +public class DataChanged implements Externalizable { + private static final long serialVersionUID = 1L; + private AsyncDataChangeEvent> change; + public DataChanged() { + } - public DataChanged(SchemaContext schemaContext, - AsyncDataChangeEvent> change) { + public DataChanged(AsyncDataChangeEvent> change) { this.change = change; - this.schemaContext = schemaContext; } - public AsyncDataChangeEvent> getChange() { return change; } + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + in.readShort(); // Read the version - private NormalizedNodeMessages.Node convertToNodeTree( - NormalizedNode normalizedNode) { + NormalizedNodeDataInput streamReader = new NormalizedNodeInputStreamReader(in); - return new NormalizedNodeToNodeCodec(schemaContext) - .encode(YangInstanceIdentifier.builder().build(), normalizedNode) - .getNormalizedNode(); + // Note: the scope passed to builder is not actually used. + Builder builder = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE); - } + // Read created data - private Iterable convertToRemovePaths( - Set removedPaths) { - final Set removedPathInstanceIds = new HashSet<>(); - for (YangInstanceIdentifier id : removedPaths) { - removedPathInstanceIds.add(InstanceIdentifierUtils.toSerializable(id)); + int size = in.readInt(); + for(int i = 0; i < size; i++) { + YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier(); + NormalizedNode node = streamReader.readNormalizedNode(); + builder.addCreated(path, node); } - return new Iterable() { - public Iterator iterator() { - return removedPathInstanceIds.iterator(); - } - }; - } + // Read updated data - private NormalizedNodeMessages.NodeMap convertToNodeMap( - Map> data) { - NormalizedNodeToNodeCodec normalizedNodeToNodeCodec = - new NormalizedNodeToNodeCodec(schemaContext); - NormalizedNodeMessages.NodeMap.Builder nodeMapBuilder = - NormalizedNodeMessages.NodeMap.newBuilder(); - NormalizedNodeMessages.NodeMapEntry.Builder builder = - NormalizedNodeMessages.NodeMapEntry.newBuilder(); - for (Map.Entry> entry : data - .entrySet()) { - - - NormalizedNodeMessages.InstanceIdentifier instanceIdentifier = - InstanceIdentifierUtils.toSerializable(entry.getKey()); - - builder.setInstanceIdentifierPath(instanceIdentifier) - .setNormalizedNode(normalizedNodeToNodeCodec - .encode(entry.getKey(), entry.getValue()) - .getNormalizedNode()); - nodeMapBuilder.addMapEntries(builder.build()); + size = in.readInt(); + for(int i = 0; i < size; i++) { + YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier(); + NormalizedNode before = streamReader.readNormalizedNode(); + NormalizedNode after = streamReader.readNormalizedNode(); + builder.addUpdated(path, before, after); } - return nodeMapBuilder.build(); - } - - @Override - public Object toSerializable() { - return DataChangeListenerMessages.DataChanged.newBuilder() - .addAllRemovedPaths(convertToRemovePaths(change.getRemovedPaths())) - .setCreatedData(convertToNodeMap(change.getCreatedData())) - .setOriginalData(convertToNodeMap(change.getOriginalData())) - .setUpdatedData(convertToNodeMap(change.getUpdatedData())) - .setOriginalSubTree(convertToNodeTree(change.getOriginalSubtree())) - .setUpdatedSubTree(convertToNodeTree(change.getUpdatedSubtree())) - .build(); - } + // Read removed data - public static DataChanged fromSerialize(SchemaContext sc, Object message, - YangInstanceIdentifier pathId) { - DataChangeListenerMessages.DataChanged dataChanged = - (DataChangeListenerMessages.DataChanged) message; - DataChangedEvent event = new DataChangedEvent(sc); - if (dataChanged.getCreatedData() != null && dataChanged.getCreatedData() - .isInitialized()) { - event.setCreatedData(dataChanged.getCreatedData()); - } - if (dataChanged.getOriginalData() != null && dataChanged - .getOriginalData().isInitialized()) { - event.setOriginalData(dataChanged.getOriginalData()); + size = in.readInt(); + for(int i = 0; i < size; i++) { + YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier(); + NormalizedNode node = streamReader.readNormalizedNode(); + builder.addRemoved(path, node); } - if (dataChanged.getUpdatedData() != null && dataChanged.getUpdatedData() - .isInitialized()) { - event.setUpdateData(dataChanged.getUpdatedData()); - } + // Read original subtree - if (dataChanged.getOriginalSubTree() != null && dataChanged - .getOriginalSubTree().isInitialized()) { - event.setOriginalSubtree(dataChanged.getOriginalSubTree(), pathId); + boolean present = in.readBoolean(); + if(present) { + builder.setBefore(streamReader.readNormalizedNode()); } - if (dataChanged.getUpdatedSubTree() != null && dataChanged - .getUpdatedSubTree().isInitialized()) { - event.setUpdatedSubtree(dataChanged.getOriginalSubTree(), pathId); - } + // Read updated subtree - if (dataChanged.getRemovedPathsList() != null && !dataChanged - .getRemovedPathsList().isEmpty()) { - event.setRemovedPaths(dataChanged.getRemovedPathsList()); + present = in.readBoolean(); + if(present) { + builder.setAfter(streamReader.readNormalizedNode()); } - return new DataChanged(sc, event); - + change = builder.build(); } - static class DataChangedEvent implements - AsyncDataChangeEvent> { - private final SchemaContext schemaContext; - private Map> createdData; - private final NormalizedNodeToNodeCodec nodeCodec; - private Map> updatedData; - private Map> originalData; - private NormalizedNode originalSubTree; - private NormalizedNode updatedSubTree; - private Set removedPathIds; - - DataChangedEvent(SchemaContext schemaContext) { - this.schemaContext = schemaContext; - nodeCodec = new NormalizedNodeToNodeCodec(schemaContext); - } - - @Override - public Map> getCreatedData() { - if(createdData == null){ - return Collections.emptyMap(); - } - return createdData; - } - - DataChangedEvent setCreatedData( - NormalizedNodeMessages.NodeMap nodeMap) { - this.createdData = convertNodeMapToMap(nodeMap); - return this; - } - - private Map> convertNodeMapToMap( - NormalizedNodeMessages.NodeMap nodeMap) { - Map> mapEntries = - new HashMap>(); - for (NormalizedNodeMessages.NodeMapEntry nodeMapEntry : nodeMap - .getMapEntriesList()) { - YangInstanceIdentifier id = InstanceIdentifierUtils - .fromSerializable(nodeMapEntry.getInstanceIdentifierPath()); - mapEntries.put(id, - nodeCodec.decode(id, nodeMapEntry.getNormalizedNode())); - } - return mapEntries; - } + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeShort(DataStoreVersions.CURRENT_VERSION); + NormalizedNodeDataOutput streamWriter = NormalizedNodeInputOutput.newDataOutput(out); - @Override - public Map> getUpdatedData() { - if(updatedData == null){ - return Collections.emptyMap(); - } - return updatedData; - } + // Write created data - DataChangedEvent setUpdateData(NormalizedNodeMessages.NodeMap nodeMap) { - this.updatedData = convertNodeMapToMap(nodeMap); - return this; + Map> createdData = change.getCreatedData(); + out.writeInt(createdData.size()); + for(Map.Entry> e: createdData.entrySet()) { + streamWriter.writeYangInstanceIdentifier(e.getKey()); + streamWriter.writeNormalizedNode(e.getValue()); } - @Override - public Set getRemovedPaths() { - if (removedPathIds == null) { - return Collections.emptySet(); - } - return removedPathIds; - } + // Write updated data - public DataChangedEvent setRemovedPaths(List removedPaths) { - Set removedIds = new HashSet<>(); - for (NormalizedNodeMessages.InstanceIdentifier path : removedPaths) { - removedIds.add(InstanceIdentifierUtils.fromSerializable(path)); - } - this.removedPathIds = removedIds; - return this; + Map> originalData = change.getOriginalData(); + Map> updatedData = change.getUpdatedData(); + out.writeInt(updatedData.size()); + for(Map.Entry> e: updatedData.entrySet()) { + streamWriter.writeYangInstanceIdentifier(e.getKey()); + streamWriter.writeNormalizedNode(originalData.get(e.getKey())); + streamWriter.writeNormalizedNode(e.getValue()); } - @Override - public Map> getOriginalData() { - if (originalData == null) { - Collections.emptyMap(); - } - return originalData; - } + // Write removed data - DataChangedEvent setOriginalData( - NormalizedNodeMessages.NodeMap nodeMap) { - this.originalData = convertNodeMapToMap(nodeMap); - return this; + Set removed = change.getRemovedPaths(); + out.writeInt(removed.size()); + for(YangInstanceIdentifier path: removed) { + streamWriter.writeYangInstanceIdentifier(path); + streamWriter.writeNormalizedNode(originalData.get(path)); } - @Override - public NormalizedNode getOriginalSubtree() { - return originalSubTree; - } + // Write original subtree - DataChangedEvent setOriginalSubtree(NormalizedNodeMessages.Node node, - YangInstanceIdentifier instanceIdentifierPath) { - originalSubTree = nodeCodec.decode(instanceIdentifierPath, node); - return this; + NormalizedNode originalSubtree = change.getOriginalSubtree(); + out.writeBoolean(originalSubtree != null); + if(originalSubtree != null) { + streamWriter.writeNormalizedNode(originalSubtree); } - @Override - public NormalizedNode getUpdatedSubtree() { - return updatedSubTree; - } + // Write original subtree - DataChangedEvent setUpdatedSubtree(NormalizedNodeMessages.Node node, - YangInstanceIdentifier instanceIdentifierPath) { - updatedSubTree = nodeCodec.decode(instanceIdentifierPath, node); - return this; + NormalizedNode updatedSubtree = change.getUpdatedSubtree(); + out.writeBoolean(updatedSubtree != null); + if(updatedSubtree != null) { + streamWriter.writeNormalizedNode(updatedSubtree); } - - } - - - }