python - Celery Generating group tasks from chain task -
i trying chain following tasks celery(v4.0),
task = group([tasks1.s(), task2.s()) | generate_job_requests.s() | execute_job.map() | aggregate_result.s() result = task.get()
above part working fine upto generate_job_requests
chord. problem starts execute_job
gets list of jobs generate_job_requests
, need create parallel tasks , later on aggregate result of jobs.
i trying validate whether such kind of taskgraph possible celery ? there possible alternate workflow solve problem such dependency ? missing in documentation.
i used map functionality intermediate task creator acts chord,
@shared_task(ignore_result=false) def dmap(it, callback, end_task): callback = subtask(callback) grp = group(callback.clone([arg, ]) arg in it) c = (grp | end_task) return c()
so task flow reduced this,
task = (group([tasks1.s(), task2.s()) | generate_job_requests.s() | dmap.s( execute_job.s(), aggregate_result.s())).apply_async()
for getting ultimate output of task, did few tweaks,
# dmap task id here dmap_task = celery_app.asyncresult(task.id) dmap_result = dmap_task.get() # actual aggregate_result task id aggr_res_task_id = dmap_result[0][0] result = celery_app.asyncresult(aggr_res_task_id) # here receive actual output of overall task result.get()
i referred answer
Comments
Post a Comment