Skip to content

Commit

Permalink
Allow users to cancel requests
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbeaumont committed Jul 25, 2021
1 parent 0d571bd commit 2280e8e
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 24 deletions.
5 changes: 3 additions & 2 deletions ocpp1.6/charge_point.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocpp16

import (
"context"
"fmt"

"github.com/lorenzodonini/ocpp-go/internal/callbackqueue"
Expand Down Expand Up @@ -238,7 +239,7 @@ func (cp *chargePoint) SendRequest(request ocpp.Request) (ocpp.Response, error)
}
}

func (cp *chargePoint) SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, err error)) error {
func (cp *chargePoint) SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(confirmation ocpp.Response, err error)) error {
featureName := request.GetFeatureName()
if _, found := cp.client.GetProfileForFeature(featureName); !found {
return fmt.Errorf("feature %v is unsupported on charge point (missing profile), cannot send request", featureName)
Expand All @@ -252,7 +253,7 @@ func (cp *chargePoint) SendRequestAsync(request ocpp.Request, callback func(conf
}
// Response will be retrieved asynchronously via asyncHandler
send := func() error {
return cp.client.SendRequest(request)
return cp.client.SendRequestCtx(ctx, request)
}
err := cp.callbacks.TryQueue("main", send, callback)
return err
Expand Down
3 changes: 2 additions & 1 deletion ocpp1.6/v16.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package ocpp16

import (
"context"
"crypto/tls"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -86,7 +87,7 @@ type ChargePoint interface {
// The central system will respond with a confirmation messages, or with an error if the request was invalid or could not be processed.
// This result is propagated via a callback, called asynchronously.
// In case of network issues (i.e. the remote host couldn't be reached), the function returns an error directly. In this case, the callback is never called.
SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error
SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error
// Connects to the central system and starts the charge point routine.
// The function doesn't block and returns right away, after having attempted to open a connection to the central system.
// If the connection couldn't be opened, an error is returned.
Expand Down
9 changes: 7 additions & 2 deletions ocppj/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocppj

import (
"context"
"fmt"

"github.com/lorenzodonini/ocpp-go/ocpp"
Expand Down Expand Up @@ -108,7 +109,7 @@ func (c *Client) Stop() {
// - the endpoint doesn't support the feature
//
// - the output queue is full
func (c *Client) SendRequest(request ocpp.Request) error {
func (c *Client) SendRequestCtx(ctx context.Context, request ocpp.Request) error {
if !c.dispatcher.IsRunning() {
return fmt.Errorf("ocppj client is not started, couldn't send request")
}
Expand All @@ -125,14 +126,18 @@ func (c *Client) SendRequest(request ocpp.Request) error {
return err
}
// Message will be processed by dispatcher. A dedicated mechanism allows to delegate the message queue handling.
if err := c.dispatcher.SendRequest(RequestBundle{Call: call, Data: jsonMessage}); err != nil {
if err := c.dispatcher.SendRequest(RequestBundle{Call: call, Data: jsonMessage, Ctx: ctx}); err != nil {
log.Errorf("request %v - %v: %v", call.UniqueId, call.Action, err)
return err
}
log.Debugf("enqueued request %v - %v", call.UniqueId, call.Action)
return nil
}

func (c *Client) SendRequest(request ocpp.Request) error {
return c.SendRequestCtx(context.Background(), request)
}

// Sends an OCPP Response to the server.
// The requestID parameter is required and identifies the previously received request.
//
Expand Down
59 changes: 41 additions & 18 deletions ocppj/dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocppj

import (
"context"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -168,6 +169,20 @@ func (d *DefaultClientDispatcher) SendRequest(req RequestBundle) error {

func (d *DefaultClientDispatcher) messagePump() {
rdy := true // Ready to transmit at the beginning
cancelled := func() {
if d.pendingRequestState.HasPendingRequest() {
// Current request timed out. Removing request and triggering cancel callback
el := d.requestQueue.Peek()
bundle, _ := el.(RequestBundle)
d.CompleteRequest(bundle.Call.UniqueId)
if d.onRequestCancel != nil {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload)
}
}
// No request is currently pending -> set timer to high number
d.timer.Reset(defaultTimeoutTick)
}
var reqContextDone <-chan struct{}
for {
select {
case _, ok := <-d.requestChannel:
Expand All @@ -177,22 +192,15 @@ func (d *DefaultClientDispatcher) messagePump() {
d.requestChannel = nil
return
}
case <-reqContextDone:
// user cancelled request
cancelled()
case _, ok := <-d.timer.C:
// Timeout elapsed
if !ok {
continue
}
if d.pendingRequestState.HasPendingRequest() {
// Current request timed out. Removing request and triggering cancel callback
el := d.requestQueue.Peek()
bundle, _ := el.(RequestBundle)
d.CompleteRequest(bundle.Call.UniqueId)
if d.onRequestCancel != nil {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload)
}
}
// No request is currently pending -> set timer to high number
d.timer.Reset(defaultTimeoutTick)
cancelled()
case rdy = <-d.readyForDispatch:
// Ready flag set, keep going
}
Expand All @@ -206,22 +214,36 @@ func (d *DefaultClientDispatcher) messagePump() {
}
// Only dispatch request if able to send and request queue isn't empty
if rdy && !d.requestQueue.IsEmpty() {
d.dispatchNextRequest()
rdy = false
// Set timer
if !d.timer.Stop() {
<-d.timer.C
if done, ok := d.dispatchNextRequest(); ok {
rdy = false
// Set timer
if !d.timer.Stop() {
<-d.timer.C
}
d.timer.Reset(d.timeout)
reqContextDone = done
}
d.timer.Reset(d.timeout)
}
}
}

func (d *DefaultClientDispatcher) dispatchNextRequest() {
func (d *DefaultClientDispatcher) dispatchNextRequest() (<-chan struct{}, bool) {
// Get first element in queue
el := d.requestQueue.Peek()
bundle, _ := el.(RequestBundle)
jsonMessage := bundle.Data
ctx := bundle.Ctx
if ctx == nil {
ctx = context.Background()
}
select {
case <-ctx.Done():
if d.onRequestCancel != nil {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload)
}
return nil, false
default:
}
d.pendingRequestState.AddPendingRequest(bundle.Call.UniqueId, bundle.Call.Payload)
// Attempt to send over network
err := d.network.Write(jsonMessage)
Expand All @@ -232,6 +254,7 @@ func (d *DefaultClientDispatcher) dispatchNextRequest() {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload)
}
}
return ctx.Done(), true
}

func (d *DefaultClientDispatcher) Pause() {
Expand Down
2 changes: 2 additions & 0 deletions ocppj/queue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocppj

import (
"context"
"fmt"
"sync"
)
Expand All @@ -10,6 +11,7 @@ import (
type RequestBundle struct {
Call *Call
Data []byte
Ctx context.Context
}

// RequestQueue can be arbitrarily implemented, as long as it conforms to the Queue interface.
Expand Down
3 changes: 2 additions & 1 deletion ocppj/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocppj

import (
"context"
"fmt"

"github.com/lorenzodonini/ocpp-go/ocpp"
Expand Down Expand Up @@ -137,7 +138,7 @@ func (s *Server) SendRequest(clientID string, request ocpp.Request) error {
return err
}
// Will not send right away. Queuing message and let it be processed by dedicated requestPump routine
if err := s.dispatcher.SendRequest(clientID, RequestBundle{call, jsonMessage}); err != nil {
if err := s.dispatcher.SendRequest(clientID, RequestBundle{call, jsonMessage, context.Background()}); err != nil {
log.Errorf("request %v - %v for client %v: %v", call.UniqueId, call.Action, clientID, err)
return err
}
Expand Down

0 comments on commit 2280e8e

Please sign in to comment.