Multiprocess FATE Revisited

I thought I had brainstormed a simple, elegant, multithreaded, deadlock-free refactoring for FATE in a previous post. However, I sort of glossed over the test ordering logic which I had not yet prototyped. The grim, possibly deadlock-afflicted reality is that the main thread needs to be notified as tests are completed. So, the main thread sends test specs through a queue to be executed by n tester threads and those threads send results to a results aggregator thread. Additionally, the results aggregator will need to send completed test IDs back to the main thread.



But when I step back and look at the graph, I can’t rationalize why there should be a separate results aggregator thread. That was added to cut down on deadlock possibilities since the main thread and the tester threads would not be waiting for data from each other. Now that I’ve come to terms with the fact that the main and the testers need to exchange data in realtime, I think I can safely eliminate the result thread. Adding more threads is not the best way to guard against race conditions and deadlocks. Ask xine.



I’m still hung up on the deadlock issue. I have these queues through which the threads communicate. At issue is the fact that they can cause a thread to block when inserting an item if the queue is “full”. How full is full? Immaterial; seeking to answer such a question is not how you guard against race conditions. Rather, it seems to me that one side should be doing non-blocking queue operations.

This is how I’m planning to revise the logic in the main thread:

test_set = set of all tests to execute
tests_pending = test_set
tests_blocked = empty set
tests_queue = multi-consumer queue to send test specs to tester threads
results_queue = multi-producer queue through which tester threads send results
while there are tests in tests_pending:
  pop a test from test_set
  if test depends on any tests that appear in tests_pending:
    add test to tests_blocked
  else:
    add test to tests_queue in a non-blocking manner
    if tests_queue is full, add test to tests_blocked

  while there are results in the results_queue:
    get a result from result_queue in non-blocking manner
    remove the corresponding test from tests_pending

if tests_blocked is non-empty:
  sleep for 1 second
  test_set = tests_blocked
  tests_blocked = empty set
else:
  insert n shutdown signals, one from each thread

go to the top of the loop and repeat until there are no more tests

while there are results in the results_queue:
  get a result from result_queue in a blocking manner

Not mentioned in the pseudocode (so it doesn’t get too verbose) is logic to check whether the retrieved test result is actually an end-of-thread signal. These are accounted and the whole test process is done when one is received for each thread.

On the tester thread side, it’s safe for them to do blocking test queue retrievals and blocking result queue insertions. The reason for the 1-second delay before resetting tests_blocked and looping again is because I want to guard against the situation where tests A and B are to be run, A depends of B running first, and while B is running (and happens to be a long encoding test), the main thread is spinning about, obsessively testing whether it’s time to insert A into the tests queue.

It all sounds just crazy enough to work. In fact, I coded it up and it does work, sort of. The queue gets blocked pretty quickly. Instead of sleeping, I decided it’s better to perform the put operation using a 1-second timeout.

Still, I’m paranoid about the precise operation of the IPC queue mechanism at work here. What happens if I try to stuff in a test spec that’s a bit too large? Will the module take whatever I give it and serialize it through the queue as soon as it can? I think an impromptu science project is in order.

big-queue.py:

$ ./big-queue.py 
reader function got a string of 100000000 characters

Since 100 MB doesn’t even make it choke, FATE’s little test specs shouldn’t pose any difficulty.

4 thoughts on “Multiprocess FATE Revisited

  1. Jakub Piotr CÅ‚apa

    If you know how many threads you are running maybe you could make the throttling based on test results. For N threads you send N tests to the queue. You block on the result queue and when you get a result you know you can send another job to the queue. Wouldn’t that work?

  2. Multimedia Mike Post author

    @Jakub: Thanks for reading through all that. :-)

    One problem I see is that the test queue seems to get full pretty quickly. So I probably wouldn’t get the chance to block on thre result queue.

  3. someone

    you may want to consider to “get a result from result_queue in BLOCKING manner” if “tests_queue is full” … this avoids cycling in 1s intervals and should speed up queuing of new tasks

  4. Jakub Piotr CÅ‚apa

    As long as there are idle worker threads the queue should not get full. If there are no idle worker threads then you should be able to wait for a result until at least one finishes. Then you should be able to put one more test (which should be consumed right away) into the queue and wait for another result.

Comments are closed.