注册

高级线程应用之栅栏、信号量、调度组以及source(三)

二、信号量(dispatch_semaphore_t

相关函数:

  • dispatch_semaphore_create:创建信号量
  • dispatch_semaphore_wait:信号量等待
  • dispatch_semaphore_signal:信号量释放

信号量有两个效果:同步作为锁 与 控制GCD最大并发数

二元信号量是最简单的一种锁,只有两种状态:占用与非占用。适合只能被唯一一个线程独占访问资源。当二元信号量处于非占用状态时,第一个试图获取该二元信号量的线程会获得该锁,并将二元信号置为占用状态,此后其他的所有视图获取该二元信号量的线程将会等待,直到该锁被释放。

对于允许多个线程并发访问的资源,多元信号量简称信号量,它是一个很好的选择。一个初始值为 N 的信号量允许 N 个线程并发访问。线程访问资源的时候首先获取信号量,进行如下操作:

  • 将信号量的值减1
  • 如果信号量的值小于0,则进入等待状态,否则继续执行。

访问完资源之后,线程释放信号量,进行如下操作:

  • 将信号量的值+1
  • 如果信号量的值< 1,唤醒一个等待中的线程。

2.1 应用

    dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_semaphore_t sem = dispatch_semaphore_create(1);
dispatch_queue_t queue1 = dispatch_queue_create("HotpotCat", NULL);

dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"1 start");
NSLog(@"1 end");
dispatch_semaphore_signal(sem);
});

dispatch_async(queue1, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"2 start");
NSLog(@"2 end");
dispatch_semaphore_signal(sem);
});

dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"3 start");
NSLog(@"3 end");
dispatch_semaphore_signal(sem);
});

dispatch_async(queue1, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"4 start");
NSLog(@"4 end");
dispatch_semaphore_signal(sem);
});

对于上面的例子输出:

1 start
1 end
2 start
2 end
3 start
3 end
4 start
4 end
这个时候信号量初始化的是1,全局队列与自定义串行队列中的任务按顺序依次执行。
当将信号量改为2后输出:
1 start
2 start
2 end
1 end
3 start
4 start
3 end
4 end

这个时候1、2先执行无序,3、4后执行无序。这样就控制了GCD任务的最大并发数。

修改代码如下:

dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_semaphore_t sem = dispatch_semaphore_create(0);

dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"1 start");
NSLog(@"1 end");
});

dispatch_async(queue, ^{
sleep(2);
NSLog(@"2 start");
NSLog(@"2 end");
dispatch_semaphore_signal(sem);
});

信号量初始值修改为0,在任务1wait,在任务2signal,这个时候输出如下:

2 start
2 end
1 start
1 end

任务2比任务1先执行了。由于信号量初始化为0wait函数后面任务就执行不了一直等待;等到signal执行后发送信号wait就可以执行了。这样就达到了控制流程。任务2中的信号控制了任务1的执行。

2.2 源码分析

2.2.1 dispatch_semaphore_create

/*
* @param dsema
* The semaphore. The result of passing NULL in this parameter is undefined.
*/


dispatch_semaphore_t
dispatch_semaphore_create(intptr_t value)
{
dispatch_semaphore_t dsema;

// If the internal value is negative, then the absolute of the value is
// equal to the number of waiting threads. Therefore it is bogus to
// initialize the semaphore with a negative value.
if (value < 0) { //>=0 才有用,否则直接返回
return DISPATCH_BAD_INPUT;// 0
}

dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
sizeof(struct dispatch_semaphore_s));
dsema->do_next = DISPATCH_OBJECT_LISTLESS;
dsema->do_targetq = _dispatch_get_default_queue(false);
dsema->dsema_value = value;
_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
dsema->dsema_orig = value;
return dsema;
}
  • value < 0的时候无效,只有>= 0才有效,才会执行后续流程。

2.2.2 dispatch_semaphore_wait

intptr_t
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
//--
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (likely(value >= 0)) { //>=0 返回
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}
  • --value大于等于0直接返回0。执行dispatch_semaphore_wait后续的代码。
  • 否则执行_dispatch_semaphore_wait_slow(相当于do-while循环)。

_dispatch_semaphore_wait_slow
当信号量为0的时候调用wait后(< 0)就走_dispatch_semaphore_wait_slow逻辑了:

DISPATCH_NOINLINE
static intptr_t
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;

_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
//超时直接break
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
//NOW的情况下进行超时处理
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1,
&orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
//FOREVER则进入wait逻辑。
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
return 0;
}
  • 当值为timeout的时候直接break
  • 当值为DISPATCH_TIME_NOW的时候循环调用_DSEMA4_TIMEOUT()
#define _DSEMA4_TIMEOUT() KERN_OPERATION_TIMED_OUT
  • 当值为DISPATCH_TIME_FOREVER的时候调用_dispatch_sema4_wait

_dispatch_sema4_wait

//    void
// _dispatch_sema4_wait(_dispatch_sema4_t *sema)
// {
// int ret = 0;
// do {
// ret = sem_wait(sema);
// } while (ret == -1 && errno == EINTR);
// DISPATCH_SEMAPHORE_VERIFY_RET(ret);
// }

void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
kern_return_t kr;
do {
kr = semaphore_wait(*sema);
} while (kr == KERN_ABORTED);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
}
semaphore_wait并没有搜到实现,这是pthread内核封装的实现。_dispatch_sema4_wait本质上是一个do-while循环,相当于在这里直接卡住执行不到后面的逻辑了。相当于:

dispatch_async(queue, ^{
// dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
do {
//循环
} while (signal <= 0);
NSLog(@"1 start");
NSLog(@"1 end");
});

结论:value >= 0的时候执行后续的代码,否则do-while循环卡住后续逻辑

2.2.3 dispatch_semaphore_signal

/*!
* @function dispatch_semaphore_signal
*
* @abstract
* Signal (increment) a semaphore.
*
* @discussion
* Increment the counting semaphore. If the previous value was less than zero,
* this function wakes a waiting thread before returning.
*
* @param dsema The counting semaphore.
* The result of passing NULL in this parameter is undefined.
*
* @result
* This function returns non-zero if a thread is woken. Otherwise, zero is
* returned.
*/

intptr_t
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
//++操作
long value = os_atomic_inc2o(dsema, dsema_value, release);
if (likely(value > 0)) {
return 0;
}
//++ 后还 < 0,则表示做wait操作(--)过多。报错。
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
//发送信号量逻辑,恢复wait等待的操作。
return _dispatch_semaphore_signal_slow(dsema);
}
  • os_atomic_inc2o执行++后值大于0直接返回能够执行。
  • 只有<= 0的时候才执行后续流程,调用_dispatch_semaphore_signal_slow进行异常处理。
  • 注释说明了当值< 0的时候在return之前唤醒一个等待线程。

_dispatch_semaphore_signal_slow

intptr_t
_dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
{
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
_dispatch_sema4_signal(&dsema->dsema_sema, 1);
return 1;
}

直接调用_dispatch_sema4_signal

_dispatch_sema4_signal

#define DISPATCH_SEMAPHORE_VERIFY_KR(x) do { \
DISPATCH_VERIFY_MIG(x); \
if (unlikely((x) == KERN_INVALID_NAME)) { \
DISPATCH_CLIENT_CRASH((x), \
"Use-after-free of dispatch_semaphore_t or dispatch_group_t"); \
} else if (unlikely(x)) { \
DISPATCH_INTERNAL_CRASH((x), "mach semaphore API failure"); \
} \
} while (0)


//经过调试走的是这个逻辑
void
_dispatch_sema4_signal(_dispatch_sema4_t *sema, long count)
{
do {
kern_return_t kr = semaphore_signal(*sema);//+1
DISPATCH_SEMAPHORE_VERIFY_KR(kr);// == -1 报错
} while (--count);//do-while(0) 只执行一次
}

相当于内部做了+1操作。这也是当信号量初始值为0的时候dispatch_semaphore_signal执行完毕后dispatch_semaphore_wait能够执行的原因。

小结:

  • dispatch_semaphore_wait进行--操作,减完是负值进入do-while循环,阻塞后续流程
  • dispatch_semaphore_signal进行++操作,加完值不大于0进入后续报错流程
  • semaphore_signal 与 semaphore_wait才是信号量能控制最大并发数的根本原因,否则dispatch_semaphore_signaldispatch_semaphore_signal都是判断后直接返回,相当于什么都没做

semaphore_signal & semaphore_wait

三、调度组

最直接的作用: 控制任务执行顺序
相关API:

  • dispatch_group_create 创建组
  • dispatch_group_async 进组任务 (与dispatch_group_enterdispatch_group_leave搭配使用效果相同)
    • dispatch_group_enter 进组
    • dispatch_group_leave 出组
  • dispatch_group_notify 进组任务执行完毕通知
  • dispatch_group_wait 进组任务执行等待时间

