Skip to content

Versioning & Source Generator #24

@bnayae

Description

@bnayae

Source Generator can solve the versioning issue.

  • learn more about source generator here , here and here
  • POC source generator technique
  • Source generator attribute (both for producer & consumer) will define a version
  • Producer will inject this version to the message metadata
  • Consumer will ignore messages which not match its version
    • Ignore pattern
      • Skip [BREAK ORDERING]: simply ignore with ACK (do not process it). Good when other versions have different subscription group
      • Wait [SLOW]: returns the message to the stream as unhandled (hopes someone else will handle it) and delays next consumption.
  • Multiple consumer can join single subscription in a broadcast format
    await using IConsumerLifetime subscription = _consumerBuilder
                         .WithOptions(o => consumerOptions)
                         .WithCancellation(cancellation)
                         .Partition(PARTITION)
                         .Shard(SHARD)
                         .WithLogger(_fakeLogger)
                           .SubscribeEventFlowV2Consumer()
                           .SubscribeEventFlowV1();

    [GenerateEventSource(EventSourceGenType.Consumer, multi)]
    public interface IEventFlowV2
    {
        /// <summary>
        /// Stages 1.
        /// </summary>
        /// <param name="PII">The PII.</param>
        /// <param name="payload">The payload.</param>
        /// <returns></returns>
        [Versions(from=1,to=3)]
        ValueTask Stage1Async(Person PII, string payload);

        [Versions(from=4)]
        ValueTask Stage1Async(Person PII, JsonElement payload);
        /// <summary>
        /// Stages the 2.
        /// </summary>
        /// <param name="PII">The PII.</param>
        /// <param name="data">The data.</param>
        /// <returns></returns>
        ValueTask Stage2Async(JsonElement PII, JsonElement data);
        ValueTask Stage2Async(JsonElement PII, int data);
    }

Brainstorm versioning logic

Why versioning

Different parts of the event flow may be changed over time

  • Communication Interfaces & DTOs
  • Communication channels
    • Metadata (properties)
    • Storage strategy

Requirements

  • Processing sequence should be maintained (at least for some scenarios)

Problem

  • Newer message version may not be compatible with old versions.

Solution: Naming convention

DTO / Operations name must end with Version indication.
any change should result in different suffixes.

Challenge

A consumer should be capable of handling multi-version in some filtering logic of first win.

Mitigation

// pseudo code
.Subscribe<IHandlerV9>(v => v >= 1);
.Subscribe<IHandlerV13>(v => v >= 10 && v < 14);
.Subscribe<IHandlerV17>(14, 17); // handle version 14 -17
.Subscribe<IHandlerV18>(18); // handle version 18
.Fallback(meta => 
{
    logger.LogWarning($"version {meta.Version} is missing");
    return Instruction.Hang; // release ownership & hand consumption for awhile in hope of newer consumer to take the message  
});

All subscriptions should register (sequentially to the channel) in order to keep sequential processing


Problem: enforce naming convention

  • Suggestion: Source Generator for the interfaces & DTOs (maybe over proto files)

Problem: Channel behavior compatibility

  • Channel properties or storage-strategy may change.

Solution: Chanel storage compatibility

  • Consumer channel can register multiple message formatter & storage strategies

Keeping code relative clean

All different version related code should be consumed via DI

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions