-
-
Notifications
You must be signed in to change notification settings - Fork 2
Open
Description
Source Generator can solve the versioning issue.
- learn more about source generator here , here and here
- POC source generator technique
- Source generator attribute (both for producer & consumer) will define a version
- Producer will inject this version to the message metadata
- Consumer will ignore messages which not match its version
- Ignore pattern
- Skip [BREAK ORDERING]: simply ignore with ACK (do not process it). Good when other versions have different subscription group
- Wait [SLOW]: returns the message to the stream as unhandled (hopes someone else will handle it) and delays next consumption.
- Ignore pattern
- Multiple consumer can join single subscription in a broadcast format
await using IConsumerLifetime subscription = _consumerBuilder
.WithOptions(o => consumerOptions)
.WithCancellation(cancellation)
.Partition(PARTITION)
.Shard(SHARD)
.WithLogger(_fakeLogger)
.SubscribeEventFlowV2Consumer()
.SubscribeEventFlowV1();
[GenerateEventSource(EventSourceGenType.Consumer, multi)]
public interface IEventFlowV2
{
/// <summary>
/// Stages 1.
/// </summary>
/// <param name="PII">The PII.</param>
/// <param name="payload">The payload.</param>
/// <returns></returns>
[Versions(from=1,to=3)]
ValueTask Stage1Async(Person PII, string payload);
[Versions(from=4)]
ValueTask Stage1Async(Person PII, JsonElement payload);
/// <summary>
/// Stages the 2.
/// </summary>
/// <param name="PII">The PII.</param>
/// <param name="data">The data.</param>
/// <returns></returns>
ValueTask Stage2Async(JsonElement PII, JsonElement data);
ValueTask Stage2Async(JsonElement PII, int data);
}Brainstorm versioning logic
Why versioning
Different parts of the event flow may be changed over time
- Communication Interfaces & DTOs
- Communication channels
- Metadata (properties)
- Storage strategy
Requirements
- Processing sequence should be maintained (at least for some scenarios)
Problem
- Newer message version may not be compatible with old versions.
Solution: Naming convention
DTO / Operations name must end with Version indication.
any change should result in different suffixes.
Challenge
A consumer should be capable of handling multi-version in some filtering logic of first win.
Mitigation
// pseudo code
.Subscribe<IHandlerV9>(v => v >= 1);
.Subscribe<IHandlerV13>(v => v >= 10 && v < 14);
.Subscribe<IHandlerV17>(14, 17); // handle version 14 -17
.Subscribe<IHandlerV18>(18); // handle version 18
.Fallback(meta =>
{
logger.LogWarning($"version {meta.Version} is missing");
return Instruction.Hang; // release ownership & hand consumption for awhile in hope of newer consumer to take the message
});All subscriptions should register (sequentially to the channel) in order to keep sequential processing
Problem: enforce naming convention
- Suggestion: Source Generator for the interfaces & DTOs (maybe over proto files)
Problem: Channel behavior compatibility
- Channel properties or storage-strategy may change.
Solution: Chanel storage compatibility
- Consumer channel can register multiple message formatter & storage strategies
Keeping code relative clean
All different version related code should be consumed via DI