X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fmessages%2FDataChanged.java;h=c753ad27646f213237e0171a38d965922b23de2e;hp=5b5f076d43713b48e6245a35a6d21684ecf909cb;hb=8232a626b43fdd2f5799da0fbcfb0f02d3c8f4fb;hpb=7ca766e911670b348d68c191ba16a903f1bdc245 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChanged.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChanged.java index 5b5f076d43..c753ad2764 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChanged.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChanged.java @@ -8,255 +8,143 @@ package org.opendaylight.controller.cluster.datastore.messages; -import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; -import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Map; +import java.util.Set; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; -import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages; +import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent; +import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class DataChanged implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = - DataChangeListenerMessages.DataChanged.class; - final private SchemaContext schemaContext; - private final AsyncDataChangeEvent> - change; +public class DataChanged implements Externalizable { + private static final long serialVersionUID = 1L; + private AsyncDataChangeEvent> change; + public DataChanged() { + } - public DataChanged(SchemaContext schemaContext, - AsyncDataChangeEvent> change) { + public DataChanged(AsyncDataChangeEvent> change) { this.change = change; - this.schemaContext = schemaContext; } - public AsyncDataChangeEvent> getChange() { return change; } + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // Read the version + in.readShort(); - private NormalizedNodeMessages.Node convertToNodeTree( - NormalizedNode normalizedNode) { + NormalizedNodeDataInput streamReader = NormalizedNodeInputOutput.newDataInputWithoutValidation(in); - return new NormalizedNodeToNodeCodec(schemaContext) - .encode(normalizedNode) - .getNormalizedNode(); + // Note: the scope passed to builder is not actually used. + Builder builder = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE); - } + // Read created data - private Iterable convertToRemovePaths( - Set removedPaths) { - final Set removedPathInstanceIds = new HashSet<>(); - for (YangInstanceIdentifier id : removedPaths) { - removedPathInstanceIds.add(InstanceIdentifierUtils.toSerializable(id)); + int size = in.readInt(); + for (int i = 0; i < size; i++) { + YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier(); + NormalizedNode node = streamReader.readNormalizedNode(); + builder.addCreated(path, node); } - return new Iterable() { - @Override - public Iterator iterator() { - return removedPathInstanceIds.iterator(); - } - }; - } + // Read updated data - private NormalizedNodeMessages.NodeMap convertToNodeMap( - Map> data) { - NormalizedNodeToNodeCodec normalizedNodeToNodeCodec = - new NormalizedNodeToNodeCodec(schemaContext); - NormalizedNodeMessages.NodeMap.Builder nodeMapBuilder = - NormalizedNodeMessages.NodeMap.newBuilder(); - NormalizedNodeMessages.NodeMapEntry.Builder builder = - NormalizedNodeMessages.NodeMapEntry.newBuilder(); - for (Map.Entry> entry : data - .entrySet()) { - - - NormalizedNodeMessages.InstanceIdentifier instanceIdentifier = - InstanceIdentifierUtils.toSerializable(entry.getKey()); - - builder.setInstanceIdentifierPath(instanceIdentifier) - .setNormalizedNode(normalizedNodeToNodeCodec - .encode(entry.getValue()) - .getNormalizedNode()); - nodeMapBuilder.addMapEntries(builder.build()); + size = in.readInt(); + for (int i = 0; i < size; i++) { + YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier(); + NormalizedNode before = streamReader.readNormalizedNode(); + NormalizedNode after = streamReader.readNormalizedNode(); + builder.addUpdated(path, before, after); } - return nodeMapBuilder.build(); - } - - @Override - public Object toSerializable() { - return DataChangeListenerMessages.DataChanged.newBuilder() - .addAllRemovedPaths(convertToRemovePaths(change.getRemovedPaths())) - .setCreatedData(convertToNodeMap(change.getCreatedData())) - .setOriginalData(convertToNodeMap(change.getOriginalData())) - .setUpdatedData(convertToNodeMap(change.getUpdatedData())) - .setOriginalSubTree(convertToNodeTree(change.getOriginalSubtree())) - .setUpdatedSubTree(convertToNodeTree(change.getUpdatedSubtree())) - .build(); - } + // Read removed data - public static DataChanged fromSerialize(SchemaContext sc, Object message, - YangInstanceIdentifier pathId) { - DataChangeListenerMessages.DataChanged dataChanged = - (DataChangeListenerMessages.DataChanged) message; - DataChangedEvent event = new DataChangedEvent(sc); - if (dataChanged.getCreatedData() != null && dataChanged.getCreatedData() - .isInitialized()) { - event.setCreatedData(dataChanged.getCreatedData()); - } - if (dataChanged.getOriginalData() != null && dataChanged - .getOriginalData().isInitialized()) { - event.setOriginalData(dataChanged.getOriginalData()); + size = in.readInt(); + for (int i = 0; i < size; i++) { + YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier(); + NormalizedNode node = streamReader.readNormalizedNode(); + builder.addRemoved(path, node); } - if (dataChanged.getUpdatedData() != null && dataChanged.getUpdatedData() - .isInitialized()) { - event.setUpdateData(dataChanged.getUpdatedData()); - } + // Read original subtree - if (dataChanged.getOriginalSubTree() != null && dataChanged - .getOriginalSubTree().isInitialized()) { - event.setOriginalSubtree(dataChanged.getOriginalSubTree(), pathId); + boolean present = in.readBoolean(); + if (present) { + builder.setBefore(streamReader.readNormalizedNode()); } - if (dataChanged.getUpdatedSubTree() != null && dataChanged - .getUpdatedSubTree().isInitialized()) { - event.setUpdatedSubtree(dataChanged.getOriginalSubTree(), pathId); - } + // Read updated subtree - if (dataChanged.getRemovedPathsList() != null && !dataChanged - .getRemovedPathsList().isEmpty()) { - event.setRemovedPaths(dataChanged.getRemovedPathsList()); + present = in.readBoolean(); + if (present) { + builder.setAfter(streamReader.readNormalizedNode()); } - return new DataChanged(sc, event); - + change = builder.build(); } - static class DataChangedEvent implements - AsyncDataChangeEvent> { - private Map> createdData; - private final NormalizedNodeToNodeCodec nodeCodec; - private Map> updatedData; - private Map> originalData; - private NormalizedNode originalSubTree; - private NormalizedNode updatedSubTree; - private Set removedPathIds; - - DataChangedEvent(SchemaContext schemaContext) { - nodeCodec = new NormalizedNodeToNodeCodec(schemaContext); - } - - @Override - public Map> getCreatedData() { - if(createdData == null){ - return Collections.emptyMap(); - } - return createdData; - } - - DataChangedEvent setCreatedData( - NormalizedNodeMessages.NodeMap nodeMap) { - this.createdData = convertNodeMapToMap(nodeMap); - return this; - } - - private Map> convertNodeMapToMap( - NormalizedNodeMessages.NodeMap nodeMap) { - Map> mapEntries = - new HashMap>(); - for (NormalizedNodeMessages.NodeMapEntry nodeMapEntry : nodeMap - .getMapEntriesList()) { - YangInstanceIdentifier id = InstanceIdentifierUtils - .fromSerializable(nodeMapEntry.getInstanceIdentifierPath()); - mapEntries.put(id, - nodeCodec.decode(nodeMapEntry.getNormalizedNode())); - } - return mapEntries; - } + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeShort(DataStoreVersions.CURRENT_VERSION); + NormalizedNodeDataOutput streamWriter = NormalizedNodeInputOutput.newDataOutput(out); - @Override - public Map> getUpdatedData() { - if(updatedData == null){ - return Collections.emptyMap(); - } - return updatedData; - } + // Write created data - DataChangedEvent setUpdateData(NormalizedNodeMessages.NodeMap nodeMap) { - this.updatedData = convertNodeMapToMap(nodeMap); - return this; + Map> createdData = change.getCreatedData(); + out.writeInt(createdData.size()); + for (Map.Entry> e: createdData.entrySet()) { + streamWriter.writeYangInstanceIdentifier(e.getKey()); + streamWriter.writeNormalizedNode(e.getValue()); } - @Override - public Set getRemovedPaths() { - if (removedPathIds == null) { - return Collections.emptySet(); - } - return removedPathIds; - } + // Write updated data - public DataChangedEvent setRemovedPaths(List removedPaths) { - Set removedIds = new HashSet<>(); - for (NormalizedNodeMessages.InstanceIdentifier path : removedPaths) { - removedIds.add(InstanceIdentifierUtils.fromSerializable(path)); - } - this.removedPathIds = removedIds; - return this; + Map> originalData = change.getOriginalData(); + Map> updatedData = change.getUpdatedData(); + out.writeInt(updatedData.size()); + for (Map.Entry> e: updatedData.entrySet()) { + streamWriter.writeYangInstanceIdentifier(e.getKey()); + streamWriter.writeNormalizedNode(originalData.get(e.getKey())); + streamWriter.writeNormalizedNode(e.getValue()); } - @Override - public Map> getOriginalData() { - if (originalData == null) { - Collections.emptyMap(); - } - return originalData; - } + // Write removed data - DataChangedEvent setOriginalData( - NormalizedNodeMessages.NodeMap nodeMap) { - this.originalData = convertNodeMapToMap(nodeMap); - return this; + Set removed = change.getRemovedPaths(); + out.writeInt(removed.size()); + for (YangInstanceIdentifier path: removed) { + streamWriter.writeYangInstanceIdentifier(path); + streamWriter.writeNormalizedNode(originalData.get(path)); } - @Override - public NormalizedNode getOriginalSubtree() { - return originalSubTree; - } + // Write original subtree - DataChangedEvent setOriginalSubtree(NormalizedNodeMessages.Node node, - YangInstanceIdentifier instanceIdentifierPath) { - originalSubTree = nodeCodec.decode(node); - return this; + NormalizedNode originalSubtree = change.getOriginalSubtree(); + out.writeBoolean(originalSubtree != null); + if (originalSubtree != null) { + streamWriter.writeNormalizedNode(originalSubtree); } - @Override - public NormalizedNode getUpdatedSubtree() { - return updatedSubTree; - } + // Write original subtree - DataChangedEvent setUpdatedSubtree(NormalizedNodeMessages.Node node, - YangInstanceIdentifier instanceIdentifierPath) { - updatedSubTree = nodeCodec.decode(node); - return this; + NormalizedNode updatedSubtree = change.getUpdatedSubtree(); + out.writeBoolean(updatedSubtree != null); + if (updatedSubtree != null) { + streamWriter.writeNormalizedNode(updatedSubtree); } - - } - - - }