74bf492292b4c16d0e29d8aa14637d687f82511a
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / NetconfChunkAggregator.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.nettyutil.handler;
10
11 import io.netty.buffer.ByteBuf;
12 import io.netty.buffer.CompositeByteBuf;
13 import io.netty.buffer.Unpooled;
14 import io.netty.channel.ChannelHandlerContext;
15 import io.netty.handler.codec.ByteToMessageDecoder;
16 import java.util.List;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 public class NetconfChunkAggregator extends ByteToMessageDecoder {
21     private static final Logger LOG = LoggerFactory.getLogger(NetconfChunkAggregator.class);
22     private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM = "Got byte {} while waiting for {}";
23     private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM = "Got byte {} while waiting for {}-{}";
24     private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM_PARAM =
25         "Got byte {} while waiting for {}-{}-{}";
26     public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024;
27
28     private enum State {
29         HEADER_ONE, // \n
30         HEADER_TWO, // #
31         HEADER_LENGTH_FIRST, // [1-9]
32         HEADER_LENGTH_OTHER, // [0-9]*\n
33         DATA,
34         FOOTER_ONE, // \n
35         FOOTER_TWO, // #
36         FOOTER_THREE, // #
37         FOOTER_FOUR, // \n
38     }
39
40     private final int maxChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE;
41     private State state = State.HEADER_ONE;
42     private long chunkSize;
43     private CompositeByteBuf chunk;
44
45     private static void checkNewLine(final byte byteToCheck, final String errorMessage) {
46         if (byteToCheck != '\n') {
47             LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, byteToCheck, (byte)'\n');
48             throw new IllegalStateException(errorMessage);
49         }
50     }
51
52     private static void checkHash(final byte byteToCheck, final String errorMessage) {
53         if (byteToCheck != '#') {
54             LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, byteToCheck, (byte)'#');
55             throw new IllegalStateException(errorMessage);
56         }
57     }
58
59     private void checkChunkSize() {
60         if (chunkSize > maxChunkSize) {
61             LOG.debug("Parsed chunk size {}, maximum allowed is {}", chunkSize, maxChunkSize);
62             throw new IllegalStateException("Maximum chunk size exceeded");
63         }
64     }
65
66     @Override
67     protected void decode(final ChannelHandlerContext ctx,
68                           final ByteBuf in, final List<Object> out) throws IllegalStateException {
69         while (in.isReadable()) {
70             switch (state) {
71                 case HEADER_ONE: {
72                     final byte b = in.readByte();
73                     checkNewLine(b, "Malformed chunk header encountered (byte 0)");
74                     state = State.HEADER_TWO;
75                     initChunk();
76                     break;
77                 }
78                 case HEADER_TWO: {
79                     final byte b = in.readByte();
80                     checkHash(b, "Malformed chunk header encountered (byte 1)");
81                     state = State.HEADER_LENGTH_FIRST;
82                     break;
83                 }
84                 case HEADER_LENGTH_FIRST: {
85                     final byte b = in.readByte();
86                     chunkSize = processHeaderLengthFirst(b);
87                     state = State.HEADER_LENGTH_OTHER;
88                     break;
89                 }
90                 case HEADER_LENGTH_OTHER: {
91                     final byte b = in.readByte();
92                     if (b == '\n') {
93                         state = State.DATA;
94                         break;
95                     }
96                     if (b < '0' || b > '9') {
97                         LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'0', (byte)'9');
98                         throw new IllegalStateException("Invalid chunk size encountered");
99                     }
100                     chunkSize *= 10;
101                     chunkSize += b - '0';
102                     checkChunkSize();
103                     break;
104                 }
105                 case DATA:
106                     /*
107                      * FIXME: this gathers all data into one big chunk before passing
108                      *        it on. Make sure the pipeline can work with partial data
109                      *        and then change this piece to pass the data on as it
110                      *        comes through.
111                      */
112                     if (in.readableBytes() < chunkSize) {
113                         LOG.debug("Buffer has {} bytes, need {} to complete chunk", in.readableBytes(), chunkSize);
114                         in.discardReadBytes();
115                         return;
116                     }
117                     aggregateChunks(in.readBytes((int) chunkSize));
118                     state = State.FOOTER_ONE;
119                     break;
120                 case FOOTER_ONE: {
121                     final byte b = in.readByte();
122                     checkNewLine(b,"Malformed chunk footer encountered (byte 0)");
123                     state = State.FOOTER_TWO;
124                     chunkSize = 0;
125                     break;
126                 }
127                 case FOOTER_TWO: {
128                     final byte b = in.readByte();
129                     checkHash(b,"Malformed chunk footer encountered (byte 1)");
130                     state = State.FOOTER_THREE;
131                     break;
132                 }
133                 case FOOTER_THREE: {
134                     final byte b = in.readByte();
135                     // In this state, either header-of-new-chunk or message-end is expected
136                     // Depends on the next character
137                     extractNewChunkOrMessageEnd(b);
138                     break;
139                 }
140                 case FOOTER_FOUR: {
141                     final byte b = in.readByte();
142                     checkNewLine(b,"Malformed chunk footer encountered (byte 3)");
143                     state = State.HEADER_ONE;
144                     out.add(chunk);
145                     chunk = null;
146                     break;
147                 }
148                 default:
149                     LOG.info("Unknown state.");
150             }
151         }
152
153         in.discardReadBytes();
154     }
155
156     private void extractNewChunkOrMessageEnd(final byte byteToCheck) {
157         if (isHeaderLengthFirst(byteToCheck)) {
158             // Extract header length#1 from new chunk
159             chunkSize = processHeaderLengthFirst(byteToCheck);
160             // Proceed with next chunk processing
161             state = State.HEADER_LENGTH_OTHER;
162         } else if (byteToCheck == '#') {
163             state = State.FOOTER_FOUR;
164         } else {
165             LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM_PARAM, byteToCheck, (byte) '#', (byte) '1', (byte) '9');
166             throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
167         }
168     }
169
170     private void initChunk() {
171         chunk = Unpooled.compositeBuffer();
172     }
173
174     private void aggregateChunks(final ByteBuf newChunk) {
175         chunk.addComponent(chunk.numComponents(), newChunk);
176
177         // Update writer index, addComponent does not update it
178         chunk.writerIndex(chunk.writerIndex() + newChunk.readableBytes());
179     }
180
181     private static int processHeaderLengthFirst(final byte byteToCheck) {
182         if (!isHeaderLengthFirst(byteToCheck)) {
183             LOG.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, byteToCheck, (byte)'1', (byte)'9');
184             throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
185         }
186
187         return byteToCheck - '0';
188     }
189
190     private static boolean isHeaderLengthFirst(final byte byteToCheck) {
191         return byteToCheck >= '1' && byteToCheck <= '9';
192     }
193 }