Feature #11015
closedImprove throughput of crunch-run output-uploading stage using multi-threaded transfers
100%
Description
To improve throughput of crunch-run job output uploading, add support for multi-threaded asynchronous transfers to hide the latency inherent in cloud environments.
Refactoring to support public APIs in the Go SDK is a separate task.
Updated by Tom Morris almost 8 years ago
- Target version set to Arvados Future Sprints
Updated by Tom Morris almost 8 years ago
- Target version changed from Arvados Future Sprints to 2017-03-01 sprint
Updated by Tom Morris almost 8 years ago
- Subject changed from Improve throughput of crunch-run output-uploading stage using multithreaded transfers to Improve throughput of crunch-run output-uploading stage using multi-threaded transfers
- Description updated (diff)
- Assigned To set to Radhika Chippada
- Story points set to 2.0
Updated by Tom Clegg almost 8 years ago
Possible implementation
In (*CollectionFileWriter)goUpload(), currently we have
goUpload() { for block := range uploader { compute md5 write to keep append signed locator to m.ManifestStream.Blocks (or append err to errors) } }
We can add a buffered channel to CollectionWriter and pass it to (CFW)goUpload() and use it to limit the total number of blocks being written by all CFWs of a given CollectionWriter.
func (m *CollectionWriter) Open(path string) io.WriteCloser { ... m.mtx.Lock() defer m.mtx.Unlock() if m.workers == nil { if m.MaxWriters < 1 { m.MaxWriters = 1 } m.workers = make(chan struct{}, m.MaxWriters) } go fw.goUpload(m.workers) m.Streams = append(m.Streams, fw) ... } goUpload(workers chan struct{}) { var blocks []string var mtx sync.Mutex for block := range uploader { mtx.Lock() blockIndex := len(blocks) blocks = append(blocks, "") mtx.Unlock() workers <- struct{}{} // wait for an available worker slot wg.Add(1) go func(blockIndex int) { compute md5 write to keep <-workers // release worker slot mtx.Lock() m.ManifestStream.Blocks[blockIndex] = signedLocator (or append err to errors) mtx.Unlock() wg.Done() }(blockIndex) } wg.Wait() finish <- errors }
Updated by Radhika Chippada almost 8 years ago
- Status changed from New to In Progress
Updated by Lucas Di Pentima almost 8 years ago
Sorry for taking so long to send comments, was trying to understand the code the best I could.
One question: Are the upload workers number configurable? I'm not able to see where, it seems to be hardcoded to be 2.
I've run services/crunch-run
tests locally without issues.
Updated by Radhika Chippada almost 8 years ago
Are the upload workers number configurable?
Number of writers >1 will help us as this let's us use multiple threads to upload.
Thanks.
Updated by Radhika Chippada almost 8 years ago
- Status changed from In Progress to Resolved
- % Done changed from 0 to 100
Applied in changeset arvados|commit:0b125c52cd816e6a4120c414d3817f354cad1055.