任务调度

主队列任务调度

主队列任务调度入口实现

主队列 的任务调度函数 _dispatch_main_queue_callback_4CF(for Core Foundation) 的 调起 由 之前提到的 _dispatch_send_wakeup_main_thread 函数, 通过mach_port 通信, 通知主线程的runloop实现.

_dispatch_main_queue_callback_4CF define

点击显示源代码

/// 处理主队列任务
void
_dispatch_main_queue_callback_4CF(mach_msg_header_t *msg DISPATCH_UNUSED)
{
	if (main_q_is_draining) {
		return;
	}
	_dispatch_queue_set_mainq_drain_state(true); /// 设置为正在处理任务
	_dispatch_main_queue_drain();				  /// 调度处理任务
	_dispatch_queue_set_mainq_drain_state(false); /// 设置为处理任务完成任务
}

主队列任务调度实现

由于 main queueserial queue, 顺序执行, 没有并发的逻辑, 所以主体的实现逻辑, 就是顺序遍历 mian queue, 出队 ‘所有’ 本次唤起需要执行的 dc, 并进行任务执行.

需要注意的实现只有通过, dmarker 以及 dq_items_tail 标记本次 本次唤起需要执行的dc的 终止点, 将后入队的任务,放在下一轮执行. 流程如下图所示.

原因见 (libdispatch (GCD源代码解析) 【任务入队,队列唤起】2 中的 队列实现相关数据结构)

                                 执行到队尾dmarker, 但还有新入队的dc
                    ---------------------------------------------------|
                    |                                                  |	
                    |                                   未执行到到dmarker标记的队尾
                    |                                      |-----------|
                    |                                      |           |
                                                                     |					
enqueue dc   msg mach port   runloop receive source  dequeue dc -------- execute dc

 main run loop	        
       |       
  main thread -------------- dc          
    

_dispatch_main_queue_drain define

点击显示源代码


#if DISPATCH_COCOA_COMPAT

/**
 * 处理主队列任务
 */
void
_dispatch_main_queue_drain(void)
{
	dispatch_queue_t dq = &_dispatch_main_q;
	if (!dq->dq_items_tail) { /// 没有需要处理的任务
		return;
	}
	struct dispatch_main_queue_drain_marker_s {
		DISPATCH_CONTINUATION_HEADER(dispatch_main_queue_drain_marker_s);
	} marker = {
		.do_vtable = NULL,
	};
	/// 标记 dq 的尾部
	struct dispatch_object_s *dmarker = (void*)▮
	_dispatch_queue_push_notrace(dq, dmarker);

#if DISPATCH_PERF_MON
	uint64_t start = _dispatch_absolute_time();
#endif
	
	/// 保存原来的dq
	dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
	_dispatch_thread_setspecific(dispatch_queue_key, dq);

	struct dispatch_object_s *dc = NULL, *next_dc = NULL;
	while (dq->dq_items_tail) {
		// 阻塞 当获知道获取到dq_items_head 保存入 dc
		while (!(dc = fastpath(dq->dq_items_head))) {
			_dispatch_hardware_pause();
		}
		// 清除当前的 dq_items_head
		dq->dq_items_head = NULL;
		do {
			/// 取出 next_dc
			next_dc = fastpath(dc->do_next);
			
			// 如果 next_dc 为空 并且 dc 与 dq->dq_items_tail 相同, 说明已经是最后一个任务, 将dq->dq_items_tail 置为 NULL, 跳过直接继续
			// 如果next_dc为空 (下一个dc 需要执行的任务) && 当前的dc 与 dq->dq_items_tail 不同, 阻塞获取next_dc
			if (!next_dc &&
					!dispatch_atomic_cmpxchg2o(dq, dq_items_tail, dc, NULL)) {
				// Enqueue is TIGHTLY controlled, we won't wait long.
				// 阻塞 直到获取到do_next 保存入 next_dc
				while (!(next_dc = fastpath(dc->do_next))) {
					_dispatch_hardware_pause();
				}
			}
			/// 如果dc == dmarker 说明已经遍历完成了在进入任务队列执行前标记的所有任务, 后面的任务是, 这次队列中任务执行后, 加入的dc
			if (dc == dmarker) {
				if (next_dc) {
					/// 这次队列中任务执行后, 加入的dc, 设置到 dq_items_head
					/// 唤醒主队列, 下一次loop再执行这些任务
					dq->dq_items_head = next_dc;
					_dispatch_queue_wakeup_main();
				}
				goto out;
			}
			// 执行对应的dc, 并弹出队列
			_dispatch_continuation_pop(dc);
			_dispatch_workitem_inc();
		} while ((dc = next_dc));
	}
	dispatch_assert(dc); // did not encounter marker

out:
	/// 还原原来的dq
	_dispatch_thread_setspecific(dispatch_queue_key, old_dq);
#if DISPATCH_PERF_MON
	_dispatch_queue_merge_stats(start);
#endif
	/// 清除dc缓存
	_dispatch_force_cache_cleanup();
}
#endif

全局队列任务调度

全局队列通过 _dispatch_queue_wakeup_global 函数, 将调度任务 入队到 workqueue , 然后workqueue 根据内核状况 创建worker thread 调用 _dispatch_worker_thread2 处理对应 dispatch queue 中的 dc. (不讨论使用 thread pool 的情况)

_dispatch_worker_thread2 的 主要包括 线程上下文设置和还原,垃圾回收gc实现的stub, autoreleasepool的创建和销毁, worker_threads 数量的计数, 任务出队以及执行

_dispatch_worker_thread2 define

点击显示源代码


static void
_dispatch_worker_thread2(void *context)
{
	struct dispatch_object_s *item;
	dispatch_queue_t dq = context;
	struct dispatch_root_queue_context_s *qc = dq->do_ctxt;


	if (_dispatch_thread_getspecific(dispatch_queue_key)) {
		DISPATCH_CRASH("Premature thread recycling");
	}

	_dispatch_thread_setspecific(dispatch_queue_key, dq);
	qc->dgq_pending = 0;
///region 内存处理
#if DISPATCH_COCOA_COMPAT /// cocoa 框架 兼容
	(void)dispatch_atomic_inc(&_dispatch_worker_threads);
	// ensure that high-level memory management techniques do not leak/crash
	if (dispatch_begin_thread_4GC) {
		dispatch_begin_thread_4GC();
	}
	void *pool = _dispatch_begin_NSAutoReleasePool();
#endif
///endregion

#if DISPATCH_PERF_MON
	uint64_t start = _dispatch_absolute_time();
#endif
	
	while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
		/// 调用对应处理, 并出队列
		_dispatch_continuation_pop(item);
	}
	
#if DISPATCH_PERF_MON
	_dispatch_queue_merge_stats(start);
#endif

///region 内存处理
#if DISPATCH_COCOA_COMPAT
	_dispatch_end_NSAutoReleasePool(pool);
	dispatch_end_thread_4GC();
	if (!dispatch_atomic_dec(&_dispatch_worker_threads) &&
			dispatch_no_worker_threads_4GC) {
		dispatch_no_worker_threads_4GC();
	}
#endif
///endregion 内存处理
	
	/// 清除 线程关联数据中的 queue_key
	_dispatch_thread_setspecific(dispatch_queue_key, NULL);

	/// 清除 线程关联的 dc
	_dispatch_force_cache_cleanup();

}

任务出队以及执行

主要讨论下 _dispatch_worker_thread2 中的 任务出队以及执行 实现

	// 任务dc 出队
	while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
		/// 调用对应dc
		_dispatch_continuation_pop(item);
	}

_dispatch_queue_concurrent_drain_one define

_dispatch_queue_concurrent_drain_one 主要包含的实现有 多线程竞争下的边界处理 (空队列, 已被其他worker线程处理, 有其他线程正在执行入队或者出队操作), 出队dc, 再次唤醒global queue

讨论下 再次唤醒global queue

由于再次唤醒global queue, 由于global queueconcurrency queue, 所以dc执行是并发的, 这个并发是通过, 每一次出队dc, 后检查一下 global queue, 是否还有dc在队列中, 如果有就 再次通知 workqueue, 需要再创建一个 work queue处理队列中剩余的 dc, 然后重复上面的步骤, 是一个类似于递归或者线程fork的过程. 如下图所示.

                                        new branch
                      |--------------------------------------------------|
                      |                                                  |
                                                                        |
enqueue dc   wake up global queue  create work thread  dequeue dc ---------- execute dc

然后就会产生多个 work queue 同时处理多个dc

             |------------ work thread1 -----  dc1   
             |
 workqueue --|------------ work thread2 ------ dc2
             |
             |------------ work thread3 ------ dc3

dequeue 的行为是在不同的work thread 中执行的, enqueue的行为也是可以在不同的thread 中执行的, 所以存在许多多线程竞争下的边界处理 (空队列, 已被其他worker线程处理, 有其他线程正在执行入队或者出队操作) 在 _dispatch_queue_concurrent_drain_one 函数中

_dispatch_continuation_pop define

_dispatch_continuation_pop 函数实现了 对于 同步dc, 栅栏dc, 组dc, 以及异步dc的处理, 各种dc的派发和执行的区别后续再进行分析.

点击显示源代码


DISPATCH_ALWAYS_INLINE_NDEBUG
static inline void
_dispatch_continuation_pop(dispatch_object_t dou)
{
	dispatch_continuation_t dc = dou._dc;
	dispatch_group_t dg;
	
	/// 这部分功能未启用
	_dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dou);
	//判断传入的内容是不是队列,如果是的话执行_dispatch_queue_invoke函数,
	//dispatch_barrier_async的dc任务 会进入以下分支, 以保证 barrier任务和其他任务隔离, 并通过dispath_semaphore_t 实现通知 barrier任务执行
	if (DISPATCH_OBJ_IS_VTABLE(dou._do)) { /// ?自定义队列
		return _dispatch_queue_invoke(dou._dq);
	}

	// Add the item back to the cache before calling the function. This
	// allows the 'hot' continuation to be used for a quick callback.
	//
	// The ccache version is per-thread.
	// Therefore, the object has not been reused yet.
	// This generates better assembly.
	
	/// 判断是否有async bit
	if ((long)dc->do_vtable & DISPATCH_OBJ_ASYNC_BIT) {
		_dispatch_continuation_free(dc);
	}
	/// 判断是否有group bit
	if ((long)dc->do_vtable & DISPATCH_OBJ_GROUP_BIT) {
		dg = dc->dc_group;
	} else {
		dg = NULL;
	}
	/// 调用任务
	_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
	if (dg) {
	    //如果是群组执行dispatch_group_leave
		dispatch_group_leave(dg);
		_dispatch_release(dg);
	}
}

信息说明

libdispatch 版本号: 187.10

相关内容

相关内容

libdispatch (GCD源代码解析) 【queue 结构】1

libdispatch (GCD源代码解析) 【任务入队,队列唤起】2

libdispatch (GCD源代码解析) 【任务调度】3

libdispatch (GCD源代码解析) 【同步】4

可执行的libdispatch源代码

https://github.com/AwayQu/libxdispatch