56dcb84198242c6029316fa63f360e307f3b5af1
[ovsdb.git] / library / src / main / java / org / opendaylight / ovsdb / lib / jsonrpc / JsonRpcDecoder.java
1 /*
2  * Copyright (C) 2013 EBay Software Foundation
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Ashwin Raveendran, Madhu Venugopal
9  */
10 package org.opendaylight.ovsdb.lib.jsonrpc;
11
12
13 import io.netty.buffer.ByteBuf;
14 import io.netty.buffer.ByteBufInputStream;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.handler.codec.ByteToMessageDecoder;
17 import io.netty.handler.codec.TooLongFrameException;
18
19 import java.io.IOException;
20 import java.util.List;
21
22 import org.opendaylight.ovsdb.lib.error.InvalidEncodingException;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import com.fasterxml.jackson.core.JsonEncoding;
27 import com.fasterxml.jackson.core.JsonFactory;
28 import com.fasterxml.jackson.core.JsonParser;
29 import com.fasterxml.jackson.core.io.IOContext;
30 import com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper;
31 import com.fasterxml.jackson.core.util.BufferRecycler;
32 import com.fasterxml.jackson.databind.JsonNode;
33 import com.fasterxml.jackson.databind.MappingJsonFactory;
34
35 /**
36  * JSON RPC 1.0 compatible decoder capable of decoding JSON messages from a TCP stream.
37  * The stream is framed first by inspecting the json for valid end marker (left curly)
38  * and is passed to a Json parser (jackson) for converting into an object model.
39  *
40  * There are no JSON parsers that I am aware of that does non blocking parsing.
41  * This approach avoids having to run json parser over and over again on the entire
42  * stream waiting for input. Parser is invoked only when we know of a full JSON message
43  * in the stream.
44  */
45 public class JsonRpcDecoder extends ByteToMessageDecoder {
46
47     private static final Logger LOG = LoggerFactory.getLogger(JsonRpcDecoder.class);
48     private int maxFrameLength;
49     //Indicates if the frame limit warning was issued
50     private boolean maxFrameLimitWasReached = false;
51     private JsonFactory jacksonJsonFactory = new MappingJsonFactory();
52
53     private IOContext jacksonIOContext = new IOContext(new BufferRecycler(), null, false);
54
55     // context for the previously read incomplete records
56     private int lastRecordBytes = 0;
57     private int leftCurlies = 0;
58     private int rightCurlies = 0;
59     private boolean inS = false;
60
61     private int recordsRead;
62
63     public JsonRpcDecoder(int maxFrameLength) {
64         this.maxFrameLength = maxFrameLength;
65     }
66
67     @Override
68     protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
69
70         LOG.trace("readable bytes {}, records read {}, incomplete record bytes {}",
71                 buf.readableBytes(), recordsRead, lastRecordBytes);
72
73         if (lastRecordBytes == 0) {
74             if (buf.readableBytes() < 4) {
75                 return; //wait for more data
76             }
77
78             skipSpaces(buf);
79
80             byte[] buff = new byte[4];
81             buf.getBytes(buf.readerIndex(), buff);
82             ByteSourceJsonBootstrapper strapper = new ByteSourceJsonBootstrapper(jacksonIOContext, buff, 0, 4);
83             JsonEncoding jsonEncoding = strapper.detectEncoding();
84             if (!JsonEncoding.UTF8.equals(jsonEncoding)) {
85                 throw new InvalidEncodingException(jsonEncoding.getJavaName(), "currently only UTF-8 is supported");
86             }
87         }
88
89         int index = lastRecordBytes + buf.readerIndex();
90
91         for (; index < buf.writerIndex(); index++) {
92             switch (buf.getByte(index)) {
93                 case '{':
94                     if (!inS) {
95                         leftCurlies++;
96                     }
97                     break;
98                 case '}':
99                     if (!inS) {
100                         rightCurlies++;
101                     }
102                     break;
103                 case '"':
104                     if (buf.getByte(index - 1) != '\\') {
105                         inS = !inS;
106                     }
107                     break;
108                 default:
109                     break;
110             }
111
112             if (leftCurlies != 0 && leftCurlies == rightCurlies && !inS) {
113                 ByteBuf slice = buf.readSlice(1 + index - buf.readerIndex());
114                 JsonParser jp = jacksonJsonFactory.createParser(new ByteBufInputStream(slice));
115                 JsonNode root = jp.readValueAsTree();
116                 out.add(root);
117                 leftCurlies = rightCurlies = lastRecordBytes = 0;
118                 recordsRead++;
119                 break;
120             }
121
122             if (index - buf.readerIndex() >= maxFrameLength) {
123                 /*
124                  * Changing this limit to being a warning, we do not wish to "break" in scale environment
125                  * and currently this limits the ovs of having only around 50 ports defined...
126                  * I do acknowledge the fast that this might be risky in case of huge amount of strings
127                  * in which the controller can crash with an OOM, however seems that we need a really huge
128                  * ovs to reach that limit.
129                  */
130
131                 //We do not want to issue a log message on every extent of the buffer
132                 //hence logging only once
133                 if (!maxFrameLimitWasReached) {
134                     maxFrameLimitWasReached = true;
135                     LOG.warn("***** OVSDB Frame limit of {} bytes has been reached! *****", this.maxFrameLength);
136                 }
137             }
138         }
139
140         // end of stream, save the incomplete record index to avoid reexamining the whole on next run
141         if (index >= buf.writerIndex()) {
142             lastRecordBytes = buf.readableBytes();
143             return;
144         }
145     }
146
147     public int getRecordsRead() {
148         return recordsRead;
149     }
150
151     private static void skipSpaces(ByteBuf byteBuf) throws IOException {
152         while (byteBuf.isReadable()) {
153             int ch = byteBuf.getByte(byteBuf.readerIndex()) & 0xFF;
154             if (!(ch == ' ' || ch == '\r' || ch == '\n' || ch == '\t')) {
155                 return;
156             } else {
157                 byteBuf.readByte(); //move the read index
158             }
159         }
160     }
161
162
163     private void print(ByteBuf buf, String message) {
164         print(buf, buf.readerIndex(), buf.readableBytes(), message == null ? "buff" : message);
165     }
166
167     private void print(ByteBuf buf, int startPos, int chars, String message) {
168         if (null == message) {
169             message = "";
170         }
171         if (startPos > buf.writerIndex()) {
172             LOG.trace("startPos out of bounds");
173         }
174         byte[] bytes = new byte[startPos + chars <= buf.writerIndex() ? chars : buf.writerIndex() - startPos];
175         buf.getBytes(startPos, bytes);
176         LOG.trace("{} ={}", message, new String(bytes));
177     }
178
179     // copied from Netty decoder
180     private void fail(ChannelHandlerContext ctx, long frameLength) {
181         if (frameLength > 0) {
182             ctx.fireExceptionCaught(
183                     new TooLongFrameException(
184                             "frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded"));
185         } else {
186             ctx.fireExceptionCaught(
187                     new TooLongFrameException(
188                             "frame length exceeds " + maxFrameLength + " - discarding"));
189         }
190     }
191 }