Fixed DirectRunner PubSub subscriber client lifecycle#39079
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request optimizes the DirectRunner Pub/Sub read evaluator by replacing class-level caches with weak-keyed dictionaries. This change prevents completed pipeline objects from being retained in memory while maintaining the performance benefits of reusing Pub/Sub clients across transforms. It also improves the robustness of client lifecycle management and adds necessary thread safety. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request refactors the DirectRunner's Pub/Sub reader to cache and reuse SubscriberClient instances using weak-keyed dictionaries, preventing completed pipelines from being kept alive. It also replaces atexit registration with weakref.finalize for cleaner, idempotent resource disposal. The review feedback suggests three key improvements: adding a per-client lock to serialize subscription creation, checking if _LOGGER is not None during garbage collection to prevent AttributeError at interpreter shutdown, and releasing the global class-level lock during synchronous network calls to avoid blocking other threads.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| def __init__(self, client): | ||
| self.client = client | ||
| self._temporary_subscription = None | ||
| self._closed = False |
There was a problem hiding this comment.
Add a per-client lock self._lock to serialize subscription creation for each transform without holding the global class-level lock during network calls.
| def __init__(self, client): | |
| self.client = client | |
| self._temporary_subscription = None | |
| self._closed = False | |
| def __init__(self, client): | |
| self.client = client | |
| self._temporary_subscription = None | |
| self._closed = False | |
| self._lock = threading.Lock() |
| try: | ||
| if self._temporary_subscription: | ||
| self.client.delete_subscription( | ||
| subscription=self._temporary_subscription) | ||
| except Exception: | ||
| _LOGGER.warning( | ||
| 'Failed to delete temporary Pub/Sub subscription %s', | ||
| self._temporary_subscription, | ||
| exc_info=True) | ||
|
|
||
| try: | ||
| self.client.close() | ||
| except Exception: | ||
| _LOGGER.warning( | ||
| 'Failed to close Pub/Sub subscriber client', exc_info=True) |
There was a problem hiding this comment.
During interpreter shutdown, module-level globals like _LOGGER can be cleared and set to None. Since close() is called from a finalizer during garbage collection (which can happen during shutdown), check if _LOGGER is not None before calling _LOGGER.warning to avoid raising an AttributeError.
| try: | |
| if self._temporary_subscription: | |
| self.client.delete_subscription( | |
| subscription=self._temporary_subscription) | |
| except Exception: | |
| _LOGGER.warning( | |
| 'Failed to delete temporary Pub/Sub subscription %s', | |
| self._temporary_subscription, | |
| exc_info=True) | |
| try: | |
| self.client.close() | |
| except Exception: | |
| _LOGGER.warning( | |
| 'Failed to close Pub/Sub subscriber client', exc_info=True) | |
| try: | |
| if self._temporary_subscription: | |
| self.client.delete_subscription( | |
| subscription=self._temporary_subscription) | |
| except Exception: | |
| if _LOGGER is not None: | |
| _LOGGER.warning( | |
| 'Failed to delete temporary Pub/Sub subscription %s', | |
| self._temporary_subscription, | |
| exc_info=True) | |
| try: | |
| self.client.close() | |
| except Exception: | |
| if _LOGGER is not None: | |
| _LOGGER.warning( | |
| 'Failed to close Pub/Sub subscriber client', exc_info=True) |
| with cls._subscriber_client_cache_lock: | ||
| if transform in cls._subscription_cache: | ||
| return cls._subscription_cache[transform] | ||
|
|
||
| subscriber_client = cls._get_subscriber_client_state_unlocked(transform) | ||
| sub_client = subscriber_client.client | ||
| sub_name = sub_client.subscription_path( | ||
| sub_project, | ||
| 'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32))) | ||
| topic_name = sub_client.topic_path(project, short_topic_name) | ||
| sub_client.create_subscription(name=sub_name, topic=topic_name) | ||
| subscriber_client.set_temporary_subscription(sub_name) | ||
| cls._subscription_cache[transform] = sub_name | ||
| return cls._subscription_cache[transform] |
There was a problem hiding this comment.
Holding the global class-level lock _subscriber_client_cache_lock during the synchronous network call sub_client.create_subscription(...) can block all other threads trying to read from or initialize other Pub/Sub sources.
We can optimize this by releasing the global lock before making the network call, and using a per-transform lock (subscriber_client._lock) to serialize subscription creation for the same transform.
| with cls._subscriber_client_cache_lock: | |
| if transform in cls._subscription_cache: | |
| return cls._subscription_cache[transform] | |
| subscriber_client = cls._get_subscriber_client_state_unlocked(transform) | |
| sub_client = subscriber_client.client | |
| sub_name = sub_client.subscription_path( | |
| sub_project, | |
| 'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32))) | |
| topic_name = sub_client.topic_path(project, short_topic_name) | |
| sub_client.create_subscription(name=sub_name, topic=topic_name) | |
| subscriber_client.set_temporary_subscription(sub_name) | |
| cls._subscription_cache[transform] = sub_name | |
| return cls._subscription_cache[transform] | |
| with cls._subscriber_client_cache_lock: | |
| subscriber_client = cls._get_subscriber_client_state_unlocked(transform) | |
| with subscriber_client._lock: | |
| with cls._subscriber_client_cache_lock: | |
| if transform in cls._subscription_cache: | |
| return cls._subscription_cache[transform] | |
| sub_client = subscriber_client.client | |
| sub_name = sub_client.subscription_path( | |
| sub_project, | |
| 'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32))) | |
| topic_name = sub_client.topic_path(project, short_topic_name) | |
| sub_client.create_subscription(name=sub_name, topic=topic_name) | |
| subscriber_client.set_temporary_subscription(sub_name) | |
| with cls._subscriber_client_cache_lock: | |
| cls._subscription_cache[transform] = sub_name | |
| return sub_name |
|
Assigning reviewers: R: @jrmccluskey for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
This PR updates the DirectRunner Pub/Sub read evaluator to cache subscriber clients per applied transform using weak-keyed caches.
This avoids retaining completed pipeline objects through class-level caches while still reusing the Pub/Sub client for repeated reads from the same transform.
addresses #19712