Merge "Remove duplicate dependency declaration"
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / NetconfChunkAggregator.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.nettyutil.handler;
10
11 import io.netty.buffer.ByteBuf;
12 import io.netty.buffer.CompositeByteBuf;
13 import io.netty.buffer.Unpooled;
14 import io.netty.channel.ChannelHandlerContext;
15 import io.netty.handler.codec.ByteToMessageDecoder;
16 import java.util.List;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 public class NetconfChunkAggregator extends ByteToMessageDecoder {
21     private final static Logger LOG = LoggerFactory.getLogger(NetconfChunkAggregator.class);
22     private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM = "Got byte {} while waiting for {}";
23     private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM = "Got byte {} while waiting for {}-{}";
24     public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024;
25
26     private static enum State {
27         HEADER_ONE, // \n
28         HEADER_TWO, // #
29         HEADER_LENGTH_FIRST, // [1-9]
30         HEADER_LENGTH_OTHER, // [0-9]*\n
31         DATA,
32         FOOTER_ONE, // \n
33         FOOTER_TWO, // #
34         FOOTER_THREE, // #
35         FOOTER_FOUR, // \n
36     }
37
38     private final int maxChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE;
39     private State state = State.HEADER_ONE;
40     private long chunkSize;
41     private CompositeByteBuf chunk;
42
43     private void checkNewLine(byte b,String errorMessage){
44         if (b != '\n') {
45             LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, b, (byte)'\n');
46             throw new IllegalStateException(errorMessage);
47         }
48     }
49
50     private void checkHash(byte b,String errorMessage){
51         if (b != '#') {
52             LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, b, (byte)'#');
53             throw new IllegalStateException(errorMessage);
54         }
55     }
56
57     private void checkChunkSize(){
58         if (chunkSize > maxChunkSize) {
59             LOG.debug("Parsed chunk size {}, maximum allowed is {}", chunkSize, maxChunkSize);
60             throw new IllegalStateException("Maximum chunk size exceeded");
61         }
62
63     }
64     @Override
65     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws IllegalStateException {
66         while (in.isReadable()) {
67             switch (state) {
68             case HEADER_ONE:
69             {
70                 final byte b = in.readByte();
71                 checkNewLine(b, "Malformed chunk header encountered (byte 0)");
72
73                 state = State.HEADER_TWO;
74
75                 initChunk();
76                 break;
77             }
78             case HEADER_TWO:
79             {
80                 final byte b = in.readByte();
81                 checkHash(b, "Malformed chunk header encountered (byte 1)");
82
83                 state = State.HEADER_LENGTH_FIRST;
84                 break;
85             }
86             case HEADER_LENGTH_FIRST:
87             {
88                 final byte b = in.readByte();
89                 chunkSize = processHeaderLengthFirst(b);
90                 state = State.HEADER_LENGTH_OTHER;
91                 break;
92             }
93             case HEADER_LENGTH_OTHER:
94             {
95                 final byte b = in.readByte();
96                 if (b == '\n') {
97                     state = State.DATA;
98                     break;
99                 }
100
101                 if (b < '0' || b > '9') {
102                     LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'0', (byte)'9');
103                     throw new IllegalStateException("Invalid chunk size encountered");
104                 }
105
106                 chunkSize *= 10;
107                 chunkSize += b - '0';
108                 checkChunkSize();
109                 break;
110             }
111             case DATA:
112                 /*
113                  * FIXME: this gathers all data into one big chunk before passing
114                  *        it on. Make sure the pipeline can work with partial data
115                  *        and then change this piece to pass the data on as it
116                  *        comes through.
117                  */
118                 if (in.readableBytes() < chunkSize) {
119                     LOG.debug("Buffer has {} bytes, need {} to complete chunk", in.readableBytes(), chunkSize);
120                     in.discardReadBytes();
121                     return;
122                 }
123                 aggregateChunks(in.readBytes((int) chunkSize));
124                 state = State.FOOTER_ONE;
125                 break;
126             case FOOTER_ONE:
127             {
128                 final byte b = in.readByte();
129                 checkNewLine(b,"Malformed chunk footer encountered (byte 0)");
130                 state = State.FOOTER_TWO;
131                 chunkSize = 0;
132                 break;
133             }
134             case FOOTER_TWO:
135             {
136                 final byte b = in.readByte();
137                 checkHash(b,"Malformed chunk footer encountered (byte 1)");
138                 state = State.FOOTER_THREE;
139                 break;
140             }
141             case FOOTER_THREE:
142             {
143                 final byte b = in.readByte();
144
145                 // In this state, either header-of-new-chunk or message-end is expected
146                 // Depends on the next character
147
148                 extractNewChunkOrMessageEnd(b);
149
150                 break;
151             }
152             case FOOTER_FOUR:
153             {
154                 final byte b = in.readByte();
155                 checkNewLine(b,"Malformed chunk footer encountered (byte 3)");
156                 state = State.HEADER_ONE;
157                 out.add(chunk);
158                 chunk = null;
159                 break;
160             }
161             }
162         }
163
164         in.discardReadBytes();
165     }
166
167     private void extractNewChunkOrMessageEnd(byte b) {
168         if (isHeaderLengthFirst(b)) {
169             // Extract header length#1 from new chunk
170             chunkSize = processHeaderLengthFirst(b);
171             // Proceed with next chunk processing
172             state = State.HEADER_LENGTH_OTHER;
173         } else if (b == '#') {
174             state = State.FOOTER_FOUR;
175         } else {
176             LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte) '#', (byte) '1', (byte) '9');
177             throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
178         }
179     }
180
181     private void initChunk() {
182         chunk = Unpooled.compositeBuffer();
183     }
184
185     private void aggregateChunks(ByteBuf newChunk) {
186         chunk.addComponent(chunk.numComponents(), newChunk);
187
188         // Update writer index, addComponent does not update it
189         chunk.writerIndex(chunk.writerIndex() + newChunk.readableBytes());
190     }
191
192     private static int processHeaderLengthFirst(byte b) {
193         if (!isHeaderLengthFirst(b)) {
194             LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'1', (byte)'9');
195             throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
196         }
197
198         return b - '0';
199     }
200
201     private static boolean isHeaderLengthFirst(byte b) {
202         return b >= '1' && b <= '9';
203     }
204 }