3.1 应用

dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_queue_t queue1 = dispatch_queue_create("test", DISPATCH_QUEUE_CONCURRENT);

dispatch_group_async(group, queue, ^{
sleep(3);
NSLog(@"1");
});

dispatch_group_async(group, queue1, ^{
sleep(2);
NSLog(@"2");
});

dispatch_group_async(group, queue1, ^{
sleep(1);
NSLog(@"3");
});

dispatch_group_async(group, queue, ^{
NSLog(@"4");
});

dispatch_group_notify(group, dispatch_get_global_queue(0, 0), ^{
NSLog(@"5");
});

有如上案例,任务5永远在任务1、2、3、4之后执行。

当然也可以使用enterleave配合dispatch_async使用:

dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_queue_t queue1 = dispatch_queue_create("test", DISPATCH_QUEUE_CONCURRENT);

//先 enter 再 leave
dispatch_group_enter(group);
dispatch_async(queue, ^{
sleep(3);
NSLog(@"1");
dispatch_group_leave(group);
});

dispatch_group_enter(group);
dispatch_async(queue1, ^{
sleep(2);
NSLog(@"2");
dispatch_group_leave(group);
});

dispatch_group_enter(group);
dispatch_async(queue1, ^{
sleep(1);
NSLog(@"3");
dispatch_group_leave(group);
});

dispatch_group_enter(group);
dispatch_async(queue, ^{
NSLog(@"4");
dispatch_group_leave(group);
});

dispatch_group_notify(group, dispatch_get_global_queue(0, 0), ^{
NSLog(@"5");
});

效果相同,需要注意的是dispatch_group_enter要比dispatch_group_leave先调用,并且必须成对出现,否则会崩溃。当然两种形式也可以混着用。

3.2 源码分析

根据上面的分析有3个问题:

  • 1.dispatch_group_enter为什么要比dispatch_group_leave先调用,否则崩溃?
  • 2.能够实现同步的原理是什么?
  • 3.dispatch_group_async为什么等价于dispatch_group_enter + dispatch_group_leave?

之前的版本调度组是封装了信号量,目前新版本的是调度组自己写了一套逻辑。

3.2.1 dispatch_group_create


dispatch_group_t
dispatch_group_create(void)
{
return _dispatch_group_create_with_count(0);
}

//creat & enter 写在一起的写法,信号量标记位1
dispatch_group_t
_dispatch_group_create_and_enter(void)
{
return _dispatch_group_create_with_count(1);
}

是对_dispatch_group_create_with_count的调用:

static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
sizeof(struct dispatch_group_s));
dg->do_next = DISPATCH_OBJECT_LISTLESS;
dg->do_targetq = _dispatch_get_default_queue(false);
if (n) {
os_atomic_store2o(dg, dg_bits,
(uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
}
return dg;
}

调用_dispatch_object_alloc创建group,与信号量写法相似

3.2.2 dispatch_group_enter

void
dispatch_group_enter(dispatch_group_t dg)
{
// The value is decremented on a 32bits wide atomic so that the carry
// for the 0 -> -1 transition is not propagated to the upper 32bits.
//0-- -> -1,与信号量不同的是没有wait
uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
DISPATCH_GROUP_VALUE_INTERVAL, acquire);
uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
if (unlikely(old_value == 0)) {
_dispatch_retain(dg); // <rdar://problem/22318411>
}
if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
DISPATCH_CLIENT_CRASH(old_bits,
"Too many nested calls to dispatch_group_enter()");
}
}
  • 0--变为-1,与信号量不同的是没有wait操作。

3.2.3 dispatch_group_leave

