I recently had to refactor some code which uses MongoDB and Celery to store results from a scraping process to a MongoDB collection. It involved a number of whack a mole type performance problems due to the distributed nature of the system, and indeed was leading to the Linux out of memory (OOM) killer being triggered against some of those workers. I wanted to write up some of the approaches I took as they may be helpful to others and indeed maybe there are better ways out there to handle the same situation (so any feedback would be much appreciated!).
The inspiration for writing this approach in a blog was another blog post on handling memory intensive Celery workers. This highlighted yet another Celery setting I wasn’t using on prefetch limits or indeed aware of. The approach from that blog used Celery’s worker prefetch limiting to avoid running out of memory on the workers. The approach below was what I used to avoid running out of memory. It occurs at one step earlier by avoiding sending tasks to the Celery worker from the Python programme distributing the work than throttling the tasks once on the workers. I think either approach may work and I’m glad to have found the blog to both prompt future thinking and my writing of this post.
This function implemented a simple limit and wait logic for the application calling the specific Celery Workers, the example below uses a trivialised example with a single Worker named ‘celery1’. The Celery task inspection function, celery.task.control.inspect(), is queried in the calling Python programme and the running-threads value for the specific worker (which is only used for this one queue) is what determines whether this function will return. If there are too many existing tasks on the worker, the function will sleep until the number of tasks are less than the value set for the variable, message_queue_max.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
In order to better manage this type of task, I set three of Celery’s Task setting. The results are ignored from the task, the task was configured for only acknowledged late rather than using the default of acknowledge on receipt rather than completion of the task (acks_late), and finally the time limits for this task was removed. The task is also configured to make two retry attempts if it fails.
max_retries=2, ignore_result=True, acks_late=True, soft_time_limit=None
This step actually removed the need for the backoff for my particular usage where a large number of upserts operations (updates for existing documents or if not present then the document(s) are inserted) were the root cause of the Celery Worker having such a long queue. A change to a insert only approach, which was feasible for my application as it only required the document to be present once, provided close on two orders of magnitute improvement to the database operations and significantly reduced the Celery Worker queue as tasks were no longer backing up.
bulk_result = MongoClient(mongo_uri).DATABASE.COLLECTION.bulk_write(array_of_insert_one_operations, ordered=False)
In the reading and research to address my queue growing too large and triggering the Linux out of memory (OOM) killer, I found two useful Celery setting which I had not been aware of. Firstly, you can setup Celery Workers to receive events using the (“-E”) option which allows for restarting worker pools directly through Celery Flower (the Management UI I use for Celery) when combined with the “worker_pool_restarts” setting.
A second aspect of our internal tooling using a scraper approach, specifically a number of the workers use tasks that are primarily focused on asychronous HTTP requests so these use the Eventlet execution pool. The deployment uses a standard Celery/RabbitMQ configuration. RabbitMQ uses a default setting where 10 concurrent connections are kept open for the broker pool when used with Celery. The setting “broker_pool_limit” allows for this to be raised, in this example below it is set to 100 which is more suitable for this type of Celery Worker/Eventlet execution pool combination.
worker_pool_restarts = True broker_pool_limit = 100
The tooling currently uses celery.bin.multi with bash scripting as each node hosts multiple workers, however it is being containerized so I think my next blog will cover moving to supervisord with a Celery setup of multiple workers per node providing multiple queues.