Python SDK » History » Version 9
Brett Smith, 12/11/2014 08:51 PM
1 | 1 | Tom Clegg | h1. Python SDK |
---|---|---|---|
2 | |||
3 | (design draft) |
||
4 | |||
5 | 5 | Brett Smith | h1. Hypothetical future Crunch scripts |
6 | 1 | Tom Clegg | |
7 | 5 | Brett Smith | We're writing these out with the goal of designing a new SDK for Crunch script authors. |
8 | |||
9 | {{toc}} |
||
10 | |||
11 | h2. Example scripts |
||
12 | |||
13 | 6 | Brett Smith | h3. Example: "Normalize" files matching a regexp |
14 | 5 | Brett Smith | |
15 | 1 | Tom Clegg | <pre><code class="python"> |
16 | #!/usr/bin/env python |
||
17 | |||
18 | from arvados import CrunchJob |
||
19 | |||
20 | import examplelib |
||
21 | import re |
||
22 | |||
23 | class NormalizeMatchingFiles(CrunchJob): |
||
24 | @CrunchJob.task() |
||
25 | def grep_files(self): |
||
26 | # CrunchJob instantiates input parameters based on the |
||
27 | # dataclass attribute. When we ask for the input parameter, |
||
28 | # CrunchJob sees that it's a Collection, and returns a |
||
29 | # CollectionReader object. |
||
30 | 7 | Brett Smith | input_collection = self.job_param('input') |
31 | for filename in input_collection.filenames(): |
||
32 | self.grep_file(self.job_param('pattern'), input_collection, filename) |
||
33 | 1 | Tom Clegg | |
34 | @CrunchJob.task() |
||
35 | 3 | Brett Smith | def grep_file(self, pattern, collection, filename): |
36 | regexp = re.compile(pattern) |
||
37 | with collection.open(filename) as in_file: |
||
38 | 1 | Tom Clegg | for line in in_file: |
39 | if regexp.search(line): |
||
40 | 4 | Brett Smith | self.normalize(in_file) |
41 | 1 | Tom Clegg | break |
42 | |||
43 | # examplelib is already multi-threaded and will peg the whole |
||
44 | # compute node. These tasks should run sequentially. |
||
45 | 8 | Brett Smith | # Tasks are normally parallel with themselves; here we override that |
46 | # default to say that the task is parallel with nothing, not even itself. |
||
47 | 4 | Brett Smith | # When tasks are created, Arvados-specific objects like Collection file |
48 | # objects are serialized as task parameters. CrunchJob instantiates |
||
49 | # these parameters as real objects when it runs the task. |
||
50 | 1 | Tom Clegg | @CrunchJob.task(parallel_with=[]) |
51 | 7 | Brett Smith | def normalize(self, collection_file): |
52 | output = examplelib.frob(collection_file.mount_path()) |
||
53 | 1 | Tom Clegg | # self.output is a CollectionWriter. When this task method finishes, |
54 | # CrunchJob checks if we wrote anything to it. If so, it takes care |
||
55 | # of finishing the upload process, and sets this task's output to the |
||
56 | # Collection UUID. |
||
57 | 7 | Brett Smith | with self.output.open(collection_file.name) as out_file: |
58 | 1 | Tom Clegg | out_file.write(output) |
59 | |||
60 | |||
61 | if __name__ == '__main__': |
||
62 | 7 | Brett Smith | NormalizeMatchingFiles(task0='grep_files').run() |
63 | 1 | Tom Clegg | </code></pre> |
64 | 7 | Brett Smith | |
65 | 6 | Brett Smith | |
66 | h3. Example: Crunch statistics on a Collection of fastj files indicating a tumor |
||
67 | |||
68 | This demonstrates scheduling tasks in order and explicit job output. |
||
69 | |||
70 | <pre><code class="python"> |
||
71 | #!/usr/bin/env python |
||
72 | |||
73 | import glob |
||
74 | import os |
||
75 | import pprint |
||
76 | |||
77 | from arvados import CrunchJob |
||
78 | from subprocess import check_call |
||
79 | |||
80 | class TumorAnalysis(CrunchJob): |
||
81 | OUT_EXT = '.analysis' |
||
82 | |||
83 | @CrunchJob.task() |
||
84 | def check_fastjs(self): |
||
85 | in_coll = self.job_param('input') |
||
86 | for name in in_coll.filenames(): |
||
87 | if name.endswith('.fastj'): |
||
88 | self.classify(in_coll, name) |
||
89 | # analyze_tumors gets scheduled to run after all the classification, |
||
90 | # since they're not parallel with each other (and it was invoked later). |
||
91 | self.analyze_tumors() |
||
92 | |||
93 | @CrunchJob.task() |
||
94 | def classify(self, collection, filename): |
||
95 | # Methods that refer to directories, like mount_path and job_dir, |
||
96 | # work like os.path.join when you pass them arguments. |
||
97 | check_call(['normal_or_tumor', collection.mount_path(filename)]) |
||
98 | outpath = filename + self.OUT_EXT |
||
99 | with open(outpath) as result: |
||
100 | is_tumor = 'tumor' in result.read(4096) |
||
101 | if is_tumor: |
||
102 | os.rename(outpath, self.job_dir(outpath)) |
||
103 | |||
104 | @CrunchJob.task() |
||
105 | def analyze_tumors(self): |
||
106 | compiled = {} |
||
107 | results = glob.glob(self.job_dir('*' + self.OUT_EXT)) |
||
108 | for outpath in results: |
||
109 | with open(outpath) as outfile: |
||
110 | compiled[thing] = ... # Imagine this is a reduce-type step. |
||
111 | # job_output is a CollectionWriter. Writing to it overrides the |
||
112 | # default behavior where job output is collated task output. |
||
113 | with self.job_output.open('compiled_numbers.log') as resultfile: |
||
114 | pprint.pprint(compiled, resultfile) |
||
115 | |||
116 | |||
117 | if __name__ == '__main__': |
||
118 | TumorAnalysis(task0='check_fastjs').run() |
||
119 | 1 | Tom Clegg | </code></pre> |
120 | |||
121 | 5 | Brett Smith | h3. Example from #3603 |
122 | |||
123 | This is the script that Abram used to illustrate #3603. |
||
124 | |||
125 | <pre><code class="python"> |
||
126 | #!/usr/bin/env python |
||
127 | |||
128 | from arvados import Collection, CrunchJob |
||
129 | from subprocess import check_call |
||
130 | |||
131 | class Example3603(CrunchJob): |
||
132 | @CrunchJob.task() |
||
133 | def parse_human_map(self): |
||
134 | refpath = self.job_param('REFPATH').name |
||
135 | for line in self.job_param('HUMAN_COLLECTION_LIST'): |
||
136 | fastj_id, human_id = line.strip().split(',') |
||
137 | self.run_ruler(refpath, fastj_id, human_id) |
||
138 | |||
139 | @CrunchJob.task() |
||
140 | def run_ruler(self, refpath, fastj_id, human_id): |
||
141 | check_call(["tileruler", "--crunch", "--noterm", "abv", |
||
142 | "-human", human_id, |
||
143 | "-fastj-path", Collection(fastj_id).mount_path(), |
||
144 | "-lib-path", refpath]) |
||
145 | self.output.add('.') # Or the path where tileruler writes output. |
||
146 | |||
147 | |||
148 | if __name__ == '__main__': |
||
149 | Example3603(task0='parse_human_map').run() |
||
150 | </code></pre> |
||
151 | |||
152 | h2. Notes/TODO |
||
153 | |||
154 | 2 | Tom Clegg | * Important concurrency limits that job scripts must be able to express: |
155 | ** Task Z cannot start until all outputs/side effects of tasks W, X, Y are known/complete (e.g., because Z uses WXY's outputs as its inputs). |
||
156 | 9 | Brett Smith | *** I'm considering having task methods immediately return some kind of future object. You could pass that in to other task methods, and the SDK would introspect that and schedule the second task to come after the first. This would also make it look more like normal Python: a higher-level function can call one task method, grab its output, and pass it in to another. Probably the only catch is that we wouldn't be using the original method's real return value, but instead the task's output collection. When we run the second task, what was represented by a future can be replaced with a CollectionReader object for the task's output. |
157 | 2 | Tom Clegg | ** Task Y and Z cannot run on the same worker node without interfering with each other (e.g., due to RAM requirements). |
158 | 8 | Brett Smith | ** Schedule at most N instance of task T on the same node. (The @parallel_with=[]@ example above is stronger than we actually want, for a multithreaded task. It would be okay to run several of these tasks in parallel, as long as there's only one per node. This could be expressed as a particular case of the above bullet: "Task T cannot run on the same worker node as task T without interference." But that feels less natural.) |
159 | 9 | Brett Smith | *** This is an SDK for Crunch, and currently Crunch gives the user no control of task scheduling beyond the sequence, like how one compute node can or can't be shared. I don't know how to write these features given that there's no way to express these constraints in a way Crunch will recognize and expect (beyond the brute-force "don't parallelize this"). |
160 | 2 | Tom Clegg | * In general, the output name is not known until the task is nearly finished. Frequently it is clearer to specify it when the task is queued, though. We should provide a convenient way to do this without any boilerplate in the queued task. |
161 | 9 | Brett Smith | ** This seems pretty easily doable by passing the desired filename into the task method as a parameter. Then the code can work with @self.output.open(passed_filename)@. |
162 | 2 | Tom Clegg | * A second example that uses a "case" and "control" input (e.g., "tumor" and "normal") might help reveal features. |
163 | 1 | Tom Clegg | * Should get more clear about how the output of the job (as opposed to the output of the last task) is to be set. The obvious way (concatenate all task outputs) should be a one-liner, if not implicit. Either way, it should run in a task rather than being left up to @crunch-job@. |
164 | 9 | Brett Smith | ** This also sounds like something that needs to be settled at the Crunch level. But it seems like we could be pretty accommodating by having crunch-job just say, "If the job set an output, great; if not, collate the task outputs." Then the SDK can provide pre-defined task methods for common output setups: collate the output of all tasks or a subset of them; use the output of a specific task; etc. |