marten: F#: Discriminated Unions are not supported for aggregates

it seems that F# discriminated unions are not supported for aggregation / projection, example of code that does not work:

open System
open Marten
open Marten.Schema.Identity

type AccountCreation = {
    Owner: string
    AccountId: Guid
    CreatedAt: DateTimeOffset
    StartingBalance: decimal
}

type Transaction = {
    To: Guid
    From: Guid
    Description: string
    Time: DateTimeOffset
    Amount: decimal
}

type AccountEvent =
    | AccountCreated of AccountCreation
    | AccountCredited of Transaction
    | AccountDebited of Transaction

type Account() =
    member val Id = Unchecked.defaultof<Guid> with get,set
    member val Owner = Unchecked.defaultof<string> with get,set
    member val Balance = Unchecked.defaultof<decimal> with get,set
    member val CreatedAt = Unchecked.defaultof<DateTimeOffset> with get,set
    member val UpdatedAt = Unchecked.defaultof<DateTimeOffset> with get,set

    member this.Apply(accountEvent: AccountEvent) =
        printfn "I've been called %A" accountEvent
        
[<EntryPoint>]
let main argv =
    use store = DocumentStore.For(fun options ->
            let connectionString = sprintf "host=%s;database=%s;username=%s;password=%s"
                                       "localhost"
                                       "postgres"
                                       "root"
                                       "root"
            options.Connection(connectionString)
            options.Events.AddEventType(typeof<AccountEvent>)
            options.Events.InlineProjections.AggregateStreamsWith<Account>() |> ignore
        )

    use session = store.LightweightSession()

    let khalidId = CombGuidIdGeneration.NewGuid()
    let billId = CombGuidIdGeneration.NewGuid()
    
    let khalid = AccountEvent.AccountCreated({
        Owner = "Khalid Abuhakmeh"
        AccountId = khalidId
        StartingBalance = 1000m
        CreatedAt = DateTimeOffset.UtcNow
    })
    
    let bill = AccountEvent.AccountCreated({
        Owner = "Bill Boga"
        AccountId = billId
        StartingBalance = 0m
        CreatedAt = DateTimeOffset.UtcNow
    })
    
    let transaction = AccountEvent.AccountCredited({
        From = khalidId
        To = billId
        Amount = 100m
        Time = DateTimeOffset.UtcNow
        Description = "transfer to bill"
    })

    session.Events.Append(khalidId, khalid) |> ignore
    session.Events.Append(billId, bill) |> ignore
    session.Events.Append(khalidId, transaction) |> ignore

    session.SaveChangesAsync()
    |> Async.AwaitTask
    |> Async.RunSynchronously

    let account = session.LoadAsync<Account>(khalidId)
                    |> Async.AwaitTask
                    |> Async.RunSynchronously

    let stream = session.Events.FetchStream(khalidId)

    printfn "%A" account
    printfn "%A" stream

    0

Long story short:

  • At the end, account is equal to null
  • The method Apply(accountEvent: AccountEvent) is never called.

However if I am doing something more classic like:

open System
open Marten
open Marten.Schema.Identity

type AccountCreation = {
    Owner: string
    AccountId: Guid
    CreatedAt: DateTimeOffset
    StartingBalance: decimal
}

type Transaction = {
    To: Guid
    From: Guid
    Description: string
    Time: DateTimeOffset
    Amount: decimal
}

type AccountEvent =
    | AccountCreated of AccountCreation
    | AccountCredited of Transaction
    | AccountDebited of Transaction

type Account() =
    member val Id = Unchecked.defaultof<Guid> with get,set
    member val Owner = Unchecked.defaultof<string> with get,set
    member val Balance = Unchecked.defaultof<decimal> with get,set
    member val CreatedAt = Unchecked.defaultof<DateTimeOffset> with get,set
    member val UpdatedAt = Unchecked.defaultof<DateTimeOffset> with get,set

    member this.Apply(accountCreation: AccountCreation) =
        printfn "I've been called %A" accountCreation
        this.Id <- accountCreation.AccountId
        this.Owner <- accountCreation.Owner
        this.Balance <- accountCreation.StartingBalance
        this.CreatedAt <- accountCreation.CreatedAt
        this.UpdatedAt <- accountCreation.CreatedAt
        
