Skip to content

Commit

Permalink
Finish ConfirmedController
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 30, 2021
1 parent de85b3d commit 420e299
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,56 @@ open Microsoft.AspNetCore.Mvc

open ECommerce.Domain

type TicketsTranchesDto = { activeEpochs : TrancheReferenceDto[] }
and TrancheReferenceDto = { fc : FcId; epochId : ConfirmedEpochId }
type TranchesDto = { activeEpochs : TrancheReferenceDto[] }
and TrancheReferenceDto = { seriesId : ConfirmedSeriesId; epochId : ConfirmedEpochId }

type SliceDto = { closed : bool; tickets : ItemDto[]; position : TicketsCheckpoint; checkpoint : TicketsCheckpoint }
and ItemDto = { id : TicketId; payload : string }
module ItemDto =
module TranchesDto =

let ofDto (x : TicketsEpoch.Events.Item) : ItemDto =
{ id = x.id; payload = x.payload }
let ofEpochId epochId =
{ activeEpochs = [| { seriesId = ConfirmedSeriesId.wellKnownId; epochId = epochId } |]}

type SliceDto = { closed : bool; carts : CartDto[]; position : ConfirmedCheckpoint; checkpoint : ConfirmedCheckpoint }
and CartDto = { id : CartId; items : ItemDto[] }
and ItemDto = { productId : ProductId; unitPrice : decimal; quantity : int }

module CartDto =

let ofDto (x : ConfirmedEpoch.Events.Cart) : CartDto =
{ id = x.cartId
items = [| for x in x.items -> { productId = x.productId; unitPrice = x.unitPrice; quantity = x.quantity } |] }

module Checkpoint =

let ofEpochAndOffset (epoch : TicketsEpochId) (offset : int) =
TicketsCheckpoint.ofEpochAndOffset epoch offset
let ofEpochAndOffset (epoch : ConfirmedEpochId) (offset : int) =
ConfirmedCheckpoint.ofEpochAndOffset epoch offset

let ofState (epochId : TicketsEpochId) (s : TicketsEpoch.Reader.StateDto) =
TicketsCheckpoint.ofEpochContent epochId s.closed s.tickets.Length
let ofState (epochId : ConfirmedEpochId) (s : ConfirmedEpoch.Reader.StateDto) =
ConfirmedCheckpoint.ofEpochContent epochId s.closed s.carts.Length

[<Route("api/[controller]")>]
type TicketsController(tickets : TicketsIngester.Service, series : TicketsSeries.Service, epochs : TicketsEpoch.Reader.Service) =
type ConfirmedController(series : ConfirmedSeries.Service, epochs : ConfirmedEpoch.Reader.Service) =
inherit ControllerBase()

[<HttpPost; Route "{fc}/{ticket}">]
member _.Post(fc : FcId, ticket : TicketId, [<FromBody>] payload) = async {
let! _added = tickets.ForFc(fc).TryIngest({ id = ticket; payload = payload})
()
}

