Merge "Split message aggregators"
[controller.git] / opendaylight / netconf / netconf-util / src / main / java / org / opendaylight / controller / netconf / util / handler / NetconfChunkAggregator.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.util.handler;
10
11 import io.netty.buffer.ByteBuf;
12 import io.netty.channel.ChannelHandlerContext;
13 import io.netty.handler.codec.ByteToMessageDecoder;
14
15 import java.util.List;
16
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 public class NetconfChunkAggregator extends ByteToMessageDecoder {
21     private final static Logger logger = LoggerFactory.getLogger(NetconfChunkAggregator.class);
22     public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024;
23
24     private static enum State {
25         HEADER_ONE, // \n
26         HEADER_TWO, // #
27         HEADER_LENGTH_FIRST, // [1-9]
28         HEADER_LENGTH_OTHER, // [0-9]*\n
29         DATA,
30         FOOTER_ONE, // \n
31         FOOTER_TWO, // #
32         FOOTER_THREE, // #
33         FOOTER_FOUR, // \n
34     }
35
36     private final int maxChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE;
37     private State state = State.HEADER_ONE;
38     private long chunkSize;
39     private ByteBuf chunk;
40
41     @Override
42     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
43         while (in.isReadable()) {
44             switch (state) {
45             case HEADER_ONE:
46             {
47                 final byte b = in.readByte();
48                 if (b != '\n') {
49                     throw new IllegalStateException("Malformed chunk header encountered (byte 0)");
50                 }
51
52                 state = State.HEADER_TWO;
53                 break;
54             }
55             case HEADER_TWO:
56             {
57                 final byte b = in.readByte();
58                 if (b != '#') {
59                     throw new IllegalStateException("Malformed chunk header encountered (byte 1)");
60                 }
61
62                 state = State.HEADER_LENGTH_FIRST;
63                 break;
64             }
65             case HEADER_LENGTH_FIRST:
66             {
67                 final byte b = in.readByte();
68                 if (b < '1' || b > '9') {
69                     throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
70                 }
71
72                 chunkSize = b - '0';
73                 state = State.HEADER_LENGTH_OTHER;
74                 break;
75             }
76             case HEADER_LENGTH_OTHER:
77             {
78                 final byte b = in.readByte();
79                 if (b == '\n') {
80                     state = State.DATA;
81                     break;
82                 }
83
84                 if (b < '0' || b > '9') {
85                     throw new IllegalStateException("Invalid chunk size encountered");
86                 }
87
88                 chunkSize *= 10;
89                 chunkSize += b - '0';
90
91                 if (chunkSize > maxChunkSize) {
92                     throw new IllegalStateException("Maximum chunk size exceeded");
93                 }
94                 break;
95             }
96             case DATA:
97                 /*
98                  * FIXME: this gathers all data into one big chunk before passing
99                  *        it on. Make sure the pipeline can work with partial data
100                  *        and then change this piece to pass the data on as it
101                  *        comes through.
102                  */
103                 if (in.readableBytes() < chunkSize) {
104                     logger.debug("Buffer has {} bytes, need {} to complete chunk", in.readableBytes(), chunkSize);
105                     in.discardReadBytes();
106                     return;
107                 }
108
109                 chunk = in.readBytes((int)chunkSize);
110                 state = State.FOOTER_ONE;
111                 break;
112             case FOOTER_ONE:
113             {
114                 final byte b = in.readByte();
115                 if (b != '\n') {
116                     logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
117                     throw new IllegalStateException("Malformed chunk footer encountered (byte 0)");
118                 }
119
120                 state = State.FOOTER_TWO;
121                 break;
122             }
123             case FOOTER_TWO:
124             {
125                 final byte b = in.readByte();
126                 if (b != '#') {
127                     logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
128                     throw new IllegalStateException("Malformed chunk footer encountered (byte 1)");
129                 }
130
131                 state = State.FOOTER_THREE;
132                 break;
133             }
134             case FOOTER_THREE:
135             {
136                 final byte b = in.readByte();
137                 if (b != '#') {
138                     logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
139                     throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
140                 }
141
142                 state = State.FOOTER_FOUR;
143                 break;
144             }
145             case FOOTER_FOUR:
146             {
147                 final byte b = in.readByte();
148                 if (b != '\n') {
149                     logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
150                     throw new IllegalStateException("Malformed chunk footer encountered (byte 3)");
151                 }
152
153                 state = State.HEADER_ONE;
154                 out.add(chunk);
155                 chunkSize = 0;
156                 chunk = null;
157                 break;
158             }
159             }
160         }
161
162         in.discardReadBytes();
163     }
164 }