219e92c3f14053c805d1ac204132f92d62479948
[controller.git] / opendaylight / netconf / netconf-util / src / main / java / org / opendaylight / controller / netconf / util / handler / NetconfChunkAggregator.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.util.handler;
10
11 import java.util.List;
12
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15
16 import io.netty.buffer.ByteBuf;
17 import io.netty.buffer.CompositeByteBuf;
18 import io.netty.buffer.Unpooled;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.handler.codec.ByteToMessageDecoder;
21
22 public class NetconfChunkAggregator extends ByteToMessageDecoder {
23     private final static Logger logger = LoggerFactory.getLogger(NetconfChunkAggregator.class);
24     public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024;
25
26     private static enum State {
27         HEADER_ONE, // \n
28         HEADER_TWO, // #
29         HEADER_LENGTH_FIRST, // [1-9]
30         HEADER_LENGTH_OTHER, // [0-9]*\n
31         DATA,
32         FOOTER_ONE, // \n
33         FOOTER_TWO, // #
34         FOOTER_THREE, // #
35         FOOTER_FOUR, // \n
36     }
37
38     private final int maxChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE;
39     private State state = State.HEADER_ONE;
40     private long chunkSize;
41     private CompositeByteBuf chunk;
42
43     @Override
44     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
45         while (in.isReadable()) {
46             switch (state) {
47             case HEADER_ONE:
48             {
49                 final byte b = in.readByte();
50                 if (b != '\n') {
51                     logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
52                     throw new IllegalStateException("Malformed chunk header encountered (byte 0)");
53                 }
54
55                 state = State.HEADER_TWO;
56
57                 initChunk();
58                 break;
59             }
60             case HEADER_TWO:
61             {
62                 final byte b = in.readByte();
63                 if (b != '#') {
64                     logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
65                     throw new IllegalStateException("Malformed chunk header encountered (byte 1)");
66                 }
67
68                 state = State.HEADER_LENGTH_FIRST;
69                 break;
70             }
71             case HEADER_LENGTH_FIRST:
72             {
73                 final byte b = in.readByte();
74                 chunkSize = processHeaderLengthFirst(b);
75                 state = State.HEADER_LENGTH_OTHER;
76                 break;
77             }
78             case HEADER_LENGTH_OTHER:
79             {
80                 final byte b = in.readByte();
81                 if (b == '\n') {
82                     state = State.DATA;
83                     break;
84                 }
85
86                 if (b < '0' || b > '9') {
87                     logger.debug("Got byte {} while waiting for {}-{}", b, (byte)'0', (byte)'9');
88                     throw new IllegalStateException("Invalid chunk size encountered");
89                 }
90
91                 chunkSize *= 10;
92                 chunkSize += b - '0';
93
94                 if (chunkSize > maxChunkSize) {
95                     logger.debug("Parsed chunk size {}, maximum allowed is {}", chunkSize, maxChunkSize);
96                     throw new IllegalStateException("Maximum chunk size exceeded");
97                 }
98                 break;
99             }
100             case DATA:
101                 /*
102                  * FIXME: this gathers all data into one big chunk before passing
103                  *        it on. Make sure the pipeline can work with partial data
104                  *        and then change this piece to pass the data on as it
105                  *        comes through.
106                  */
107                 if (in.readableBytes() < chunkSize) {
108                     logger.debug("Buffer has {} bytes, need {} to complete chunk", in.readableBytes(), chunkSize);
109                     in.discardReadBytes();
110                     return;
111                 }
112
113                 aggregateChunks(in.readBytes((int) chunkSize));
114                 state = State.FOOTER_ONE;
115                 break;
116             case FOOTER_ONE:
117             {
118                 final byte b = in.readByte();
119                 if (b != '\n') {
120                     logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
121                     throw new IllegalStateException("Malformed chunk footer encountered (byte 0)");
122                 }
123
124                 state = State.FOOTER_TWO;
125                 chunkSize = 0;
126                 break;
127             }
128             case FOOTER_TWO:
129             {
130                 final byte b = in.readByte();
131
132                 if (b != '#') {
133                     logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
134                     throw new IllegalStateException("Malformed chunk footer encountered (byte 1)");
135                 }
136
137                 state = State.FOOTER_THREE;
138                 break;
139             }
140             case FOOTER_THREE:
141             {
142                 final byte b = in.readByte();
143
144                 // In this state, either header-of-new-chunk or message-end is expected
145                 // Depends on the next character
146
147                 if (isHeaderLengthFirst(b)) {
148                     // Extract header length#1 from new chunk
149                     chunkSize = processHeaderLengthFirst(b);
150                     // Proceed with next chunk processing
151                     state = State.HEADER_LENGTH_OTHER;
152                 } else if (b == '#') {
153                     state = State.FOOTER_FOUR;
154                 } else {
155                     logger.debug("Got byte {} while waiting for {} or {}-{}", b, (byte) '#', (byte) '1', (byte) '9');
156                     throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
157                 }
158
159                 break;
160             }
161             case FOOTER_FOUR:
162             {
163                 final byte b = in.readByte();
164                 if (b != '\n') {
165                     logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
166                     throw new IllegalStateException("Malformed chunk footer encountered (byte 3)");
167                 }
168
169                 state = State.HEADER_ONE;
170                 out.add(chunk);
171                 chunk = null;
172                 break;
173             }
174             }
175         }
176
177         in.discardReadBytes();
178     }
179
180     private void initChunk() {
181         chunk = Unpooled.compositeBuffer();
182     }
183
184     private void aggregateChunks(ByteBuf newChunk) {
185         chunk.addComponent(chunk.numComponents(), newChunk);
186
187         // Update writer index, addComponent does not update it
188         chunk.writerIndex(chunk.writerIndex() + newChunk.readableBytes());
189     }
190
191     private static int processHeaderLengthFirst(byte b) {
192         if (isHeaderLengthFirst(b) == false) {
193             logger.debug("Got byte {} while waiting for {}-{}", b, (byte)'1', (byte)'9');
194             throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
195         }
196
197         return b - '0';
198     }
199
200     private static boolean isHeaderLengthFirst(byte b) {
201         return b >= '1' && b <= '9';
202     }
203 }