Deploy fix of ByteBufferInput and use it 07/111007/12
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 25 Mar 2024 08:24:52 +0000 (09:24 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 25 Mar 2024 13:14:43 +0000 (14:14 +0100)
readAscii() just wants to fix up the last byte it passes to the String
constructor. Do that after reading.

Inherit everything alse that is not required from the regular
ByteBufferInput to minimize code duplication.

JIRA: CONTROLLER-2109
Change-Id: Ic12b8e06317a8376b34d514daaf1ba7d46d16178
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/utils/serializer/ByteBufferInput.java
atomix-storage/src/main/java/io/atomix/utils/serializer/KryoJournalSerdes.java

index 077c0b6c283e4b2359c82612591898c528d238df..515572ca819606c38ec97c84dc66f7a9a8e0a453 100644 (file)
 
 package io.atomix.utils.serializer;
 
-import java.io.IOException;
-import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.CharBuffer;
-import java.nio.DoubleBuffer;
-import java.nio.FloatBuffer;
-import java.nio.IntBuffer;
-import java.nio.LongBuffer;
-import java.nio.ShortBuffer;
-
-import com.esotericsoftware.kryo.KryoException;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.util.UnsafeUtil;
 
 /**
- * An InputStream that reads data from a byte array and optionally fills the byte array from another InputStream as needed.
- * Utility methods are provided for efficiently reading primitive types and strings.
- * 
+ * A Kryo-4.0.3 ByteBufferInput adapted to deal with
+ * <a href="https://github.com/EsotericSoftware/kryo/issues/505">issue 505</a>.
+ *
  * @author Roman Levenstein &lt;romixlev@gmail.com&gt;
+ * @author Robert Varga
  */
-public class ByteBufferInput extends Input {
-       protected ByteBuffer niobuffer;
-
-       protected boolean varIntsEnabled = true;
-
-       /* Default byte order is BIG_ENDIAN to be compatible to the base class */
-       ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
-
-       protected final static ByteOrder nativeOrder = ByteOrder.nativeOrder();
-
-    //See: https://stackoverflow.com/questions/3038392/do-java-arrays-have-a-maximum-size
-    static final int MAX_SAFE_ARRAY_SIZE = Integer.MAX_VALUE - 8;
-
-       /** Creates an uninitialized Input. A buffer must be set before the Input is used.
-        * @see #setBuffer(ByteBuffer) */
-       public ByteBufferInput () {
-       }
-
-       /** Creates a new Input for reading from a byte array.
-        * @param bufferSize The size of the buffer. An exception is thrown if more bytes than this are read. */
-       public ByteBufferInput (int bufferSize) {
-               this.capacity = bufferSize;
-               niobuffer = ByteBuffer.allocateDirect(bufferSize);
-               niobuffer.order(byteOrder);
-       }
-
-       public ByteBufferInput (byte[] buffer) {
-               setBuffer(buffer);
-       }
-
-       /** Creates a new Input for reading from a ByteBuffer. */
+public class ByteBufferInput extends com.esotericsoftware.kryo.io.ByteBufferInput {
        public ByteBufferInput (ByteBuffer buffer) {
-               setBuffer(buffer);
-       }
-
-       /** Creates a new Input for reading from an InputStream with a buffer size of 4096. */
-       public ByteBufferInput (InputStream inputStream) {
-               this(4096);
-               if (inputStream == null) throw new IllegalArgumentException("inputStream cannot be null.");
-               this.inputStream = inputStream;
-       }
-
-       /** Creates a new Input for reading from an InputStream. */
-       public ByteBufferInput (InputStream inputStream, int bufferSize) {
-               this(bufferSize);
-               if (inputStream == null) throw new IllegalArgumentException("inputStream cannot be null.");
-               this.inputStream = inputStream;
-       }
-
-       public ByteOrder order () {
-               return byteOrder;
-       }
-
-       public void order (ByteOrder byteOrder) {
-               this.byteOrder = byteOrder;
-       }
-
-       /** Sets a new buffer, discarding any previous buffer. The position and total are reset. */
-       public void setBuffer (byte[] bytes) {
-               ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);
-               directBuffer.put(bytes);
-               directBuffer.position(0);
-               directBuffer.limit(bytes.length);
-               directBuffer.order(byteOrder);
-               setBuffer(directBuffer);
-       }
-
-       /** Sets a new buffer, discarding any previous buffer. The byte order, position, limit and capacity are set to match the
-        * specified buffer. The total is reset. The {@link #setInputStream(InputStream) InputStream} is set to null. */
-       public void setBuffer (ByteBuffer buffer) {
-               if (buffer == null) throw new IllegalArgumentException("buffer cannot be null.");
-               niobuffer = buffer;
-               position = buffer.position();
-               limit = buffer.limit();
-               capacity = buffer.capacity();
-               byteOrder = buffer.order();
-               total = 0;
-               inputStream = null;
-       }
-
-       /** Releases a direct buffer. {@link #setBuffer(ByteBuffer)} must be called before any write operations can be performed. */
-       public void release () {
-               close();
-               UnsafeUtil.releaseBuffer(niobuffer);
-               niobuffer = null;
-       }
-
-       /** This constructor allows for creation of a direct ByteBuffer of a given size at a given address.
-        * 
-        * <p>
-        * Typical usage could look like this snippet:
-        * 
-        * <pre>
-        * // Explicitly allocate memory
-        * long bufAddress = UnsafeUtil.unsafe().allocateMemory(4096);
-        * // Create a ByteBufferInput using the allocated memory region
-        * ByteBufferInput buffer = new ByteBufferInput(bufAddress, 4096);
-        * 
-        * // Do some operations on this buffer here
-        * 
-        * // Say that ByteBuffer won't be used anymore
-        * buffer.release();
-        * // Release the allocated region
-        * UnsafeUtil.unsafe().freeMemory(bufAddress);
-        * </pre>
-        * 
-        * @param address starting address of a memory region pre-allocated using Unsafe.allocateMemory() */
-       public ByteBufferInput (long address, int size) {
-               setBuffer(UnsafeUtil.getDirectBufferAt(address, size));
-       }
-
-       public ByteBuffer getByteBuffer () {
-               return niobuffer;
-       }
-
-       public InputStream getInputStream () {
-               return inputStream;
-       }
-
-       /** Sets a new InputStream. The position and total are reset, discarding any buffered bytes.
-        * @param inputStream May be null. */
-       public void setInputStream (InputStream inputStream) {
-               this.inputStream = inputStream;
-               limit = 0;
-               rewind();
-       }
-
-       public void rewind () {
-               super.rewind();
-               niobuffer.position(0);
-       }
-
-       /** Fills the buffer with more bytes. Can be overridden to fill the bytes from a source other than the InputStream. */
-       protected int fill (ByteBuffer buffer, int offset, int count) throws KryoException {
-               if (inputStream == null) return -1;
-               try {
-                       byte[] tmp = new byte[count];
-                       int result = inputStream.read(tmp, 0, count);
-                       buffer.position(offset);
-                       if (result >= 0) {
-                               buffer.put(tmp, 0, result);
-                               buffer.position(offset);
-                       }
-                       return result;
-               } catch (IOException ex) {
-                       throw new KryoException(ex);
-               }
-       }
-
-       /** @param required Must be > 0. The buffer is filled until it has at least this many bytes.
-        * @return the number of bytes remaining.
-        * @throws KryoException if EOS is reached before required bytes are read (buffer underflow). */
-       final protected int require (int required) throws KryoException {
-               int remaining = limit - position;
-               if (remaining >= required) return remaining;
-               if (required > capacity) throw new KryoException("Buffer too small: capacity: " + capacity + ", required: " + required);
-
-               int count;
-               // Try to fill the buffer.
-               if (remaining > 0) {
-                       count = fill(niobuffer, limit, capacity - limit);
-                       if (count == -1) throw new KryoException("Buffer underflow.");
-                       remaining += count;
-                       if (remaining >= required) {
-                               limit += count;
-                               return remaining;
-                       }
-               }
-
-               // Compact. Position after compaction can be non-zero
-               niobuffer.position(position);
-               niobuffer.compact();
-               total += position;
-               position = 0;
-
-               while (true) {
-                       count = fill(niobuffer, remaining, capacity - remaining);
-                       if (count == -1) {
-                               if (remaining >= required) break;
-                               throw new KryoException("Buffer underflow.");
-                       }
-                       remaining += count;
-                       if (remaining >= required) break; // Enough has been read.
-               }
-               limit = remaining;
-               niobuffer.position(0);
-               return remaining;
-       }
-
-       /** @param optional Try to fill the buffer with this many bytes.
-        * @return the number of bytes remaining, but not more than optional, or -1 if the EOS was reached and the buffer is empty. */
-       private int optional (int optional) throws KryoException {
-               int remaining = limit - position;
-               if (remaining >= optional) return optional;
-               optional = Math.min(optional, capacity);
-
-               // Try to fill the buffer.
-               int count = fill(niobuffer, limit, capacity - limit);
-               if (count == -1) return remaining == 0 ? -1 : Math.min(remaining, optional);
-               remaining += count;
-               if (remaining >= optional) {
-                       limit += count;
-                       return optional;
-               }
-
-               // Compact.
-               niobuffer.compact();
-               total += position;
-               position = 0;
-
-               while (true) {
-                       count = fill(niobuffer, remaining, capacity - remaining);
-                       if (count == -1) break;
-                       remaining += count;
-                       if (remaining >= optional) break; // Enough has been read.
-               }
-               limit = remaining;
-               niobuffer.position(position);
-               return remaining == 0 ? -1 : Math.min(remaining, optional);
-       }
-
-       // InputStream
-
-       /** Reads a single byte as an int from 0 to 255, or -1 if there are no more bytes are available. */
-       public int read () throws KryoException {
-               if (optional(1) <= 0) return -1;
-               niobuffer.position(position);
-               position++;
-               return niobuffer.get() & 0xFF;
-       }
-
-       /** Reads bytes.length bytes or less and writes them to the specified byte[], starting at 0, and returns the number of bytes
-        * read. */
-       public int read (byte[] bytes) throws KryoException {
-               niobuffer.position(position);
-               return read(bytes, 0, bytes.length);
-       }
-
-       /** Reads count bytes or less and writes them to the specified byte[], starting at offset, and returns the number of bytes read
-        * or -1 if no more bytes are available. */
-       public int read (byte[] bytes, int offset, int count) throws KryoException {
-               niobuffer.position(position);
-               if (bytes == null) throw new IllegalArgumentException("bytes cannot be null.");
-               int startingCount = count;
-               int copyCount = Math.min(limit - position, count);
-               while (true) {
-                       niobuffer.get(bytes, offset, copyCount);
-                       position += copyCount;
-                       count -= copyCount;
-                       if (count == 0) break;
-                       offset += copyCount;
-                       copyCount = optional(count);
-                       if (copyCount == -1) {
-                               // End of data.
-                               if (startingCount == count) return -1;
-                               break;
-                       }
-                       if (position == limit) break;
-               }
-               return startingCount - count;
-       }
-
-       public void setPosition (int position) {
-               this.position = position;
-               this.niobuffer.position(position);
-       }
-
-       /** Sets the limit in the buffer. */
-       public void setLimit (int limit) {
-               this.limit = limit;
-               this.niobuffer.limit(limit);
-       }
-
-       public void skip (int count) throws KryoException {
-               super.skip(count);
-               niobuffer.position(this.position());
-       }
-
-       /** Discards the specified number of bytes. */
-       public long skip (long count) throws KryoException {
-               long remaining = count;
-               while (remaining > 0) {
-                       int skip = (int)Math.min(MAX_SAFE_ARRAY_SIZE, remaining);
-                       skip(skip);
-                       remaining -= skip;
-               }
-               return count;
-       }
-
-       /** Closes the underlying InputStream, if any. */
-       public void close () throws KryoException {
-               if (inputStream != null) {
-                       try {
-                               inputStream.close();
-                       } catch (IOException ignored) {
-                       }
-               }
-       }
-
-       // byte
-
-       /** Reads a single byte. */
-       public byte readByte () throws KryoException {
-               niobuffer.position(position);
-               require(1);
-               position++;
-               return niobuffer.get();
-       }
-
-       /** Reads a byte as an int from 0 to 255. */
-       public int readByteUnsigned () throws KryoException {
-               // buffer.position(position);
-               require(1);
-               position++;
-               return niobuffer.get() & 0xFF;
-       }
-
-       /** Reads the specified number of bytes into a new byte[]. */
-       public byte[] readBytes (int length) throws KryoException {
-               byte[] bytes = new byte[length];
-               readBytes(bytes, 0, length);
-               return bytes;
-       }
-
-       /** Reads bytes.length bytes and writes them to the specified byte[], starting at index 0. */
-       public void readBytes (byte[] bytes) throws KryoException {
-               readBytes(bytes, 0, bytes.length);
-       }
-
-       /** Reads count bytes and writes them to the specified byte[], starting at offset. */
-       public void readBytes (byte[] bytes, int offset, int count) throws KryoException {
-               if (bytes == null) throw new IllegalArgumentException("bytes cannot be null.");
-               int copyCount = Math.min(limit - position, count);
-               while (true) {
-                       niobuffer.get(bytes, offset, copyCount);
-                       position += copyCount;
-                       count -= copyCount;
-                       if (count == 0) break;
-                       offset += copyCount;
-                       copyCount = Math.min(count, capacity);
-                       require(copyCount);
-               }
-       }
-
-       public int readInt () throws KryoException {
-               require(4);
-               position += 4;
-               return niobuffer.getInt();
-       }
-
-       public int readInt (boolean optimizePositive) throws KryoException {
-               if (varIntsEnabled)
-                       return readVarInt(optimizePositive);
-               else
-                       return readInt();
-       }
-
-       public int readVarInt (boolean optimizePositive) throws KryoException {
-               niobuffer.position(position);
-               if (require(1) < 5) return readInt_slow(optimizePositive);
-               position++;
-               int b = niobuffer.get();
-               int result = b & 0x7F;
-               if ((b & 0x80) != 0) {
-                       position++;
-                       b = niobuffer.get();
-                       result |= (b & 0x7F) << 7;
-                       if ((b & 0x80) != 0) {
-                               position++;
-                               b = niobuffer.get();
-                               result |= (b & 0x7F) << 14;
-                               if ((b & 0x80) != 0) {
-                                       position++;
-                                       b = niobuffer.get();
-                                       result |= (b & 0x7F) << 21;
-                                       if ((b & 0x80) != 0) {
-                                               position++;
-                                               b = niobuffer.get();
-                                               result |= (b & 0x7F) << 28;
-                                       }
-                               }
-                       }
-               }
-               return optimizePositive ? result : ((result >>> 1) ^ -(result & 1));
-       }
-
-       private int readInt_slow (boolean optimizePositive) {
-               // The buffer is guaranteed to have at least 1 byte.
-               position++;
-               int b = niobuffer.get();
-               int result = b & 0x7F;
-               if ((b & 0x80) != 0) {
-                       require(1);
-                       position++;
-                       b = niobuffer.get();
-                       result |= (b & 0x7F) << 7;
-                       if ((b & 0x80) != 0) {
-                               require(1);
-                               position++;
-                               b = niobuffer.get();
-                               result |= (b & 0x7F) << 14;
-                               if ((b & 0x80) != 0) {
-                                       require(1);
-                                       position++;
-                                       b = niobuffer.get();
-                                       result |= (b & 0x7F) << 21;
-                                       if ((b & 0x80) != 0) {
-                                               require(1);
-                                               position++;
-                                               b = niobuffer.get();
-                                               result |= (b & 0x7F) << 28;
-                                       }
-                               }
-                       }
-               }
-               return optimizePositive ? result : ((result >>> 1) ^ -(result & 1));
-       }
-
-       /** Returns true if enough bytes are available to read an int with {@link #readInt(boolean)}. */
-       public boolean canReadInt () throws KryoException {
-               if (limit - position >= 5) return true;
-               if (optional(5) <= 0) return false;
-               int p = position;
-               if ((niobuffer.get(p++) & 0x80) == 0) return true;
-               if (p == limit) return false;
-               if ((niobuffer.get(p++) & 0x80) == 0) return true;
-               if (p == limit) return false;
-               if ((niobuffer.get(p++) & 0x80) == 0) return true;
-               if (p == limit) return false;
-               if ((niobuffer.get(p++) & 0x80) == 0) return true;
-               if (p == limit) return false;
-               return true;
+               super(buffer);
        }
 
-       /** Returns true if enough bytes are available to read a long with {@link #readLong(boolean)}. */
-       public boolean canReadLong () throws KryoException {
-               if (limit - position >= 9) return true;
-               if (optional(5) <= 0) return false;
-               int p = position;
-               if ((niobuffer.get(p++) & 0x80) == 0) return true;
-               if (p == limit) return false;
-               if ((niobuffer.get(p++) & 0x80) == 0) return true;
-               if (p == limit) return false;
-               if ((niobuffer.get(p++) & 0x80) == 0) return true;
-               if (p == limit) return false;
-               if ((niobuffer.get(p++) & 0x80) == 0) return true;
-               if (p == limit) return false;
-               if ((niobuffer.get(p++) & 0x80) == 0) return true;
-               if (p == limit) return false;
-               if ((niobuffer.get(p++) & 0x80) == 0) return true;
-               if (p == limit) return false;
-               if ((niobuffer.get(p++) & 0x80) == 0) return true;
-               if (p == limit) return false;
-               if ((niobuffer.get(p++) & 0x80) == 0) return true;
-               if (p == limit) return false;
-               return true;
-       }
-
-       /** Reads the length and string of UTF8 characters, or null. This can read strings written by
-        * {@link Output#writeString(String)} , {@link Output#writeString(CharSequence)}, and {@link Output#writeAscii(String)}.
-        * @return May be null. */
+       @Override
        public String readString () {
                niobuffer.position(position);
                int available = require(1);
@@ -645,12 +177,12 @@ public class ByteBufferInput extends Input {
                        end++;
                        b = niobuffer.get();
                } while ((b & 0x80) == 0);
-               niobuffer.put(end - 1, (byte)(niobuffer.get(end - 1) & 0x7F)); // Mask end of ascii bit.
-               byte[] tmp = new byte[end - start];
+               int count = end - start;
+               byte[] tmp = new byte[count];
                niobuffer.position(start);
                niobuffer.get(tmp);
-               String value = new String(tmp, 0, 0, end - start);
-               niobuffer.put(end - 1, (byte)(niobuffer.get(end - 1) | 0x80));
+               tmp[count - 1] &= 0x7F;  // Mask end of ascii bit.
+               String value = new String(tmp, 0, 0, count);
                position = end;
                niobuffer.position(position);
                return value;
@@ -685,9 +217,7 @@ public class ByteBufferInput extends Input {
                return new String(chars, 0, charCount);
        }
 
-       /** Reads the length and string of UTF8 characters, or null. This can read strings written by
-        * {@link Output#writeString(String)} , {@link Output#writeString(CharSequence)}, and {@link Output#writeAscii(String)}.
-        * @return May be null. */
+       @Override
        public StringBuilder readStringBuilder () {
                niobuffer.position(position);
                int available = require(1);
@@ -709,276 +239,4 @@ public class ByteBufferInput extends Input {
                builder.append(chars, 0, charCount);
                return builder;
        }
-
-       /** Reads a 4 byte float. */
-       public float readFloat () throws KryoException {
-               require(4);
-               position += 4;
-               return niobuffer.getFloat();
-       }
-
-       /** Reads a 1-5 byte float with reduced precision. */
-       public float readFloat (float precision, boolean optimizePositive) throws KryoException {
-               return readInt(optimizePositive) / (float)precision;
-       }
-
-       /** Reads a 2 byte short. */
-       public short readShort () throws KryoException {
-               require(2);
-               position += 2;
-               return niobuffer.getShort();
-       }
-
-       /** Reads a 2 byte short as an int from 0 to 65535. */
-       public int readShortUnsigned () throws KryoException {
-               require(2);
-               position += 2;
-               return niobuffer.getShort();
-       }
-
-       /** Reads an 8 byte long. */
-       public long readLong () throws KryoException {
-               require(8);
-               position += 8;
-               return niobuffer.getLong();
-       }
-
-       /** {@inheritDoc} */
-       public long readLong (boolean optimizePositive) throws KryoException {
-               if (varIntsEnabled)
-                       return readVarLong(optimizePositive);
-               else
-                       return readLong();
-       }
-
-       /** {@inheritDoc} */
-       public long readVarLong (boolean optimizePositive) throws KryoException {
-               niobuffer.position(position);
-               if (require(1) < 9) return readLong_slow(optimizePositive);
-               position++;
-               int b = niobuffer.get();
-               long result = b & 0x7F;
-               if ((b & 0x80) != 0) {
-                       position++;
-                       b = niobuffer.get();
-                       result |= (b & 0x7F) << 7;
-                       if ((b & 0x80) != 0) {
-                               position++;
-                               b = niobuffer.get();
-                               result |= (b & 0x7F) << 14;
-                               if ((b & 0x80) != 0) {
-                                       position++;
-                                       b = niobuffer.get();
-                                       result |= (b & 0x7F) << 21;
-                                       if ((b & 0x80) != 0) {
-                                               position++;
-                                               b = niobuffer.get();
-                                               result |= (long)(b & 0x7F) << 28;
-                                               if ((b & 0x80) != 0) {
-                                                       position++;
-                                                       b = niobuffer.get();
-                                                       result |= (long)(b & 0x7F) << 35;
-                                                       if ((b & 0x80) != 0) {
-                                                               position++;
-                                                               b = niobuffer.get();
-                                                               result |= (long)(b & 0x7F) << 42;
-                                                               if ((b & 0x80) != 0) {
-                                                                       position++;
-                                                                       b = niobuffer.get();
-                                                                       result |= (long)(b & 0x7F) << 49;
-                                                                       if ((b & 0x80) != 0) {
-                                                                               position++;
-                                                                               b = niobuffer.get();
-                                                                               result |= (long)b << 56;
-                                                                       }
-                                                               }
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-               }
-               if (!optimizePositive) result = (result >>> 1) ^ -(result & 1);
-               return result;
-       }
-
-       private long readLong_slow (boolean optimizePositive) {
-               // The buffer is guaranteed to have at least 1 byte.
-               position++;
-               int b = niobuffer.get();
-               long result = b & 0x7F;
-               if ((b & 0x80) != 0) {
-                       require(1);
-                       position++;
-                       b = niobuffer.get();
-                       result |= (b & 0x7F) << 7;
-                       if ((b & 0x80) != 0) {
-                               require(1);
-                               position++;
-                               b = niobuffer.get();
-                               result |= (b & 0x7F) << 14;
-                               if ((b & 0x80) != 0) {
-                                       require(1);
-                                       position++;
-                                       b = niobuffer.get();
-                                       result |= (b & 0x7F) << 21;
-                                       if ((b & 0x80) != 0) {
-                                               require(1);
-                                               position++;
-                                               b = niobuffer.get();
-                                               result |= (long)(b & 0x7F) << 28;
-                                               if ((b & 0x80) != 0) {
-                                                       require(1);
-                                                       position++;
-                                                       b = niobuffer.get();
-                                                       result |= (long)(b & 0x7F) << 35;
-                                                       if ((b & 0x80) != 0) {
-                                                               require(1);
-                                                               position++;
-                                                               b = niobuffer.get();
-                                                               result |= (long)(b & 0x7F) << 42;
-                                                               if ((b & 0x80) != 0) {
-                                                                       require(1);
-                                                                       position++;
-                                                                       b = niobuffer.get();
-                                                                       result |= (long)(b & 0x7F) << 49;
-                                                                       if ((b & 0x80) != 0) {
-                                                                               require(1);
-                                                                               position++;
-                                                                               b = niobuffer.get();
-                                                                               result |= (long)b << 56;
-                                                                       }
-                                                               }
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-               }
-               if (!optimizePositive) result = (result >>> 1) ^ -(result & 1);
-               return result;
-       }
-
-       /** Reads a 1 byte boolean. */
-       public boolean readBoolean () throws KryoException {
-               require(1);
-               position++;
-               return niobuffer.get() == 1 ? true : false;
-       }
-
-       /** Reads a 2 byte char. */
-       public char readChar () throws KryoException {
-               require(2);
-               position += 2;
-               return niobuffer.getChar();
-       }
-
-       /** Reads an 8 bytes double. */
-       public double readDouble () throws KryoException {
-               require(8);
-               position += 8;
-               return niobuffer.getDouble();
-       }
-
-       /** Reads a 1-9 byte double with reduced precision. */
-       public double readDouble (double precision, boolean optimizePositive) throws KryoException {
-               return readLong(optimizePositive) / (double)precision;
-       }
-
-       // Methods implementing bulk operations on arrays of primitive types
-
-       /** Bulk input of an int array. */
-       public int[] readInts (int length) throws KryoException {
-               if (capacity - position >= length * 4 && isNativeOrder()) {
-                       int[] array = new int[length];
-                       IntBuffer buf = niobuffer.asIntBuffer();
-                       buf.get(array);
-                       position += length * 4;
-                       niobuffer.position(position);
-                       return array;
-               } else
-                       return super.readInts(length);
-       }
-
-       /** Bulk input of a long array. */
-       public long[] readLongs (int length) throws KryoException {
-               if (capacity - position >= length * 8 && isNativeOrder()) {
-                       long[] array = new long[length];
-                       LongBuffer buf = niobuffer.asLongBuffer();
-                       buf.get(array);
-                       position += length * 8;
-                       niobuffer.position(position);
-                       return array;
-               } else
-                       return super.readLongs(length);
-       }
-
-       /** Bulk input of a float array. */
-       public float[] readFloats (int length) throws KryoException {
-               if (capacity - position >= length * 4 && isNativeOrder()) {
-                       float[] array = new float[length];
-                       FloatBuffer buf = niobuffer.asFloatBuffer();
-                       buf.get(array);
-                       position += length * 4;
-                       niobuffer.position(position);
-                       return array;
-               } else
-                       return super.readFloats(length);
-       }
-
-       /** Bulk input of a short array. */
-       public short[] readShorts (int length) throws KryoException {
-               if (capacity - position >= length * 2 && isNativeOrder()) {
-                       short[] array = new short[length];
-                       ShortBuffer buf = niobuffer.asShortBuffer();
-                       buf.get(array);
-                       position += length * 2;
-                       niobuffer.position(position);
-                       return array;
-               } else
-                       return super.readShorts(length);
-       }
-
-       /** Bulk input of a char array. */
-       public char[] readChars (int length) throws KryoException {
-               if (capacity - position >= length * 2 && isNativeOrder()) {
-                       char[] array = new char[length];
-                       CharBuffer buf = niobuffer.asCharBuffer();
-                       buf.get(array);
-                       position += length * 2;
-                       niobuffer.position(position);
-                       return array;
-               } else
-                       return super.readChars(length);
-       }
-
-       /** Bulk input of a double array. */
-       public double[] readDoubles (int length) throws KryoException {
-               if (capacity - position >= length * 8 && isNativeOrder()) {
-                       double[] array = new double[length];
-                       DoubleBuffer buf = niobuffer.asDoubleBuffer();
-                       buf.get(array);
-                       position += length * 8;
-                       niobuffer.position(position);
-                       return array;
-               } else
-                       return super.readDoubles(length);
-       }
-
-       private boolean isNativeOrder () {
-               return byteOrder == nativeOrder;
-       }
-
-       /** Return current setting for variable length encoding of integers
-        * @return current setting for variable length encoding of integers */
-       public boolean getVarIntsEnabled () {
-               return varIntsEnabled;
-       }
-
-       /** Controls if a variable length encoding for integer types should be used when serializers suggest it.
-        * 
-        * @param varIntsEnabled */
-       public void setVarIntsEnabled (boolean varIntsEnabled) {
-               this.varIntsEnabled = varIntsEnabled;
-       }
 }
index fb910cb291989ea12c40cc81aa6be3b308605e51..c84dd2086ca6048ebfba497f4509c74bf0b82a81 100644 (file)
@@ -21,7 +21,6 @@ import static java.util.Objects.requireNonNull;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Registration;
 import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.ByteBufferInputStream;
 import com.esotericsoftware.kryo.io.ByteBufferOutput;
 import com.esotericsoftware.kryo.pool.KryoCallback;
 import com.esotericsoftware.kryo.pool.KryoFactory;
@@ -143,14 +142,14 @@ final class KryoJournalSerdes implements JournalSerdes, KryoFactory, KryoPool {
 
     @Override
     public <T> T deserialize(final ByteBuffer buffer) {
-        return kryoInputPool.run(input -> {
-            input.setInputStream(new ByteBufferInputStream(buffer));
-            return kryoPool.run(kryo -> {
-                @SuppressWarnings("unchecked")
-                T obj = (T) kryo.readClassAndObject(input);
-                return obj;
-            });
-        }, DEFAULT_BUFFER_SIZE);
+        Kryo kryo = borrow();
+        try {
+            @SuppressWarnings("unchecked")
+            T obj = (T) kryo.readClassAndObject(new ByteBufferInput(buffer));
+            return obj;
+        } finally {
+            release(kryo);
+        }
     }
 
     @Override
@@ -163,7 +162,7 @@ final class KryoJournalSerdes implements JournalSerdes, KryoFactory, KryoPool {
         Kryo kryo = borrow();
         try {
             @SuppressWarnings("unchecked")
-            T obj = (T) kryo.readClassAndObject(new ByteBufferInput(stream, bufferSize));
+            T obj = (T) kryo.readClassAndObject(new com.esotericsoftware.kryo.io.ByteBufferInput(stream, bufferSize));
             return obj;
         } finally {
             release(kryo);