Project

General

Profile

Actions

Python SDK » History » Revision 14

« Previous | Revision 14/17 (diff) | Next »
Brett Smith, 01/15/2015 03:39 PM
setting job output is Crunch-specific; move that behavior to CrunchDispatcher


Python SDK

This is a design draft for a next-generation Python SDK, making it easier to write Crunch scripts today, with room to grow for future workflow changes in Arvados.

Goals

As I understand them, in order of importance:

  • The SDK should provide a high-level, Pythonic interface to dispatch work through Arvados. It should eliminate any need to manipulate jobs or tasks directly via API calls, or manually instantiate objects from parameter strings. There should be a bare minimum of boilerplate code in a user's Crunch script—just the tiny bit necessary to use the SDK.
  • The SDK should be usable with today's Crunch, so users can start working with it and give us feedback to inform future development.
  • The SDK should have clear room to grow to accommodate future workflow improvements, whether that's new Crunch features (including possibly reusable tasks and/or elimination of the job/task distinction), hooks with the Common Workflow Language, or whatever.
    The first version of the SDK will not support features we expect these systems to have—that would conflict with the previous goal. The idea here is that the API should not rely on details of today's Crunch implementation, and should have clear spaces where users will be able to use the features of more powerful work dispatch systems in the future.

Components

Dispatcher

Dispatcher classes provide a consistent interface to talk to different scheduling systems: they know how to run the right code for this instantiation (e.g., the current task); how to dispatch new work to the scheduling system; how to de/serialize parameters; etc.

