Adjust for odlparent 3 / yangtools 2
[ovsdb.git] / library / impl / src / main / java / org / opendaylight / ovsdb / lib / jsonrpc / JsonRpcDecoder.java
1 /*
2  * Copyright (c) 2013, 2017 EBay Software Foundation and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.lib.jsonrpc;
10
11 import com.fasterxml.jackson.core.JsonEncoding;
12 import com.fasterxml.jackson.core.JsonFactory;
13 import com.fasterxml.jackson.core.JsonParser;
14 import com.fasterxml.jackson.core.io.IOContext;
15 import com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper;
16 import com.fasterxml.jackson.core.util.BufferRecycler;
17 import com.fasterxml.jackson.databind.JsonNode;
18 import com.fasterxml.jackson.databind.MappingJsonFactory;
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.ByteBufInputStream;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.handler.codec.ByteToMessageDecoder;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.util.List;
26 import org.opendaylight.ovsdb.lib.error.InvalidEncodingException;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * JSON RPC 1.0 compatible decoder capable of decoding JSON messages from a TCP stream.
32  * The stream is framed first by inspecting the json for valid end marker (left curly)
33  * and is passed to a Json parser (jackson) for converting into an object model.
34  *
35  * <p>There are no JSON parsers that I am aware of that does non blocking parsing.
36  * This approach avoids having to run json parser over and over again on the entire
37  * stream waiting for input. Parser is invoked only when we know of a full JSON message
38  * in the stream.
39  */
40 public class JsonRpcDecoder extends ByteToMessageDecoder {
41
42     private static final Logger LOG = LoggerFactory.getLogger(JsonRpcDecoder.class);
43     private final int maxFrameLength;
44     //Indicates if the frame limit warning was issued
45     private boolean maxFrameLimitWasReached = false;
46     private final JsonFactory jacksonJsonFactory = new MappingJsonFactory();
47
48     private final IOContext jacksonIOContext = new IOContext(new BufferRecycler(), null, false);
49
50     // context for the previously read incomplete records
51     private int lastRecordBytes = 0;
52     private int leftCurlies = 0;
53     private int rightCurlies = 0;
54     private boolean inS = false;
55
56     private int recordsRead;
57
58     public JsonRpcDecoder(int maxFrameLength) {
59         this.maxFrameLength = maxFrameLength;
60     }
61
62     @Override
63     protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
64
65         LOG.trace("readable bytes {}, records read {}, incomplete record bytes {}",
66                 buf.readableBytes(), recordsRead, lastRecordBytes);
67
68         if (lastRecordBytes == 0) {
69             if (buf.readableBytes() < 4) {
70                 return; //wait for more data
71             }
72
73             skipSpaces(buf);
74
75             byte[] buff = new byte[4];
76             buf.getBytes(buf.readerIndex(), buff);
77             ByteSourceJsonBootstrapper strapper = new ByteSourceJsonBootstrapper(jacksonIOContext, buff, 0, 4);
78             JsonEncoding jsonEncoding = strapper.detectEncoding();
79             if (!JsonEncoding.UTF8.equals(jsonEncoding)) {
80                 throw new InvalidEncodingException(jsonEncoding.getJavaName(), "currently only UTF-8 is supported");
81             }
82         }
83
84         int index = lastRecordBytes + buf.readerIndex();
85
86         for (; index < buf.writerIndex(); index++) {
87             switch (buf.getByte(index)) {
88                 case '{':
89                     if (!inS) {
90                         leftCurlies++;
91                     }
92                     break;
93                 case '}':
94                     if (!inS) {
95                         rightCurlies++;
96                     }
97                     break;
98                 case '"':
99                     if (buf.getByte(index - 1) != '\\') {
100                         inS = !inS;
101                     }
102                     break;
103                 default:
104                     break;
105             }
106
107             if (leftCurlies != 0 && leftCurlies == rightCurlies && !inS) {
108                 ByteBuf slice = buf.readSlice(1 + index - buf.readerIndex());
109                 JsonParser jp = jacksonJsonFactory.createParser((InputStream) new ByteBufInputStream(slice));
110                 JsonNode root = jp.readValueAsTree();
111                 out.add(root);
112                 leftCurlies = 0;
113                 rightCurlies = 0;
114                 lastRecordBytes = 0;
115                 recordsRead++;
116                 break;
117             }
118
119             /*
120              * Changing this limit to being a warning, we do not wish to "break" in scale environment
121              * and currently this limits the ovs of having only around 50 ports defined...
122              * I do acknowledge the fast that this might be risky in case of huge amount of strings
123              * in which the controller can crash with an OOM, however seems that we need a really huge
124              * ovs to reach that limit.
125              */
126
127             //We do not want to issue a log message on every extent of the buffer
128             //hence logging only once
129             if (index - buf.readerIndex() >= maxFrameLength && !maxFrameLimitWasReached) {
130                 maxFrameLimitWasReached = true;
131                 LOG.warn("***** OVSDB Frame limit of {} bytes has been reached! *****", this.maxFrameLength);
132             }
133         }
134
135         // end of stream, save the incomplete record index to avoid reexamining the whole on next run
136         if (index >= buf.writerIndex()) {
137             lastRecordBytes = buf.readableBytes();
138         }
139     }
140
141     public int getRecordsRead() {
142         return recordsRead;
143     }
144
145     private static void skipSpaces(ByteBuf byteBuf) throws IOException {
146         while (byteBuf.isReadable()) {
147             int ch = byteBuf.getByte(byteBuf.readerIndex()) & 0xFF;
148             if (!(ch == ' ' || ch == '\r' || ch == '\n' || ch == '\t')) {
149                 return;
150             } else {
151                 byteBuf.readByte(); //move the read index
152             }
153         }
154     }
155 }