Story #3761
closed[Keep] Process entries on the current pull list.
100%
Description
Currently, when receiving its first pull list, keepstore sets up a WorkQueue instance called pullq. At the same time it should also start a pull worker goroutine:
go RunPullWorker(pullq.NextItem)
The resulting goroutine will run forever, processing pull requests on the WorkQueue one at a time.
"RunPullWorker" will:- Get the next pull request.
- For each server, try Pull(). Stop when one succeeds.
- Repeat.
- Generate a random API token1.
- Generate a permission signature using the random token, timestamp ~60 seconds in the future, and desired block hash.
- Using this token & signature, retrieve the given block from the given keepstore server.
- Verify checksum and write to storage, just as if it had been provided by a client in a PUT transaction. I.e., PutBlock().
RunPullWorker() and Pull() will look something like this:
func RunPullWorker(nextItem <-chan interface{}) {
for item := range nextItem {
pullReq := item.(PullRequest)
for _, addr := range pullReq.Servers {
err := pw.Pull(pullReq.Locator, addr)
if err == nil {
break
}
}
}
}
func Pull(addr string, locator string) (err error) {
log.Printf("Pull %s/%s starting", locator, addr)
defer func() {
if err == nil {
log.Printf("Pull %s/%s success", addr, locator)
} else {
log.Printf("Pull %s/%s error: %s", addr, locator, err)
}
}()
// (will also need to set auth headers and add a signature token to the locator here)
resp, err = http.Get("http://%s/%s", addr, locator)
if err { return }
data, err = ioutil.ReadAll(resp.Body)
if err { return }
err = PutBlock(data, locator)
return
}
PullWorker doesn't need to worry about:
- Retrying (Data Manager will tell us to do the pull again, if it's still needed)
- Concurrency (we can add concurrency safely & easily by starting multiple PullWorkers)
- Noticing when the pull list changes, or is empty (WorkQueue already does all this: we just read from the channel, and something will arrive when there's something for this thread to do)
- Detecting whether a given pull request is useless, e.g., data already present, before pulling (instead, trust Data Manager to give us useful pull lists, and be OK with an occasional superfluous GET)
1 Currently, Keep doesn't actually verify API tokens, just the permission signature, so a random token is just as effective as a real one.
Updated by Tom Clegg over 10 years ago
- Subject changed from [Keep] Process entries on the current pull list, logging each failure and success. to [Keep] Process entries on the current pull list.
- Description updated (diff)
Updated by Tim Pierce about 10 years ago
- Category set to Keep
- Assigned To set to Tim Pierce
- Target version changed from Arvados Future Sprints to 2015-01-07 sprint
Updated by Tom Clegg about 10 years ago
- Target version changed from 2015-01-07 sprint to Arvados Future Sprints
Updated by Tom Clegg almost 10 years ago
- Target version changed from Arvados Future Sprints to 2015-03-11 sprint
Updated by Radhika Chippada almost 10 years ago
- Assigned To set to Radhika Chippada
Updated by Radhika Chippada almost 10 years ago
- Status changed from New to In Progress
Updated by Peter Amstutz almost 10 years ago
It would be a clearer if RunPullWorker
took the WorkQueue
object instead of a channel.
In pull_worker.go:27 this seems unnecessary since arvadosclient.MakeArvadosClient()
is already responsible for reading the ARVADOS_API_TOKEN from the environment:
arv.ApiToken = os.Getenv("ARVADOS_API_TOKEN")
Even better would be to get rid of allocating ArvadosClient and KeepClient here, and instead have RunPullWorker take a
KeepClient
struct as a parameter. The caller will set up the ArvadosClient
and KeepClient
.
This loop in pull_worker.go:42 is unnecessary, you can use the Servers list to populate service_roots (see below)
for _, addr := range pullReq.Servers { ... }
The key in the service_roots map is supposed to be server's uuid in the "keep services" table, but that is only used to shuffle the service list when determining the order of precedence for reads and writes using the full service list. In this case, the shuffle order doesn't matter because each server in the list is known to have the block, so we can use the same service address string for both the key and value.
Change pull_worker.go:70 to something like this:
for _, addr := range pullReq.Servers { service_roots[addr] = addr }
Instead of generating a random API token each time in GetContent
why don't you pass in the token to use?
The code in GetContent
is not actually creating a valid signing token. You need to use MakePermSignature
You don't need to use MakeLocator2()
or AuthorizedGet()
. You can just concatenate the base locator with the return value of MakePermSignature
and use keepclient.Get()
.
I've learned the hard way, don't do defer reader.Close()
without first checking to see if reader
is nil or not.
Why are you doing log.Print(read_content, err)
? Is this for debugging? Am I misreading this or isn't this going to potentially result in 64 MiB of binary data sent to the log?
I'm concerned that stubbing out GetContent()
in the test means a key piece of the code is not being tested. You need to add integration tests where the real GetContent
implementation talks to an actual keep store service. In particular, you need to do that to ensure that you're creating and using the signing token successfully.
To correctly test the case where a new pull list preempts an existing list, you will need an alternate GetContent()
which waits on a channel after processing some (but not all) items on the first list, to give the the testing code a chance to issue a new pull list to replace the first list.
Updated by Radhika Chippada almost 10 years ago
Peter,
Addressed all the review comments. Some details / exceptions:
- I used "SignLocator" instead of "MakePermSignature" as SignLocator seems to be doing better / more such as time formatting etc.
- Tom's description says generate a random token that expires in one minute. Hence, I am generating a token for each request. I left it as such for now and can change it if need be.
- Enhanced the tests to use a delay and verifying that some of the pull lists are ignored.
- I minimized the amount of work the GetContent function does. If we want to do an integration test that talks to a real remote keep server, let's talk about it as to how I can do this. I am not sure how to do it yet. Thanks :).
Updated by Peter Amstutz almost 10 years ago
Radhika Chippada wrote:
Peter,
Addressed all the review comments. Some details / exceptions:
- I used "SignLocator" instead of "MakePermSignature" as SignLocator seems to be doing better / more such as time formatting etc.
Ah yes, that looks like the right function to use.
- Tom's description says generate a random token that expires in one minute. Hence, I am generating a token for each request. I left it as such for now and can change it if need be.
That makes sense. Ok.
- Enhanced the tests to use a delay and verifying that some of the pull lists are ignored.
I don't understand how it checks to see which pull list items are processed or not?
- I minimized the amount of work the GetContent function does. If we want to do an integration test that talks to a real remote keep server, let's talk about it as to how I can do this. I am not sure how to do it yet. Thanks :).
Much better.
Notes¶
pull_worker.go:15 should not use a global variable. RunPullWorker()
should pass the KeepClient struct to Pull()
. When it passes KeepClient to Pull()
it should pass by value so that the field updates in Pull()
doesn't interfere with the values of the original KeepClient struct. You will also need to pass KeepClient into GetContent().
The PermissionSecret is not the API token. It is the contents of the file specified by the "-permission-key-file" parameter. Pull()
should not be altering PermissionSecret. (Just delete that line)
You're returning and 'err' status from Pull()
but RunPullWorker()
doesn't log it.
GetContent()
should only take signedLocator
.
Testing¶
In pull_worker_test.go, give the "Test()
" function a more descriptive name (e.g. TestPullWorker)
Could you update the PullWorkerTestData{...} initializations to include the field names, it would make it much more readable.
I don't understand why performTest()
needs to wait 25 milliseconds at the start of the test, before anything has happened yet?
You might want to use SetUpTest()
and TearDownTest()
to start/stop RunPullWorker()
on every test instead of SetUpSuite()
and TearDownSuite()
. Waiting until TearDownSuite()
to check len(processesPullLists) seems problematic.
There don't seem to be any assertions actually checking that the mock GetContent() is being called. The only assertions are for checking the results of the initial pull request.
It might make sense to have tests that just test RunPullWorker()
by adding work to the WorkQueue
directly.
In order to write your integration tests, take a look at arvadostest.StartAPI()
and arvadostest.StartKeep()
in the arvados Go SDK.
Updated by Radhika Chippada almost 10 years ago
Peter, addressed all your feedback and the tests are much more readable now. I also made the GetContent and PutContent functions the smallest possible so that the test mockup do not result in overlooking too much implementation.
The only item that I did not address is adding actual integration tests. I think I will address it separately as time permits (since we do not yet have any integration tests for keepstore at this time). Thanks.
Updated by Peter Amstutz almost 10 years ago
In Pull()
, KeepClient.Arvados.ApiToken
needs to be set to match the value from GenerateRandomApiToken()
.
pull_worker_test
does not appear to be using the read_content
field any more.
Since you are creating a new work queue and a new RunPullWorker()
goroutine each time you call performTest
, you should call pullq.Close()
at the end of performTest
so the WorkQueue.listen()
and RunPullWorker()
threads will terminate. Then you can take out expectWorkerChannelEmpty
.
Updated by Radhika Chippada almost 10 years ago
- Added integration tests for pull worker. These are starting the api server and keep servers using arvadostest.run_servers.go.
- The tests currently do not used signed locators as the testing framework does not support signed locators. Added #5428 for future support of this.
Updated by Peter Amstutz almost 10 years ago
GetKeepServices()
should use keepclient.DiscoverKeepServers()
(or at least arvadosclient.Call()
) instead of reimplementing all the HTTP client logic.
Please add a pull worker test that checks that a pull request with an invalid data manager token is rejected.
Updated by Radhika Chippada almost 10 years ago
Peter, I updated DiscoverKeepServers to return service_roots and used it. Also added the test. Thanks.
Updated by Radhika Chippada almost 10 years ago
- Status changed from In Progress to Resolved
- % Done changed from 0 to 100
Applied in changeset arvados|commit:ed0ed73d8bdd94dc04c93a1a2bc9f82ad45f6dcf.