[<EntryPoint>]
let main argv =
    use store = DocumentStore.For(fun options ->
            let connectionString = sprintf "host=%s;database=%s;username=%s;password=%s"
                                       "localhost"
                                       "postgres"
                                       "root"
                                       "root"
            options.Connection(connectionString)
            options.Events.AddEventType(typeof<AccountEvent>)
            options.Events.InlineProjections.AggregateStreamsWith<Account>() |> ignore
        )

    use session = store.LightweightSession()

    let khalidId = CombGuidIdGeneration.NewGuid()
    let billId = CombGuidIdGeneration.NewGuid()
    
    let khalid = {
        Owner = "Khalid Abuhakmeh"
        AccountId = khalidId
        StartingBalance = 1000m
        CreatedAt = DateTimeOffset.UtcNow
    }
    
    let bill = {
        Owner = "Bill Boga"
        AccountId = billId
        StartingBalance = 0m
        CreatedAt = DateTimeOffset.UtcNow
    }
    
    let transaction = {
        From = khalidId
        To = billId
        Amount = 100m
        Time = DateTimeOffset.UtcNow
        Description = "transfer to bill"
    }

    session.Events.Append(khalidId, khalid) |> ignore
    session.Events.Append(billId, bill) |> ignore
    session.Events.Append(khalidId, transaction) |> ignore

    session.SaveChangesAsync()
    |> Async.AwaitTask
    |> Async.RunSynchronously

    let account = session.LoadAsync<Account>(khalidId)
                    |> Async.AwaitTask
                    |> Async.RunSynchronously

    let stream = session.Events.FetchStream(khalidId)

    printfn "%A" account
    printfn "%A" stream

    0

account is properly loaded

What did I change between the two? I basically removed discriminated unions, in the events that are append to the stream:

From (AccountEvent.AccountCreated):

let khalid = AccountEvent.AccountCreated({
	Owner = "Khalid Abuhakmeh"
	AccountId = khalidId
	StartingBalance = 1000m
	CreatedAt = DateTimeOffset.UtcNow
})

To (AccountCreation type):

let khalid = {
	Owner = "Khalid Abuhakmeh"
	AccountId = khalidId
	StartingBalance = 1000m
	CreatedAt = DateTimeOffset.UtcNow
}

and change the parameter passed to Apply:

member this.Apply(accountEvent: AccountEvent) =

to

member this.Apply(accountCreation: AccountCreation) =

I think this is really frustrating in F# to not be able to use Discriminated Unions because of the possibility it offers in terms of pattern matching. It forces to aggregate from the whole stream without persisting the aggregation / projection, which can be an issue in terms of performances for queries.

AFAIK, this is not due Newtonsoft.Json cause it does support both the serialization and deserialization with discriminated unions:

[<EntryPoint>]
let main argv =
    let accountCreated = AccountEvent.AccountCreated({
        Owner = "Khalid Abuhakmeh"
        AccountId = Guid.NewGuid()
        StartingBalance = 1000m
        CreatedAt = DateTimeOffset.UtcNow
    })
    let serialized = JsonConvert.SerializeObject(accountCreated)
    let deserialized = JsonConvert.DeserializeObject<AccountEvent>(serialized)
    
    printfn "%A" (accountCreated = deserialized)
    
    0

It most likely resides in some reflection tasks performed by marten upon event appending.

I am not sure if someone could have a hint about a decent workaround or maybe a hint about where to lookup in the source code.

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Reactions: 8
  • Comments: 35 (8 by maintainers)

