package org.opendaylight.controller.cluster.datastore.messages;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class DataChanged implements SerializableMessage {
- public static final Class<DataChangeListenerMessages.DataChanged> SERIALIZABLE_CLASS =
- DataChangeListenerMessages.DataChanged.class;
- final private SchemaContext schemaContext;
- private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
- change;
+public class DataChanged implements Externalizable {
+ private static final long serialVersionUID = 1L;
+ private AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
+ public DataChanged() {
+ }
- public DataChanged(SchemaContext schemaContext,
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+ public DataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
this.change = change;
- this.schemaContext = schemaContext;
}
-
public AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> getChange() {
return change;
}
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ in.readShort(); // Read the version
- private NormalizedNodeMessages.Node convertToNodeTree(
- NormalizedNode<?, ?> normalizedNode) {
+ NormalizedNodeDataInput streamReader = new NormalizedNodeInputStreamReader(in);
- return new NormalizedNodeToNodeCodec(schemaContext)
- .encode(normalizedNode)
- .getNormalizedNode();
+ // Note: the scope passed to builder is not actually used.
+ Builder builder = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE);
- }
+ // Read created data
- private Iterable<NormalizedNodeMessages.InstanceIdentifier> convertToRemovePaths(
- Set<YangInstanceIdentifier> removedPaths) {
- final Set<NormalizedNodeMessages.InstanceIdentifier> removedPathInstanceIds = new HashSet<>();
- for (YangInstanceIdentifier id : removedPaths) {
- removedPathInstanceIds.add(InstanceIdentifierUtils.toSerializable(id));
+ int size = in.readInt();
+ for(int i = 0; i < size; i++) {
+ YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+ NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+ builder.addCreated(path, node);
}
- return new Iterable<NormalizedNodeMessages.InstanceIdentifier>() {
- @Override
- public Iterator<NormalizedNodeMessages.InstanceIdentifier> iterator() {
- return removedPathInstanceIds.iterator();
- }
- };
- }
+ // Read updated data
- private NormalizedNodeMessages.NodeMap convertToNodeMap(
- Map<YangInstanceIdentifier, NormalizedNode<?, ?>> data) {
- NormalizedNodeToNodeCodec normalizedNodeToNodeCodec =
- new NormalizedNodeToNodeCodec(schemaContext);
- NormalizedNodeMessages.NodeMap.Builder nodeMapBuilder =
- NormalizedNodeMessages.NodeMap.newBuilder();
- NormalizedNodeMessages.NodeMapEntry.Builder builder =
- NormalizedNodeMessages.NodeMapEntry.newBuilder();
- for (Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data
- .entrySet()) {
-
-
- NormalizedNodeMessages.InstanceIdentifier instanceIdentifier =
- InstanceIdentifierUtils.toSerializable(entry.getKey());
-
- builder.setInstanceIdentifierPath(instanceIdentifier)
- .setNormalizedNode(normalizedNodeToNodeCodec
- .encode(entry.getValue())
- .getNormalizedNode());
- nodeMapBuilder.addMapEntries(builder.build());
+ size = in.readInt();
+ for(int i = 0; i < size; i++) {
+ YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+ NormalizedNode<?, ?> before = streamReader.readNormalizedNode();
+ NormalizedNode<?, ?> after = streamReader.readNormalizedNode();
+ builder.addUpdated(path, before, after);
}
- return nodeMapBuilder.build();
- }
-
- @Override
- public Object toSerializable() {
- return DataChangeListenerMessages.DataChanged.newBuilder()
- .addAllRemovedPaths(convertToRemovePaths(change.getRemovedPaths()))
- .setCreatedData(convertToNodeMap(change.getCreatedData()))
- .setOriginalData(convertToNodeMap(change.getOriginalData()))
- .setUpdatedData(convertToNodeMap(change.getUpdatedData()))
- .setOriginalSubTree(convertToNodeTree(change.getOriginalSubtree()))
- .setUpdatedSubTree(convertToNodeTree(change.getUpdatedSubtree()))
- .build();
- }
+ // Read removed data
- public static DataChanged fromSerialize(SchemaContext sc, Object message,
- YangInstanceIdentifier pathId) {
- DataChangeListenerMessages.DataChanged dataChanged =
- (DataChangeListenerMessages.DataChanged) message;
- DataChangedEvent event = new DataChangedEvent(sc);
- if (dataChanged.getCreatedData() != null && dataChanged.getCreatedData()
- .isInitialized()) {
- event.setCreatedData(dataChanged.getCreatedData());
- }
- if (dataChanged.getOriginalData() != null && dataChanged
- .getOriginalData().isInitialized()) {
- event.setOriginalData(dataChanged.getOriginalData());
+ size = in.readInt();
+ for(int i = 0; i < size; i++) {
+ YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
+ NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
+ builder.addRemoved(path, node);
}
- if (dataChanged.getUpdatedData() != null && dataChanged.getUpdatedData()
- .isInitialized()) {
- event.setUpdateData(dataChanged.getUpdatedData());
- }
+ // Read original subtree
- if (dataChanged.getOriginalSubTree() != null && dataChanged
- .getOriginalSubTree().isInitialized()) {
- event.setOriginalSubtree(dataChanged.getOriginalSubTree(), pathId);
+ boolean present = in.readBoolean();
+ if(present) {
+ builder.setBefore(streamReader.readNormalizedNode());
}
- if (dataChanged.getUpdatedSubTree() != null && dataChanged
- .getUpdatedSubTree().isInitialized()) {
- event.setUpdatedSubtree(dataChanged.getOriginalSubTree(), pathId);
- }
+ // Read updated subtree
- if (dataChanged.getRemovedPathsList() != null && !dataChanged
- .getRemovedPathsList().isEmpty()) {
- event.setRemovedPaths(dataChanged.getRemovedPathsList());
+ present = in.readBoolean();
+ if(present) {
+ builder.setAfter(streamReader.readNormalizedNode());
}
- return new DataChanged(sc, event);
-
+ change = builder.build();
}
- static class DataChangedEvent implements
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
- private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData;
- private final NormalizedNodeToNodeCodec nodeCodec;
- private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updatedData;
- private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> originalData;
- private NormalizedNode<?, ?> originalSubTree;
- private NormalizedNode<?, ?> updatedSubTree;
- private Set<YangInstanceIdentifier> removedPathIds;
-
- DataChangedEvent(SchemaContext schemaContext) {
- nodeCodec = new NormalizedNodeToNodeCodec(schemaContext);
- }
-
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
- if(createdData == null){
- return Collections.emptyMap();
- }
- return createdData;
- }
-
- DataChangedEvent setCreatedData(
- NormalizedNodeMessages.NodeMap nodeMap) {
- this.createdData = convertNodeMapToMap(nodeMap);
- return this;
- }
-
- private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> convertNodeMapToMap(
- NormalizedNodeMessages.NodeMap nodeMap) {
- Map<YangInstanceIdentifier, NormalizedNode<?, ?>> mapEntries =
- new HashMap<YangInstanceIdentifier, NormalizedNode<?, ?>>();
- for (NormalizedNodeMessages.NodeMapEntry nodeMapEntry : nodeMap
- .getMapEntriesList()) {
- YangInstanceIdentifier id = InstanceIdentifierUtils
- .fromSerializable(nodeMapEntry.getInstanceIdentifierPath());
- mapEntries.put(id,
- nodeCodec.decode(nodeMapEntry.getNormalizedNode()));
- }
- return mapEntries;
- }
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeShort(DataStoreVersions.CURRENT_VERSION);
+ NormalizedNodeDataOutput streamWriter = NormalizedNodeInputOutput.newDataOutput(out);
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
- if(updatedData == null){
- return Collections.emptyMap();
- }
- return updatedData;
- }
+ // Write created data
- DataChangedEvent setUpdateData(NormalizedNodeMessages.NodeMap nodeMap) {
- this.updatedData = convertNodeMapToMap(nodeMap);
- return this;
+ Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData = change.getCreatedData();
+ out.writeInt(createdData.size());
+ for(Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: createdData.entrySet()) {
+ streamWriter.writeYangInstanceIdentifier(e.getKey());
+ streamWriter.writeNormalizedNode(e.getValue());
}
- @Override
- public Set<YangInstanceIdentifier> getRemovedPaths() {
- if (removedPathIds == null) {
- return Collections.emptySet();
- }
- return removedPathIds;
- }
+ // Write updated data
- public DataChangedEvent setRemovedPaths(List<NormalizedNodeMessages.InstanceIdentifier> removedPaths) {
- Set<YangInstanceIdentifier> removedIds = new HashSet<>();
- for (NormalizedNodeMessages.InstanceIdentifier path : removedPaths) {
- removedIds.add(InstanceIdentifierUtils.fromSerializable(path));
- }
- this.removedPathIds = removedIds;
- return this;
+ Map<YangInstanceIdentifier, NormalizedNode<?, ?>> originalData = change.getOriginalData();
+ Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updatedData = change.getUpdatedData();
+ out.writeInt(updatedData.size());
+ for(Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: updatedData.entrySet()) {
+ streamWriter.writeYangInstanceIdentifier(e.getKey());
+ streamWriter.writeNormalizedNode(originalData.get(e.getKey()));
+ streamWriter.writeNormalizedNode(e.getValue());
}
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getOriginalData() {
- if (originalData == null) {
- Collections.emptyMap();
- }
- return originalData;
- }
+ // Write removed data
- DataChangedEvent setOriginalData(
- NormalizedNodeMessages.NodeMap nodeMap) {
- this.originalData = convertNodeMapToMap(nodeMap);
- return this;
+ Set<YangInstanceIdentifier> removed = change.getRemovedPaths();
+ out.writeInt(removed.size());
+ for(YangInstanceIdentifier path: removed) {
+ streamWriter.writeYangInstanceIdentifier(path);
+ streamWriter.writeNormalizedNode(originalData.get(path));
}
- @Override
- public NormalizedNode<?, ?> getOriginalSubtree() {
- return originalSubTree;
- }
+ // Write original subtree
- DataChangedEvent setOriginalSubtree(NormalizedNodeMessages.Node node,
- YangInstanceIdentifier instanceIdentifierPath) {
- originalSubTree = nodeCodec.decode(node);
- return this;
+ NormalizedNode<?, ?> originalSubtree = change.getOriginalSubtree();
+ out.writeBoolean(originalSubtree != null);
+ if(originalSubtree != null) {
+ streamWriter.writeNormalizedNode(originalSubtree);
}
- @Override
- public NormalizedNode<?, ?> getUpdatedSubtree() {
- return updatedSubTree;
- }
+ // Write original subtree
- DataChangedEvent setUpdatedSubtree(NormalizedNodeMessages.Node node,
- YangInstanceIdentifier instanceIdentifierPath) {
- updatedSubTree = nodeCodec.decode(node);
- return this;
+ NormalizedNode<?, ?> updatedSubtree = change.getUpdatedSubtree();
+ out.writeBoolean(updatedSubtree != null);
+ if(updatedSubtree != null) {
+ streamWriter.writeNormalizedNode(updatedSubtree);
}
-
-
}
-
-
-
}