[<HttpGet>]
member _.ListTranches() : Async<TicketsTranchesDto> = async {
let! active = series.ReadIngestionEpochs()
return { activeEpochs = [| for x in active -> { fc = x.fc; epochId = x.ingestionEpochId } |]}
member _.ListTranches() : Async<TranchesDto> = async {
let! active = series.ReadIngestionEpochId()
return TranchesDto.ofEpochId active
}

[<HttpGet; Route("{fcId}/{epoch}")>]
member _.ReadTranche(fcId : FcId, epoch : TicketsEpochId) : Async<SliceDto> = async {
let! state = epochs.Read(fcId, epoch)
[<HttpGet; Route("{epoch}")>]
member _.ReadTranche(epoch : ConfirmedEpochId) : Async<SliceDto> = async {
let! state = epochs.Read(epoch)
// TOCONSIDER closed should control cache header
let pos, checkpoint = Checkpoint.ofEpochAndOffset epoch 0, Checkpoint.ofState epoch state
return { closed = state.closed; tickets = Array.map ItemDto.ofDto state.tickets; position = pos; checkpoint = checkpoint }
return { closed = state.closed; carts = Array.map CartDto.ofDto state.carts; position = pos; checkpoint = checkpoint }
}

[<HttpGet; Route("{fcId}/slice/{token?}")>]
member _.Poll(fcId : FcId, token : System.Nullable<TicketsCheckpoint>) : Async<SliceDto> = async {
let pos = if token.HasValue then token.Value else TicketsCheckpoint.initial
let epochId, offset = TicketsCheckpoint.toEpochAndOffset pos
let! state = epochs.Read(fcId, epochId)
[<HttpGet; Route("slice/{token?}")>]
member _.Poll(token : System.Nullable<ConfirmedCheckpoint>) : Async<SliceDto> = async {
let pos = if token.HasValue then token.Value else ConfirmedCheckpoint.initial
let epochId, offset = ConfirmedCheckpoint.toEpochAndOffset pos
let! state = epochs.Read(epochId)
// TOCONSIDER closed should control cache header
let pos, checkpoint = Checkpoint.ofEpochAndOffset epochId offset, Checkpoint.ofState epochId state
return { closed = state.closed; tickets = Array.skip offset state.tickets |> Array.map ItemDto.ofDto; position = pos; checkpoint = checkpoint }
return { closed = state.closed; carts = Array.skip offset state.carts |> Array.map CartDto.ofDto; position = pos; checkpoint = checkpoint }
}
4 changes: 2 additions & 2 deletions Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ type Service internal

/// Ingest the supplied items. Yields relevant elements of the post-state to enable generation of stats
/// and facilitate deduplication of incoming items in order to avoid null store round-trips where possible
member _.Ingest(epochId, items) =
member _.Ingest(epochId, carts) =
let decider = resolveStale epochId
/// NOTE decider which will initially transact against potentially stale cached state, which will trigger a
/// resync if another writer has gotten in before us. This is a conscious decision in this instance; the bulk
/// of writes are presumed to be coming from within this same process
decider.Transact(decide shouldClose items)
decider.Transact(decide shouldClose carts)

/// Returns all the items currently held in the stream (Not using AllowStale on the assumption this needs to see updates from other apps)
member _.Read epochId : Async<Fold.State> =
Expand Down
23 changes: 22 additions & 1 deletion Sample/ECommerce.Equinox/ECommerce.Domain/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,27 @@ type [<Measure>] confirmedEpochId
type ConfirmedEpochId = int<confirmedEpochId>
module ConfirmedEpochId =
let initial = 0<confirmedEpochId>
// let value (value : ConfirmedEpochId) : int = %value
let value (value : ConfirmedEpochId) : int = %value
let parse (value : int) : ConfirmedEpochId = %value
let next (value : ConfirmedEpochId) : ConfirmedEpochId = % (%value + 1)
let toString (value : ConfirmedEpochId) : string = string %value

type [<Measure>] confirmedCheckpoint
type ConfirmedCheckpoint = int64<confirmedCheckpoint>
module ConfirmedCheckpoint =

let initial : ConfirmedCheckpoint = %0L
let factor = 1_000_000L

let ofEpochAndOffset (epoch : ConfirmedEpochId) offset : ConfirmedCheckpoint =
int64 (ConfirmedEpochId.value epoch) * factor + int64 offset |> UMX.tag

let ofEpochContent (epoch : ConfirmedEpochId) isClosed count : ConfirmedCheckpoint =
let epoch, offset =
if isClosed then ConfirmedEpochId.next epoch, 0
else epoch, count
ofEpochAndOffset epoch offset

let toEpochAndOffset (value : ConfirmedCheckpoint) : ConfirmedEpochId * int =
let d, r = Math.DivRem(%value, factor)
(ConfirmedEpochId.parse (int d)), int r

0 comments on commit 420e299

Please sign in to comment.