注册

Okio-很棒的文章

前言


本文不去阐述 Okio 产生原因,也不去对比 Okio 与 Java 原生 IO 的优劣性,单纯分析 Okio 的实现,对每个关键点分析透彻,并配上精美图解。


本文分析点目录



  • Okio 类框架图
  • Source 与 Sink
  • Buffer
  • Segment
  • 具体分析 Buffer 的 write 方法,看数据流转过程

Okio 类框架图


Okio 整个框架的代码量并不大,体现了高内聚的设计,类框架大概如下:



图中体现了框架内部类之间的关系,整个框架做的事情大致就是:


Source 与 Sink


Source 与 Sink 是 Okio 中所有输入输出流的基类接口,类似 Java IO 中 InputStream,OutputStream。


Source,源的意思,也就是说我是数据源,你们要的数据都从我这来。


Sink,往下沉,往外运输的意思,从 Source 中读到数据后,通过我传出去。


分别定义了数据流的读写接口:


public interface Source extends Closeable {
/**
* Removes at least 1, and up to {@code byteCount} bytes from this and appends
* them to {@code sink}. Returns the number of bytes read, or -1 if this
* source is exhausted.
*/
long read(Buffer sink, long byteCount) throws IOException;

/** Returns the timeout for this source. */
Timeout timeout();

/**
* Closes this source and releases the resources held by this source. It is an
* error to read a closed source. It is safe to close a source more than once.
*/
@Override void close() throws IOException;
}
复制代码

public interface Sink extends Closeable, Flushable {
/** Removes {@code byteCount} bytes from {@code source} and appends them to this. */
void write(Buffer source, long byteCount) throws IOException;

/** Pushes all buffered bytes to their final destination. */
@Override void flush() throws IOException;

/** Returns the timeout for this sink. */
Timeout timeout();

/**
* Pushes all buffered bytes to their final destination and releases the
* resources held by this sink. It is an error to write a closed sink. It is
* safe to close a sink more than once.
*/
@Override void close() throws IOException;
}
复制代码

在 Okio 中 Source,Sink 也有几个实现类,我们只看 RealBufferedSource,RealBufferedSink。


RealBufferedSource


从名字中看出,带有缓冲的数据源(既然是源,那么你们想要的数据都从我这拿),但是它不是真正的数据源,真正的数据源是成员变量 source,它只是包装装饰了一下,
当 read 开头的方法被调用时,都是从成员变量 source 中读取,读到数据后,先存入成员变量 buffer 中,然后再从 buffer 中读数据。


来看 read 方法:


--> RealBufferedSource.java


final class RealBufferedSource implements BufferedSource {
// 缓冲,数据先放到 buffer 中,别人来读的时候先从 buffer 中读,没有的话再从 source 中读
public final Buffer buffer = new Buffer();
// 真正数据源,也是此 RealBufferedSource 的上游数据源,当 buffer 中没数据时,从 source 中读
public final Source source;
boolean closed;
...

@Override public long read(Buffer sink, long byteCount) throws IOException {
// 该方法作用:从当前 Source 中 读取 byteCount 个字节存到 sink 中
if (sink == null) throw new IllegalArgumentException("sink == null");
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");

// 若当前 buffer 中没数据,就从成员变量 source 读,读到了再存入 buffer 中
// 若当前 source 中也没数据,则返回-1
// 若当前 buffer 中有数据,则跳过这里,直接从 buffer 中读
if (buffer.size == 0) {
long read = source.read(buffer, Segment.SIZE);
if (read == -1) return -1;
}

// 到这里 buffer 中肯定是有数据的,取小者,因为 buffer 中的数据量可能不足 byteCount 个,谁小就用谁
long toRead = Math.min(byteCount, buffer.size);
// 将 buffer 中数据读到 sink 中
return buffer.read(sink, toRead);
}

}
复制代码

RealBufferedSink
从名字中看出,带有缓冲的输出流(既然是输出流,那么你们想要往外写的数据都通过我来写),但它不是真正的输出流,真正的输出流是 成员变量 sink,它只是包装装饰了一下,当 write 开头的方法被调用时,都是先将数据写到成员变量 buffer 中,然后再通过成员变量 sink 往外写。


来看 write 方法:


