注册

熟悉又陌生的Handler


熟悉又陌生的Handler-3

nativeInit:

在上文中,关于Handler三件套的创建流程,第一个涉及到的JNI调用就是MessageQueue的nativeInit方法。

MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}

具体实现在framework/base/core/jni/android_os_MessageQueue.cpp中:

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
// 首先创建NativeMessageQueue对象,该对象持有Native侧的Looper对象
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
//如果创建失败,直接异常
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
// 增加NativeMessageQueue的引用计数
nativeMessageQueue->incStrong(env);
// 返回nativeMessageQueue这个指针给Java层
return reinterpret_cast<jlong>(nativeMessageQueue);
}

在NativeMessageQueue的构造函数中:

NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
// 通过TLS(Thread Local Storage线程局部存储)获取当前线程的Native的Looper对象
mLooper = Looper::getForThread();
if (mLooper == NULL) {
// 如果没有,那么会去创建一个Native的Looper对象
mLooper = new Looper(false);
// 并将创建的Looper对象保存到TLS中
Looper::setForThread(mLooper);
}
}

需要注意这里的Looper对象和Java层的Looper对象并没有什么直接关联。在其构造方法中,最关联的两件事:

Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
// 初始化表示唤醒事件的文件描述符mWakeEventFd
// eventfd这个系统调用用于创建或者打开一个eventfd的文件,类似于文件的open操作
// 这里传入的初始值为0,然后设置的标志位为
// EFD_CLOEXEC:FD_CLOEXEC,简单说就是fork子进程时不继承,对于多线程的程序设上这个值不会有错的。
// EFD_NONBLOCK:
// 文件会被设置成O_NONBLOCK(非阻塞IO,读取不到数据的时候或写入缓冲区满了会return -1),
// 而不是阻塞调用,一般要设置。
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
AutoMutex _l(mLock);
// 重新创建当前Looper的Epoll事件
rebuildEpollLocked();
}

rebuildEpollLocked实现如下:

void Looper::rebuildEpollLocked() {
// 如果当前Looper已经有了EpollFd,即已经有了旧的epoll实例,那么先重置一下
if (mEpollFd >= 0) {
mEpollFd.reset();
}
// 创建新的Epoll实例
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
struct epoll_event eventItem;
// 初始化eventItem占用的内存空间
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
// EPOLLIN :表示对应的文件描述符可以读
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd.get();
// 调用epoll_ctl操作mEpollFd对应的Epoll实例,将mWakeEventFd(唤醒事件)
// 添加到mEpoll对应的epoll实例上
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
// 将Request也一并添加到epoll实例上
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}

epoll_event结构体如下:

struct epoll_event {
uint32_t events;
epoll_data_t data;
}

typedef union epoll_data {
void* ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

events成员变量:可以是以下几个宏的集合:

  • EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
  • EPOLLOUT:表示对应的文件描述符可以写;
  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
  • EPOLLERR:表示对应的文件描述符发生错误;
  • EPOLLHUP:表示对应的文件描述符被挂断;
  • EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。

既然Java层的MessageQueue在创建的时候,有创建Native层的MessageQueue,那么同样的Java层MQ在销毁的时候,也会触发NativeMessageQueue的销毁,Native层MQ的销毁比较简单,实质上就是Native层一个对象的清除:

  1. 移除对象的引用关系。
  2. delete调用清除对象的内存空间。

nativeWake:

从上文分析中,我们知道,在Java层当我们调用MessageQueue.enqueueMessage的时候,在Java层觉得需要唤醒消息队列的时候,会调用nativeWake这个native方法:

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
// 调用NativeMessageQueue的wake方法。
nativeMessageQueue->wake();
}

NativeMessageQueue的wake方法就是调Native Looper的wake方法:

void Looper::wake() {
uint64_t inc = 1;
// 向管道mWakeEventFd写入字符1
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}

TEMP_FAILURE_RETRY这个宏表达式的作用是,对传入的表达式求值,当传入的表达式求值为-1,则表示失败,当表达式返回值-1且设置了错误码为EINITR(4),那么他会一直重试,直到成功。

nativePollOnce:

从上文分析中,我们知道,在Java层消息队列处理Message之前,会先调用nativePollOnce,处理Native层的消息:

nativePollOnce(ptr, nextPollTimeoutMillis);

ptr是之前在Native层创建的MessageQueue的“指针”,nextPollTimeoutMillis表示下一条消息要被取出的时间。

在android_os_MessageQueue.cpp中,nativePollOnce实现如下:

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
// 将Java层传递过来的mPtr转换为NativeMessageQueue指针
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
// 调用nativeMessageQueue的pollOnce方法
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

pollOnce:

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
// ...
// 调用到Looper的pollOnce方法
mLooper->pollOnce(timeoutMillis);
// ...
}

