Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / actors / JsonExportActor.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.actors;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.Props;
14 import com.google.gson.stream.JsonWriter;
15 import java.io.IOException;
16 import java.nio.file.Files;
17 import java.nio.file.Path;
18 import java.nio.file.Paths;
19 import java.util.ArrayList;
20 import java.util.Iterator;
21 import java.util.List;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
25 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
26 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
28 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
29 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
30 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
31 import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
32 import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter;
33 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
34 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidateNode;
35 import org.opendaylight.yangtools.yang.data.tree.api.ModificationType;
36 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
37 import org.opendaylight.yangtools.yang.model.util.SchemaInferenceStack;
38
39 public final class JsonExportActor extends AbstractUntypedActor {
40     // Internal messages
41     public static final class ExportSnapshot {
42         private final String id;
43
44         private final DataTreeCandidate dataTreeCandidate;
45
46         public ExportSnapshot(final DataTreeCandidate candidate, final String id) {
47             dataTreeCandidate = requireNonNull(candidate);
48             this.id = requireNonNull(id);
49         }
50     }
51
52     public static final class ExportJournal {
53         private final ReplicatedLogEntry replicatedLogEntry;
54
55         public ExportJournal(final ReplicatedLogEntry replicatedLogEntry) {
56             this.replicatedLogEntry = requireNonNull(replicatedLogEntry);
57         }
58     }
59
60     public static final class FinishExport {
61         private final String id;
62
63         public FinishExport(final String id) {
64             this.id = requireNonNull(id);
65         }
66     }
67
68     private final List<ReplicatedLogEntry> entries = new ArrayList<>();
69     private final @NonNull EffectiveModelContext schemaContext;
70     private final @NonNull Path baseDirPath;
71
72     private JsonExportActor(final EffectiveModelContext schemaContext, final Path dirPath) {
73         this.schemaContext = requireNonNull(schemaContext);
74         baseDirPath = requireNonNull(dirPath);
75     }
76
77     public static Props props(final EffectiveModelContext schemaContext, final String dirPath) {
78         return Props.create(JsonExportActor.class, schemaContext, Paths.get(dirPath));
79     }
80
81     @Override
82     protected void handleReceive(final Object message) {
83         if (message instanceof ExportSnapshot) {
84             onExportSnapshot((ExportSnapshot) message);
85         } else if (message instanceof ExportJournal) {
86             onExportJournal((ExportJournal) message);
87         } else if (message instanceof FinishExport) {
88             onFinishExport((FinishExport)message);
89         } else {
90             unknownMessage(message);
91         }
92     }
93
94     private void onExportSnapshot(final ExportSnapshot exportSnapshot) {
95         final Path snapshotDir = baseDirPath.resolve("snapshots");
96         createDir(snapshotDir);
97
98         final Path filePath = snapshotDir.resolve(exportSnapshot.id + "-snapshot.json");
99         LOG.debug("Creating JSON file : {}", filePath);
100
101         final NormalizedNode root = exportSnapshot.dataTreeCandidate.getRootNode().getDataAfter();
102         checkState(root instanceof NormalizedNodeContainer, "Unexpected root %s", root);
103
104         writeSnapshot(filePath, (NormalizedNodeContainer<?>) root);
105         LOG.debug("Created JSON file: {}", filePath);
106     }
107
108     private void onExportJournal(final ExportJournal exportJournal) {
109         entries.add(exportJournal.replicatedLogEntry);
110     }
111
112     private void onFinishExport(final FinishExport finishExport) {
113         final Path journalDir = baseDirPath.resolve("journals");
114         createDir(journalDir);
115
116         final Path filePath = journalDir.resolve(finishExport.id + "-journal.json");
117         LOG.debug("Creating JSON file : {}", filePath);
118         writeJournal(filePath);
119         LOG.debug("Created JSON file: {}", filePath);
120     }
121
122     private void writeSnapshot(final Path path, final NormalizedNodeContainer<?> root) {
123         try (JsonWriter jsonWriter = new JsonWriter(Files.newBufferedWriter(path))) {
124             jsonWriter.beginObject();
125
126             try (var nnWriter = NormalizedNodeWriter.forStreamWriter(JSONNormalizedNodeStreamWriter.createNestedWriter(
127                     JSONCodecFactorySupplier.RFC7951.getShared(schemaContext),
128                     SchemaInferenceStack.of(schemaContext).toInference(), null, jsonWriter),
129                 true)) {
130                 for (NormalizedNode node : root.body()) {
131                     nnWriter.write(node);
132                 }
133             }
134
135             jsonWriter.endObject();
136         } catch (IOException e) {
137             LOG.error("Failed to export stapshot to {}", path, e);
138         }
139     }
140
141     private void writeJournal(final Path path) {
142         try (JsonWriter jsonWriter = new JsonWriter(Files.newBufferedWriter(path))) {
143             jsonWriter.beginObject().name("Entries");
144             jsonWriter.beginArray();
145             for (var entry : entries) {
146                 final var data = entry.getData();
147                 if (data instanceof CommitTransactionPayload payload) {
148                     final var candidate = payload.getCandidate().candidate();
149                     writeNode(jsonWriter, candidate);
150                 } else {
151                     jsonWriter.beginObject().name("Payload").value(data.toString()).endObject();
152                 }
153             }
154             jsonWriter.endArray();
155             jsonWriter.endObject();
156         } catch (IOException e) {
157             LOG.error("Failed to export journal to {}", path, e);
158         }
159     }
160
161     private static void writeNode(final JsonWriter writer, final DataTreeCandidate candidate) throws IOException {
162         writer.beginObject().name("Entry").beginArray();
163         doWriteNode(writer, candidate.getRootPath(), candidate.getRootNode());
164         writer.endArray().endObject();
165     }
166
167     private static void doWriteNode(final JsonWriter writer, final YangInstanceIdentifier path,
168             final DataTreeCandidateNode node) throws IOException {
169         switch (node.modificationType()) {
170             case APPEARED:
171             case DISAPPEARED:
172             case SUBTREE_MODIFIED:
173                 NodeIterator iterator = new NodeIterator(null, path, node.childNodes().iterator());
174                 do {
175                     iterator = iterator.next(writer);
176                 } while (iterator != null);
177                 break;
178             case DELETE:
179             case UNMODIFIED:
180             case WRITE:
181                 outputNodeInfo(writer, path, node);
182                 break;
183             default:
184                 outputDefault(writer, path, node);
185         }
186     }
187
188     private static void outputNodeInfo(final JsonWriter writer, final YangInstanceIdentifier path,
189                                        final DataTreeCandidateNode node) throws IOException {
190         final ModificationType modificationType = node.modificationType();
191
192         writer.beginObject().name("Node");
193         writer.beginArray();
194         writer.beginObject().name("Path").value(path.toString()).endObject();
195         writer.beginObject().name("ModificationType").value(modificationType.toString()).endObject();
196         if (modificationType == ModificationType.WRITE) {
197             writer.beginObject().name("Data").value(node.getDataAfter().body().toString()).endObject();
198         }
199         writer.endArray();
200         writer.endObject();
201     }
202
203     private static void outputDefault(final JsonWriter writer, final YangInstanceIdentifier path,
204                                       final DataTreeCandidateNode node) throws IOException {
205         writer.beginObject().name("Node");
206         writer.beginArray();
207         writer.beginObject().name("Path").value(path.toString()).endObject();
208         writer.beginObject().name("ModificationType")
209                 .value("UNSUPPORTED MODIFICATION: " + node.modificationType()).endObject();
210         writer.endArray();
211         writer.endObject();
212     }
213
214     private void createDir(final Path path) {
215         try {
216             Files.createDirectories(path);
217         } catch (IOException e) {
218             LOG.warn("Directory {} cannot be created", path, e);
219         }
220     }
221
222     private static final class NodeIterator {
223         private final Iterator<DataTreeCandidateNode> iterator;
224         private final YangInstanceIdentifier path;
225         private final NodeIterator parent;
226
227         NodeIterator(final @Nullable NodeIterator parent, final YangInstanceIdentifier path,
228                      final Iterator<DataTreeCandidateNode> iterator) {
229             this.iterator = requireNonNull(iterator);
230             this.path = requireNonNull(path);
231             this.parent = parent;
232         }
233
234         NodeIterator next(final JsonWriter writer) throws IOException {
235             while (iterator.hasNext()) {
236                 final var node = iterator.next();
237                 final var child = path.node(node.name());
238
239                 switch (node.modificationType()) {
240                     case APPEARED:
241                     case DISAPPEARED:
242                     case SUBTREE_MODIFIED:
243                         return new NodeIterator(this, child, node.childNodes().iterator());
244                     case DELETE:
245                     case UNMODIFIED:
246                     case WRITE:
247                         outputNodeInfo(writer, path, node);
248                         break;
249                     default:
250                         outputDefault(writer, child, node);
251                 }
252             }
253
254             return parent;
255         }
256     }
257 }