Skip to content

Latest commit



318 lines (240 loc) · 10.4 KB

File metadata and controls

318 lines (240 loc) · 10.4 KB

/ Consensus / Proposing / Staging

Staging Pipeline


Staging pipeline is responsible for packaging and staging requests from receptioning into proposal files. Each of these local proposals once accepted by the consortium will be uniformly transacted on each consortium node. The transactions will then be appended into each node ledgers.


Project Tree

1. Pipeline

Staging is a Pipeline

  • a persisted input stream : Input.hs

  • a line of Pipes Welded together in Pipeline.hs

    • Sourcing initial inputs : stream infinitely inputLog
    • Composition of deterministic Pipes
      • Welding : Adapting IOs between pipes
    serializing .~> nonEmptying .~> capping .~> persisting 
    • Sinking final outputs : sinking outputLog
  • a persisted output stream : Output.hs

2. Pipes

The pipeline is using Pipes serializing, nonEmptying, capping, persisting, meaning each of them has

  • An Input Stream
  • A Stream Processing
  • An Output Stream

3. Execution Environment

The Staging model (pipeline) is executed on

  • The Log Engine used
  • The Business Logic used on top of the consensus layer (requests in that context)

You'll find in this folder different version of Pipeline.hs "polymorphically reduced" or concrete

4. Executable

Staging has some DevOps features as well

  • Settings.hs always into a separated project xxxx-packaging-settings for deployment purposes in Zeus
  • Dependencies.hs are derived from Settings if sub-dependencies are all Healthy


  • Perform the HealhtChecks to obtain the pipeline dependencies
  • Execute the pipeline + load the junctions in the EventStore Microservice
  • Put the Microservice back in HealthCheck mode if any Exception bubbles up in the pipeline during execution.

N.B : Microservice configuration and Deployment (Locally/Simulated/Production etc...) are defined in the package Zeus



A Junction (Merger) is

  • a set of persisted input streams
  • a nondeterministic logic for merging these input streams
  • a persisted output stream (input of a pipeline)

The persisted input stream is the junction of 2 upstreams pipelines

We are using the "User Defined Projections" EventStore feature to implement this junction

Defined in Junction.hs

Executed in Executable.hs



Defined in Input.hs

1. Package request

Executing that command means

  • Accumulate each request into a temporary file.
FileName : x.tmp with x the offset of local proposal produced
  • Detect when the temporary file is full
Current Size + new request size < configurable size limit
  • Convert this temporary file into a proposal file
x.tmp -> x.proposal
  • Redirect the stream into a new Temporary File : (x+1).tmp
  • Notify the downstream Broadcasting Section that a new local proposal is available.

2. Stage

This command appears when the Local Proposal flow is tensed. The flow is tensed when the consensus is consuming more local proposal than produced.

Executing it means :

  • Converting the temporary file into a proposal file if requests are already accumulated.


Defined Output.hs

Staging produces

  • A Notification : LocalProposalStaged {localOffset :: Offset}
  • a File {localOffset}.proposal containing requests with
0 < Size <= Size Limit

Pipe recipe

To produce the expected pipeline output , we are combining different pipes all together by

  • A simple function composition (.)
  • A welding : map to adapt Output Pipe x with Input Pipe x+1

The Staging pipe recipe is

  stream infinitely inputLog -- sourcing
     ~> serializing
    .~> nonEmptying
    .~> capping proposalSizeLimit
    .~> persisting proposalRootFolder
    .~> sinking outputLog

Defined in Pipeline.hs

The welding between each pipe is defined in /Welding/BluePrint.hs

The pipe recipe goals are

1. Size properly the proposals

0 < Size <= Size Limit

Under the responsibility of

2. Convert the temporary file into a proposal file.

Under the responsibility of

N.B : Serializing will be removed eventually. We'll evaluate this when addressing data compression.



Defined in Pipe.hs

Just Transform request in SerializedRequest

newtype SerializedRequest = SerializedRequest [Word8] deriving (Eq,Show)

N.B : serializing will be removed eventually. We'll address it when we'll add the request compression features.



Remembering the initial input of the section

data Input request
  = Stage -- ^ ask to "Staging Pipeline" to force the stage of a new local proposal with all the requests currently collected
  | Package request -- ^ ask to to "Staging Pipeline" to package the request into a proposal according
                    -- some properties (see

It's totally natural to receive multiple Stage commands consecutively, E.g

  • No Requests while many blocks are appended consecutively
  • Etc...

Forcing the production of empty proposal adds no value in our domain and provokes an accidental complexity downstream if not managed.
Therefore, we want to

  • Remove the consecutive Stage commands from our input stream.
  • Never start downstream processing with a Stage

Said differently, we want to get the following property

0 < Proposal File Size 


With the following Natural Transformation

Staging.Input request ~> Maybe request

We want the following stream property

  • Never start by Nothing
  • Never 2 consecutive Nothing

E.g - We want the following transformation

[Nothing, Just r1, Just r2, Nothing, Nothing, Nothing] -> [Just r1, Just r2, Nothing]

Implemented in Pipe.hs

Tested in PipeSpec.hs



We can't broadcast proposal files with unlimited memory size. We want the following property

Proposal File Size <= Size Limit

Capping is welded after NonEmptying, combining the 2 pipes will give the following property

0 < Proposal File Size <= Size Limit


To increase the expressivity of our previous domain we are doing the following natural transformation

instance Weldable (NonEmptying.Output request) (Capping.Input request) where
    = \case
      Nothing  -> Capping.AskForACut
      Just request -> Capping.Add request

With the Following Output in Output.hs

data Output request
  = Cut
  | Added request
  deriving (Show,Eq)

the polymorphism on request is reduced with the following type Class

class Sizable a where
  getMemorySize :: a -> Byte
  :: ( S.MonadAsync m
     , Sizable item)
  => Byte -- ^ proposalSizeLimit
  -> S.SerialT m (Input item)
  -> S.SerialT m (Output item)

We are cutting the consecutiveness of Added request events if the cumulative memory size of requests is > a size limit given. By "cutting", we mean adding a Cut in between 2 Added request events.

Using a Fold executed with a postscan with the following State Machine


Implemented in Pipe.hs

Tested in PipeSpec.hs



This pipe prerequesites the following properties on its stream input

0 < Proposal File Size <= Size Limit


data Input request
  = CommitProposal
  | Persist request
  deriving (Show,Eq,Functor)
newtype Output 
  = LocalProposalPersisted {proposalId :: Offset} 
  deriving (Show,Eq)


Using Streamly FileSystem primitives

  • Create a temporary file : x.tmp
  • Persist request
  • When receiving CommitProposal
    • Convert x.tmp into x.proposal
    • produce LocalProposalPersisted x
    • x = x + 1

Implemented in Pipe.hs

Integration Test in PipeSpec.hs