Skip to content

Fixed DirectRunner PubSub subscriber client lifecycle#39079

Open
lalitx17 wants to merge 1 commit into
apache:masterfrom
lalitx17:fix-directrunner-pubsub-client-reuse
Open

Fixed DirectRunner PubSub subscriber client lifecycle#39079
lalitx17 wants to merge 1 commit into
apache:masterfrom
lalitx17:fix-directrunner-pubsub-client-reuse

Conversation

@lalitx17

Copy link
Copy Markdown
Contributor

This PR updates the DirectRunner Pub/Sub read evaluator to cache subscriber clients per applied transform using weak-keyed caches.

This avoids retaining completed pipeline objects through class-level caches while still reusing the Pub/Sub client for repeated reads from the same transform.

addresses #19712

@lalitx17 lalitx17 marked this pull request as ready for review June 24, 2026 03:46
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request optimizes the DirectRunner Pub/Sub read evaluator by replacing class-level caches with weak-keyed dictionaries. This change prevents completed pipeline objects from being retained in memory while maintaining the performance benefits of reusing Pub/Sub clients across transforms. It also improves the robustness of client lifecycle management and adds necessary thread safety.

Highlights

  • Subscriber Client Caching: Implemented weak-keyed caches for Pub/Sub subscriber clients in the DirectRunner to prevent memory leaks while allowing client reuse.
  • Lifecycle Management: Introduced a _PubSubSubscriberClient wrapper to manage client state and lifecycle, ensuring proper cleanup of temporary subscriptions.
  • Thread Safety: Added a threading lock to the subscriber client cache to ensure safe concurrent access during pipeline execution.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the DirectRunner's Pub/Sub reader to cache and reuse SubscriberClient instances using weak-keyed dictionaries, preventing completed pipelines from being kept alive. It also replaces atexit registration with weakref.finalize for cleaner, idempotent resource disposal. The review feedback suggests three key improvements: adding a per-client lock to serialize subscription creation, checking if _LOGGER is not None during garbage collection to prevent AttributeError at interpreter shutdown, and releasing the global class-level lock during synchronous network calls to avoid blocking other threads.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +583 to +586
def __init__(self, client):
self.client = client
self._temporary_subscription = None
self._closed = False

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Add a per-client lock self._lock to serialize subscription creation for each transform without holding the global class-level lock during network calls.

Suggested change
def __init__(self, client):
self.client = client
self._temporary_subscription = None
self._closed = False
def __init__(self, client):
self.client = client
self._temporary_subscription = None
self._closed = False
self._lock = threading.Lock()

Comment on lines +596 to +610
try:
if self._temporary_subscription:
self.client.delete_subscription(
subscription=self._temporary_subscription)
except Exception:
_LOGGER.warning(
'Failed to delete temporary Pub/Sub subscription %s',
self._temporary_subscription,
exc_info=True)

try:
self.client.close()
except Exception:
_LOGGER.warning(
'Failed to close Pub/Sub subscriber client', exc_info=True)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

During interpreter shutdown, module-level globals like _LOGGER can be cleared and set to None. Since close() is called from a finalizer during garbage collection (which can happen during shutdown), check if _LOGGER is not None before calling _LOGGER.warning to avoid raising an AttributeError.

Suggested change
try:
if self._temporary_subscription:
self.client.delete_subscription(
subscription=self._temporary_subscription)
except Exception:
_LOGGER.warning(
'Failed to delete temporary Pub/Sub subscription %s',
self._temporary_subscription,
exc_info=True)
try:
self.client.close()
except Exception:
_LOGGER.warning(
'Failed to close Pub/Sub subscriber client', exc_info=True)
try:
if self._temporary_subscription:
self.client.delete_subscription(
subscription=self._temporary_subscription)
except Exception:
if _LOGGER is not None:
_LOGGER.warning(
'Failed to delete temporary Pub/Sub subscription %s',
self._temporary_subscription,
exc_info=True)
try:
self.client.close()
except Exception:
if _LOGGER is not None:
_LOGGER.warning(
'Failed to close Pub/Sub subscriber client', exc_info=True)

Comment on lines +663 to 676
with cls._subscriber_client_cache_lock:
if transform in cls._subscription_cache:
return cls._subscription_cache[transform]

subscriber_client = cls._get_subscriber_client_state_unlocked(transform)
sub_client = subscriber_client.client
sub_name = sub_client.subscription_path(
sub_project,
'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32)))
topic_name = sub_client.topic_path(project, short_topic_name)
sub_client.create_subscription(name=sub_name, topic=topic_name)
subscriber_client.set_temporary_subscription(sub_name)
cls._subscription_cache[transform] = sub_name
return cls._subscription_cache[transform]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Holding the global class-level lock _subscriber_client_cache_lock during the synchronous network call sub_client.create_subscription(...) can block all other threads trying to read from or initialize other Pub/Sub sources.

We can optimize this by releasing the global lock before making the network call, and using a per-transform lock (subscriber_client._lock) to serialize subscription creation for the same transform.

Suggested change
with cls._subscriber_client_cache_lock:
if transform in cls._subscription_cache:
return cls._subscription_cache[transform]
subscriber_client = cls._get_subscriber_client_state_unlocked(transform)
sub_client = subscriber_client.client
sub_name = sub_client.subscription_path(
sub_project,
'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32)))
topic_name = sub_client.topic_path(project, short_topic_name)
sub_client.create_subscription(name=sub_name, topic=topic_name)
subscriber_client.set_temporary_subscription(sub_name)
cls._subscription_cache[transform] = sub_name
return cls._subscription_cache[transform]
with cls._subscriber_client_cache_lock:
subscriber_client = cls._get_subscriber_client_state_unlocked(transform)
with subscriber_client._lock:
with cls._subscriber_client_cache_lock:
if transform in cls._subscription_cache:
return cls._subscription_cache[transform]
sub_client = subscriber_client.client
sub_name = sub_client.subscription_path(
sub_project,
'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32)))
topic_name = sub_client.topic_path(project, short_topic_name)
sub_client.create_subscription(name=sub_name, topic=topic_name)
subscriber_client.set_temporary_subscription(sub_name)
with cls._subscriber_client_cache_lock:
cls._subscription_cache[transform] = sub_name
return sub_name

@github-actions

Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @jrmccluskey for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant