cab1d741b15b7ab6d6971924691b598664d75a51
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / NetconfChunkAggregator.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.nettyutil.handler;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11
12 import io.netty.buffer.ByteBuf;
13 import io.netty.buffer.CompositeByteBuf;
14 import io.netty.buffer.Unpooled;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.handler.codec.ByteToMessageDecoder;
17 import java.util.List;
18 import org.checkerframework.checker.index.qual.NonNegative;
19 import org.opendaylight.netconf.nettyutil.AbstractNetconfSessionNegotiator;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 public class NetconfChunkAggregator extends ByteToMessageDecoder {
24     private static final Logger LOG = LoggerFactory.getLogger(NetconfChunkAggregator.class);
25     private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM = "Got byte {} while waiting for {}";
26     private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM = "Got byte {} while waiting for {}-{}";
27     private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM_PARAM =
28         "Got byte {} while waiting for {}-{}-{}";
29
30     public static final @NonNegative int DEFAULT_MAXIMUM_CHUNK_SIZE =
31         AbstractNetconfSessionNegotiator.DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE;
32
33     private enum State {
34         HEADER_ONE, // \n
35         HEADER_TWO, // #
36         HEADER_LENGTH_FIRST, // [1-9]
37         HEADER_LENGTH_OTHER, // [0-9]*\n
38         DATA,
39         FOOTER_ONE, // \n
40         FOOTER_TWO, // #
41         FOOTER_THREE, // #
42         FOOTER_FOUR, // \n
43     }
44
45     private final int maxChunkSize;
46     private State state = State.HEADER_ONE;
47     private long chunkSize;
48     private CompositeByteBuf chunk;
49
50     /**
51      * Construct an instance with maximum chunk size set to {@link #DEFAULT_MAXIMUM_CHUNK_SIZE}.
52      */
53     public NetconfChunkAggregator() {
54         this(DEFAULT_MAXIMUM_CHUNK_SIZE);
55     }
56
57     /**
58      * Construct an instance with specified maximum chunk size.
59      *
60      * @param maxChunkSize maximum chunk size
61      * @throws IllegalArgumentException if {@code maxChunkSize} is negative
62      */
63     public NetconfChunkAggregator(final @NonNegative int maxChunkSize) {
64         this.maxChunkSize = maxChunkSize;
65         checkArgument(maxChunkSize > 0, "Negative maximum chunk size %s", maxChunkSize);
66     }
67
68     private static void checkNewLine(final byte byteToCheck, final String errorMessage) {
69         if (byteToCheck != '\n') {
70             LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, byteToCheck, (byte)'\n');
71             throw new IllegalStateException(errorMessage);
72         }
73     }
74
75     private static void checkHash(final byte byteToCheck, final String errorMessage) {
76         if (byteToCheck != '#') {
77             LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, byteToCheck, (byte)'#');
78             throw new IllegalStateException(errorMessage);
79         }
80     }
81
82     private void checkChunkSize() {
83         if (chunkSize > maxChunkSize) {
84             LOG.debug("Parsed chunk size {}, maximum allowed is {}", chunkSize, maxChunkSize);
85             throw new IllegalStateException("Chunk size " + chunkSize + " exceeds maximum " + maxChunkSize);
86         }
87     }
88
89     @Override
90     protected void decode(final ChannelHandlerContext ctx,
91                           final ByteBuf in, final List<Object> out) throws IllegalStateException {
92         while (in.isReadable()) {
93             switch (state) {
94                 case HEADER_ONE: {
95                     final byte b = in.readByte();
96                     checkNewLine(b, "Malformed chunk header encountered (byte 0)");
97                     state = State.HEADER_TWO;
98                     initChunk();
99                     break;
100                 }
101                 case HEADER_TWO: {
102                     final byte b = in.readByte();
103                     checkHash(b, "Malformed chunk header encountered (byte 1)");
104                     state = State.HEADER_LENGTH_FIRST;
105                     break;
106                 }
107                 case HEADER_LENGTH_FIRST: {
108                     final byte b = in.readByte();
109                     chunkSize = processHeaderLengthFirst(b);
110                     state = State.HEADER_LENGTH_OTHER;
111                     break;
112                 }
113                 case HEADER_LENGTH_OTHER: {
114                     final byte b = in.readByte();
115                     if (b == '\n') {
116                         state = State.DATA;
117                         break;
118                     }
119                     if (b < '0' || b > '9') {
120                         LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'0', (byte)'9');
121                         throw new IllegalStateException("Invalid chunk size encountered");
122                     }
123                     chunkSize *= 10;
124                     chunkSize += b - '0';
125                     checkChunkSize();
126                     break;
127                 }
128                 case DATA:
129                     /*
130                      * FIXME: this gathers all data into one big chunk before passing
131                      *        it on. Make sure the pipeline can work with partial data
132                      *        and then change this piece to pass the data on as it
133                      *        comes through.
134                      */
135                     if (in.readableBytes() < chunkSize) {
136                         LOG.debug("Buffer has {} bytes, need {} to complete chunk", in.readableBytes(), chunkSize);
137                         in.discardReadBytes();
138                         return;
139                     }
140                     aggregateChunks(in.readBytes((int) chunkSize));
141                     state = State.FOOTER_ONE;
142                     break;
143                 case FOOTER_ONE: {
144                     final byte b = in.readByte();
145                     checkNewLine(b,"Malformed chunk footer encountered (byte 0)");
146                     state = State.FOOTER_TWO;
147                     chunkSize = 0;
148                     break;
149                 }
150                 case FOOTER_TWO: {
151                     final byte b = in.readByte();
152                     checkHash(b,"Malformed chunk footer encountered (byte 1)");
153                     state = State.FOOTER_THREE;
154                     break;
155                 }
156                 case FOOTER_THREE: {
157                     final byte b = in.readByte();
158                     // In this state, either header-of-new-chunk or message-end is expected
159                     // Depends on the next character
160                     extractNewChunkOrMessageEnd(b);
161                     break;
162                 }
163                 case FOOTER_FOUR: {
164                     final byte b = in.readByte();
165                     checkNewLine(b,"Malformed chunk footer encountered (byte 3)");
166                     state = State.HEADER_ONE;
167                     out.add(chunk);
168                     chunk = null;
169                     break;
170                 }
171                 default:
172                     LOG.info("Unknown state.");
173             }
174         }
175
176         in.discardReadBytes();
177     }
178
179     private void extractNewChunkOrMessageEnd(final byte byteToCheck) {
180         if (isHeaderLengthFirst(byteToCheck)) {
181             // Extract header length#1 from new chunk
182             chunkSize = processHeaderLengthFirst(byteToCheck);
183             // Proceed with next chunk processing
184             state = State.HEADER_LENGTH_OTHER;
185         } else if (byteToCheck == '#') {
186             state = State.FOOTER_FOUR;
187         } else {
188             LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM_PARAM, byteToCheck, (byte) '#', (byte) '1', (byte) '9');
189             throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
190         }
191     }
192
193     private void initChunk() {
194         chunk = Unpooled.compositeBuffer();
195     }
196
197     private void aggregateChunks(final ByteBuf newChunk) {
198         chunk.addComponent(chunk.numComponents(), newChunk);
199
200         // Update writer index, addComponent does not update it
201         chunk.writerIndex(chunk.writerIndex() + newChunk.readableBytes());
202     }
203
204     private static int processHeaderLengthFirst(final byte byteToCheck) {
205         if (!isHeaderLengthFirst(byteToCheck)) {
206             LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, byteToCheck, (byte)'1', (byte)'9');
207             throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
208         }
209
210         return byteToCheck - '0';
211     }
212
213     private static boolean isHeaderLengthFirst(final byte byteToCheck) {
214         return byteToCheck >= '1' && byteToCheck <= '9';
215     }
216 }