整体流程

每次调用 gcd API 进行任务调度, 都是将任务block 或者 function)加入到队列, 后续再进行执行, 所以整个流程中 任务入队 -> 队列唤起 -> 任务出队(调度), 是GCD的实现比较重要的3个环节。

队列实现相关数据结构

dc

dcdispatch_continuation_s 结构体的 指针类型 dispatch_continuation_t, 作用是存储入队的任务的相关信息

define

点击显示源代码

struct dispatch_continuation_s {
	const void *do_vtable;           // 动作表
	struct x *volatile do_next;      // 下一个任务 dc
	dispatch_function_t dc_func;     // 任务函数
	void *dc_ctxt                    // 任务上下文
	dispatch_group_t dc_group;       // dc任务组
	void *dc_data[3];                // 额外存储的相关dc
};

dc_func 存储了对应的任务, dc_ctxt 不透明指针存有 调用dc_func的入参

为了说明 这两个字段, 讲下关于 dispatch_async_f(queue, ctx, func) 以及 dispatch_async(queue, block)

平时常用的 dispatch_async(queue, block), 内部实际上调用的是 dispatch_async_f(queue, ctx, func)

点击显示源代码

void
dispatch_async(dispatch_queue_t dq, void (^work)(void))
{
	dispatch_async_f(dq, _dispatch_Block_copy(work),
			_dispatch_call_block_and_release); // _dispatch_call_block_and_release 是调用block的函数, NSThread 的 block 也是类似的实现
}

执行block任务的时候, 实际上是执行的是 dc_func(dc_ctxt) => _dispatch_call_block_and_release(work), 也就是说对于入队的block任务, dc_ctxt 对应的是 block, dc_func 对应的是 _dispatch_call_block_and_release.

顺便说一句, NSThread的block实现也是类似的嵌套.

贴上 dispatch_group_async_f调用dc_func的实现, 有兴趣可以点击 toggle code 查看

点击显示源代码

void
dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
	dispatch_continuation_t dc;

	// No fastpath/slowpath hint because we simply don't know
	if (dq->dq_width == 1) {
		/// 说明是串行
		/// 使用栅栏异步
		return dispatch_barrier_async_f(dq, ctxt, func);
	}

	/// 从当前线程中 获取dc, 并且重新设置线程中的dc
	dc = fastpath(_dispatch_continuation_alloc_cacheonly());
	if (!dc) {
		/// 说明当前线程未设置dc, 创建dc并设置, 然后再异步处理
		/// TODO: _dispatch_async_f2 内部实现
		return _dispatch_async_f_slow(dq, ctxt, func);
	}

	dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
	dc->dc_func = func;
	dc->dc_ctxt = ctxt;

	// No fastpath/slowpath hint because we simply don't know
	if (dq->do_targetq) {
		/// 如果有do_targetq, 则优先使用do_targetq来进行入队dc
		return _dispatch_async_f2(dq, dc);
	}

	/// 原子性, 线程安全的将 dc 加入dq
	_dispatch_queue_push(dq, dc);
}

点击显示源代码

	/// 调用任务
	_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);

DISPATCH_OBJ_XXX_BIT (任务标识符)

标识 dc 内的任务类型

点击显示源代码

#define DISPATCH_OBJ_ASYNC_BIT		0x1 // 同步任务
#define DISPATCH_OBJ_BARRIER_BIT	0x2 // 栅栏任务
#define DISPATCH_OBJ_GROUP_BIT		0x4 // 任务组任务
#define DISPATCH_OBJ_SYNC_SLOW_BIT	0x8 // 同步慢任务

// 使用
dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;

dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_GROUP_BIT);

gcd 中 所有的任务 都是使用 dc 存储, gcd 中有多种任务类型 同步, 异步, 栅栏, 任务组 等, 所以需要在 dcdo_vtable 动作表字段 存储相关option枚举, 来标识任务类型

dq_items_head & dq_items_tail

队尾和队首 存放着 dc 指针, 并通过 dcdo_next 指针 来寻找下一个需要执行的 dc任务.

define

点击显示源代码

struct dispatch_queue_s {
	uint32_t volatile dq_running; 
	uint32_t dq_width; 
	struct dispatch_object_s *volatile dq_items_tail;  // 队尾
	struct dispatch_object_s *volatile dq_items_head;  // 队首
	unsigned long dq_serialnum; 
	dispatch_queue_t dq_specific_q;
};

对于一个 队列 而言, 只要有 队首指针dq_items_head 和 dc 的 do_next 就可以寻找到队列中, 所有的任务了, 考虑下为什么需要 dq_items_tail

需要 dq_items_tail 的原因是因为我们在执行任务的同时, 可能再加入新的任务, 每次进行任务调度的时候需要决定一次任务调度的终止点, 不然同步的任务可能会进入一个死循环…

考虑下面这段代码

点击显示源代码

- (void)dispatchNested {
    dispatch_async(dispatch_get_main_queue(), ^{
        std::cout << "hello, dispatch nested!" << pthread_self() << "\n";
        dispatch_async(dispatch_get_main_queue(), ^{
            [self dispatchNested];
        });
    });
}