final class RealBufferedSink implements BufferedSink {
// 缓冲,当需要通过我进行数据输出时,数据会先存到 buffer 中,再通过 sink 输出
public final Buffer buffer = new Buffer();
// 真正输出流,也是此 RealBufferedSink 的下游,当需要进行数据输出时,通过 sink 输出
public final Sink sink;
boolean closed;
...
@Override public void write(Buffer source, long byteCount)
throws IOException {
if (closed) throw new IllegalStateException("closed");
// 将 source 中的数据写到 buffer 中
buffer.write(source, byteCount);
// 将 buffer 中的数据通过 sink 写出去,内部具体如何写的,在后续 Buffer 章节中详细分析
emitCompleteSegments();
}
复制代码

整个输入流到输出流可用如下图表示:



Buffer


缓冲的意思,Buffer 是 Okio 中的核心类,整个数据中转靠的就是它。


那么请问,我们为什么要 Buffer 这个东西?我们之前用 Java IO 时不用带 Buffer 的 InputStream 不也照样可以读么?


我们举个通俗的例子解答这个问题,假如,我们在果园里有一颗苹果树,想吃的时候,去摘一个,什么时候再想吃了,再去树上摘一个,那么,这样跑来跑去的不累么?每次还得跑到园子里。那我们何不先摘个十个八个的,放到箩筐里面带回家,想吃的时候,直接从箩筐里拿,就不必跑那么远到树上去摘了。


Buffer 就是扮演的上面的箩筐的角色,所以 Buffer 的存在是非常关键的,可以做到省时省力。


看下 Okio 中 Buffer:


--> Buffer.java


public final class Buffer implements BufferedSource, BufferedSink, Cloneable, ByteChannel {
// Buffer 实现了 BufferedSource,BufferedSink,也就是说,Buffer 既可以
// 作为 Source 的缓冲,也可以作为 Sink 的缓冲,
private static final byte[] DIGITS =
{ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
static final int REPLACEMENT_CHARACTER = '\ufffd';

// 关键点,存储数据链表头
@Nullable Segment head;
// 当前 buffer 中的字节数量
long size;
...

public Buffer() {
}
}

复制代码

Buffer 是用来缓冲数据的,要缓冲数据,就需要容器,那么它内部靠什么来存储数据?是 Segment。Buffer 中主要就是依靠 Segment 来存储数据,将 Segment 组成循环链表的结构,可用下图表示:



那我们就来看看 Segment 是什么,注意,Buffer 还没说完,等 Segment 分析完后,会再通过分析具体方法,来看看 Buffer 与 Segment 是如何配合完成工作的。


Segment


Segment 片段的意思,先看下类源码:


--> Segment.java


final class Segment {
// 每个 Segment 最大容量,也就是最多容纳 8192 个字节
static final int SIZE = 8192;

// 在 Segment 分割的场景(后面讲),如果要分割出去的字节数大于 1024 这个临界值,
// 那么直接共享 data,不要去做数组拷贝了
static final int SHARE_MINIMUM = 1024;

// 真正存放数据的地方
final byte[] data;

// data 中第一个可读位置,比如当前 data 中有 1024 个字节,pos = 0,被读取了一个字节后
// pos = 1,下次再读的话,就要从 1 开始读了
int pos;

// data 中第一个可写的位置,比如当前 data 中有 1024 个字节,那么第一个可写的位置是 1025
int limit;

// 此 Segment 是否在与别的 Segment 或者 ByteString 共享 data
boolean shared;

// 该 Segment 是否独享 data,独享时,可向 data 写入数据
boolean owner;

// 循环链表下一个节点
Segment next;

// 循环链表前一个节点
Segment prev;

Segment() {
this.data = new byte[SIZE];
this.owner = true;
this.shared = false;
}

Segment(byte[] data, int pos, int limit, boolean shared, boolean owner) {
this.data = data;
this.pos = pos;
this.limit = limit;
this.shared = shared;
this.owner = owner;
}
...
}
复制代码

看到 next,prev,就知道 Segment 会组成双向链表结构,只不过 Okio 中是双向循环链表,Segment 可用如下图表示:


Segment 内部定义了操作数据、操作链表的方法,我们着重分析以下几个方法:



  • push
  • pop
  • split
  • compact
  • writeTo

push


// 往当前链表中添加一个节点,放在被调用 Segment 之后
public final Segment push(Segment segment) {
segment.prev = this;
segment.next = next;
next.prev = segment;
next = segment;
return segment;
}
复制代码

pop


// 从当前链表中删除该节点
public final @Nullable Segment pop() {
Segment result = next != this ? next : null;
prev.next = next;
next.prev = prev;
next = null;
prev = null;
return result;
}
复制代码

上面两个方法是简单的链表增删操作,不需要注释,只需要注意以下几点:


push:


1、先将被push进来的节点的 prev、next 安顿好


2、再将与新节点相邻的前一个节点的 next 安顿好


3、最后将与新节点相邻的后一个节点的 prev 安顿好


pop:


1、先安顿前邻居的 next


2、再安顿后邻居的 prev


3、最后安顿自己的 prev、next


总之,在一个双向链表中增删操作,无非就是关心三个节点:自己、前邻居、后邻居,自己要解决prev next,前邻居解决next,后邻居解决 prev


split



public final Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix;

// We have two competing performance goals:
// 我们有两个竞争的性能目标
// - Avoid copying data. We accomplish this by sharing segments.
// - 避免数据的拷贝。我们通过共享 Segment 来达成这一目标
// - Avoid short shared segments. These are bad for performance because they are readonly and
// may lead to long chains of short segments.
// - 避免短的共享 Segment,这样会导致性能差,因为共享后,Segment 只能读不能写,并且
// 可能导致链表中短 Segment 链变的很长
// To balance these goals we only share segments when the copy will be large.
// 为了平衡这些目标,我们只在需要拷贝的数据量比较大时,才会采用共享 Segment 的方式
if (byteCount >= SHARE_MINIMUM) {
// 当要分割出去的数据大于SHARE_MINIMUM(1024)时,会采用共享 Segment 的方式创建新的Segment,
// 注意,共享的是 Segment 里面的 data,还是会创建一个新的 Segment,只不过 data 是同一个
prefix = sharedCopy();
} else {
// 当要分割的数据小于SHARE_MINIMUM(1024)时,那么直接 copy 吧,反正顶天不会超过 1024 个字节
prefix = SegmentPool.take();
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}

// 到这里,我们分割出来的 Segment 就创建好了,但是我们要始终注意的是:分割的是
// Segment 里的 data,data 被一刀两断了,那么两个 Segment 是不是要把 pos,limit 都调整下

// 调整新创建的 Segment 的 limit 为 pos + byteCount
prefix.limit = prefix.pos + byteCount;
// 调整原 Segment 的 pos 为 pos + byteCount
pos += byteCount;
// 加到该 Segment 的前节点后面,其实在 Okio 中,这个 prev 就是 tail,
// 你可能会问,这个方法哪个 Segment 对象都可以调用,这个 prev 是调用这个方法
// 的 Segment 的前节点,为啥会是 tail 节点?其实在 Okio 框架内部只有一个地方
// 调用了,就是在 Buffer 的 write(Buffer source, long byteCount) 里,
// 里面是用 head 去调用的,而 head 的 prev 是 tail
// 你可能又有疑问了,这方法是 public 的,外部都可以访问啊,但请看 Segment ,这家伙
// 不是 public 的,包内访问限制,外部无法使用,只能在 Okio 内使用,so...
prev.push(prefix);
return prefix;
}
复制代码

split 方法的过程可用下图表示:



split 过程完成后,Buffer 中 链表变化如下图:



那么这个 split 方法有何用呢? 上面注释中提到过,split 方法在 Okio 中唯一调用处在 Buffer 的 write(Buffer source, long byteCount),这个方法中的最后一段注释这样写道:


/**
* Occasionally we write only part of a source buffer to a sink buffer. For
* example, given a sink [51%, 91%], we may want to write the first 30% of
* a source [92%, 82%] to it. To simplify, we first transform the source to
* an equivalent buffer [30%, 62%, 82%] and then move the head segment,
* yielding sink [51%, 91%, 30%] and source [62%, 82%].
*/
复制代码

意思是:在某些场景下,我们只需要从 Source 中 写一部分数据 到 Sink 中,例如我们现在有一个 Sink,未读数据占比是[51%, 91%](注意,代表这个 Buffer 有两个 Segment,未读数据占比),我们可能想从一个[92%, 82%]的 Source 中往 Sink 中写30%的数据。为了简便,我们首先把 Source 转化为一个同等的 Buffer [30%, 62%, 82%],然后把头 Segment 移动到 Sink 中去(注意是移动,不是拷贝,这样就是一个指针操作),那么此时 Source 中只剩[62%, 82%]。


不难理解,无非就是把 Source 的两个 Segment 拆成三个,然后把其中一个移到 Sink 中,这样就避免了比较大的数据量的拷贝,只是移动了指针,在 split 中,当需要写的数据小于1024,才会有拷贝操作,大于1024时,直接共享数据,所以这里是一个性能提升的地方。


writeTo


// 把当前 Segment 数据写到 sink 中去
public final void writeTo(Segment sink, int byteCount) {
// 当 sink 不独享数据时,直接抛异常,因为不独享时,只可读,不可写
if (!sink.owner) throw new IllegalArgumentException();
// 若 sink 的可写空间不足了
if (sink.limit + byteCount > SIZE) {
// We can't fit byteCount bytes at the sink's current position. Shift sink first.
// 若 sink 与别的 Segment 共享数据,只读不写,直接抛异常
if (sink.shared) throw new IllegalArgumentException();
// 若将已读空间重复利用,sink空间还不够,抛异常
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}

System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;
}
}
复制代码

