栅栏同步

栅栏同步入队

栅栏同步 的实现中 主要通过 _dispatch_thread_semaphore_t等待(wait)以及信号发送(signal)来实现栅栏. (1.保证已入队正在执行的任务完成后 再执行栅栏任务 2.栅栏任务执行完成后 发信号, 通知后续任务执行 )

dispatch_barrier_sync_f define

同步栅栏任务入队 的 入口方法

点击显示源代码

void
dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func)
{
	// 1) ensure that this thread hasn't enqueued anything ahead of this call
	// 2) the queue is not suspended
	/**
	 * 1) 有任务已入队
	 * 2) 队列挂起
	 */
	if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
		return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
	}
	// 队列已运行
	if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1))) {
		// global queues and main queue bound to main thread always falls into
		// the slow case
		return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
	}
	// 多层依赖队列
	if (slowpath(dq->do_targetq->do_targetq)) {
		return _dispatch_barrier_sync_f_recurse(dq, ctxt, func);
	}
	
	// 无任务进行中
	_dispatch_barrier_sync_f_invoke(dq, ctxt, func);
}

_dispatch_barrier_sync_f2 define

实现的功能: 1. 栅栏任务执行完成后 发信号, 通知后续任务执行.

点击显示源代码

DISPATCH_NOINLINE
static void
_dispatch_barrier_sync_f2(dispatch_queue_t dq)
{
	if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
		// rdar://problem/8290662 "lock transfer"
		_dispatch_thread_semaphore_t sema;
		// 获取sema
		sema = _dispatch_queue_drain_one_barrier_sync(dq);
		if (sema) {
			(void)dispatch_atomic_add2o(dq, do_suspend_cnt,
					DISPATCH_OBJECT_SUSPEND_INTERVAL);
			// rdar://9032024 running lock must be held until sync_f_slow
			// returns: increment by 2 and decrement by 1
			(void)dispatch_atomic_inc2o(dq, dq_running);
			// 通知后续任务, 继续执行
			_dispatch_thread_semaphore_signal(sema);
			return;
		}
	}
	if (slowpath(dispatch_atomic_dec2o(dq, dq_running) == 0)) {
		_dispatch_wakeup(dq);
	}
}

_dispatch_barrier_sync_f_slow define

实现的功能: 1.保证已入队正在执行的任务完成后 再执行栅栏任务 2. 执行栅栏任务 3.栅栏任务执行完成后 发信号, 通知后续任务执行

点击显示源代码


DISPATCH_NOINLINE
static void
_dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func)
{
	// It's preferred to execute synchronous blocks on the current thread
	// due to thread-local side effects, garbage collection, etc. However,
	// blocks submitted to the main thread MUST be run on the main thread

	struct dispatch_barrier_sync_slow2_s dbss2 = {
		.dbss2_dq = dq,
#if DISPATCH_COCOA_COMPAT
		.dbss2_func = func,
		.dbss2_ctxt = ctxt,
#endif
		.dbss2_sema = _dispatch_get_thread_semaphore(),
	};
	struct dispatch_barrier_sync_slow_s dbss = {
		.do_vtable = (void *)(DISPATCH_OBJ_BARRIER_BIT |
				DISPATCH_OBJ_SYNC_SLOW_BIT),
		.dc_func = _dispatch_barrier_sync_f_slow_invoke,
		.dc_ctxt = &dbss2,
	};
	_dispatch_queue_push(dq, (void *)&dbss);

	_dispatch_thread_semaphore_wait(dbss2.dbss2_sema);
	_dispatch_put_thread_semaphore(dbss2.dbss2_sema);

#if DISPATCH_COCOA_COMPAT
	// Main queue bound to main thread
	if (dbss2.dbss2_func == NULL) {
		return;
	}
#endif
	dispatch_atomic_acquire_barrier();
	//region 执行栅栏任务
	if (slowpath(dq->do_targetq) && slowpath(dq->do_targetq->do_targetq)) {
		_dispatch_function_recurse(dq, ctxt, func);
	} else {
		_dispatch_function_invoke(dq, ctxt, func);
	}
	//endregion
	
	dispatch_atomic_release_barrier();
	
	
	if (fastpath(dq->do_suspend_cnt < 2 * DISPATCH_OBJECT_SUSPEND_INTERVAL)) {
		// rdar://problem/8290662 "lock transfer"
		// ensure drain of current barrier sync has finished
		while (slowpath(dq->dq_running > 2)) {
			_dispatch_hardware_pause();
		}
		_dispatch_thread_semaphore_t sema;
		sema = _dispatch_queue_drain_one_barrier_sync(dq);
		if (sema) {
			// 通知后续任务执行
			_dispatch_thread_semaphore_signal(sema);
			return;
		}
	}
	(void)dispatch_atomic_sub2o(dq, do_suspend_cnt,
			DISPATCH_OBJECT_SUSPEND_INTERVAL);
	if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
		_dispatch_wakeup(dq);
	}
}