每一个loop都会执行, 一次异步, 但是主线程不会被阻塞, 第二个dispatch_async, 被判定为下一次需要任务调度的执行内容, , 如果没有 dq_items_tail就无法实现

顺带一提, 所有的任务处理的队列都是有类似的机制, 如Runloop 的 source 事件队列, 未开源的UIKit 的 evnt 事件队列, 以及 CFSocketfd 的 数据接收事件队列

之前由于 UIKit 的 event队列 的 任务调度 的截断逻辑, 还发生过crash, 连续触发了一个UI组件的两个事件, 一个事件在这一次任务调度处理完成并且释放了这个UI组件的部分对象, 下一次的任务调度, 处理了另外一个未处理的事件, 然后导致调用了assign的野指针. 代码老是真的可怕0.0

任务入队

任务入队, 就是把 dc 加载 dispatch queue (dq) 的 队尾

入队code

点击显示源代码

static inline void
_dispatch_queue_push_list(dispatch_queue_t dq, dispatch_object_t _head,
		dispatch_object_t _tail)
{
	struct dispatch_object_s *prev, *head = _head._do, *tail = _tail._do;
	/// 将原来dq_items_tail 换为 tail, 并清空 tail的 do_next
	/// 然后将原来dq_items_tail 链上 head
	tail->do_next = NULL;
	dispatch_atomic_store_barrier();
	/// 原子性 交换值 (也就是dq 需要保证线程安全)
	prev = fastpath(dispatch_atomic_xchg2o(dq, dq_items_tail, tail));
	if (prev) {/// 存在prev 已经wakeup 过 dq, 目前有任务正在处理,直接将任务加入队尾 
		
		// if we crash here with a value less than 0x1000, then we are at a
		// known bug in client code for example, see _dispatch_queue_dispose
		// or _dispatch_atfork_child
		prev->do_next = head;
	} else {
		// 加入到队首, 并且唤起任务处理
		_dispatch_queue_push_list_slow(dq, head);
	}
}

这段代码的功能特性

调用原子性api, 操作队列, 实现gcd 的 线程安全特性

支持 多个dc 入队, 链表操作 headtail, 进行衔接

如果当前dq, 是空闲的, 唤起队列处理任务 _dispatch_queue_push_list_slow(dq, head)

任务唤起

全局队列唤起

global queue 的 唤起逻辑, 第一步 dx_probe(dou._do) , 调用 dqdo_probe 的函数指针(main queue 的这个函数指针为空函数), 这个函数就是 _dispatch_queue_wakeup_global, 通过它来唤起global queue 来执行任务

_dispatch_queue_wakeup_global 支持两种实现的任务唤起pthread_workqueuethread pool, 之前提到过 global queue, 有8个原因就是对应 4种 workqueue, 以及overcommit选项, 而 thread pool 是自己管理的线程池实行. 实际使用中使用的是workqueue, 因为workqueue的性能更好, (pthread_workqueue_additem_np).

点击显示源代码

// 6618342 Contact the team that owns the Instrument DTrace probe before
//         renaming this symbol
dispatch_queue_t
_dispatch_wakeup(dispatch_object_t dou)
{
	dispatch_queue_t tq;

	if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
		return NULL;
	}
	// 调用 __dispatch_wakeup_global, 执行后续任务
	if (!dx_probe(dou._do) && !dou._dq->dq_items_tail) {
		return NULL;
	}

	// _dispatch_source_invoke() relies on this testing the whole suspend count
	// word, not just the lock bit. In other words, no point taking the lock
	// if the source is suspended or canceled.
	///
	/**
	 * 当前do_suspend_cnt 为 0  则 替换写入
	 * DISPATCH_OBJECT_SUSPEND_LOCK 1U 并返回 true
	 */
	if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0,
			DISPATCH_OBJECT_SUSPEND_LOCK)) {
#if DISPATCH_COCOA_COMPAT
		/**
		 * do_suspend_cnt 不为0, 且当前的dq为主线程队列, 则唤醒
		 */
		if (dou._dq == &_dispatch_main_q) {
			/// 唤醒主线程队列
			_dispatch_queue_wakeup_main();
		}
#endif
		return NULL;
	}
	_dispatch_retain(dou._do);
	tq = dou._do->do_targetq;
	_dispatch_queue_push(tq, dou._do);
	return tq;	// libdispatch does not need this, but the Instrument DTrace
				// probe does
}

_dispatch_queue_wakeup_global

点击显示源代码


