8. 数据缓冲器和编解码器

Java NIO 提供了ByteBuffer但许多库在其上构建自己的字节缓冲区 API,特别是对于重用缓冲区和/或使用直接缓冲区有利于性能的网络操作。例如,Netty 有ByteBuf层次结构,Undertow 使用 XNIO,Jetty 使用池化字节缓冲区和要释放的回调,等等。spring-core模块提供了一组抽象来处理各种字节缓冲区 API,如下所示:

8.1. DataBufferFactory

DataBufferFactory用于通过以下两种方式之一创建数据缓冲区:

  1. 分配一个新的数据缓冲区,如果知道的话,可以选择预先指定容量,即使实现DataBuffer可以按需增长和缩小,这也会更有效。

  2. 包装现有的byte[]java.nio.ByteBuffer,它使用DataBuffer实现装饰给定数据并且不涉及分配。

请注意,WebFlux 应用程序不会直接创建 DataBufferFactory ,而是通过客户端上的ServerHttpResponseClientHttpRequest访问它。工厂的类型取决于底层的客户端或服务器,例如对于 Reactor Netty是 NettyDataBufferFactory,对于其他的是DefaultDataBufferFactory

8.2. DataBuffer

DataBuffer 接口提供与java.nio.ByteBuffer类似的操作,但还带来了一些额外的好处,其中一些好处是受到 Netty ByteBuf 的启发。以下是部分功能列表:

  • 以独立位置读取和写入,即不需要调用flip()来在读取和写入之间交替。

  • 容量随需扩展java.lang.StringBuilder

  • 通过PooledDataBuffer池化缓冲区和引用计数

  • 将缓冲区视为java.nio.ByteBufferInputStreamOutputStream

  • 确定给定字节的索引或最后一个索引。

8.3.PooledDataBuffer

正如 ByteBuffer的 Javadoc 中所解释的,字节缓冲区可以是直接的或非直接的。直接缓冲区可以驻留在 Java 堆之外,这消除了对本地 I/O 操作进行复制的需要。这使得直接缓冲区对于通过套接字接收和发送数据特别有用,但它们的创建和释放成本也更高,这导致了池化缓冲区的想法。

PooledDataBuffer是它的扩展,DataBuffer它有助于引用计数,这对于字节缓冲池至关重要。它是如何工作的?当 aPooledDataBuffer被分配时,引用计数为 1。调用retain()递增计数,调用release()递减计数。只要计数大于0,就保证缓冲区不会被释放。当计数减少到 0 时,可以释放池化缓冲区,这实际上可能意味着为缓冲区保留的内存返回到内存池。

请注意,PooledDataBuffer与其直接操作,在大多数情况下,最好使用DataBufferUtils应用版本中的便捷方法或 DataBuffer仅当它是PooledDataBuffer.

8.4.DataBufferUtils

DataBufferUtils提供了许多实用方法来操作数据缓冲区:

  • 如果底层字节缓冲区 API 支持,则将数据缓冲区流加入可能具有零副本的单个缓冲区,例如通过复合缓冲区。

  • InputStream或 NIOChannel变为Flux<DataBuffer>,反之亦然, Publisher<DataBuffer>变为OutputStream或 NIO Channel

  • 如果缓冲区是 PooledDataBuffer的实例,则释放或保留 DataBuffer的方法。

  • 跳过或从字节流中获取,直到特定的字节数。

8.5.编解码器

org.springframework.core.codec包提供以下策略接口:

  • Encoder:编码Publisher<T>成数据缓冲区流。

  • Decoder:解码Publisher<DataBuffer>成更高级别的对象流。

spring-core模块提供byte[]ByteBufferDataBufferResourceString编码器和解码器实现。spring-web模块添加了 Jackson JSON、Jackson Smile、JAXB2、Protocol Buffers 和其他编码器和解码器。请参阅 WebFlux 部分中的编解码器。

8.6. 使用DataBuffer

使用数据缓冲区时,必须特别注意确保缓冲区被释放,因为它们可能被池化。我们将使用编解码器来说明它是如何工作的,但这些概念更普遍适用。让我们看看编解码器必须在内部做什么来管理数据缓冲区。

Decoder是在创建更高级别对象之前最后读取输入数据缓冲区的,因此它必须按如下方式释放它们:

  1. 如果 Decoder简单地读取每个输入缓冲区并准备立即释放它,它可以通过DataBufferUtils.release(dataBuffer)方法.

  2. 如果 Decoder正在使用FluxMono操作符,例如flatMap, reduce,以及其他在内部预取和缓存数据项,或者正在使用操作符,例如 filter, skip,以及其他遗漏项,则 doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)方法必须被添加到组合链中以确保这些缓冲区在之前被释放被丢弃,也可能是由于错误或取消信号。

  3. 如果 Decoder以任何其他方式保留一个或多个数据缓冲区,则必须确保在完全读取时释放它们,或者在缓存数据缓冲区被读取和释放之前发生错误或取消信号的情况下。

请注意,DataBufferUtils#join提供了一种将数据缓冲区流聚合到单个数据缓冲区中的安全有效的方法。同样skipUntilByteCounttakeUntilByteCount是解码器使用的其他安全方法。

Encoder分配了其他必须读取(和释放)的数据缓冲区。所以 Encoder 没有什么可做的。但是,如果在用数据填充缓冲区时发生序列化错误,则Encoder必须注意释放数据缓冲区。例如:

DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
    // serialize and populate buffer..
    release = false;
}
finally {
    if (release) {
        DataBufferUtils.release(buffer);
    }
}
return buffer;

Encoder的消费者负责释放它接收到的数据缓冲区。在 WebFlux 应用程序中,Encoder的输出用于写入 HTTP 服务器响应或客户端 HTTP 请求,在这种情况下,释放数据缓冲区是写入服务器响应或客户端请求的代码的责任.

请注意,在 Netty 上运行时,有用于 解决缓冲区泄漏问题的调试选项。

最后更新于