Most upvoted comments

Thank you @ehouarn-perret.

I’m just finishing Domain Modeling Made Functional by @swlaschin, last pages left - so I think that now I have the good basis to tackle this issue and make Marten more F# friendly.

After I finish my work on https://github.com/JasperFx/marten/issues/1302 I’ll try to tackle that.

One way to fix this issue is by adding the following bits of code to the original example

type WrappedAccountEvent = {
    Inner : AccountEvent
}    

type Events.IEventStore with 
    member this.Append(id : Guid, x: AccountEvent) = 
        let y = { Inner = x }
        this.Append(id, y)
        
type Account with
    member this.Apply(evt : WrappedAccountEvent) =
        let unwrapped = evt.Inner        
        this.Apply(unwrapped)       

That code creates an optional type extension for the Append method specific to AccountEvents. Then it adds an intrinsic type extension to the Account type. The distinction between the two type extensions actually matters because the extension to the Account method is visible via Reflection and from C#. Other than some extra object allocations, the technique is low impact. It’s just boring and repetitive code to write for every aggregate and its DU of events. And the solution to boring and repetitive code is … code generation.

I propose that we use F# build time code generation, a technique I have borrowed from Myriad (https://github.com/MoiraeSoftware/myriad).

For each aggregate and DU event you are using, you would add an empty file to your project after the file declaring the event and aggregate. At build time, command line utility would receive a list of .fs files and process the F# AST of those files looking for member methods named “Apply” that take a parameter that is a DU. It would then generate code like the example above for each one, putting that code into the empty file you added to your project. I’ve used this technique before and it worked out fairly well.

If the Marten/F# folks think this is a viable solution, I’m willing to create and open source the command line utility to make it work.

@jannikbuschke Sounds like an awesome way to contribute to Marten! We’ve had F# contributors in the past, but not lately. I’d welcome some official F# docs and tests in the main codebase.

No recent activity, seems to work, and I’m finally closing this.

@AlexZeitler @mary-perret-1986

I got a chance to work on this a bit more last night and I see a way to ease up on some of these restrictions. I’ll try post a sample of what the output will look like a bit later.

It seems somewhere along the way this might have been resolved.

Following test passes:

type CounterEvent =
  | Increased of int
  | Decreased of int
  | ResetTo of int

type CounterState () =
  member val Id = Guid.Empty with get, set
  member val Total = 0 with get, set

type CounterAggregate () =
  inherit SingleStreamAggregation<CounterState> ()

  member _.Apply (e : CounterEvent, state : CounterState) =
    match e with
    | Increased i -> state.Total <- state.Total + i
    | Decreased i -> state.Total <- state.Total - i
    | ResetTo i -> state.Total <- i

[<Fact>]
let ``Can project a union`` () =
  let cfg (options : StoreOptions) =
    options.Connection ("...")
    options.Projections.Add<CounterAggregate> (ProjectionLifecycle.Inline)
    ()

  let store = DocumentStore.For cfg
  let session = store.LightweightSession ()
  let g = Guid.NewGuid ()

  session.Events.Append (g, Increased 12, Increased 1) |> ignore
  session.SaveChanges ()
  let doc = session.Load<CounterState> (g)
  Assert.Equal (13, doc.Total)

@AlexZeitler I got the issue with types not being in a namespace blowing up the generated code fixed in the latest alpha. That’s no longer a problem.

Quick followup: I made a lot of progress on this over the weekend. I expect to have something for folks to look at in a week. The solution I came up with has some limitations. I think they are reasonable and I would like to hear feedback from folks:

  • Each aggregate has a single DU for its events. - This seems like the natural F# way to me.
  • Each aggregate/DU pair are declared in a single fs file. - I think this could be relaxed at the cost of code complexity that I would rather avoid.
  • All these files are included in your project above any Marten implementation code.
  • Your aggregate/DU pairs are in a namespace - This can be the same namespace for all of them or different namespaces.
  • The Apply method has a type annotation on the DU variable. - This will requirement definitely go away as the more idiomatic F# code would allow type inference to do its thing. It’s easier to process the AST if the variable has the type annotation.

