2 * Copyright (c) 2013, 2017 EBay Software Foundation and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.ovsdb.lib.jsonrpc;
11 import com.fasterxml.jackson.core.JsonEncoding;
12 import com.fasterxml.jackson.core.JsonFactory;
13 import com.fasterxml.jackson.core.JsonParser;
14 import com.fasterxml.jackson.core.io.IOContext;
15 import com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper;
16 import com.fasterxml.jackson.core.util.BufferRecycler;
17 import com.fasterxml.jackson.databind.JsonNode;
18 import com.fasterxml.jackson.databind.MappingJsonFactory;
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.ByteBufInputStream;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.handler.codec.ByteToMessageDecoder;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.util.List;
26 import org.opendaylight.ovsdb.lib.error.InvalidEncodingException;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
31 * JSON RPC 1.0 compatible decoder capable of decoding JSON messages from a TCP stream.
32 * The stream is framed first by inspecting the json for valid end marker (left curly)
33 * and is passed to a Json parser (jackson) for converting into an object model.
35 * <p>There are no JSON parsers that I am aware of that does non blocking parsing.
36 * This approach avoids having to run json parser over and over again on the entire
37 * stream waiting for input. Parser is invoked only when we know of a full JSON message
40 public class JsonRpcDecoder extends ByteToMessageDecoder {
42 private static final Logger LOG = LoggerFactory.getLogger(JsonRpcDecoder.class);
43 private final int maxFrameLength;
44 //Indicates if the frame limit warning was issued
45 private boolean maxFrameLimitWasReached = false;
46 private final JsonFactory jacksonJsonFactory = new MappingJsonFactory();
48 private final IOContext jacksonIOContext = new IOContext(new BufferRecycler(), null, false);
50 // context for the previously read incomplete records
51 private int lastRecordBytes = 0;
52 private int leftCurlies = 0;
53 private int rightCurlies = 0;
54 private boolean inS = false;
56 private int recordsRead;
58 public JsonRpcDecoder(int maxFrameLength) {
59 this.maxFrameLength = maxFrameLength;
63 protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
65 LOG.trace("readable bytes {}, records read {}, incomplete record bytes {}",
66 buf.readableBytes(), recordsRead, lastRecordBytes);
68 if (lastRecordBytes == 0) {
69 if (buf.readableBytes() < 4) {
70 return; //wait for more data
75 byte[] buff = new byte[4];
76 buf.getBytes(buf.readerIndex(), buff);
77 ByteSourceJsonBootstrapper strapper = new ByteSourceJsonBootstrapper(jacksonIOContext, buff, 0, 4);
78 JsonEncoding jsonEncoding = strapper.detectEncoding();
79 if (!JsonEncoding.UTF8.equals(jsonEncoding)) {
80 throw new InvalidEncodingException(jsonEncoding.getJavaName(), "currently only UTF-8 is supported");
84 int index = lastRecordBytes + buf.readerIndex();
86 for (; index < buf.writerIndex(); index++) {
87 switch (buf.getByte(index)) {
99 if (buf.getByte(index - 1) != '\\') {
107 if (leftCurlies != 0 && leftCurlies == rightCurlies && !inS) {
108 ByteBuf slice = buf.readSlice(1 + index - buf.readerIndex());
109 JsonParser jp = jacksonJsonFactory.createParser((InputStream) new ByteBufInputStream(slice));
110 JsonNode root = jp.readValueAsTree();
120 * Changing this limit to being a warning, we do not wish to "break" in scale environment
121 * and currently this limits the ovs of having only around 50 ports defined...
122 * I do acknowledge the fast that this might be risky in case of huge amount of strings
123 * in which the controller can crash with an OOM, however seems that we need a really huge
124 * ovs to reach that limit.
127 //We do not want to issue a log message on every extent of the buffer
128 //hence logging only once
129 if (index - buf.readerIndex() >= maxFrameLength && !maxFrameLimitWasReached) {
130 maxFrameLimitWasReached = true;
131 LOG.warn("***** OVSDB Frame limit of {} bytes has been reached! *****", this.maxFrameLength);
135 // end of stream, save the incomplete record index to avoid reexamining the whole on next run
136 if (index >= buf.writerIndex()) {
137 lastRecordBytes = buf.readableBytes();
141 public int getRecordsRead() {
145 private static void skipSpaces(ByteBuf byteBuf) throws IOException {
146 while (byteBuf.isReadable()) {
147 int ch = byteBuf.getByte(byteBuf.readerIndex()) & 0xFF;
148 if (!(ch == ' ' || ch == '\r' || ch == '\n' || ch == '\t')) {
151 byteBuf.readByte(); //move the read index