static bool
_dispatch_queue_wakeup_global(dispatch_queue_t dq)
{
	static dispatch_once_t pred;
	struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
	int r;
	
	if (!dq->dq_items_tail) {
		return false;
	}

	_dispatch_safe_fork = false;

	dispatch_debug_queue(dq, __PRETTY_FUNCTION__);

	// 初始化 根队列以及上下文
	dispatch_once_f(&pred, NULL, _dispatch_root_queues_init);
	
	/// pthread work queues
	/// thread pool
#if HAVE_PTHREAD_WORKQUEUES
#if DISPATCH_ENABLE_THREAD_POOL
	if (qc->dgq_kworkqueue)
#endif
	{
		if (dispatch_atomic_cmpxchg2o(qc, dgq_pending, 0, 1)) {
			pthread_workitem_handle_t wh;
			unsigned int gen_cnt;
			_dispatch_debug("requesting new worker thread");

			/**
			 * 内部使用 workq_kernreturn 系统调用, 通知workqueue, 增加应当执行的项目
			 */
			r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
					_dispatch_worker_thread2, dq, &wh, &gen_cnt);
			(void)dispatch_assume_zero(r);
		} else {
			_dispatch_debug("work thread request still pending on global "
					"queue: %p", dq);
		}
		goto out;
	}
#endif // HAVE_PTHREAD_WORKQUEUES
#if DISPATCH_ENABLE_THREAD_POOL
	if (dispatch_semaphore_signal(qc->dgq_thread_mediator)) {
		goto out;
	}

	pthread_t pthr;
	int t_count;
	do {
		t_count = qc->dgq_thread_pool_size;
		if (!t_count) {
			_dispatch_debug("The thread pool is full: %p", dq);
			goto out;
		}
	} while (!dispatch_atomic_cmpxchg2o(qc, dgq_thread_pool_size, t_count,
			t_count - 1));

	/// 创建线程 并 _dispatch_worker_thread
	while ((r = pthread_create(&pthr, NULL, _dispatch_worker_thread, dq))) {
		if (r != EAGAIN) { /* Resource temporarily unavailable */
			(void)dispatch_assume_zero(r);
		}
		sleep(1);
	}
	r = pthread_detach(pthr);
	(void)dispatch_assume_zero(r);
#endif // DISPATCH_ENABLE_THREAD_POOL

out:
	return false;
}

主队列唤起

main queue 的唤起逻辑就是, 通过mach port 端口通信, 通知runloop, 需要执行任务, 这样会产生一个source0还是source1来着的事件, 反正我分不清, 这样主线程如果处于msg_trap的状态, 将被唤醒执行任务.

点击显示源代码

void
_dispatch_queue_wakeup_main(void) // 唤醒runloop主线程
{
	kern_return_t kr;

	/// 初始化 单例主线程队列 port
	dispatch_once_f(&_dispatch_main_q_port_pred, NULL,
			_dispatch_main_q_port_init);

	// 唤醒主线程
	kr = _dispatch_send_wakeup_main_thread(main_q_port, 0); // 这部分未开源

	switch (kr) {
	case MACH_SEND_TIMEOUT:
	case MACH_SEND_TIMED_OUT:
	case MACH_SEND_INVALID_DEST:
		break;
	default:
		(void)dispatch_assume_zero(kr);
		break;
	}

	_dispatch_safe_fork = false;
}

未开源部分, 我找了个三方实现

点击显示源代码

mig_external kern_return_t _dispatch_send_wakeup_main_thread
(
    mach_port_t _port,
    natural_t _waitTimeout
)
{
#ifdef  __MigPackStructs
#pragma pack(4)
#endif
    typedef struct {
        mach_msg_header_t Head;
    } Request;
#ifdef  __MigPackStructs
#pragma pack()
#endif
    /*
     * typedef struct {
     * 	mach_msg_header_t Head;
     * 	NDR_record_t NDR;
     * 	kern_return_t RetCode;
     * } mig_reply_error_t;
     */
    union {
        Request In;
    } Mess;
    Request *InP = &Mess.In;
    mach_msg_return_t msg_result;
#ifdef	__MIG_check__Reply__wakeup_main_thread_t__defined
    kern_return_t check_result;
#endif	/* __MIG_check__Reply__wakeup_main_thread_t__defined */
    __DeclareSendSimple(78, "wakeup_main_thread")
    InP->Head.msgh_bits =
        MACH_MSGH_BITS(19, 0);
    /* msgh_size passed as argument */
    InP->Head.msgh_request_port = _port;
    InP->Head.msgh_reply_port = MACH_PORT_NULL;
    InP->Head.msgh_id = 78;
    __BeforeSendSimple(78, "wakeup_main_thread")
    msg_result = mach_msg(&InP->Head, MACH_SEND_MSG|MACH_SEND_TIMEOUT|MACH_MSG_OPTION_NONE, (mach_msg_size_t)sizeof(Request), 0, MACH_PORT_NULL, _waitTimeout, MACH_PORT_NULL);
    __AfterSendSimple(78, "wakeup_main_thread")
    if (msg_result == MACH_SEND_TIMED_OUT) {
    }
    return msg_result;
}

emmmm, 这部分代码好像也没啥子东西, 就是调mach_msg, 发了个消息给主线程的端口, 也不知道苹果为什么要这个函数不给源码.

信息说明

libdispatch 版本号: 187.10

相关内容

libdispatch (GCD源代码解析) 【queue 结构】1

libdispatch (GCD源代码解析) 【任务入队,队列唤起】2

libdispatch (GCD源代码解析) 【任务调度】3

libdispatch (GCD源代码解析) 【同步】4

可执行的libdispatch源代码

https://github.com/AwayQu/libxdispatch