注册

高级线程应用之栅栏、信号量、调度组以及source(四)

四、Dispatch Source

在任一线程上调用它的的一个函数 dispatch_source_merge_data 后,会执行 dispatch source 事先定义好的句柄(可以把句柄简单理解为一个 block ) 这个过程叫 用户事件(Custom event)。是 dispatch source 支持处理的一种事件。

句柄是一种指向指针的指针,它指向的就是一个类或者结构,它和系统有很密切的关系。比如:实例句柄(HINSTANCE),位图句柄(HBITMAP),设备表述句柄(HDC),图标句柄(HICON)等。这当中还有一个通用的句柄,就是HANDLE

Dispatch Source有两点:

  • CPU 负荷非常小,尽量不占用资源 。
  • 联结的优势。
  • dispatch source不受runloop的影响,底层封装的是pthread

相关API

  • dispatch_source_create 创建源
  • dispatch_source_set_event_handler 设置源事件回调
  • dispatch_source_merge_data 源事件设置数据
  • dispatch_source_get_data 获取源事件数据
  • dispatch_resume 继续
  • dispatch_suspend 挂起

4.1 应用

dispatch_source_t
dispatch_source_create(dispatch_source_type_t type,
uintptr_t handle,
uintptr_t mask,
dispatch_queue_t _Nullable queue);
  • typedispatch 源可处理的事件。比如:DISPATCH_SOURCE_TYPE_TIMERDISPATCH_SOURCE_TYPE_DATA_ADD
    • DISPATCH_SOURCE_TYPE_DATA_ADD: 将所有触发结果相加,最后统一执行响应。间隔的时间越长,则每次触发都会响应;如果间隔的时间很短,则会将触发后的结果相加后统一触发。也就是利用CPU空闲时间进行回调。
  • handle:可以理解为句柄、索引或id,如果要监听进程,需要传入进程的ID
  • mask:可以理解为描述,具体要监听什么。
  • queue:处理handle的队列。

有如下一个进度条的案例:

self.completed = 0;
self.queue = dispatch_queue_create("HotpotCat", NULL);
self.source = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_ADD, 0, 0, dispatch_get_main_queue());
//设置句柄
dispatch_source_set_event_handler(self.source, ^{
NSLog(@"%@",[NSThread currentThread]);
NSUInteger value = dispatch_source_get_data(self.source);
self.completed += value;
double progress = self.completed / 100.0;
NSLog(@"progress: %.2f",progress);
self.progressView.progress = progress;
});
self.isRunning = YES;
//创建后默认是挂起状态
dispatch_resume(self.source);

创建了一个ADD类型的source,在handle获取进度增量并更新进度条。由于创建后source处于挂起状态,需要先恢复。

可以在按钮的点击事件中进行任务的挂起和恢复:

if (self.isRunning) {
dispatch_suspend(self.source);
dispatch_suspend(self.queue);
NSLog(@"pause");
self.isRunning = NO;
[sender setTitle:@"pause" forState:UIControlStateNormal];
} else {
dispatch_resume(self.source);
dispatch_resume(self.queue);
NSLog(@"running");
self.isRunning = YES;
[sender setTitle:@"running" forState:UIControlStateNormal];
}

任务的执行是一个简单的循环:

for (NSInteger i = 0; i < 100; i++) {
dispatch_async(self.queue, ^{
NSLog(@"merge");
//加不加 sleep 影响 handler 的执行次数。
sleep(1);
dispatch_source_merge_data(self.source, 1);//+1
});
}
  • 在循环中调用dispatch_source_merge_data触发回调。当queue挂起后后续任务就不再执行了。
  • 在不加sleep的情况下handler的回调是小于100次的,任务会被合并。

4.2 源码解析

4.2.1 dispatch_source_create