Looper的pollOnce最终实现在system/core/libutils/Looper.cpp:

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
// 首先处理Native层的Response
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++)
// 当ident>=0的时候,表示没有callback
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
return ident;
}
}
// 如果有result,那么就退出了
if (result != 0) {
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
return result;
}
// 调用pollInner
result = pollInner(timeoutMillis);
}
}

Response和Request的结构体如下:

    struct Request {
// request关联的文件描述符
int fd;
// requestId,当为POLL_CALLBACK(-2)的时候,表示有callback
int ident;
int events;
int seq;
// request的处理回调
sp<LooperCallback> callback;
void* data;
void initEventItem(struct epoll_event* eventItem) const;
};
struct Response {
int events;
Request request;
};

Looper::pollInner的实现如下,内部首先是调用epoll_wait这个阻塞方法,获取Epoll事件发生的数量,然后根据这个数量,

int Looper::pollInner(int timeoutMillis) {
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
}

// Poll.
int result = POLL_WAKE;
// 上面的分析已知,在pollInner被调用之前,mResponses已经都被处理完了
mResponses.clear();
mResponseIndex = 0;
// 即将开始epoll轮询。
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// 等待epoll_wait系统调用返回,返回timeoutMillis时间内文件描述符mEpollFd上发生的epoll事件数量
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// 轮询结束,下面开始处理收到的事件。
mPolling = false;
// Acquire lock.
mLock.lock();

// 比如发生了什么异常,需要重新创建Epoll机制
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}

// 当epoll事件个数小于0的时候,即认为发生了异常,跳转到Done处继续执行
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
result = POLL_ERROR;
goto Done;
}

// 当epoll事件等于0的时候,表示轮询超时了,直接跳转到Done处继续执行
if (eventCount == 0) {
result = POLL_TIMEOUT;
goto Done;
}
// 开始循环遍历,处理所有的event
for (int i = 0; i < eventCount; i++) {
// 获取一个事件的FD
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
// 如果是唤醒事件
if (fd == mWakeEventFd.get()) {
if (epollEvents & EPOLLIN) {
// 此时已经被唤醒了,读取并清空管道中的数据
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
// 通过文件描述符,找到对应的Request索引
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
// 将对应的request封装成Response对象并push到mRequests这和Vector中
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
// Response事件都处理完了,接下来处理Native的Message事件。
mNextMessageUptime = LLONG_MAX;
// mMessageEnvelopes是一个Vector,MessageEnvelopes如其名消息信封
// 封装了Message和MessageHandler对象
// Message表示消息,MessageHandler定义了一个handleMessage方法
// 通过调用Looper::sendMessageXX可以发送一条Native Message
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// 取出来的一条消息到了可以被处理的时间,那么就移除并执行
// 对应MessageHandler的handleMessage方法。
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// 而如果队列头部的消息尚未到达需要被处理的时间
// 那么队列需要挂起到这个头部消息能被处理时候为止
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
// 接着处理上面push进来的mResponses,即Request
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
// 有callback
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
// 执行callback
int callbackResult = response.request.callback->handleEvent(fd, events, data)
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
// 清除callback的引用
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}

整个Native层Looper机制的重中之重就是Looper::pollInner中的epoll_wait这个系统调用,这个调用在消息队列没有工作需要处理的时候,会阻塞当前线程,释放系统资源,也就是说,Looper的死循环机制并不会一直占用系统资源,在没有任务需要处理的时候,主线程是阻塞状态的,因此并不会造成资源占用过高。

或者更通俗易懂的,我们看到的Looper.loop开启了一个死循环,这个死循环的的确确就是一个死循环。但是特别之处在于,不同于我们写一个无限循环,CPU会一直执行,然后导致资源占用激增,Looper.loop这个死循环,在没有消息需要处理的时候,就会阻塞停止,不再往epoll_wait后面执行。

而我们感受不到主线程停止,是因为,我们写的一个代码,执行都是被动的,我们在子线程post一个message,MessageQueue接收消息,主线程的Looper.loop执行代码,执行完代码后取下一条Message,没有Message,主线程继续阻塞。我们写的代码执行了,当然是感受不到主线程阻塞的。

小结:

通过对Native的pollOnce的分析,我们知道Android的消息处理机制实际上纵跨Java层和Native层的。

Java层和Native层,通过上面的一些JNI调用以及mPtr这个关键指针,将Java层的MessageQueue和Native的MessageQueue进行关联,这样在Java层的消息机制进行运转的时候,Native层的消息机制也能一起运转。

消息处理的流程是先处理Native的Message,然后再处理Native的Request,最后才会是pollOnce结束之后,处理Java层的Message,所以有时候Java层的消息并不多但是响应时间比较慢可能是Native层的消息机制导致的。

0 个评论

要回复文章请先登录注册