serverless: sanitize concurrency_modifier output in JobScaler#476
serverless: sanitize concurrency_modifier output in JobScaler#476StanByriukov02 wants to merge 3 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR hardens the serverless worker’s JobScaler against invalid values returned by a user-provided concurrency_modifier, preventing runtime crashes during scheduling/queue sizing (as described in issue #458).
Changes:
- Add defensive handling in
JobScaler.set_scale()to catchconcurrency_modifierexceptions and sanitize the resulting concurrency value. - Introduce
_sanitize_concurrency()to coerce/validate concurrency to an integer>= 1. - Add unit tests covering several invalid
concurrency_modifierreturn values.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
runpod/serverless/modules/rp_scale.py |
Wraps concurrency_modifier calls with exception handling and sanitizes the updated concurrency value. |
tests/test_serverless/test_modules/test_rp_scale_concurrency_validation.py |
Adds tests for invalid concurrency modifier outputs and a valid-value baseline. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| def test_concurrency_modifier_none_defaults_to_one(self): | ||
| scaler = JobScaler({"concurrency_modifier": lambda _: None}) | ||
| asyncio.run(scaler.set_scale()) | ||
| self.assertEqual(scaler.current_concurrency, 1) |
| scaler = JobScaler({"concurrency_modifier": lambda _: 4}) | ||
| asyncio.run(scaler.set_scale()) | ||
| self.assertEqual(scaler.current_concurrency, 4) | ||
|
|
|
|
||
| class TestJobScalerConcurrencyValidation(TestCase): | ||
| def test_concurrency_modifier_none_defaults_to_one(self): | ||
| scaler = JobScaler({"concurrency_modifier": lambda _: None}) |
There was a problem hiding this comment.
[🟡 Medium] [🔵 Bug]
The new tests instantiate JobScaler and call set_scale() without clearing JobsProgress, but set_scale() waits in a loop while current_occupancy() > 0; because JobsProgress is a singleton with file-backed persistence, stale in-flight jobs from prior tests/runs can make these tests hang indefinitely and create non-deterministic CI behavior. Add per-test setup that clears/reset JobsProgress (or monkeypatch JobScaler.job_progress) before invoking set_scale().
# tests/test_serverless/test_modules/test_rp_scale_concurrency_validation.py
scaler = JobScaler({"concurrency_modifier": lambda _: None})
asyncio.run(scaler.set_scale())
self.assertEqual(scaler.current_concurrency, 1)
Hi,
This PR hardens JobScaler scaling when a user-provided concurrency_modifier returns an invalid value (None / 0 / negative / non-int), preventing crashes like:
TypeError: '<' not supported between instances of 'int' and 'NoneType'
It validates/coerces the value to a safe int >= 1 and adds unit tests for the invalid-value cases.
Related: #458