writeTo 过程可用如下图表示:



compact


// 压缩,将 tail 中数据往 prev 中转移
public final void compact() {
// 若前节点就是自己,那么此时链表中只有一个节点
if (prev == this) throw new IllegalStateException();
// 若前节点非独享,也就是不可写,直接return
if (!prev.owner) return; // Cannot compact: prev isn't writable.
// 当前节点的未读数据大小
int byteCount = limit - pos;
// 前节点的最大可写空间,记得我们上面的 writeTo方法吗,最大可写空间=可写空间+已读空间
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
// 若前节点的最大可写空间容不下即将要写入的数据,直接return
if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
// 到这里肯定是可以写入的,可不还是调用 writeTo 么
writeTo(prev, byteCount);
// 当前节点从链表中断开
pop();
// 从链表中断开了别扔了,回收,下次利用
SegmentPool.recycle(this);
}
复制代码

compact 是压缩的意思,在有的情况下,链表中的 Segment 利用率不高(可写的空间还有很多),这个时候我们能不能把后面一个节点的数据往这个节点里面压一压呢?以提高利用率,同时可以回收后一个节点,减小链表长度,一举两得。要注意的是,这个方法是 tail 节点调用。


compact过程可由下图表示:

Segment 提供了这些原子方法,让他人去调用吧。


Buffer 的 write 和 read 方法


之前在 Buffer 小节,还未说完,我们现在通过 Buffer 的 write 和 read 方法来具体分析从一个缓存到另一个缓存的读写过程。


write


// 从 source 中移动 byteCount 个字节到当前Buffer(去掉许多注释)
@Override public void write(Buffer source, long byteCount) {
// Move bytes from the head of the source buffer to the tail of this buffer
// 从 source 的 head 移动数据到当前Buffer的 tail中

if (source == null) throw new IllegalArgumentException("source == null");
if (source == this) throw new IllegalArgumentException("source == this");
// 检查偏移和移动数量,防止越界
checkOffsetAndCount(source.size, 0, byteCount);
// 为什么是while循环,因为byteCount可能很大,而Segment移动数据有限
while (byteCount > 0) {
// Is a prefix of the source's head segment all that we need to move?
// 如果source的头里面的未读数据就比byteCount大
if (byteCount < (source.head.limit - source.head.pos)) {
// 因为写都是往尾部写入,这里先找到当前Buffer的tail节点
Segment tail = head != null ? head.prev : null;
if (tail != null && tail.owner
&& (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
// Our existing segments are sufficient. Move bytes from source's head to our tail.
// 当前Buffer的tail节点的可写空间就够了,直接将数据写入tail就行,这里使用的是拷贝
source.head.writeTo(tail, (int) byteCount);
source.size -= byteCount;
size += byteCount;
return;
} else {
// We're going to need another segment. Split the source's head
// segment in two, then move the first of those two to this buffer.
// 将source的head一分为二,因为当前Buffer的tail容不下byteCount个字节了
// 当然,你可能会问,如果tail为null或者不满足上面的if里的任何一个条件都会走
// 这里的else,别忘了,这个else始终被外层的if条件约束着
source.head = source.head.split((int) byteCount);
}
}

// Remove the source's head segment and append it to our tail.
// 如过走了上面的if,那么这里的source.head已经是分割过的,如果没走if,什么都没干
Segment segmentToMove = source.head;
long movedByteCount = segmentToMove.limit - segmentToMove.pos;
// 将segmentToMove从source中移除,因为它将要加入到当前的Buffer中
source.head = segmentToMove.pop();
if (head == null) {
head = segmentToMove;
head.next = head.prev = head;
} else {
Segment tail = head.prev;
// 加入到当前Buffer中
tail = tail.push(segmentToMove);
// 再压缩一下,尽量让segment满载,提高利用率
tail.compact();
}

// 数据移动完了,设置source与当前Buffer的变量,source数据少了,这边多了
source.size -= movedByteCount;
size += movedByteCount;
byteCount -= movedByteCount;

//如果一趟没将byteCount个字节的数据移动完,再进行下一次循环
}
}
复制代码

read


public long read(Buffer sink, long byteCount) {
if (sink == null) throw new IllegalArgumentException("sink == null");
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (size == 0) return -1L;
if (byteCount > size) byteCount = size;
// 其实就是将当前Buffer中的数据写到sink中,这里好像与我们看到这个方法的第一反应
// 有点不同,我们可能想的是:既然是read,就是要从外头读数据,但是这里是往外头读
// 我们还是要理解source与sink的概念,从source读,往sink中写
sink.write(this, byteCount);
return byteCount;
}

作者:HuntX
链接:https://juejin.cn/post/6953205123803775006
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

0 个评论

要回复文章请先登录注册