X-Git-Url: http://pileus.org/git/?a=blobdiff_plain;f=kernel%2Fworkqueue.c;h=ec7e4f62aaff4e71051eefb7d684e8ff4868a5ad;hb=d9ecdb282c91952796b7542c4f57fd6de6948d7b;hp=ce7799540c91d28d3109e0e31fc53e623859ed1e;hpb=49997d75152b3d23c53b0fa730599f2f74c92c65;p=~andy%2Flinux diff --git a/kernel/workqueue.c b/kernel/workqueue.c index ce7799540c9..ec7e4f62aaf 100644 --- a/kernel/workqueue.c +++ b/kernel/workqueue.c @@ -125,7 +125,7 @@ struct cpu_workqueue_struct *get_wq_data(struct work_struct *work) } static void insert_work(struct cpu_workqueue_struct *cwq, - struct work_struct *work, int tail) + struct work_struct *work, struct list_head *head) { set_wq_data(work, cwq); /* @@ -133,21 +133,17 @@ static void insert_work(struct cpu_workqueue_struct *cwq, * result of list_add() below, see try_to_grab_pending(). */ smp_wmb(); - if (tail) - list_add_tail(&work->entry, &cwq->worklist); - else - list_add(&work->entry, &cwq->worklist); + list_add_tail(&work->entry, head); wake_up(&cwq->more_work); } -/* Preempt must be disabled. */ static void __queue_work(struct cpu_workqueue_struct *cwq, struct work_struct *work) { unsigned long flags; spin_lock_irqsave(&cwq->lock, flags); - insert_work(cwq, work, 1); + insert_work(cwq, work, &cwq->worklist); spin_unlock_irqrestore(&cwq->lock, flags); } @@ -162,18 +158,40 @@ static void __queue_work(struct cpu_workqueue_struct *cwq, * it can be processed by another CPU. */ int queue_work(struct workqueue_struct *wq, struct work_struct *work) +{ + int ret; + + ret = queue_work_on(get_cpu(), wq, work); + put_cpu(); + + return ret; +} +EXPORT_SYMBOL_GPL(queue_work); + +/** + * queue_work_on - queue work on specific cpu + * @cpu: CPU number to execute work on + * @wq: workqueue to use + * @work: work to queue + * + * Returns 0 if @work was already on a queue, non-zero otherwise. + * + * We queue the work to a specific CPU, the caller must ensure it + * can't go away. + */ +int +queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work) { int ret = 0; if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { BUG_ON(!list_empty(&work->entry)); - __queue_work(wq_per_cpu(wq, get_cpu()), work); - put_cpu(); + __queue_work(wq_per_cpu(wq, cpu), work); ret = 1; } return ret; } -EXPORT_SYMBOL_GPL(queue_work); +EXPORT_SYMBOL_GPL(queue_work_on); static void delayed_work_timer_fn(unsigned long __data) { @@ -337,14 +355,14 @@ static void wq_barrier_func(struct work_struct *work) } static void insert_wq_barrier(struct cpu_workqueue_struct *cwq, - struct wq_barrier *barr, int tail) + struct wq_barrier *barr, struct list_head *head) { INIT_WORK(&barr->work, wq_barrier_func); __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work)); init_completion(&barr->done); - insert_work(cwq, &barr->work, tail); + insert_work(cwq, &barr->work, head); } static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) @@ -364,7 +382,7 @@ static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) active = 0; spin_lock_irq(&cwq->lock); if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) { - insert_wq_barrier(cwq, &barr, 1); + insert_wq_barrier(cwq, &barr, &cwq->worklist); active = 1; } spin_unlock_irq(&cwq->lock); @@ -397,11 +415,62 @@ void flush_workqueue(struct workqueue_struct *wq) might_sleep(); lock_acquire(&wq->lockdep_map, 0, 0, 0, 2, _THIS_IP_); lock_release(&wq->lockdep_map, 1, _THIS_IP_); - for_each_cpu_mask(cpu, *cpu_map) + for_each_cpu_mask_nr(cpu, *cpu_map) flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); } EXPORT_SYMBOL_GPL(flush_workqueue); +/** + * flush_work - block until a work_struct's callback has terminated + * @work: the work which is to be flushed + * + * Returns false if @work has already terminated. + * + * It is expected that, prior to calling flush_work(), the caller has + * arranged for the work to not be requeued, otherwise it doesn't make + * sense to use this function. + */ +int flush_work(struct work_struct *work) +{ + struct cpu_workqueue_struct *cwq; + struct list_head *prev; + struct wq_barrier barr; + + might_sleep(); + cwq = get_wq_data(work); + if (!cwq) + return 0; + + lock_acquire(&cwq->wq->lockdep_map, 0, 0, 0, 2, _THIS_IP_); + lock_release(&cwq->wq->lockdep_map, 1, _THIS_IP_); + + prev = NULL; + spin_lock_irq(&cwq->lock); + if (!list_empty(&work->entry)) { + /* + * See the comment near try_to_grab_pending()->smp_rmb(). + * If it was re-queued under us we are not going to wait. + */ + smp_rmb(); + if (unlikely(cwq != get_wq_data(work))) + goto out; + prev = &work->entry; + } else { + if (cwq->current_work != work) + goto out; + prev = &cwq->worklist; + } + insert_wq_barrier(cwq, &barr, prev->next); +out: + spin_unlock_irq(&cwq->lock); + if (!prev) + return 0; + + wait_for_completion(&barr.done); + return 1; +} +EXPORT_SYMBOL_GPL(flush_work); + /* * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit, * so this work can't be re-armed in any way. @@ -449,7 +518,7 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq, spin_lock_irq(&cwq->lock); if (unlikely(cwq->current_work == work)) { - insert_wq_barrier(cwq, &barr, 0); + insert_wq_barrier(cwq, &barr, cwq->worklist.next); running = 1; } spin_unlock_irq(&cwq->lock); @@ -477,7 +546,7 @@ static void wait_on_work(struct work_struct *work) wq = cwq->wq; cpu_map = wq_cpu_map(wq); - for_each_cpu_mask(cpu, *cpu_map) + for_each_cpu_mask_nr(cpu, *cpu_map) wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work); } @@ -553,6 +622,19 @@ int schedule_work(struct work_struct *work) } EXPORT_SYMBOL(schedule_work); +/* + * schedule_work_on - put work task on a specific cpu + * @cpu: cpu to put the work task on + * @work: job to be done + * + * This puts a job on a specific cpu + */ +int schedule_work_on(int cpu, struct work_struct *work) +{ + return queue_work_on(cpu, keventd_wq, work); +} +EXPORT_SYMBOL(schedule_work_on); + /** * schedule_delayed_work - put work task in global workqueue after delay * @dwork: job to be done @@ -607,10 +689,10 @@ int schedule_on_each_cpu(work_func_t func) struct work_struct *work = per_cpu_ptr(works, cpu); INIT_WORK(work, func); - set_bit(WORK_STRUCT_PENDING, work_data_bits(work)); - __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), work); + schedule_work_on(cpu, work); } - flush_workqueue(keventd_wq); + for_each_online_cpu(cpu) + flush_work(per_cpu_ptr(works, cpu)); put_online_cpus(); free_percpu(works); return 0; @@ -747,7 +829,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name, err = create_workqueue_thread(cwq, singlethread_cpu); start_workqueue_thread(cwq, -1); } else { - get_online_cpus(); + cpu_maps_update_begin(); spin_lock(&workqueue_lock); list_add(&wq->list, &workqueues); spin_unlock(&workqueue_lock); @@ -759,7 +841,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name, err = create_workqueue_thread(cwq, cpu); start_workqueue_thread(cwq, cpu); } - put_online_cpus(); + cpu_maps_update_done(); } if (err) { @@ -773,8 +855,8 @@ EXPORT_SYMBOL_GPL(__create_workqueue_key); static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq) { /* - * Our caller is either destroy_workqueue() or CPU_DEAD, - * get_online_cpus() protects cwq->thread. + * Our caller is either destroy_workqueue() or CPU_POST_DEAD, + * cpu_add_remove_lock protects cwq->thread. */ if (cwq->thread == NULL) return; @@ -784,7 +866,7 @@ static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq) flush_cpu_workqueue(cwq); /* - * If the caller is CPU_DEAD and cwq->worklist was not empty, + * If the caller is CPU_POST_DEAD and cwq->worklist was not empty, * a concurrent flush_workqueue() can insert a barrier after us. * However, in that case run_workqueue() won't return and check * kthread_should_stop() until it flushes all work_struct's. @@ -808,14 +890,14 @@ void destroy_workqueue(struct workqueue_struct *wq) const cpumask_t *cpu_map = wq_cpu_map(wq); int cpu; - get_online_cpus(); + cpu_maps_update_begin(); spin_lock(&workqueue_lock); list_del(&wq->list); spin_unlock(&workqueue_lock); - for_each_cpu_mask(cpu, *cpu_map) + for_each_cpu_mask_nr(cpu, *cpu_map) cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu)); - put_online_cpus(); + cpu_maps_update_done(); free_percpu(wq->cpu_wq); kfree(wq); @@ -829,6 +911,7 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, unsigned int cpu = (unsigned long)hcpu; struct cpu_workqueue_struct *cwq; struct workqueue_struct *wq; + int ret = NOTIFY_OK; action &= ~CPU_TASKS_FROZEN; @@ -836,7 +919,7 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, case CPU_UP_PREPARE: cpu_set(cpu, cpu_populated_map); } - +undo: list_for_each_entry(wq, &workqueues, list) { cwq = per_cpu_ptr(wq->cpu_wq, cpu); @@ -846,7 +929,9 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, break; printk(KERN_ERR "workqueue [%s] for %i failed\n", wq->name, cpu); - return NOTIFY_BAD; + action = CPU_UP_CANCELED; + ret = NOTIFY_BAD; + goto undo; case CPU_ONLINE: start_workqueue_thread(cwq, cpu); @@ -854,7 +939,7 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, case CPU_UP_CANCELED: start_workqueue_thread(cwq, -1); - case CPU_DEAD: + case CPU_POST_DEAD: cleanup_workqueue_thread(cwq); break; } @@ -862,11 +947,11 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, switch (action) { case CPU_UP_CANCELED: - case CPU_DEAD: + case CPU_POST_DEAD: cpu_clear(cpu, cpu_populated_map); } - return NOTIFY_OK; + return ret; } void __init init_workqueues(void)