1#define EXTENDED_DEBUG 0
5extern "C" int syscall(
int);
18#define log_message(stuff) do { print(nullptr) << gettid() << ": " << stuff << "\n"; } while (0)
24#define log_message(stuff) do { } while (0)
77 }
else if (threads < 1) {
150 const char *bytes = ((
const char *)&this->zero_marker);
152 while (bytes <
limit && *bytes == 0) {
162 char *bytes = ((
char *)&this->zero_marker);
176 const char *name =
job->task.name ?
job->task.name :
"<no name>";
177 const char *
parent_name =
job->parent_job ? (
job->parent_job->task.name ?
job->parent_job->task.name :
"<no name>") :
"<no parent job>";
178 log_message(
prefix << name <<
"[" <<
job <<
"] serial: " <<
job->task.serial <<
" active_workers: " <<
job->active_workers <<
" min: " <<
job->task.
min <<
" extent: " <<
job->task.extent <<
" siblings: " <<
job->siblings <<
" sibling count: " <<
job->sibling_count <<
" min_threads " <<
job->task.min_threads <<
" next_sempaphore: " <<
job->next_semaphore <<
" threads_reserved: " <<
job->threads_reserved <<
" parent_job: " <<
parent_name <<
"[" <<
job->parent_job <<
"]");
179 for (
int i = 0;
i <
job->task.num_semaphores;
i++) {
180 log_message(indent <<
" semaphore " << (
void *)
job->task.semaphores[
i].semaphore <<
" count " <<
job->task.semaphores[
i].count <<
" val " << *(
int *)
job->task.semaphores[
i].semaphore);
187 while (
job !=
nullptr) {
197#define print_job(job, indent, prefix) do { } while (0)
198#define dump_job_state() do { } while (0)
217 prev_ptr = &
job->next_job;
220 *prev_ptr =
job->next_job;
221 job->task.extent = 0;
245 work *parent_job =
job->parent_job;
248 if (parent_job ==
nullptr) {
265 log_message(
"Cannot run job " <<
job->task.name <<
" on this thread.");
273 if (
job->make_runnable()) {
279 prev_ptr = &(
job->next_job);
325 job->active_workers++;
327 if (
job->parent_job ==
nullptr) {
331 job->parent_job->threads_reserved +=
job->task.min_threads;
332 log_message(
"Reserved " <<
job->task.min_threads <<
" on " <<
job->parent_job->task.name <<
" for " <<
job->task.name <<
" giving " <<
job->parent_job->threads_reserved <<
" of " <<
job->parent_job->task.min_threads);
337 if (
job->task.serial) {
339 *prev_ptr =
job->next_job;
348 job->make_runnable()) {
369 job->task.extent = 0;
370 }
else if (
job->task.extent > 0) {
382 if (
job->task.extent == 0) {
383 *prev_ptr =
job->next_job;
400 log_message(
"Saw thread pool saw error from task: " << (
int)result);
403 bool wake_owners =
false;
407 job->exit_status = result;
409 for (
int i = 0;
i <
job->sibling_count;
i++) {
412 job->siblings[
i].exit_status = result;
413 wake_owners |= (
job->active_workers == 0 &&
job->siblings[
i].owner_is_sleeping);
419 if (
job->parent_job ==
nullptr) {
423 job->parent_job->threads_reserved -=
job->task.min_threads;
424 log_message(
"Returned " <<
job->task.min_threads <<
" to " <<
job->parent_job->task.name <<
" for " <<
job->task.name <<
" giving " <<
job->parent_job->threads_reserved <<
" of " <<
job->parent_job->task.min_threads);
428 job->active_workers--;
477 if (jobs[
i].task.min_threads == 0) {
483 if (jobs[
i].task.num_semaphores != 0) {
487 if (jobs[
i].task.serial) {
505 log_message(
"enqueue_work_already_locked adding one to min_threads.");
524 log_message(
"enqueue_work_already_locked job " << jobs[0].task.name <<
" with min_threads " << min_threads <<
" task_parent " <<
task_parent->task.name <<
" task_parent->task.min_threads " <<
task_parent->task.min_threads <<
" task_parent->threads_reserved " <<
task_parent->threads_reserved);
527 "Logic error: thread over commit.\n");
591WEAK __attribute__((destructor))
void halide_thread_pool_cleanup() {
598 return f(user_context, idx, closure);
602 int min,
int extent,
uint8_t *closure,
604 return f(user_context, min, extent, closure, task_parent);
608 int min,
int size,
uint8_t *closure) {
642 work *jobs = (
work *)__builtin_alloca(
sizeof(
work) * num_tasks);
644 for (
int i = 0; i < num_tasks; i++) {
650 jobs[i].
task = *tasks++;
660 if (num_tasks == 0) {
667 for (
int i = 0; i < num_tasks; i++) {
681 halide_error(
nullptr,
"halide_set_num_threads: must be >= 0.");
724 Halide::Runtime::Internal::Synchronization::atomic_store_release(&sem->
value, &n);
730 int old_val = Halide::Runtime::Internal::Synchronization::atomic_fetch_add_acquire_release(&sem->
value, n);
732 if (old_val == 0 && n != 0) {
750 Halide::Runtime::Internal::Synchronization::atomic_load_acquire(&sem->
value, &expected);
752 desired = expected - n;
753 }
while (desired >= 0 &&
754 !Halide::Runtime::Internal::Synchronization::atomic_cas_weak_relacq_relaxed(&sem->
value, &expected, &desired));
800 int min,
int size,
uint8_t *closure) {
805 int min,
int size,
uint8_t *closure,
void *task_parent) {
int halide_default_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
int(* halide_semaphore_release_t)(struct halide_semaphore_t *, int)
int halide_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
bool halide_default_semaphore_try_acquire(struct halide_semaphore_t *, int n)
void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex)
int(* halide_do_par_for_t)(void *, halide_task_t, int, int, uint8_t *)
Set a custom method for performing a parallel for loop.
int halide_default_do_par_for(void *user_context, halide_task_t task, int min, int size, uint8_t *closure)
The default versions of the parallel runtime functions.
int halide_default_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
int(* halide_task_t)(void *user_context, int task_number, uint8_t *closure)
Define halide_do_par_for to replace the default thread pool implementation.
void halide_mutex_lock(struct halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...
int halide_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
int halide_default_semaphore_init(struct halide_semaphore_t *, int n)
void halide_mutex_unlock(struct halide_mutex *mutex)
struct halide_thread * halide_spawn_thread(void(*f)(void *), void *closure)
Spawn a thread.
int(* halide_do_loop_task_t)(void *, halide_loop_task_t, int, int, uint8_t *, void *)
The version of do_task called for loop tasks.
bool(* halide_semaphore_try_acquire_t)(struct halide_semaphore_t *, int)
int(* halide_loop_task_t)(void *user_context, int min, int extent, uint8_t *closure, void *task_parent)
A task representing a serial for loop evaluated over some range.
int halide_default_semaphore_release(struct halide_semaphore_t *, int n)
void halide_join_thread(struct halide_thread *)
Join a thread.
@ halide_error_code_success
There was no error.
void halide_cond_broadcast(struct halide_cond *cond)
int(* halide_do_task_t)(void *, halide_task_t, int, uint8_t *)
If you use the default do_par_for, you can still set a custom handler to perform each individual task...
int halide_default_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
int(* halide_semaphore_init_t)(struct halide_semaphore_t *, int)
void halide_error(void *user_context, const char *)
Halide calls this function on runtime errors (for example bounds checking failures).
int(* halide_do_parallel_tasks_t)(void *, int, struct halide_parallel_task_t *, void *task_parent)
Provide an entire custom tasking runtime via function pointers.
WEAK halide_semaphore_release_t custom_semaphore_release
WEAK halide_semaphore_init_t custom_semaphore_init
WEAK int default_desired_num_threads()
WEAK halide_do_task_t custom_do_task
WEAK halide_do_par_for_t custom_do_par_for
ALWAYS_INLINE int clamp_num_threads(int threads)
WEAK void worker_thread(void *)
WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_parent)
WEAK halide_do_parallel_tasks_t custom_do_parallel_tasks
WEAK void worker_thread_already_locked(work *owned_job)
WEAK halide_do_loop_task_t custom_do_loop_task
WEAK work_queue_t work_queue
WEAK halide_semaphore_try_acquire_t custom_semaphore_try_acquire
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
Internal::ConstantInterval cast(Type t, const Internal::ConstantInterval &a)
Cast operators for ConstantIntervals.
WEAK int halide_host_cpu_count()
unsigned __INT8_TYPE__ uint8_t
void halide_thread_yield()
void * memset(void *s, int val, size_t n)
#define halide_abort_if_false(user_context, cond)
char * getenv(const char *)
int64_t min
The lower and upper bound of the interval.
int desired_threads_working
ALWAYS_INLINE bool running() const
halide_thread * threads[MAX_THREADS]
ALWAYS_INLINE void reset()
ALWAYS_INLINE void assert_zeroed() const
ALWAYS_INLINE bool running() const
ALWAYS_INLINE bool make_runnable()
halide_parallel_task_t task
Cross platform condition variable.
A parallel task to be passed to halide_do_parallel_tasks.
struct halide_semaphore_acquire_t * semaphores
struct halide_semaphore_t * semaphore
An opaque struct representing a semaphore.
WEAK void halide_set_custom_parallel_runtime(halide_do_par_for_t do_par_for, halide_do_task_t do_task, halide_do_loop_task_t do_loop_task, halide_do_parallel_tasks_t do_parallel_tasks, halide_semaphore_init_t semaphore_init, halide_semaphore_try_acquire_t semaphore_try_acquire, halide_semaphore_release_t semaphore_release)
WEAK int halide_default_semaphore_release(halide_semaphore_t *s, int n)
WEAK halide_do_task_t halide_set_custom_do_task(halide_do_task_t f)
WEAK bool halide_default_semaphore_try_acquire(halide_semaphore_t *s, int n)
WEAK bool halide_semaphore_try_acquire(struct halide_semaphore_t *sema, int count)
WEAK int halide_default_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
WEAK halide_do_loop_task_t halide_set_custom_do_loop_task(halide_do_loop_task_t f)
WEAK int halide_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
WEAK int halide_semaphore_init(struct halide_semaphore_t *sema, int count)
WEAK int halide_default_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
#define log_message(stuff)
WEAK halide_do_par_for_t halide_set_custom_do_par_for(halide_do_par_for_t f)
#define print_job(job, indent, prefix)
WEAK int halide_do_loop_task(void *user_context, halide_loop_task_t f, int min, int size, uint8_t *closure, void *task_parent)
WEAK int halide_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
Enqueue some number of the tasks described above and wait for them to complete.
WEAK void halide_shutdown_thread_pool()
WEAK int halide_default_semaphore_init(halide_semaphore_t *s, int n)
WEAK int halide_default_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
The default versions of the parallel runtime functions.
WEAK int halide_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
WEAK int halide_default_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
WEAK int halide_set_num_threads(int n)
Set the number of threads used by Halide's thread pool.
WEAK int halide_semaphore_release(struct halide_semaphore_t *sema, int count)