python - Celery Generating group tasks from chain task -


i trying chain following tasks celery(v4.0),

task = group([tasks1.s(), task2.s()) | generate_job_requests.s() | execute_job.map() | aggregate_result.s() result = task.get() 

above part working fine upto generate_job_requests chord. problem starts execute_job gets list of jobs generate_job_requests, need create parallel tasks , later on aggregate result of jobs.

i trying validate whether such kind of taskgraph possible celery ? there possible alternate workflow solve problem such dependency ? missing in documentation.

i used map functionality intermediate task creator acts chord,

@shared_task(ignore_result=false) def dmap(it, callback, end_task):     callback = subtask(callback)     grp = group(callback.clone([arg, ]) arg in it)     c = (grp | end_task)     return c() 

so task flow reduced this,

task = (group([tasks1.s(), task2.s()) | generate_job_requests.s() | dmap.s(         execute_job.s(), aggregate_result.s())).apply_async() 

for getting ultimate output of task, did few tweaks,

# dmap task id here dmap_task = celery_app.asyncresult(task.id) dmap_result = dmap_task.get() # actual aggregate_result task id aggr_res_task_id = dmap_result[0][0] result = celery_app.asyncresult(aggr_res_task_id) # here receive actual output of overall task result.get() 

i referred answer


Comments

Popular posts from this blog

php - How to display all orders for a single product showing the most recent first? Woocommerce -

asp.net - How to correctly use QUERY_STRING in ISAPI rewrite? -

angularjs - How restrict admin panel using in backend laravel and admin panel on angular? -