Project

General

Profile

Python SDK » History » Version 11

Brett Smith, 01/15/2015 02:47 PM
more about CrunchScript.run()

1 1 Tom Clegg
h1. Python SDK
2
3 10 Brett Smith
This is a design draft for a next-generation Python SDK, making it easier to write Crunch scripts today, with room to grow for future workflow changes in Arvados.
4 1 Tom Clegg
5 10 Brett Smith
{{toc}}
6 1 Tom Clegg
7 10 Brett Smith
h2. Goals
8 1 Tom Clegg
9 10 Brett Smith
As I understand them, in order of importance:
10 1 Tom Clegg
11 10 Brett Smith
* The SDK should provide a high-level, Pythonic interface to dispatch work through Arvados.  It should eliminate any need to manipulate jobs or tasks directly via API calls, or manually instantiate objects from parameter strings.  There should be a bare minimum of boilerplate code in a user's Crunch script—just the tiny bit necessary to use the SDK.
12
* The SDK should be usable with today's Crunch, so users can start working with it and give us feedback to inform future development.
13
* The SDK should have clear room to grow to accommodate future workflow improvements, whether that's new Crunch features (including possibly reusable tasks and/or elimination of the job/task distinction), hooks with the Common Workflow Language, or whatever.
14
  The first version of the SDK will not support features we expect these systems to have—that would conflict with the previous goal.  The idea here is that the API should not rely on details of today's Crunch implementation, and should have clear spaces where users will be able to use the features of more powerful work dispatch systems in the future.
15 1 Tom Clegg
16 10 Brett Smith
h2. Components
17 1 Tom Clegg
18 10 Brett Smith
h3. Dispatcher
19 1 Tom Clegg
20 10 Brett Smith
Dispatcher classes are internal to the SDK; users do not interact with them at all.  They provide a consistent interface to talk to the scheduling system: they know how to run the right code for this instantiation (e.g., the current task); how to dispatch new work to the scheduling system; how to de/serialize parameters, etc.
21 1 Tom Clegg
22 10 Brett Smith
Early on, we'll have a traditional Crunch dispatcher, and a "running locally for debugging" dispatcher.  If we want to support entirely different backends in the future, the Dispatcher interface should provide the right level of abstraction to do that.  While I'm not sketching out an API here (since it's not user-facing), we do need to be careful to make sure that Dispatcher methods hit that design target.
23 1 Tom Clegg
24 10 Brett Smith
h3. CrunchScript
25 1 Tom Clegg
26 10 Brett Smith
The bulk of a user's Crunch script will define a subclass of the SDK's CrunchScript class.  The user extends it with WorkMethods that represent work units, and a start() method that implements code to start the work.  That done, the user calls subclass.run() to get the appropriate code running.
27 1 Tom Clegg
28 11 Brett Smith
The run() class method accepts the Dispatcher class to use in the @dispatcher@ keyword argument.  If none is provided, it introspects the environment to find the appropriate Dispatcher class to use.  It instantiates that, passing along an instantiation of itself.  To better illustrate, I'll describe how the current Crunch dispatcher would take over from here.
29 1 Tom Clegg
30 10 Brett Smith
The Crunch Dispatcher gets the current task.  If that's task 0, it gets the current job, deserializes the job parameters, and calls subclass.start() with those arguments.  For any other task, it looks up the task's corresponding WorkMethod, and runs its code with deserialized parameters.
31 1 Tom Clegg
32
Either way, the method's return value is captured.  If it's a Collection object, the Dispatcher makes sure it's saved, then records the portable data hash as the task's output.  For any other return value except None, it serializes the return value to JSON, saves that in a Collection, and uses that as the task's output.
33 11 Brett Smith
34
The run() class method also takes a @job_output@ keyword argument with a magic string that says how to set the job's output.  Off the bat we'll support @concatenate_all_tasks@ and @concatenate_last_sequence@.  @concatenate_all_tasks@ is Crunch's current behavior; hopefully @concatenate_last_sequence@ is self-explanatory from that.
35
36 10 Brett Smith
37
h3. WorkMethod
38
39
WorkMethod is a class that should decorate instance methods defined on the CrunchScript subclass.  It demarcates work that can be scheduled through a Dispatcher.  Use looks like:
40
41
<pre><code class="python">@WorkMethod()
42
def task_code(self, param1, param2):
43
    # implementation
44 1 Tom Clegg
</code></pre>
45
46 10 Brett Smith
When a WorkMethod is called—from start() or another WorkMethod's code—the user's code isn't run.  Instead, we give the Dispatcher all the information about the call.  The Dispatcher serializes the parameters and schedules the real work method to be run later.  We wrap information about the work's scheduling in a FutureOutput object, which I'll get to next.
47 1 Tom Clegg
48 10 Brett Smith
Note that WorkMethod accepts arguments, and we're passing it none.  We expect the constructor to grow optional keyword arguments over time that let the user specify scheduling constraints.
49
50
h3. FutureOutput
51
52
FutureOutput objects represent the output of work that has been scheduled but not yet run.  Whenever a WorkMethod is called, it returns a FutureOutput object, and the user can pass that back in as an argument to later WorkMethod calls.  When that happens, the Dispatcher does two things.  First, it introspects all the FutureObjects passed in to figure out how the new work should be scheduled.  Second, when it's time to run the later work, it finds the real output generated by the earlier work, and passes that in when running the user code.  To be clear: each Dispatcher will have its own paired FutureObject implementation.
53
54
Let me talk about a Crunch example to make things concrete.  Roughly, the FutureObject will wrap the task we create to represent the work.  Imagine a simple linear Crunch job:
55
56
<pre><code class="python">step1out = task1(arg1, arg2)
57
task2(step1out, arg)</code></pre>
58
59
When task2 is called, the Crunch Dispatcher introspects the arguments, and finds the step1out FutureObject among them.  The task2 code can't be run until task1 has finished.  The Crunch Dispatcher gets task1's sequence number from step1out, and creates a task to run task2 with a greater sequence number.
60
61
When Crunch actually starts the compute work for task2, the Crunch Dispatcher will see that this argument should have the value of task1's output.  It will deserialize that from the task output mirroring the rules described earlier, then pass that object in as the value when it calls the user's code.
62
63
h3. Review
64
65
Here are the interfaces that script authors will use:
66
67
* They will subclass CrunchScript to define their own start() method and WorkMethods.
68
* They will decorate their work methods with WorkMethod().  This decorator will not currently accept any arguments, but you'll still call it as if it did, so it can grow to accept new keyword arguments in the future.
69
* They will receive FutureOutput objects when they call WorkMethods, and pass those to other WorkMethod calls.  The user doesn't need to know any of the semantics of FutureOutput at all; only the Dispatcher does.
70
* The main body of the script will call the run() class method on their CrunchScript subclass.
71
72
This is a very small interface.  There's nothing Crunch-specific in it.  When the library takes responsibility for some Arvados interface, it takes responsibility for all interactions with it: e.g., it both serializes and deserializes parameters; it controls all aspects of work scheduling.  Users shouldn't have to encode any Crunch specifics in their scripts.  I think that bodes well for future development of the SDK.
73
74
h2. Hypothetical future Crunch scripts
75
76
These are completely untested, since they can't actually run.  Please forgive cosmetic mistakes.
77
78 6 Brett Smith
h3. Example: Crunch statistics on a Collection of fastj files indicating a tumor
79
80
This demonstrates scheduling tasks in order and explicit job output.
81
82
<pre><code class="python">
83
#!/usr/bin/env python
84
85
import os
86
import pprint
87 10 Brett Smith
import subprocess
88 6 Brett Smith
89 10 Brett Smith
from arvados import CrunchScript, WorkMethod
90 6 Brett Smith
from subprocess import check_call
91
92 10 Brett Smith
class TumorAnalysis(CrunchScript):
93
    # input is a Collection named in a job parameter.
94
    # The SDK instantiates the corresponding Colection object and passes it in.
95
    def start(self, input):
96
        # analyze_tumors is being called with a generator of
97
        # FutureOutput objects.  The Dispatcher will unroll this (it
98
        # inevitably needs to traverse recursive data structures),
99
        # schedule all the classify tasks at sequence N+1, then
100
        # schedule analyze_tumors after at sequence N+2.
101
        self.analyze_tumors(self.classify(in_file)
102
                            for in_file in input.all_files()
103
                            if in_file.name.endswith('.fastj'))
104 6 Brett Smith
105 10 Brett Smith
    @WorkMethod()
106
    def classify(self, in_file):
107
        out_file = tempfile.NamedTemporaryFile()
108
        proc = subprocess.Popen(['normal_or_tumor'],
109
                                stdin=subprocess.PIPE, stdout=out_file)
110
        for data in in_file.readall():
111
            proc.stdin.write(data)
112
        proc.stdin.close()
113
        proc.wait()
114
        if 'tumor' in outfile.read(4096):
115
            out_file.seek(0)
116
            out_coll = CollectionWriter()
117
            with out_coll.open(in_file.name) as output:
118
                output.write(out_file.read())
119
            return out_coll
120
        return None
121 5 Brett Smith
122 10 Brett Smith
    @WorkMethod()
123
    def analyze_tumors(self, results):
124 5 Brett Smith
        compiled = {}
125 10 Brett Smith
        for result in results:
126
            if result is None:
127
                continue
128
            # Imagine a reduce-type step operating on the file in the
129
            # Collection.
130
            compiled[thing] = ...
131
        return compiled
132 5 Brett Smith
133
134
if __name__ == '__main__':
135 10 Brett Smith
    TumorAnalysis.run()
136 5 Brett Smith
</code></pre>
137
138 10 Brett Smith
h3. Example from #3603 (FIXME: UPDATE THIS)
139 5 Brett Smith
140
This is the script that Abram used to illustrate #3603.
141
142
<pre><code class="python">
143
#!/usr/bin/env python
144
145
from arvados import Collection, CrunchJob
146
from subprocess import check_call
147
148
class Example3603(CrunchJob):
149
    @CrunchJob.task()
150 2 Tom Clegg
    def parse_human_map(self):
151
        refpath = self.job_param('REFPATH').name
152 9 Brett Smith
        for line in self.job_param('HUMAN_COLLECTION_LIST'):
153 2 Tom Clegg
            fastj_id, human_id = line.strip().split(',')
154 8 Brett Smith
            self.run_ruler(refpath, fastj_id, human_id)
155 9 Brett Smith
156 2 Tom Clegg
    @CrunchJob.task()
157 9 Brett Smith
    def run_ruler(self, refpath, fastj_id, human_id):
158 2 Tom Clegg
        check_call(["tileruler", "--crunch", "--noterm", "abv",
159 1 Tom Clegg
                    "-human", human_id,
160 9 Brett Smith
                    "-fastj-path", Collection(fastj_id).mount_path(),
161 1 Tom Clegg
                    "-lib-path", refpath])
162
        self.output.add('.')  # Or the path where tileruler writes output.
163
164
165
if __name__ == '__main__':
166
    Example3603(task0='parse_human_map').run()
167
</code></pre>
168
169
h2. Notes/TODO
170
171
* Important concurrency limits that job scripts must be able to express:
172
** Task Z cannot start until all outputs/side effects of tasks W, X, Y are known/complete (e.g., because Z uses WXY's outputs as its inputs).
173
*** I'm considering having task methods immediately return some kind of future object.  You could pass that in to other task methods, and the SDK would introspect that and schedule the second task to come after the first.  This would also make it look more like normal Python: a higher-level function can call one task method, grab its output, and pass it in to another.  Probably the only catch is that we wouldn't be using the original method's real return value, but instead the task's output collection.  When we run the second task, what was represented by a future can be replaced with a CollectionReader object for the task's output.
174
** Task Y and Z cannot run on the same worker node without interfering with each other (e.g., due to RAM requirements).
175
** Schedule at most N instance of task T on the same node.  (The @parallel_with=[]@ example above is stronger than we actually want, for a multithreaded task.  It would be okay to run several of these tasks in parallel, as long as there's only one per node.  This could be expressed as a particular case of the above bullet: "Task T cannot run on the same worker node as task T without interference."  But that feels less natural.)
176
*** This is an SDK for Crunch, and currently Crunch gives the user no control of task scheduling beyond the sequence, like how one compute node can or can't be shared.  I don't know how to write these features given that there's no way to express these constraints in a way Crunch will recognize and expect (beyond the brute-force "don't parallelize this").
177
* In general, the output name is not known until the task is nearly finished. Frequently it is clearer to specify it when the task is queued, though. We should provide a convenient way to do this without any boilerplate in the queued task.
178
** This seems pretty easily doable by passing the desired filename into the task method as a parameter.  Then the code can work with @self.output.open(passed_filename)@.
179
* A second example that uses a "case" and "control" input (e.g., "tumor" and "normal") might help reveal features.
180
* Should get more clear about how the output of the job (as opposed to the output of the last task) is to be set. The obvious way (concatenate all task outputs) should be a one-liner, if not implicit. Either way, it should run in a task rather than being left up to @crunch-job@.
181
** This also sounds like something that needs to be settled at the Crunch level.  But it seems like we could be pretty accommodating by having crunch-job just say, "If the job set an output, great; if not, collate the task outputs."  Then the SDK can provide pre-defined task methods for common output setups: collate the output of all tasks or a subset of them; use the output of a specific task; etc.