2 * Copyright (C) 2013 EBay Software Foundation
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 * Authors : Ashwin Raveendran, Madhu Venugopal, Thomas Bachman
10 package org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc;
13 import io.netty.buffer.ByteBuf;
14 import io.netty.buffer.ByteBufInputStream;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.handler.codec.ByteToMessageDecoder;
17 import io.netty.handler.codec.TooLongFrameException;
19 import java.util.List;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
24 import com.fasterxml.jackson.core.JsonEncoding;
25 import com.fasterxml.jackson.core.JsonFactory;
26 import com.fasterxml.jackson.core.JsonParser;
27 import com.fasterxml.jackson.core.io.IOContext;
28 import com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper;
29 import com.fasterxml.jackson.core.util.BufferRecycler;
30 import com.fasterxml.jackson.databind.JsonNode;
31 import com.fasterxml.jackson.databind.MappingJsonFactory;
34 * JSON RPC 1.0 compatible decoder capable of decoding JSON messages from a TCP stream.
35 * The stream is framed first by inspecting the json for valid end marker (left curly)
36 * and is passed to a Json parser (jackson) for converting into an object model.
38 * There are no JSON parsers that I am aware of that does non blocking parsing.
39 * This approach avoids having to run json parser over and over again on the entire
40 * stream waiting for input. Parser is invoked only when we know of a full JSON message
43 public class JsonRpcDecoder extends ByteToMessageDecoder {
45 protected static final Logger logger = LoggerFactory.getLogger(JsonRpcDecoder.class);
47 private final int maxFrameLength;
49 private final JsonFactory jacksonJsonFactory = new MappingJsonFactory();
51 private final IOContext jacksonIOContext = new IOContext(new BufferRecycler(), null, false);
53 // context for the previously read incomplete records
54 private int lastRecordBytes = 0;
55 private int leftCurlies = 0;
56 private int rightCurlies = 0;
57 private boolean inS = false;
59 private int recordsRead;
61 public JsonRpcDecoder(int maxFrameLength) {
62 this.maxFrameLength = maxFrameLength;
66 protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
68 logger.trace("readable bytes {}, records read {}, incomplete record bytes {}",
69 buf.readableBytes(), recordsRead, lastRecordBytes);
71 if (lastRecordBytes == 0) {
72 if (buf.readableBytes() < 4) {
73 return; //wait for more data
78 byte[] buff = new byte[4];
79 buf.getBytes(buf.readerIndex(), buff);
80 ByteSourceJsonBootstrapper strapper = new ByteSourceJsonBootstrapper(jacksonIOContext, buff, 0, 4);
81 JsonEncoding jsonEncoding = strapper.detectEncoding();
82 if (!JsonEncoding.UTF8.equals(jsonEncoding)) {
83 throw new InvalidEncodingException(jsonEncoding.getJavaName(), "currently only UTF-8 is supported");
87 int i = lastRecordBytes + buf.readerIndex();
89 for (; i < buf.writerIndex(); i++) {
90 switch (buf.getByte(i)) {
92 if (!inS) leftCurlies++;
95 if (!inS) rightCurlies++;
98 if (buf.getByte(i - 1) != '\\') inS = !inS;
105 if (leftCurlies != 0 && leftCurlies == rightCurlies && !inS) {
106 ByteBuf slice = buf.readSlice(1 + i - buf.readerIndex());
107 JsonParser jp = jacksonJsonFactory.createParser(new ByteBufInputStream(slice));
108 JsonNode root = jp.readValueAsTree();
110 leftCurlies = rightCurlies = lastRecordBytes = 0;
115 if (isEom(buf.getByte(i) & 0xFF)) {
116 // Dump up to this point in the buffer
117 buf.readerIndex(1 + i);
118 leftCurlies = rightCurlies = lastRecordBytes = 0;
119 ctx.fireChannelReadComplete();
121 else if (i - buf.readerIndex() >= maxFrameLength) {
122 fail(ctx, i - buf.readerIndex());
126 // end of stream, save the incomplete record index to avoid reexamining the whole on next run
127 if (i >= buf.writerIndex()) {
128 lastRecordBytes = buf.readableBytes();
133 public int getRecordsRead() {
137 private static boolean isEom(int ch) {
144 private static void skipSpaces(ByteBuf b) {
145 while (b.isReadable()) {
146 int ch = b.getByte(b.readerIndex()) & 0xFF;
147 if (!(ch == ' ' || ch == '\r' || ch == '\n' || ch == '\t' || ch == '\0')) {
150 b.readByte(); //move the read index
154 // copied from Netty decoder
155 private void fail(ChannelHandlerContext ctx, long frameLength) {
156 if (frameLength > 0) {
157 ctx.fireExceptionCaught(
158 new TooLongFrameException(
159 "frame length exceeds " + maxFrameLength +
160 ": " + frameLength + " - discarded"));
162 ctx.fireExceptionCaught(
163 new TooLongFrameException(
164 "frame length exceeds " + maxFrameLength +