void
dispatch_group_leave(dispatch_group_t dg)
{
// The value is incremented on a 64bits wide atomic so that the carry for
// the -1 -> 0 transition increments the generation atomically.
//-1++ -> 0
uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
DISPATCH_GROUP_VALUE_INTERVAL, release);
//#define DISPATCH_GROUP_VALUE_MASK 0x00000000fffffffcULL
// old_state & DISPATCH_GROUP_VALUE_MASK 是一个很大的值
uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
//-1 & DISPATCH_GROUP_VALUE_MASK == DISPATCH_GROUP_VALUE_1,old_value = -1
if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {//old_value == -1
old_state += DISPATCH_GROUP_VALUE_INTERVAL;
do {
new_state = old_state;
if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
} else {
// If the group was entered again since the atomic_add above,
// we can't clear the waiters bit anymore as we don't know for
// which generation the waiters are for
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
}
if (old_state == new_state) break;
} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
old_state, new_state, &old_state, relaxed)));
//调用 _dispatch_group_wake,唤醒 dispatch_group_notify
return _dispatch_group_wake(dg, old_state, true);
}
//old_value 为0的情况下直接报错,也就是先leave的情况下直接报错
if (unlikely(old_value == 0)) {
DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
"Unbalanced call to dispatch_group_leave()");
}
}
  • -1++变为0,当old_value == -1的时候调用_dispatch_group_wake唤醒dispatch_group_notify
  • 既然old_value == -1的时候才唤醒,那么多次enter只有最后一次leave的时候才能唤醒。
  • old_value == 0的时候直接报错,这也就是为什么先调用leave直接发生了crash

3.2.4 dispatch_group_notify

void
dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dsn = _dispatch_continuation_alloc();
_dispatch_continuation_init(dsn, dq, db, 0, DC_FLAG_CONSUME);
_dispatch_group_notify(dg, dq, dsn);
}

调用_dispatch_group_notify

static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dsn)
{
uint64_t old_state, new_state;
dispatch_continuation_t prev;

dsn->dc_data = dq;
_dispatch_retain(dq);

prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
if (os_mpsc_push_was_empty(prev)) {
os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
if ((uint32_t)old_state == 0) {//循环判断 old_state == 0 的时候 wake
os_atomic_rmw_loop_give_up({
return _dispatch_group_wake(dg, new_state, false);
});
}
});
}
}
  • old_state == 0的时候调用_dispatch_group_wake,也就是调用blockcallout。与leave调用了同一个方法。

为什么两个地方都调用了?
因为在leave的时候dispatch_group_notify可能已经执行过了,任务已经保存在了group中,leave的时候本身尝试调用一次。
当然leave中也可能是一个延时任务,当调用leave的时候notify可能还没有执行,就导致notify任务还不存在。所以需要在notify中也调用。

_dispatch_group_wake

static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>

if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
dispatch_continuation_t dc, next_dc, tail;

// Snapshot before anything is notified/woken <rdar://problem/8554546>
dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
do {
dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
//异步回调,执行block callout
_dispatch_continuation_async(dsn_queue, dc,
_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
_dispatch_release(dsn_queue);
} while ((dc = next_dc));

refs++;
}

if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
_dispatch_wake_by_address(&dg->dg_gen);
}

if (refs) _dispatch_release_n(dg, refs);
}
  • 调用_dispatch_continuation_async相当于调用的是dispatch_async执行notify的任务。
  • 任务先保存在在group中,唤醒notify的时候才将任务加入队列。

3.2.5 dispatch_group_async

dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
//标记 DC_FLAG_GROUP_ASYNC
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
dispatch_qos_t qos;

qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
_dispatch_continuation_group_async(dg, dq, dc, qos);
}

调用_dispatch_continuation_group_async

static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dc, dispatch_qos_t qos)
{
//调用enter
dispatch_group_enter(dg);
dc->dc_data = dg;
//dispatch_async
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

  • 内部先调用dispatch_group_enter,在这里就等待wakeup的调用了
  • 再调用_dispatch_continuation_async,相当于dispatch_async

那么leave在什么时候调用呢?
肯定要在callout执行完毕后调用。_dispatch_continuation_async的调用以全局队列为例调用_dispatch_root_queue_push,最终会调用到_dispatch_continuation_invoke_inline



da85caeb4694d7e1f9583489793c3847.png



在这里就进行了逻辑区分,有group的情况下(dispatch_group_async的时候dc_flags进行了标记)调用的是_dispatch_continuation_with_group_invoke

static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
struct dispatch_object_s *dou = dc->dc_data;
unsigned long type = dx_type(dou);
if (type == DISPATCH_GROUP_TYPE) {
//callout
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_trace_item_complete(dc);
//leave
dispatch_group_leave((dispatch_group_t)dou);
} else {
DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
}
}


  • callout后调用了dispatch_group_leave

dispatch_group_async 底层是对 dispatch_group_enter + dispatch_group_leave 的封装

  • dispatch_group_async中先进行dispatch_group_enter,然后执行dispatch_async
  • 在回调中先_dispatch_client_callout然后dispatch_group_leave


0 个评论

要回复文章请先登录注册