Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/content/en/docs/documentation/eventing.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ rare corner cases. Returning an empty set means that the mapper considered the s
resource event as irrelevant and the SDK will thus not trigger a reconciliation of the primary
resource in that situation.

On an update event, the SDK calls `toPrimaryResourceIDs` for **both the old and the new version**
of the secondary resource. This way it can reconcile not only the primaries that the secondary
currently maps to, but also those it previously mapped to and no longer does. So when a reference
changes — including when only a subset of the referenced primaries changes — both the newly
referenced and the dropped primaries are reconciled, and a dropped primary can revert to its
default state. Because the mapper can be invoked for an older version of a resource, keep your
implementation a pure function of the resource passed to it.

Adding a `SecondaryToPrimaryMapper` is typically sufficient when there is a one-to-many relationship
between primary and secondary resources. The secondary resources can be mapped to its primary
owner, and this is enough information to also get these secondary resources from the `Context`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@
*/
@FunctionalInterface
public interface SecondaryToPrimaryMapper<R> {

/**
* @param resource - secondary
* @return set of primary resource IDs
* Maps a secondary resource to the set of primary resources that should be reconciled in
* response.
*
* @param resource the secondary resource for which an event was received
* @return set of primary resource IDs to enqueue for reconciliation; an empty set means the event
* is irrelevant and no reconciliation is triggered. On update events, this method is invoked
* for both the old and the new versions of the resource.
*/
Set<ResourceID> toPrimaryResourceIDs(R resource);
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ public synchronized void start() {

@Override
protected synchronized void handleEvent(
ResourceAction action, T resource, T oldResource, Boolean deletedFinalStateUnknown) {
ResourceAction action,
T resource,
T oldResource,
Boolean deletedFinalStateUnknown,
// not relevant for controller event source
Set<ResourceID> relatedPrimaryIDs) {
try {
if (log.isDebugEnabled()) {
log.debug("Event received with action: {}", action);
Expand Down Expand Up @@ -156,7 +161,8 @@ private void handleEvent(ExtendedResourceEvent r) {
r.getAction(),
(T) r.getResource().orElseThrow(),
(T) r.getPreviousResource().orElse(null),
r.isLastStateUnknown());
r.isLastStateUnknown(),
null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,42 @@ public DefaultPrimaryToSecondaryIndex(SecondaryToPrimaryMapper<R> secondaryToPri
}

@Override
public synchronized void onAddOrUpdate(R resource) {
public synchronized Set<ResourceID> onAddOrUpdate(R resource, R oldResource) {

Set<ResourceID> primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource);

var secondaryId = ResourceID.fromResource(resource);

primaryResources.forEach(
primaryResource -> {
var resourceSet =
index.computeIfAbsent(primaryResource, pr -> ConcurrentHashMap.newKeySet());
resourceSet.add(ResourceID.fromResource(resource));
resourceSet.add(secondaryId);
});

if (oldResource != null) {
var obsoletePrimaries =
new HashSet<>(secondaryToPrimaryMapper.toPrimaryResourceIDs(oldResource));
if (!primaryResources.containsAll(obsoletePrimaries)) {
var result = new HashSet<>(primaryResources);
obsoletePrimaries.removeAll(primaryResources);
obsoletePrimaries.forEach(
p ->
index.computeIfPresent(
p,
(id, currentSet) -> {
currentSet.remove(secondaryId);
return currentSet.isEmpty() ? null : currentSet;
}));
result.addAll(obsoletePrimaries);
return result;
}
}
return primaryResources;
}

@Override
public synchronized void onDelete(R resource) {
public synchronized Set<ResourceID> onDelete(R resource) {
Set<ResourceID> primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource);
primaryResources.forEach(
primaryResource -> {
Expand All @@ -58,6 +82,7 @@ public synchronized void onDelete(R resource) {
}
}
});
return primaryResources;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -127,17 +128,21 @@ public synchronized void onDelete(R resource, boolean deletedFinalStateUnknown)
if (resultEvent.isEmpty()) {
return;
}
primaryToSecondaryIndex.onDelete(resource);
var primaryIds = primaryToSecondaryIndex.onDelete(resource);
if (eventAcceptedByFilter(
ResourceAction.DELETED, resource, null, deletedFinalStateUnknown)) {
propagateEvent(resource);
propagateEvent(resource, null, primaryIds);
}
});
}

@Override
protected void handleEvent(
ResourceAction action, R resource, R oldResource, Boolean deletedFinalStateUnknown) {
ResourceAction action,
R resource,
R oldResource,
Boolean deletedFinalStateUnknown,
Set<ResourceID> relatedPrimaryIds) {
// Called from ManagedInformerEventSource#eventFilteringUpdateAndCacheResource after the temp
// cache decided to surface a (possibly synthesized) event. The user-level filters
// (onAdd/onUpdate/onDelete/genericFilter) still apply, so this path mirrors the direct
Expand All @@ -148,7 +153,7 @@ protected void handleEvent(
log.debug(
"handleEvent: removing from primaryToSecondaryIndex. id={}",
ResourceID.fromResource(resource));
primaryToSecondaryIndex.onDelete(resource);
relatedPrimaryIds = primaryToSecondaryIndex.onDelete(resource);
}
if (!eventAcceptedByFilter(action, resource, oldResource, deletedFinalStateUnknown)) {
if (log.isDebugEnabled()) {
Expand All @@ -166,7 +171,7 @@ protected void handleEvent(
action,
resource.getMetadata().getResourceVersion());
}
propagateEvent(resource);
propagateEvent(resource, oldResource, relatedPrimaryIds);
}

@Override
Expand All @@ -177,12 +182,12 @@ public synchronized void start() {
super.start();
// this makes sure that on first reconciliation all resources are
// present on the index
manager().list().forEach(primaryToSecondaryIndex::onAddOrUpdate);
manager().list().forEach(r -> primaryToSecondaryIndex.onAddOrUpdate(r, null));
}

@SuppressWarnings("unchecked")
private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R oldObject) {
primaryToSecondaryIndex.onAddOrUpdate(newObject);
var primaryIds = primaryToSecondaryIndex.onAddOrUpdate(newObject, oldObject);
var resourceID = ResourceID.fromResource(newObject);

var resultEvent = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject);
Comment thread
csviri marked this conversation as resolved.
Expand All @@ -194,15 +199,22 @@ private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R ol
"Propagating event for {}, resource with same version not result of a our update.",
action);
var event = resultEvent.get();
propagateEvent((R) event.getResource().orElseThrow());
propagateEvent((R) event.getResource().orElseThrow(), oldObject, primaryIds);
} else {
log.debug("Event filtered out for operation: {}, resourceID: {}", action, resourceID);
}
}