碎碎念

在我个人的偏好上, 这部分实现是很恶心的, 因为为了处理多线程的队列的不同情况, 栅栏任务完成后的发信号(唤醒), 有很多判断处理.

整体流程

               	
enqueue dc --- execute barrier dc ------- signal next dc
    |              
    |              |     
    |     create semaphore & wait
    |              |
    |--------------|

如果 (dq suspended ||	                       
    has enqueued dc || 
    dq running)          
    

同步

同步的实现比较简单, 1.串行下使用栅栏同步 2.root queue, 直接执行 3._dispatch_sync_f2 处理等待的同步

dispatch_sync_f define

点击显示源代码

DISPATCH_NOINLINE
void
dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
	if (fastpath(dq->dq_width == 1)) {
		/// 串行情况下使用, 栅栏同步
		return dispatch_barrier_sync_f(dq, ctxt, func);
	}
	if (slowpath(!dq->do_targetq)) {
		// the global root queues do not need strict ordering
		/// 直接执行 func
		(void)dispatch_atomic_add2o(dq, dq_running, 2);
		return _dispatch_sync_f_invoke(dq, ctxt, func);
	}
	_dispatch_sync_f2(dq, ctxt, func);
}

_dispatch_sync_f2 define

处理等待的同步: 1._dispatch_sync_f_slow 等待已入队任务执行完 的 signal 后再执行同步任务 2. 处理队列依赖递归 3. 非边界情况, 直接执行同步任务

点击显示源代码

DISPATCH_NOINLINE
static void
_dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
	// 1) ensure that this thread hasn't enqueued anything ahead of this call
	// 2) the queue is not suspended
	/**
     * 1. 如果没有任务, 需要在这个任务之前执行
	 * 2. 这个队列没有被挂起
	 */
	if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
		return _dispatch_sync_f_slow(dq, ctxt, func);
	}
	/**
	 * 当前的任务数, 已有任务在执行
	 */
	if (slowpath(dispatch_atomic_add2o(dq, dq_running, 2) & 1)) {
		return _dispatch_sync_f_slow2(dq, ctxt, func);
	}
	// 根据targetq重新排序任务顺序
	if (slowpath(dq->do_targetq->do_targetq)) {
		/**
		 * 同步递归
		 */
		return _dispatch_sync_f_recurse(dq, ctxt, func);
	}
	_dispatch_sync_f_invoke(dq, ctxt, func);
}

信息说明

libdispatch 版本号: 187.10

相关内容

相关内容

libdispatch (GCD源代码解析) 【queue 结构】1

libdispatch (GCD源代码解析) 【任务入队,队列唤起】2

libdispatch (GCD源代码解析) 【任务调度】3

libdispatch (GCD源代码解析) 【同步】4

可执行的libdispatch源代码

https://github.com/AwayQu/libxdispatch