dispatch_source_t
dispatch_source_create(dispatch_source_type_t dst, uintptr_t handle,
uintptr_t mask, dispatch_queue_t dq)
{
dispatch_source_refs_t dr;
dispatch_source_t ds;
//add对应 _dispatch_source_data_create timer对应 _dispatch_source_timer_create
dr = dux_create(dst, handle, mask)._dr;
if (unlikely(!dr)) {
return DISPATCH_BAD_INPUT;
}
//创建队列
ds = _dispatch_queue_alloc(source,
dux_type(dr)->dst_strict ? DSF_STRICT : DQF_MUTABLE, 1,
DISPATCH_QUEUE_INACTIVE | DISPATCH_QUEUE_ROLE_INNER)._ds;
ds->dq_label = "source";
ds->ds_refs = dr;
dr->du_owner_wref = _dispatch_ptr2wref(ds);

//没有传队列,获取root_queues
if (unlikely(!dq)) {
dq = _dispatch_get_default_queue(true);
} else {
_dispatch_retain((dispatch_queue_t _Nonnull)dq);
}
//目标队列为传进来的dq
ds->do_targetq = dq;
//是timer 并且设置了interval则调用dispatch_source_set_timer进行设置
//也就是说type为timer的时候即使不设置timer也会默认设置。这里时间间隔设置为了handle
if (dr->du_is_timer && (dr->du_timer_flags & DISPATCH_TIMER_INTERVAL)) {
dispatch_source_set_timer(ds, DISPATCH_TIME_NOW, handle, UINT64_MAX);
}
_dispatch_object_debug(ds, "%s", __func__);
//返回自己创建的source,source本身也是队列。
return ds;
}
  • 根据type创建对应的队列。add对应_dispatch_source_data_createtimer对应_dispatch_source_timer_create
  • 如果创建的时候没有传处理handle的队列,会默认获取root_queues中的队列。
  • 设置目标队列为传进来的队列。
  • 如果typeDISPATCH_SOURCE_TYPE_INTERVAL(应该是私有的)则主动调用一次dispatch_source_set_timer
  • 返回自己创建的sourcesource本身也是队列。

_dispatch_source_data_create

static dispatch_unote_t
_dispatch_source_data_create(dispatch_source_type_t dst, uintptr_t handle,
uintptr_t mask)
{
if (handle || mask) {
return DISPATCH_UNOTE_NULL;
}

// bypass _dispatch_unote_create() because this is always "direct"
// even when EV_UDATA_SPECIFIC is 0
dispatch_unote_class_t du = _dispatch_calloc(1u, dst->dst_size);
du->du_type = dst;
du->du_filter = dst->dst_filter;
du->du_is_direct = true;
return (dispatch_unote_t){ ._du = du };
}

直接调用_dispatch_calloc创建返回。

_dispatch_source_timer_create

static dispatch_unote_t
_dispatch_source_timer_create(dispatch_source_type_t dst,
uintptr_t handle, uintptr_t mask)
{
dispatch_timer_source_refs_t dt;
......
//创建
dt = _dispatch_calloc(1u, dst->dst_size);
dt->du_type = dst;
dt->du_filter = dst->dst_filter;
dt->du_is_timer = true;
dt->du_timer_flags |= (uint8_t)(mask | dst->dst_timer_flags);
dt->du_ident = _dispatch_timer_unote_idx(dt);
dt->dt_timer.target = UINT64_MAX;
dt->dt_timer.deadline = UINT64_MAX;
dt->dt_timer.interval = UINT64_MAX;
dt->dt_heap_entry[DTH_TARGET_ID] = DTH_INVALID_ID;
dt->dt_heap_entry[DTH_DEADLINE_ID] = DTH_INVALID_ID;
return (dispatch_unote_t){ ._dt = dt };
}

内部时间给的默认值是最大值。

4.2.2 dispatch_source_set_event_handler

void
dispatch_source_set_event_handler(dispatch_source_t ds,
dispatch_block_t handler)
{
_dispatch_source_set_handler(ds, handler, DS_EVENT_HANDLER, true);
}

调用_dispatch_source_set_handler传递的类型为DS_EVENT_HANDLER

DISPATCH_NOINLINE
static void
_dispatch_source_set_handler(dispatch_source_t ds, void *func,
uintptr_t kind, bool is_block)
{
dispatch_continuation_t dc;
//创建dc存储handler
dc = _dispatch_source_handler_alloc(ds, func, kind, is_block);
//挂起
if (_dispatch_lane_try_inactive_suspend(ds)) {
//替换
_dispatch_source_handler_replace(ds, kind, dc);
//恢复
return _dispatch_lane_resume(ds, DISPATCH_RESUME);
}
......
}
创建_dispatch_source_handler_alloc存储handler,内部会进行标记非DS_EVENT_HANDLER会标记为DC_FLAG_CONSUME_dispatch_lane_try_inactive_suspend挂起队列。_dispatch_source_handler_replace替换handler

static inline void
_dispatch_source_handler_replace(dispatch_source_t ds, uintptr_t kind,
dispatch_continuation_t dc)
{
//handler目标回调为空释放handler
if (!dc->dc_func) {
_dispatch_continuation_free(dc);
dc = NULL;
} else if (dc->dc_flags & DC_FLAG_FETCH_CONTEXT) {
dc->dc_ctxt = ds->do_ctxt;
}
//保存
dc = os_atomic_xchg(&ds->ds_refs->ds_handler[kind], dc, release);
if (dc) _dispatch_source_handler_dispose(dc);
}
_dispatch_lane_resume恢复队列,调用队列对应的awake
551186cf96ba7399edf19e56c3b90ca0.png
  • 先调用_dispatch_lane_resume_activate(这也就是set后立马调用的原因):