protected void propagateEvent(R object) {
var primaryResourceIdSet =
configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(object);
protected void propagateEvent(R resource, R oldResource, Set<ResourceID> primaryResourceIdSet) {
if (primaryResourceIdSet == null) {
primaryResourceIdSet = new HashSet<>();
primaryResourceIdSet.addAll(
configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(resource));
if (oldResource != null) {
Comment thread
csviri marked this conversation as resolved.
primaryResourceIdSet.addAll(
configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(oldResource));
}
}
if (primaryResourceIdSet.isEmpty()) {
return;
}
Expand Down Expand Up @@ -249,17 +261,24 @@ public Set<R> getSecondaryResources(P primary) {
@Override
public void handleRecentResourceUpdate(
ResourceID resourceID, R resource, R previousVersionOfResource) {
handleRecentCreateOrUpdate(resource);
handleRecentCreateOrUpdate(resource, previousVersionOfResource);
}

@Override
public void handleRecentResourceCreate(ResourceID resourceID, R resource) {
handleRecentCreateOrUpdate(resource);
handleRecentCreateOrUpdate(resource, null);
}

@Override
protected Set<ResourceID> cacheUpdateAndGetRelatedPrimaryIDs(
R updatedResource, R previousResource) {
return handleRecentCreateOrUpdate(updatedResource, previousResource);
}

private void handleRecentCreateOrUpdate(R newResource) {
primaryToSecondaryIndex.onAddOrUpdate(newResource);
private Set<ResourceID> handleRecentCreateOrUpdate(R newResource, R previousVersion) {
var relatedPrimaryIds = primaryToSecondaryIndex.onAddOrUpdate(newResource, previousVersion);
temporaryResourceCache.putResource(newResource);
return relatedPrimaryIds;
}

private boolean useSecondaryToPrimaryIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -97,38 +98,48 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
ResourceID id = ResourceID.fromResource(resourceToUpdate);
log.debug("Starting event filtering and caching update for id={}", id);
R updatedResource = null;
Set<ResourceID> relatedPrimaryIds = null;
try {
temporaryResourceCache.startEventFilteringModify(id);
updatedResource = updateMethod.apply(resourceToUpdate);
Comment thread
csviri marked this conversation as resolved.
handleRecentResourceUpdate(id, updatedResource, resourceToUpdate);
relatedPrimaryIds = cacheUpdateAndGetRelatedPrimaryIDs(updatedResource, resourceToUpdate);
log.debug(
"Caching resource update successful. id={}, rv={}",
id,
updatedResource.getMetadata().getResourceVersion());
return updatedResource;
} finally {
var res = temporaryResourceCache.doneEventFilterModify(id);
res.ifPresentOrElse(
r -> {
log.debug(
"Propagating not own event after filtering update. id={}, action={}, rv={}",
id,
r.getAction(),
r.getResource()
.map(rr -> rr.getMetadata().getResourceVersion())
.orElse("[not set]"));
handleEvent(
r.getAction(),
(R) r.getResource().orElseThrow(),
(R) r.getPreviousResource().orElse(null),
r.isLastStateUnknown());
},
() -> log.debug("No new event present after the filtering update. id={}", id));
if (res.isPresent()) {
var event = res.orElseThrow();
if (log.isDebugEnabled()) {
log.debug(
"Propagating not own event after filtering update. id={}, action={}, rv={}",
id,
event.getAction(),
event
.getResource()
.map(rr -> rr.getMetadata().getResourceVersion())
.orElse("[not set]"));
}
handleEvent(
event.getAction(),
(R) event.getResource().orElseThrow(),
(R) event.getPreviousResource().orElse(null),
event.isLastStateUnknown(),
relatedPrimaryIds);
} else {
log.debug("No new event present after the filtering update. id={}", id);
}
}
}

protected abstract void handleEvent(
ResourceAction action, R resource, R oldResource, Boolean deletedFinalStateUnknown);
ResourceAction action,
R resource,
R oldResource,
Boolean deletedFinalStateUnknown,
Set<ResourceID> relatedPrimaryIDs);

@SuppressWarnings("unchecked")
@Override
Expand Down Expand Up @@ -177,6 +188,20 @@ public void handleRecentResourceCreate(ResourceID resourceID, R resource) {
temporaryResourceCache.putResource(resource);
}

/**
* Caches the resource updated through {@link #eventFilteringUpdateAndCacheResource} and returns
* the primary resource IDs related to that update, so they can be propagated to {@link
* #handleEvent}. The base implementation just fills the temporary cache and reports no related
* primaries. Subclasses that maintain a primary-to-secondary index override this to surface the
* affected primaries even after the secondary's references have changed, keeping that concern
* internal to those event sources instead of leaking it into {@link RecentOperationCacheFiller}.
*/
protected Set<ResourceID> cacheUpdateAndGetRelatedPrimaryIDs(
R updatedResource, R previousResource) {
handleRecentResourceUpdate(null, updatedResource, previousResource);
return Collections.emptySet();
}

@Override
public Optional<R> get(ResourceID resourceID) {
// The order of reading from these caches matters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ public static <T extends HasMetadata> NOOPPrimaryToSecondaryIndex<T> getInstance
private NOOPPrimaryToSecondaryIndex() {}

@Override
public void onAddOrUpdate(R resource) {
// empty method because of noop implementation
public Set<ResourceID> onAddOrUpdate(R resource, R oldResource) {
return null;
Comment thread
csviri marked this conversation as resolved.
}

@Override
public void onDelete(R resource) {
public Set<ResourceID> onDelete(R resource) {
// empty method because of noop implementation
return null;
Comment on lines 35 to +43

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ugly but was not able to find much nicer alternative

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding unit test coverage

}
Comment thread
csviri marked this conversation as resolved.
Comment on lines 35 to 44

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could alternatively remove NOOPPrimaryToSecondaryIndex. But in a separate PR would be nicer.


@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@

public interface PrimaryToSecondaryIndex<R extends HasMetadata> {

void onAddOrUpdate(R resource);
Set<ResourceID> onAddOrUpdate(R resource, R oldResource);

void onDelete(R resource);
Set<ResourceID> onDelete(R resource);

Set<ResourceID> getSecondaryResources(ResourceID primary);
Comment on lines 23 to 29
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ public synchronized void checkGhostResources() {
log.debug("Removing ghost resource with ID: {}", e.getKey());
iterator.remove();
eventFilteringSupport.handleGhostResourceRemoval(e.getKey());
managedInformerEventSource.handleEvent(ResourceAction.DELETED, e.getValue(), null, true);
managedInformerEventSource.handleEvent(
ResourceAction.DELETED, e.getValue(), null, true, null);
}
}
}
Expand Down
Loading
Loading