2 * Copyright (C) 2013 EBay Software Foundation
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 * Authors : Ashwin Raveendran, Madhu Venugopal
10 package org.opendaylight.ovsdb.lib.jsonrpc;
13 import io.netty.buffer.ByteBuf;
14 import io.netty.buffer.ByteBufInputStream;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.handler.codec.ByteToMessageDecoder;
17 import io.netty.handler.codec.TooLongFrameException;
19 import java.io.IOException;
20 import java.util.List;
22 import org.opendaylight.ovsdb.lib.error.InvalidEncodingException;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 import com.fasterxml.jackson.core.JsonEncoding;
27 import com.fasterxml.jackson.core.JsonFactory;
28 import com.fasterxml.jackson.core.JsonParser;
29 import com.fasterxml.jackson.core.io.IOContext;
30 import com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper;
31 import com.fasterxml.jackson.core.util.BufferRecycler;
32 import com.fasterxml.jackson.databind.JsonNode;
33 import com.fasterxml.jackson.databind.MappingJsonFactory;
36 * JSON RPC 1.0 compatible decoder capable of decoding JSON messages from a TCP stream.
37 * The stream is framed first by inspecting the json for valid end marker (left curly)
38 * and is passed to a Json parser (jackson) for converting into an object model.
40 * There are no JSON parsers that I am aware of that does non blocking parsing.
41 * This approach avoids having to run json parser over and over again on the entire
42 * stream waiting for input. Parser is invoked only when we know of a full JSON message
45 public class JsonRpcDecoder extends ByteToMessageDecoder {
47 private static final Logger LOG = LoggerFactory.getLogger(JsonRpcDecoder.class);
48 private int maxFrameLength;
49 //Indicates if the frame limit warning was issued
50 private boolean maxFrameLimitWasReached = false;
51 private JsonFactory jacksonJsonFactory = new MappingJsonFactory();
53 private IOContext jacksonIOContext = new IOContext(new BufferRecycler(), null, false);
55 // context for the previously read incomplete records
56 private int lastRecordBytes = 0;
57 private int leftCurlies = 0;
58 private int rightCurlies = 0;
59 private boolean inS = false;
61 private int recordsRead;
63 public JsonRpcDecoder(int maxFrameLength) {
64 this.maxFrameLength = maxFrameLength;
68 protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
70 LOG.trace("readable bytes {}, records read {}, incomplete record bytes {}",
71 buf.readableBytes(), recordsRead, lastRecordBytes);
73 if (lastRecordBytes == 0) {
74 if (buf.readableBytes() < 4) {
75 return; //wait for more data
80 byte[] buff = new byte[4];
81 buf.getBytes(buf.readerIndex(), buff);
82 ByteSourceJsonBootstrapper strapper = new ByteSourceJsonBootstrapper(jacksonIOContext, buff, 0, 4);
83 JsonEncoding jsonEncoding = strapper.detectEncoding();
84 if (!JsonEncoding.UTF8.equals(jsonEncoding)) {
85 throw new InvalidEncodingException(jsonEncoding.getJavaName(), "currently only UTF-8 is supported");
89 int index = lastRecordBytes + buf.readerIndex();
91 for (; index < buf.writerIndex(); index++) {
92 switch (buf.getByte(index)) {
104 if (buf.getByte(index - 1) != '\\') {
112 if (leftCurlies != 0 && leftCurlies == rightCurlies && !inS) {
113 ByteBuf slice = buf.readSlice(1 + index - buf.readerIndex());
114 JsonParser jp = jacksonJsonFactory.createParser(new ByteBufInputStream(slice));
115 JsonNode root = jp.readValueAsTree();
117 leftCurlies = rightCurlies = lastRecordBytes = 0;
122 if (index - buf.readerIndex() >= maxFrameLength) {
124 * Changing this limit to being a warning, we do not wish to "break" in scale environment
125 * and currently this limits the ovs of having only around 50 ports defined...
126 * I do acknowledge the fast that this might be risky in case of huge amount of strings
127 * in which the controller can crash with an OOM, however seems that we need a really huge
128 * ovs to reach that limit.
131 //We do not want to issue a log message on every extent of the buffer
132 //hence logging only once
133 if (!maxFrameLimitWasReached) {
134 maxFrameLimitWasReached = true;
135 LOG.warn("***** OVSDB Frame limit of {} bytes has been reached! *****", this.maxFrameLength);
140 // end of stream, save the incomplete record index to avoid reexamining the whole on next run
141 if (index >= buf.writerIndex()) {
142 lastRecordBytes = buf.readableBytes();
147 public int getRecordsRead() {
151 private static void skipSpaces(ByteBuf byteBuf) throws IOException {
152 while (byteBuf.isReadable()) {
153 int ch = byteBuf.getByte(byteBuf.readerIndex()) & 0xFF;
154 if (!(ch == ' ' || ch == '\r' || ch == '\n' || ch == '\t')) {
157 byteBuf.readByte(); //move the read index
163 private void print(ByteBuf buf, String message) {
164 print(buf, buf.readerIndex(), buf.readableBytes(), message == null ? "buff" : message);
167 private void print(ByteBuf buf, int startPos, int chars, String message) {
168 if (null == message) {
171 if (startPos > buf.writerIndex()) {
172 LOG.trace("startPos out of bounds");
174 byte[] bytes = new byte[startPos + chars <= buf.writerIndex() ? chars : buf.writerIndex() - startPos];
175 buf.getBytes(startPos, bytes);
176 LOG.trace("{} ={}", message, new String(bytes));
179 // copied from Netty decoder
180 private void fail(ChannelHandlerContext ctx, long frameLength) {
181 if (frameLength > 0) {
182 ctx.fireExceptionCaught(
183 new TooLongFrameException(
184 "frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded"));
186 ctx.fireExceptionCaught(
187 new TooLongFrameException(
188 "frame length exceeds " + maxFrameLength + " - discarding"));