static void
_dispatch_lane_resume_activate(dispatch_lane_t dq)
{
if (dx_vtable(dq)->dq_activate) {
dx_vtable(dq)->dq_activate(dq);
}

_dispatch_lane_resume(dq, DISPATCH_ACTIVATION_DONE);
}

再调用_dispatch_lane_resume

4.2.3 dispatch_source_merge_data

void
dispatch_source_merge_data(dispatch_source_t ds, uintptr_t val)
{
dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds);
dispatch_source_refs_t dr = ds->ds_refs;

if (unlikely(dqf & (DSF_CANCELED | DQF_RELEASED))) {
return;
}
//根据类型存值
switch (dr->du_filter) {
case DISPATCH_EVFILT_CUSTOM_ADD:
//有累加
os_atomic_add2o(dr, ds_pending_data, val, relaxed);
break;
case DISPATCH_EVFILT_CUSTOM_OR:
os_atomic_or2o(dr, ds_pending_data, val, relaxed);
break;
case DISPATCH_EVFILT_CUSTOM_REPLACE:
os_atomic_store2o(dr, ds_pending_data, val, relaxed);
break;
default:
DISPATCH_CLIENT_CRASH(dr->du_filter, "Invalid source type");
}
//唤醒执行回调
dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
}
  • 根据类型对值进行处理,处理完之后唤醒队列执行。

对于主线程会执行_dispatch_main_queue_wakeup,其中会取到dispatch_queue获取到dc,最后进行handler的调用。

4.2.4 dispatch_source_get_data

uintptr_t
dispatch_source_get_data(dispatch_source_t ds)
{
dispatch_source_refs_t dr = ds->ds_refs;
#if DISPATCH_USE_MEMORYSTATUS
if (dr->du_vmpressure_override) {
return NOTE_VM_PRESSURE;
}
#if TARGET_OS_SIMULATOR
if (dr->du_memorypressure_override) {
return NOTE_MEMORYSTATUS_PRESSURE_WARN;
}
#endif
#endif // DISPATCH_USE_MEMORYSTATUS
//获取数据
uint64_t value = os_atomic_load2o(dr, ds_data, relaxed);
return (unsigned long)(dr->du_has_extended_status ?
DISPATCH_SOURCE_GET_DATA(value) : value);
}

merge_data相反,一个存一个取。

4.2.5 dispatch_resume

void
dispatch_resume(dispatch_object_t dou)
{
DISPATCH_OBJECT_TFB(_dispatch_objc_resume, dou);
if (unlikely(_dispatch_object_is_global(dou) ||
_dispatch_object_is_root_or_base_queue(dou))) {
return;
}
if (dx_cluster(dou._do) == _DISPATCH_QUEUE_CLUSTER) {
_dispatch_lane_resume(dou._dl, DISPATCH_RESUME);
}
}

经过调试走的是_dispatch_lane_resume逻辑,与_dispatch_source_set_handler中调用的一致。awake队列。

4.2.6 dispatch_suspend

void
dispatch_suspend(dispatch_object_t dou)
{
DISPATCH_OBJECT_TFB(_dispatch_objc_suspend, dou);
if (unlikely(_dispatch_object_is_global(dou) ||
_dispatch_object_is_root_or_base_queue(dou))) {
return;
}
if (dx_cluster(dou._do) == _DISPATCH_QUEUE_CLUSTER) {
return _dispatch_lane_suspend(dou._dl);
}
}

调用_dispatch_lane_suspend挂起队列。

4.2.7 dispatch_source_cancel

dispatch_source_cancel(dispatch_source_t ds)
{
_dispatch_object_debug(ds, "%s", __func__);

_dispatch_retain_2(ds);

if (_dispatch_queue_atomic_flags_set_orig(ds, DSF_CANCELED) & DSF_CANCELED){
_dispatch_release_2_tailcall(ds);
} else {
//_dispatch_workloop_wakeup
dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY | DISPATCH_WAKEUP_CONSUME_2);
}
}



调用_dispatch_workloop_wakeup
c6967b09e3874be19291415f6812e8e2.png
  • cancel内部会对状态进行判断,如果是挂起状态会报错。所以需要在运行状态下取消。
  • 调用_dispatch_release_2_tailcall进行释放操作。

