-
Notifications
You must be signed in to change notification settings - Fork 72
feat(plumber): retention policy worker #717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
7e03b42 to
3b83c30
Compare
dd35670 to
18191c5
Compare
|
@codex review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
@codex review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
plumber/ppl/priv/ecto_repo/migrations/20251201141215_add_expires_at_to_pipeline_requests.exs
Show resolved
Hide resolved
aaac102 to
af9fec5
Compare
| expired_records = | ||
| from(pr in PplRequests, | ||
| where: pr.expires_at < ^now, | ||
| select: %{id: pr.id, ppl_artefact_id: pr.ppl_artefact_id}, | ||
| limit: ^limit | ||
| ) | ||
| |> EctoRepo.all() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a comment, but try to run this with the explain to validate that the planner picks the index that you created to run this query. (It should infer expires_at is not null but it may not)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added additional index for this:
|
|
||
| if days < @min_grace_period_days do | ||
| Logger.warning( | ||
| "[Retention] grace_period_days=#{days} is below minimum, using #{@min_grace_period_days}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add org_id in this log line, so it is easier to debug.
| ) | ||
|
|
||
| {marked_count, _} = EctoRepo.update_all(mark_query, set: [expires_at: expires_at]) | ||
| {unmarked_count, _} = EctoRepo.update_all(unmark_query, set: [expires_at: nil]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These update_all calls can cause issues since if the cutoff date is recent enough, we might end up updating a lot of rows at once and have issues with locks, heavy IO, connection timeout, etc.
The more secure way here would be to do batching with a limit on the number of rows that are updated at once.
af9fec5 to
0d49ae0
Compare
📝 Description
This PR implements retention policy support in plumber. When the usage service applies a retention policy, plumber marks pipeline records for expiration with a configurable grace period, allowing corrections before data is deleted.
expires_atcolumn topipeline_requeststable to track when records should be deletedPolicyConsumerto receive retention policy events from usage service via RabbitMQPolicyApplierto mark/unmark pipeline records based on the cutoff dateexpires_at = now + grace_period(default 15 days)expires_atcleared, allowing policy corrections to "save" mistakenly marked recordsRETENTION_GRACE_PERIOD_DAYSenv var (min: 7 days)✅ Checklist