Merge "Added trunk model."
[ovsdb.git] / library / src / main / java / org / opendaylight / ovsdb / lib / jsonrpc / JsonRpcDecoder.java
1 /*
2  * Copyright (C) 2013 EBay Software Foundation
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Ashwin Raveendran, Madhu Venugopal
9  */
10 package org.opendaylight.ovsdb.lib.jsonrpc;
11
12
13 import io.netty.buffer.ByteBuf;
14 import io.netty.buffer.ByteBufInputStream;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.handler.codec.ByteToMessageDecoder;
17 import io.netty.handler.codec.TooLongFrameException;
18
19 import java.io.IOException;
20 import java.util.List;
21
22 import org.opendaylight.ovsdb.lib.error.InvalidEncodingException;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import com.fasterxml.jackson.core.JsonEncoding;
27 import com.fasterxml.jackson.core.JsonFactory;
28 import com.fasterxml.jackson.core.JsonParser;
29 import com.fasterxml.jackson.core.io.IOContext;
30 import com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper;
31 import com.fasterxml.jackson.core.util.BufferRecycler;
32 import com.fasterxml.jackson.databind.JsonNode;
33 import com.fasterxml.jackson.databind.MappingJsonFactory;
34
35 /**
36  * JSON RPC 1.0 compatible decoder capable of decoding JSON messages from a TCP stream.
37  * The stream is framed first by inspecting the json for valid end marker (left curly)
38  * and is passed to a Json parser (jackson) for converting into an object model.
39  *
40  * There are no JSON parsers that I am aware of that does non blocking parsing.
41  * This approach avoids having to run json parser over and over again on the entire
42  * stream waiting for input. Parser is invoked only when we know of a full JSON message
43  * in the stream.
44  */
45 public class JsonRpcDecoder extends ByteToMessageDecoder {
46
47     protected static final Logger logger = LoggerFactory.getLogger(JsonRpcDecoder.class);
48
49     private int maxFrameLength;
50
51     private JsonFactory jacksonJsonFactory = new MappingJsonFactory();
52
53     private IOContext jacksonIOContext = new IOContext(new BufferRecycler(), null, false);
54
55     // context for the previously read incomplete records
56     private int lastRecordBytes = 0;
57     private int leftCurlies = 0;
58     private int rightCurlies = 0;
59     private boolean inS = false;
60
61     private int recordsRead;
62
63     public JsonRpcDecoder(int maxFrameLength) {
64         this.maxFrameLength = maxFrameLength;
65     }
66
67     @Override
68     protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
69
70         logger.trace("readable bytes {}, records read {}, incomplete record bytes {}",
71                 buf.readableBytes(), recordsRead, lastRecordBytes);
72
73         if (lastRecordBytes == 0) {
74             if (buf.readableBytes() < 4) {
75                 return; //wait for more data
76             }
77
78             skipSpaces(buf);
79
80             byte[] buff = new byte[4];
81             buf.getBytes(buf.readerIndex(), buff);
82             ByteSourceJsonBootstrapper strapper = new ByteSourceJsonBootstrapper(jacksonIOContext, buff, 0, 4);
83             JsonEncoding jsonEncoding = strapper.detectEncoding();
84             if (!JsonEncoding.UTF8.equals(jsonEncoding)) {
85                 throw new InvalidEncodingException(jsonEncoding.getJavaName(), "currently only UTF-8 is supported");
86             }
87         }
88
89         int index = lastRecordBytes + buf.readerIndex();
90
91         for (; index < buf.writerIndex(); index++) {
92             switch (buf.getByte(index)) {
93                 case '{':
94                     if (!inS) {
95                         leftCurlies++;
96                     }
97                     break;
98                 case '}':
99                     if (!inS) {
100                         rightCurlies++;
101                     }
102                     break;
103                 case '"':
104                     if (buf.getByte(index - 1) != '\\') {
105                         inS = !inS;
106                     }
107                     break;
108                 default:
109                     break;
110             }
111
112             if (leftCurlies != 0 && leftCurlies == rightCurlies && !inS) {
113                 ByteBuf slice = buf.readSlice(1 + index - buf.readerIndex());
114                 JsonParser jp = jacksonJsonFactory.createParser(new ByteBufInputStream(slice));
115                 JsonNode root = jp.readValueAsTree();
116                 out.add(root);
117                 leftCurlies = rightCurlies = lastRecordBytes = 0;
118                 recordsRead++;
119                 break;
120             }
121
122             if (index - buf.readerIndex() >= maxFrameLength) {
123                 fail(ctx, index - buf.readerIndex());
124             }
125         }
126
127         // end of stream, save the incomplete record index to avoid reexamining the whole on next run
128         if (index >= buf.writerIndex()) {
129             lastRecordBytes = buf.readableBytes();
130             return;
131         }
132     }
133
134     public int getRecordsRead() {
135         return recordsRead;
136     }
137
138     private static void skipSpaces(ByteBuf byteBuf) throws IOException {
139         while (byteBuf.isReadable()) {
140             int ch = byteBuf.getByte(byteBuf.readerIndex()) & 0xFF;
141             if (!(ch == ' ' || ch == '\r' || ch == '\n' || ch == '\t')) {
142                 return;
143             } else {
144                 byteBuf.readByte(); //move the read index
145             }
146         }
147     }
148
149
150     private void print(ByteBuf buf, String message) {
151         print(buf, buf.readerIndex(), buf.readableBytes(), message == null ? "buff" : message);
152     }
153
154     private void print(ByteBuf buf, int startPos, int chars, String message) {
155         if (null == message) {
156             message = "";
157         }
158         if (startPos > buf.writerIndex()) {
159             logger.trace("startPos out of bounds");
160         }
161         byte[] bytes = new byte[startPos + chars <= buf.writerIndex() ? chars : buf.writerIndex() - startPos];
162         buf.getBytes(startPos, bytes);
163         logger.trace("{} ={}", message, new String(bytes));
164     }
165
166     // copied from Netty decoder
167     private void fail(ChannelHandlerContext ctx, long frameLength) {
168         if (frameLength > 0) {
169             ctx.fireExceptionCaught(
170                     new TooLongFrameException(
171                             "frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded"));
172         } else {
173             ctx.fireExceptionCaught(
174                     new TooLongFrameException(
175                             "frame length exceeds " + maxFrameLength + " - discarding"));
176         }
177     }
178 }