4.2.8 dispatch_source_set_timer

void
dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
uint64_t interval, uint64_t leeway)
{
dispatch_timer_source_refs_t dt = ds->ds_timer_refs;
dispatch_timer_config_t dtc;

if (unlikely(!dt->du_is_timer)) {
DISPATCH_CLIENT_CRASH(ds, "Attempt to set timer on a non-timer source");
}
//根据type配置timer和interval
if (dt->du_timer_flags & DISPATCH_TIMER_INTERVAL) {
dtc = _dispatch_interval_config_create(start, interval, leeway, dt);
} else {
dtc = _dispatch_timer_config_create(start, interval, leeway, dt);
}
if (_dispatch_timer_flags_to_clock(dt->du_timer_flags) != dtc->dtc_clock &&
dt->du_filter == DISPATCH_EVFILT_TIMER_WITH_CLOCK) {
DISPATCH_CLIENT_CRASH(0, "Attempting to modify timer clock");
}
//跟踪配置
_dispatch_source_timer_telemetry(ds, dtc->dtc_clock, &dtc->dtc_timer);
dtc = os_atomic_xchg2o(dt, dt_pending_config, dtc, release);
if (dtc) free(dtc);
//唤醒
dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
}

4.2.9 dispatch_source_set_registration_handler

void
dispatch_source_set_registration_handler(dispatch_source_t ds,
dispatch_block_t handler)
{
_dispatch_source_set_handler(ds, handler, DS_REGISTN_HANDLER, true);
}

也是直接调用的_dispatch_source_set_handler,参数是DS_REGISTN_HANDLER

4.2.10 dispatch_source_set_cancel_handler

void
dispatch_source_set_cancel_handler(dispatch_source_t ds,
dispatch_block_t handler)
{
_dispatch_source_set_handler(ds, handler, DS_CANCEL_HANDLER, true);
}
  • 直接调用的_dispatch_source_set_handler,参数是DS_CANCEL_HANDLER
  • 会根据DS_REGISTN_HANDLER、DS_CANCEL_HANDLER、DS_EVENT_HANDLER进行handler的获取和释放,因为这三者可能同时存在。

那么就有个问题设置timer类型后我们没有主动调用dispatch_source_merge_data,那么它是在什么时机调用的呢?在回调中bt:

    frame #2: 0x000000010b6a29c8 libdispatch.dylib`_dispatch_client_callout + 8
frame #3: 0x000000010b6a5316 libdispatch.dylib`_dispatch_continuation_pop + 557
frame #4: 0x000000010b6b8e8b libdispatch.dylib`_dispatch_source_invoke + 2205
frame #5: 0x000000010b6b4508 libdispatch.dylib`_dispatch_root_queue_drain + 351
frame #6: 0x000000010b6b4e6d libdispatch.dylib`_dispatch_worker_thread2 + 135
frame #7: 0x00007fff611639f7 libsystem_pthread.dylib`_pthread_wqthread + 220
frame #8: 0x00007fff61162b77 libsystem_pthread.dylib`start_wqthread + 15

搜索_dispatch_source_invoke只找到了:

DISPATCH_VTABLE_INSTANCE(source,
.do_type = DISPATCH_SOURCE_KEVENT_TYPE,
.do_dispose = _dispatch_source_dispose,
.do_debug = _dispatch_source_debug,
.do_invoke = _dispatch_source_invoke,

.dq_activate = _dispatch_source_activate,
.dq_wakeup = _dispatch_source_wakeup,
.dq_push = _dispatch_lane_push,
);
也就是调用的sourcedo_invoke,调用逻辑为_dispatch_root_queue_drain -> _dispatch_continuation_pop_inline -> dx_invoke

void
_dispatch_source_invoke(dispatch_source_t ds, dispatch_invoke_context_t dic,
dispatch_invoke_flags_t flags)
{
_dispatch_queue_class_invoke(ds, dic, flags,
DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS, _dispatch_source_invoke2);

#if DISPATCH_EVENT_BACKEND_KEVENT
if (flags & DISPATCH_INVOKE_WORKLOOP_DRAIN) {
dispatch_workloop_t dwl = (dispatch_workloop_t)_dispatch_get_wlh();
dispatch_timer_heap_t dth = dwl->dwl_timer_heap;
if (dth && dth[0].dth_dirty_bits) {
//调用
_dispatch_event_loop_drain_timers(dwl->dwl_timer_heap,
DISPATCH_TIMER_WLH_COUNT);
}
}
#endif // DISPATCH_EVENT_BACKEND_KEVENT
}




0 个评论

要回复文章请先登录注册