Python SDK » History » Version 1
Tom Clegg, 08/16/2014 01:03 AM
1 | 1 | Tom Clegg | h1. Python SDK |
---|---|---|---|
2 | |||
3 | (design draft) |
||
4 | |||
5 | <pre><code class="python"> |
||
6 | #!/usr/bin/env python |
||
7 | |||
8 | from arvados import CrunchJob |
||
9 | |||
10 | import examplelib |
||
11 | import re |
||
12 | |||
13 | class NormalizeMatchingFiles(CrunchJob): |
||
14 | @CrunchJob.task() |
||
15 | def grep_files(self): |
||
16 | # CrunchJob instantiates input parameters based on the |
||
17 | # dataclass attribute. When we ask for the input parameter, |
||
18 | # CrunchJob sees that it's a Collection, and returns a |
||
19 | # CollectionReader object. |
||
20 | for filename in self.job_param('input').filenames(): |
||
21 | self.grep_file(filename) |
||
22 | |||
23 | @CrunchJob.task() |
||
24 | def grep_file(self, filename): |
||
25 | regexp = re.compile(self.job_param('pattern')) |
||
26 | with self.job_param('input').open(filename) as in_file: |
||
27 | for line in in_file: |
||
28 | if regexp.search(line): |
||
29 | self.normalize(filename) |
||
30 | break |
||
31 | |||
32 | # examplelib is already multi-threaded and will peg the whole |
||
33 | # compute node. These tasks should run sequentially. |
||
34 | @CrunchJob.task(parallel_with=[]) |
||
35 | def normalize(self, filename): |
||
36 | output = examplelib.frob(self.job_param('input').mount_path(filename)) |
||
37 | # self.output is a CollectionWriter. When this task method finishes, |
||
38 | # CrunchJob checks if we wrote anything to it. If so, it takes care |
||
39 | # of finishing the upload process, and sets this task's output to the |
||
40 | # Collection UUID. |
||
41 | with self.output.open(filename) as out_file: |
||
42 | out_file.write(output) |
||
43 | |||
44 | |||
45 | if __name__ == '__main__': |
||
46 | NormalizeMatchingFiles(task0='grep_files').main() |
||
47 | </code></pre> |