Feature #5787
closed[Crunch] Add Crunch support to GATK Queue
100%
Updated by Pjotr Prins over 9 years ago
Updated by Pjotr Prins over 9 years ago
Pjotr Prins wrote:
Pjotr Prins wrote:
DRMAA spec for SGE
Specification for writing a QueueRunner
http://gatkforums.broadinstitute.org/discussion/1347/queue-custom-job-schedulers
needs a start method, a status method and a tryStop method
Updated by Tom Clegg over 9 years ago
- Category set to Sample Pipelines
- Target version set to 2015-04-29 sprint
starting Arvados job_tasks from Queue¶
A crunch job can't do "add a job_task, and wait for it to finish" because new tasks don't start until the task that queued them has completed. This avoids wasting resources and mangling output when a parent task fails and gets re-attempted: if the child tasks had started, they might have started their own child tasks, and so on, and the job output would end up containing multiple copies of each child task's output. I think we could change this such that new tasks start right away (assuming new task level == current level) but get recursively terminated and deleted if a parent task fails.
crunch-job makes a "job output" by concatenating all task outputs at the end of the job. In the case of Queue, this is incorrect. This could be addressed either by changing crunch-job ("if job output is not null, assume one of the tasks set it correctly, and skip create_output_collection()") or by mangling the task records ("set output to null on intermediate tasks so create_output_collection() doesn't use them").
starting Arvados jobs from Queue¶
Meanwhile, a job can queue more jobs, and they will start right away. Nothing is in place to clean up abandoned jobs when the parent fails in this setup, so this has potential to waste a lot of resources. However, this is the model we're moving to with Crunch2 (and is needed by more generalized workflow tools like CWL -- e.g., different child jobs can use different docker images) and doesn't require waiting on any Crunch functionality, so it might be a good way to start. This might also make it easier to run Queue on a shell VM or workstation without the crunch-job wrapper.
In order to be efficient this requires that each job started by Queue makes reasonably good use of an entire node (because nodes aren't yet shared across jobs) and isn't trivially short (because job warmup time can be a few minutes).
The efficiency advantage, of course, is that the node allocation is dynamic in this scenario.
Updated by Tom Clegg over 9 years ago
Here's an illustrative line from a Queue job.
2015-04-23_00:47:40 qr1hi-8i9sb-9xvdw8bg5n8eunm 12178 0 stderr INFO 00:47:39,949 FunctionEdge - Starting: 'java' '-Xmx98304m' '-XX:+UseParallelOldGC' '-XX:ParallelGCThreads=4' '-XX:GCTimeLimit=50' '-XX:GCHeapFreeLimit=10' '-Djava.io.tmpdir=/tmp/crunch-job-task-work/compute22.1/output/.queue/tmp' '-cp' '/keep/37f14b3ba952acb558fb9b942a431f47+398/Queue.jar' 'org.broadinstitute.gatk.engine.CommandLineGATK' '-T' 'HaplotypeCaller' '-I' '/keep/5abf6ca500449c6361a958d901123d85+5221/HWI-ST1027_129_D0THKACXX.dedup.mate.realign.recal.bam' '-L' '/tmp/crunch-job-task-work/compute22.1/output/.queue/scatterGather/HaplotypeCaller-1-sg/temp_0001_of_1000/scatter.intervals' '-R' '/keep/3514b8e5da0e8d109946bc809b20a78a+5698/human_g1k_v37.fasta' '-nct' '16' '-o' '/tmp/crunch-job-task-work/compute22.1/output/.queue/scatterGather/HaplotypeCaller-1-sg/temp_0001_of_1000/HWI-ST1027_129_D0THKACXX.vcf.raw_variants.vcf' '-stand_call_conf' '30.0' '-stand_emit_conf' '15.0' '-ploidy' '2'
Presumably this would translate to an Arvados job something like this:
- Propagate the script_parameters, script_version, repository, docker image, and arvados_sdk_version from the current job to the child job
- Replace
/keep/{collection_pdh}/
with$(dir $(input_name))/
(should work fine without this since /keep/ has the same path in all jobs/tasks but it would be ideal to maintain the abstraction -- even$(dir collection_pdh)
might be better than/keep/collection_pdh
) - Replace /tmp/crunch-job-task-work/compute22.1 (i.e., current task's task_tmpdir) with
$(task.tmpdir)
- Use the parent job's output directory as a task.vwd for each child job -- replace input file paths (like
-L .../scatter.intervals
) with paths relative to the vwd. - Alternatively, assume any given file is either read or written but not both, and
- Write the parent job's output directory to Keep as "tmpdir_snapshot"
- Munge each filename that does exist in tmpdir_snapshot so it's given to the child relative to
$(dir $(tmpdir_snapshot))
mkdir -p $(task.tmpdir)/{dirname(X)}
for each file X mentioned on the command line that doesn't exist in the parent's tmp/output dir
"script":"run-command", "script_parameters": { "command": { "value": [ "java", "-Xmx98304m", "-XX:+UseParallelOldGC", "-XX:ParallelGCThreads=4", "-XX:GCTimeLimit=50", "-XX:GCHeapFreeLimit=10", "-Djava.io.tmpdir=$(task.tmpdir)/output/.queue/tmp", "-cp", "/keep/37f14b3ba952acb558fb9b942a431f47+398/Queue.jar", "org.broadinstitute.gatk.engine.CommandLineGATK", "-T", "HaplotypeCaller", "-I", "/keep/5abf6ca500449c6361a958d901123d85+5221/HWI-ST1027_129_D0THKACXX.dedup.mate.realign.recal.bam", "-L", "$(task.vwd)/.queue/scatterGather/HaplotypeCaller-1-sg/temp_0001_of_1000/scatter.intervals", "-R", "/keep/3514b8e5da0e8d109946bc809b20a78a+5698/human_g1k_v37.fasta", "-nct", "16", "-o", "$(task.tmpdir)/output/.queue/scatterGather/HaplotypeCaller-1-sg/temp_0001_of_1000/HWI-ST1027_129_D0THKACXX.vcf.raw_variants.vcf", "-stand_call_conf", "30.0", "-stand_emit_conf", "15.0", "-ploidy", "2" ] }, "queue": { "value": "37f14b3ba952acb558fb9b942a431f47+398" }, "scala_script": { "value": "d89a7e6e248dad3df60c71cd92321681+69" }, "input_bam": { "5abf6ca500449c6361a958d901123d85+5221" }, "reference": { "value": "3514b8e5da0e8d109946bc809b20a78a+5698" } }, "max_tasks_per_node":1
When a child job finishes, the parent job can copy the child job's output into its own {tmp}/output/ dir. Presumably the child processes aren't supposed to stomp on one another's files so this should be at least as safe as running everything in a shared /tmp like most Queue setups.
I'm guessing this following part is just "nohup.out" (or a job log, in Arvados) and is meant for humans to inspect when troubleshooting, i.e., we don't need to copy the child job logs into the parent job's temp dir for Queue to look at:
2015-04-23_00:47:40 qr1hi-8i9sb-9xvdw8bg5n8eunm 12178 0 stderr INFO 00:47:39,949 FunctionEdge - Output written to /tmp/crunch-job-task-work/compute22.1/output/.queue/scatterGather/HaplotypeCaller-1-sg/temp_0001_of_1000/HWI-ST1027_129_D0THKACXX.vcf.raw_variants.vcf.out
...
Some restrictions/assumptions that may hold for GATK and help us get an initial implementation working even though it's not fully generic:Each filename is its own command line argument. There's noNo, there's-foo=/bar/baz
-Djava.io.tmpdir=/...
- All filenames are absolute. There's no
.queue/foo/bar
- Each child job will have just one task per node, so
-Xmx98304m
(presumably copied by Queue from its own process) should also work for each child job
Updated by Tom Clegg over 9 years ago
- Reduce the scatter level from 1000. Perhaps 100? 50? (In our experience reducing too far is often undesirable: one task often ends up taking way longer than everything else, and this affects turnaround time much more with a small number of biggish tasks.)
- Give the Arvados Queue plugin its own concurrency limit ("don't have more than X jobs at a time running/queued in Arvados"). Hopefully Queue's architecture won't make this difficult. If it does, we might reevaluate other options.
Updated by Ward Vandewege over 9 years ago
- Subject changed from Add Crunch support to GATK Queue to [Crunch] Add Crunch support to GATK Queue
- Status changed from New to In Progress
Updated by Peter Amstutz over 9 years ago
- Target version changed from 2015-04-29 sprint to 2015-05-20 sprint
Updated by Tom Clegg over 9 years ago
Looking at f6fcf9f on 5787-run-command-checkin
The checkin docstring says "Symlinks into the keep mount in the output dir are efficiently added to the collection with no data copying." For the benefit of those who don't already know that collections don't support symlinks, I think it would be helpful to mention the behavior (i.e., checkin follows symlinks and copies the data from target files) before the optimization (if the symlink resolves to a file in a Keep FUSE mount, the data is copied by reference, which is efficient).
AFAICT this branch still silently ignores (but no longer deletes) symlinks that resolve to something outside a keep mount, and symlinks to nonexistent files. Was this intentional? It seems sketchy to me. The run-command diagnostic message claims to do what I would expect ("the following output files will be saved...") with the minor complaint that the "find" output now includes directory names, which won't in fact be saved if they happen to be empty. (Perhaps "not type d" or "type f or l" is easy to do?)
Either way the new behavior looks incompatible with the old behavior (which is still advertised in the run-command docs, incidentally). The documented/old behavior is to ignore whatever the task does with symlinks in the vwd: the output is "original starting collection + whatever regular files are in the vwd at the end". With the new behavior, the output is "whatever regular files and symlinks-to-fuse-mount are in the vwd at the end". It seems the new behavior has to be optional in order to avoid breaking existing jobs that delete, rename, or add symlinks in the vwd. I can think of three reasonable modes:- Write all regular files and symlinks (including the new efficient symlinks-to-Keep feature)
- Ignore all symlinks; merge regular files + original collection (used to be merge=True, i.e., in a run-command subtask)
- Ignore all symlinks; only write regular files (used to be merge=False) -- although the caller can always delete symlinks ahead of time like run-command does now, so maybe this isn't important.
What happens if a symlink target is in a (writable) Keep mount in a collection that hasn't been committed? Does it cause the target collection to be committed? This would be nice on the vwd side, but might be surprising on the uncommitted-collection side ("why did this get committed??"). Should we defer this question to #3198?
The comment "delete symlinks,..." should be deleted now that checkin doesn't delete symlinks (not sure why this was needed before, but whatever).
robust_put is no longer used and should not be imported.
Updated by Peter Amstutz over 9 years ago
Tom Clegg wrote:
Looking at f6fcf9f on 5787-run-command-checkin
The checkin docstring says "Symlinks into the keep mount in the output dir are efficiently added to the collection with no data copying." For the benefit of those who don't already know that collections don't support symlinks, I think it would be helpful to mention the behavior (i.e., checkin follows symlinks and copies the data from target files) before the optimization (if the symlink resolves to a file in a Keep FUSE mount, the data is copied by reference, which is efficient).
Done.
AFAICT this branch still silently ignores (but no longer deletes) symlinks that resolve to something outside a keep mount, and symlinks to nonexistent files. Was this intentional? It seems sketchy to me. The run-command diagnostic message claims to do what I would expect ("the
Fixed. Also catches exceptions now.
following output files will be saved...") with the minor complaint that the "find" output now includes directory names, which won't in fact be saved if they happen to be empty. (Perhaps "not type d" or "type f or l" is easy to do?)
Fixed.
Either way the new behavior looks incompatible with the old behavior (which is still advertised in the run-command docs, incidentally). The documented/old behavior is to ignore whatever the task does with symlinks in the vwd: the output is "original starting collection + whatever regular files are in the vwd at the end". With the new behavior, the output is "whatever regular files and symlinks-to-fuse-mount are in the vwd at the end". It seems the new behavior has to be optional in order to avoid breaking existing jobs that delete, rename, or add symlinks in the vwd.
I think it's extremely unlikely that there any existing jobs that delete, rename or add symlinks in the vwd, and I don't think the risk justifies the complexity of maintaining a separate code path and adding a new flag.
I have updated the run-command documentation.
I can think of three reasonable modes:
- Write all regular files and symlinks (including the new efficient symlinks-to-Keep feature)
That's now the behavior (as you noted, it was ignoring non-keep symlinks, that's fixed.)
What happens if a symlink target is in a (writable) Keep mount in a collection that hasn't been committed? Does it cause the target collection to be committed? This would be nice on the vwd side, but might be surprising on the uncommitted-collection side ("why did this get committed??"). Should we defer this question to #3198?
Once we have writable FUSE, run-command's VWD capability will be functionally obsolete, and we should be doing #5778 instead.
The comment "delete symlinks,..." should be deleted now that checkin doesn't delete symlinks (not sure why this was needed before, but whatever).
Fixed.
robust_put is no longer used and should not be imported.
Fixed.
Updated by Tom Clegg over 9 years ago
Now at 0a87aad
typo "necessay" in vwd.py
Also catches exceptions now.
Hm. This looks like "if can't store output, print error message and proceed as if that's OK." Why is it ever OK to ignore errors here?
I think it's extremely unlikely that there any existing jobs that delete, rename or add symlinks in the vwd
(...and rely on run-command to write the missing files anyway)
Fair enough. The old behavior seems nearly completely useless apart from compatibility concerns, so dropping it seems fine.
I have updated the run-command documentation.
You changed "merge" to "write both", but this is still false, isn't it?
"When the command completes, the output collection will write both the output of your command and the contents of the starting collection."
What gets written is the content of the VWD at the end of the job -- which can include the symlinks to the starting collection -- right? The above sentence still describes the old behavior, where the content of the starting collection was always included.
The word "Get" at the beginning of the descriptions of $(task.tmpdir)
et al. seems odd to me. Why not just this?
$(task.tmpdir) | Designated temporary directory. This directory will be discarded when the job completes. |
"path to your git checkout" should use something closer to git's own terminology, like "git tree" or "git working directory".
Generally these Crunch environment variables (other than task.outdir
) should be documented at doc/api/crunch-scripts.html, since they're not particular to run-command -- the run-command doc just needs to map task.uuid to TASK_UUID etc. (Would have been even better to use exactly the same variable names so the translation table wouldn't be necessary at all, but I guess we missed that one...)
Once we have writable FUSE, run-command's VWD capability will be functionally obsolete, and we should be doing #5778 instead.
True, but saying "we should do something else" on this issue page doesn't help the poor confused users who try it. Documenting "don't put symlinks to writable FUSE dirs in your VWD" would help. Throwing an error when someone does it would also help.
Updated by Peter Amstutz over 9 years ago
Tom Clegg wrote:
Now at 0a87aad
typo "necessay" in vwd.py
Fixed.
Also catches exceptions now.
Hm. This looks like "if can't store output, print error message and proceed as if that's OK." Why is it ever OK to ignore errors here?
You're right. Now the task will fail on IOError or OSError (but it will still try to write all the output).
What gets written is the content of the VWD at the end of the job -- which can include the symlinks to the starting collection -- right? The above sentence still describes the old behavior, where the content of the starting collection was always included.
Fixed.
The word "Get" at the beginning of the descriptions of
$(task.tmpdir)
et al. seems odd to me. Why not just this?
$(task.tmpdir) Designated temporary directory. This directory will be discarded when the job completes. "path to your git checkout" should use something closer to git's own terminology, like "git tree" or "git working directory".
Fixed.
Once we have writable FUSE, run-command's VWD capability will be functionally obsolete, and we should be doing #5778 instead.
True, but saying "we should do something else" on this issue page doesn't help the poor confused users who try it. Documenting "don't put symlinks to writable FUSE dirs in your VWD" would help. Throwing an error when someone does it would also help.
Given that writable FUSE isn't done yet, and integrating writable FUSE is even further down, I'm not sure we should be spending a lot of time worrying about this right now.
Updated by Tom Clegg over 9 years ago
Peter Amstutz wrote:
You're right. Now the task will fail on IOError or OSError (but it will still try to write all the output).
Phew. Suggest last_error = e
instead of caught_error = True
but doesn't have to block.
What gets written is the content of the VWD at the end of the job -- which can include the symlinks to the starting collection -- right? The above sentence still describes the old behavior, where the content of the starting collection was always included.
Fixed.
Thanks. Just typo "which will include the out of your command" → ...output of...
("files from the starting collection remain read-only and cannot be altered" is a bit awkward but no more so than it was when we started here so I can keep living with it)
Given that writable FUSE isn't done yet, and integrating writable FUSE is even further down, I'm not sure we should be spending a lot of time worrying about this right now.
OK, I'll take that as a "yes" to my original question "Should we defer this question to #3198?" ;)
The rest lgtm, thanks.
Updated by Peter Amstutz over 9 years ago
- Status changed from In Progress to Resolved