Do these limitations sound onerous or unreasonable?

For sure I’ll add you and @wastaz as reviewers. I have plan to work on that during the weekend or at worst in the next week if my non-marten life won’t mess with that 😉

I’ll have also some learning curve, as I wasn’t coding in F# much.

@oskardudycz I absolutely prefer having the docs as code (so our docs won’t go stale). Just a new build prerequisite (F#) for anybody wanting to build Marten. Unless they are not made part of the default build or build is otherwise tweaked.

@JacksonCribb, thank you a lot for the feedback. That’d be great if we accidentally fixed that 😅 I’ll keep the issue open, as I’m planning to work more on the Marten F# experience soon, so I’ll try to double-check if we have all the common use cases covered.

@johncj-improving what about using a Type Provider?

Thanks for the reminder! The only requirement is that all of your aggregates and DUs should be in one file with a namespace. At compile time, you pass the name of that file to my command line utility. It will generate a file that will be inserted in your project immediately below the file you passed in. For the example above, assuming you split the code into two files (above the [<EntryPoint>] attribute and specified Account as your namespace), the generated file would look like this:

module Martenizer
// Generated code
    open Marten
    open Marten.Events 
    open System 
    open Account 


// DU wrappers get written first
    type WrappedAccountEvent = {
        Inner : AccountEvent
    }    
// Aggregates are extended with an unwrapping member    
    type Account with
        member this.Apply(evt : WrappedAccountEvent) =
            let unwrapped = evt.Inner        
            this.Apply(unwrapped)     

// Various Marten interfaces/objects get specific extension methods (only visible from F#)
    type IEventStore with 
        member this.Append(id : Guid, x: AccountEvent) = 
            let y = { Inner = x }
            this.Append(id, y)
        member this.Append(id : Guid, xs : seq<AccountEvent>) =
            let y = xs |> Seq.map(fun a -> { Inner = a } :> obj) //|> Seq.toArray
            this.Append(id, y )
        member this.Append(id : Guid, [<ParamArray>] evts : AccountEvent[] ) =
            let y = evts |> Array.map(fun e -> { Inner = e } :> obj) |> Array.toSeq
            this.Append(id, y)
            
    type IDocumentSession with 
        member this.LoadAsync<'T when 'T :> Account>((id : Guid)) =
            async {
            let acct = new Account()
            let evts = this.Events.FetchStream(id)
            for evt in evts do
                acct.Apply(this.Events.Load<WrappedAccountEvent>(evt.Id).Data)
            return acct
            } |> Async.StartAsTask


    
//The typeof command gets hijacked
    let inline typeof< ^T when ^T :> System.Object> =
        match typeof< ^T> with
        | t when t = typeof<AccountEvent> -> typeof<WrappedAccountEvent>
        | _ -> typeof< ^T>

The end result is that you write your F# the way you expect and when you compile, a bunch of magic happens that automates the wrapping of your DUs. The code that generates the code above is pretty ugly, but I’m the only one who has to look at it. I’m waiting for the Marten 4.0 Event Sourcing API to be finalized so that I know the scope of the code I have to generate. As a side note, that bit of code that hijacks typeof is the nuttiest code I’ve every written. On the plus side, I finally understand SRTPs in F#. If anyone has samples of Aggregates and Event DUs they can share, you can post them here or email them to me at [FirstName].[LastName]@improving.com

Thanks, John Cavnar-Johnson

I have a plan to add new sample projects to our solution: one written with C# and other with F#, based on what @ehouarn-perret provided. It would be easier to check and verify if it’s working also for F# and might be good starting point for our users. Thoughts? @jokokko I like the idea of updating also scenarios 👍