Early on, we'll have a traditional Crunch dispatcher, and a "running locally for debugging" dispatcher. If we want to support entirely different backends in the future, the Dispatcher interface should provide the right level of abstraction to do that. While I'm not sketching out an API here (since it's not user-facing), we do need to be careful to make sure that Dispatcher methods hit that design target.

Users can instantiate Dispatcher classes to customize their behavior. For example, one thing we know we want to support early on is telling Crunch how to set the job output from given tasks. Since the job/task distinction is specific to Crunch v1, that customization belongs in the Crunch Dispatcher class. We'll let users pass in task names to specify what output should become the job output:

dispatcher = CrunchDispatcher(job_output_from=['taskB', 'taskD'])

However, all Dispatchers should be able to run without any customization, and users shouldn't have to deal with them at all for the easy cases where they're fine with the scheduler's default behavior.

CrunchScript

The bulk of a user's Crunch script will define a subclass of the SDK's CrunchScript class. The user extends it with WorkMethods that represent work units, and a start() method that implements code to start the work. That done, the user calls subclass.run() to get the appropriate code running.

The run() class method accepts the Dispatcher to use in the dispatcher keyword argument. If none is provided, it introspects the environment to find the appropriate Dispatcher class to use. It instantiates that, passing along an instantiation of itself. To better illustrate, I'll describe how the current Crunch dispatcher would take over from here.

The Crunch Dispatcher gets the current task. If that's task 0, it gets the current job, deserializes the job parameters, and calls subclass.start() with those arguments. For any other task, it looks up the task's corresponding WorkMethod, and runs its code with deserialized parameters.

Either way, the method's return value is captured. If it's a Collection object, the Dispatcher makes sure it's saved, then records the portable data hash as the task's output. For any other return value except None, it serializes the return value to JSON, saves that in a Collection, and uses that as the task's output.

WorkMethod

WorkMethod is a class that should decorate instance methods defined on the CrunchScript subclass. It demarcates work that can be scheduled through a Dispatcher. Use looks like:

@WorkMethod()
def task_code(self, param1, param2):
    # implementation

When a WorkMethod is called—from start() or another WorkMethod's code—the user's code isn't run. Instead, we give the Dispatcher all the information about the call. The Dispatcher serializes the parameters and schedules the real work method to be run later. We wrap information about the work's scheduling in a FutureOutput object, which I'll get to next.

Note that WorkMethod accepts arguments, and we're passing it none. We expect the constructor to grow optional keyword arguments over time that let the user specify scheduling constraints.

FutureOutput

FutureOutput objects represent the output of work that has been scheduled but not yet run. Whenever a WorkMethod is called, it returns a FutureOutput object, and the user can pass that back in as an argument to later WorkMethod calls. When that happens, the Dispatcher does two things. First, it introspects all the FutureObjects passed in to figure out how the new work should be scheduled. Second, when it's time to run the later work, it finds the real output generated by the earlier work, and passes that in when running the user code. To be clear: each Dispatcher will have its own paired FutureObject implementation.

Let me talk about a Crunch example to make things concrete. Roughly, the FutureObject will wrap the task we create to represent the work. Imagine a simple linear Crunch job:

step1out = task1(arg1, arg2)
task2(step1out, arg)

When task2 is called, the Crunch Dispatcher introspects the arguments, and finds the step1out FutureObject among them. The task2 code can't be run until task1 has finished. The Crunch Dispatcher gets task1's sequence number from step1out, and creates a task to run task2 with a greater sequence number.

When Crunch actually starts the compute work for task2, the Crunch Dispatcher will see that this argument should have the value of task1's output. It will deserialize that from the task output mirroring the rules described earlier, then pass that object in as the value when it calls the user's code.

Review

Here are the interfaces that script authors will use:

  • They will subclass CrunchScript to define their own start() method and WorkMethods.
  • They will decorate their work methods with WorkMethod(). This decorator will not currently accept any arguments, but you'll still call it as if it did, so it can grow to accept new keyword arguments in the future.
  • They will receive FutureOutput objects when they call WorkMethods, and pass those to other WorkMethod calls. The user doesn't need to know any of the semantics of FutureOutput at all; only the Dispatcher does.
  • The main body of the script will call the run() class method on their CrunchScript subclass.

This is a very small interface. There's nothing Crunch-specific in it. When the library takes responsibility for some Arvados interface, it takes responsibility for all interactions with it: e.g., it both serializes and deserializes parameters; it controls all aspects of work scheduling. Users shouldn't have to encode any Crunch specifics in their scripts. I think that bodes well for future development of the SDK.

Hypothetical future Crunch scripts

These are completely untested, since they can't actually run. Please forgive cosmetic mistakes.

Example: Crunch statistics on a Collection of fastj files indicating a tumor

This demonstrates scheduling tasks in order and explicit job output.

#!/usr/bin/env python

import os
import pprint
import subprocess

from arvados.crunch import CrunchScript, WorkMethod, CrunchDispatcher
from subprocess import check_call

class TumorAnalysis(CrunchScript):
    # input is a Collection named in a job parameter.
    # The SDK instantiates the corresponding Colection object and passes it in.
    def start(self, input):
        # analyze_tumors is being called with a generator of
        # FutureOutput objects.  The Dispatcher will unroll this (it
        # inevitably needs to traverse recursive data structures),
        # schedule all the classify tasks at sequence N+1, then
        # schedule analyze_tumors after at sequence N+2.
        self.analyze_tumors(self.classify(in_file)
                            for in_file in input.all_files()
                            if in_file.name.endswith('.fastj'))

    @WorkMethod()
    def classify(self, in_file):
        out_file = tempfile.NamedTemporaryFile()
        proc = subprocess.Popen(['normal_or_tumor'],
                                stdin=subprocess.PIPE, stdout=out_file)
        for data in in_file.readall():
            proc.stdin.write(data)
        proc.stdin.close()
        proc.wait()
        if 'tumor' in outfile.read(4096):
            out_file.seek(0)
            out_coll = CollectionWriter()
            with out_coll.open(in_file.name) as output:
                output.write(out_file.read())
            return out_coll
        return None

    @WorkMethod()
    def analyze_tumors(self, results):
        compiled = {}
        for result in results:
            if result is None:
                continue
            # Imagine a reduce-type step operating on the file in the
            # Collection.
            compiled[thing] = ...
        return compiled

if __name__ == '__main__':
    dispatcher = CrunchDispatcher(job_output_from=['analyze_tumors'])
    TumorAnalysis.run(dispatcher=dispatcher)

Example from #3603

This is the script that Abram used to illustrate #3603.

#!/usr/bin/env python

from arvados.crunch import CrunchScript, WorkMethod
from subprocess import check_call

class Example3603(CrunchScript):
    def start(self, human_collection_list, refpath):
        for line in human_collection_list:
            fastj_id, human_id = line.strip().split(',')
            self.run_ruler(refpath, fastj_id, human_id)

    @WorkMethod()
    def run_ruler(self, refpath, fastj_id, human_id):
        # CollectionReader and StreamFileReader objects should grow a
        # mount_path() method that gives the object's location under
        # $TASK_KEEPMOUNT.
        check_call(["tileruler", "--crunch", "--noterm", "abv",
                    "-human", human_id,
                    "-fastj-path", CollectionReader(fastj_id).mount_path(),
                    "-lib-path", refpath])
        out_coll = CollectionWriter()
        # The argument is the path where tileruler writes output.
        out_coll.write_directory_tree('.')
        return out_coll

if __name__ == '__main__':
    # Note that there will be many run_ruler tasks.  Their output will be
    # concatenated.  The default behavior, as usual, is to concatenate
    # everything.
    dispatcher = CrunchDispatcher(job_output_from=['run_ruler'])
    Example3603.run(dispatcher=dispatcher)

Notes/TODO

(Some of these may be out-of-date, but I'll leave that for the suggesters to decide.)

  • Important concurrency limits that job scripts must be able to express:
    • Task Z cannot start until all outputs/side effects of tasks W, X, Y are known/complete (e.g., because Z uses WXY's outputs as its inputs).
      • I'm considering having task methods immediately return some kind of future object. You could pass that in to other task methods, and the SDK would introspect that and schedule the second task to come after the first. This would also make it look more like normal Python: a higher-level function can call one task method, grab its output, and pass it in to another. Probably the only catch is that we wouldn't be using the original method's real return value, but instead the task's output collection. When we run the second task, what was represented by a future can be replaced with a CollectionReader object for the task's output.
    • Task Y and Z cannot run on the same worker node without interfering with each other (e.g., due to RAM requirements).
    • Schedule at most N instance of task T on the same node. (The parallel_with=[] example above is stronger than we actually want, for a multithreaded task. It would be okay to run several of these tasks in parallel, as long as there's only one per node. This could be expressed as a particular case of the above bullet: "Task T cannot run on the same worker node as task T without interference." But that feels less natural.)
      • This is an SDK for Crunch, and currently Crunch gives the user no control of task scheduling beyond the sequence, like how one compute node can or can't be shared. I don't know how to write these features given that there's no way to express these constraints in a way Crunch will recognize and expect (beyond the brute-force "don't parallelize this").
  • In general, the output name is not known until the task is nearly finished. Frequently it is clearer to specify it when the task is queued, though. We should provide a convenient way to do this without any boilerplate in the queued task.
    • This seems pretty easily doable by passing the desired filename into the task method as a parameter. Then the code can work with self.output.open(passed_filename).
  • A second example that uses a "case" and "control" input (e.g., "tumor" and "normal") might help reveal features.
  • Should get more clear about how the output of the job (as opposed to the output of the last task) is to be set. The obvious way (concatenate all task outputs) should be a one-liner, if not implicit. Either way, it should run in a task rather than being left up to crunch-job.
    • This also sounds like something that needs to be settled at the Crunch level. But it seems like we could be pretty accommodating by having crunch-job just say, "If the job set an output, great; if not, collate the task outputs." Then the SDK can provide pre-defined task methods for common output setups: collate the output of all tasks or a subset of them; use the output of a specific task; etc.

Updated by Brett Smith almost 10 years ago · 17 revisions