Skip to content

Commit

Permalink
Allow users to cancel requests
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbeaumont committed Jul 21, 2021
1 parent c6b416e commit 24ecdea
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 23 deletions.
5 changes: 3 additions & 2 deletions ocpp1.6/charge_point.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocpp16

import (
"context"
"fmt"

"github.com/lorenzodonini/ocpp-go/internal/callbackqueue"
Expand Down Expand Up @@ -238,7 +239,7 @@ func (cp *chargePoint) SendRequest(request ocpp.Request) (ocpp.Response, error)
}
}

func (cp *chargePoint) SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, err error)) error {
func (cp *chargePoint) SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(confirmation ocpp.Response, err error)) error {
featureName := request.GetFeatureName()
if _, found := cp.client.GetProfileForFeature(featureName); !found {
return fmt.Errorf("feature %v is unsupported on charge point (missing profile), cannot send request", featureName)
Expand All @@ -252,7 +253,7 @@ func (cp *chargePoint) SendRequestAsync(request ocpp.Request, callback func(conf
}
// Response will be retrieved asynchronously via asyncHandler
send := func() error {
return cp.client.SendRequest(request)
return cp.client.SendRequestCtx(ctx, request)
}
err := cp.callbacks.TryQueue("main", send, callback)
return err
Expand Down
3 changes: 2 additions & 1 deletion ocpp1.6/v16.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package ocpp16

import (
"context"
"crypto/tls"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -86,7 +87,7 @@ type ChargePoint interface {
// The central system will respond with a confirmation messages, or with an error if the request was invalid or could not be processed.
// This result is propagated via a callback, called asynchronously.
// In case of network issues (i.e. the remote host couldn't be reached), the function returns an error directly. In this case, the callback is never called.
SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error
SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error
// Connects to the central system and starts the charge point routine.
// The function doesn't block and returns right away, after having attempted to open a connection to the central system.
// If the connection couldn't be opened, an error is returned.
Expand Down
9 changes: 7 additions & 2 deletions ocppj/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocppj

import (
"context"
"fmt"

"github.com/lorenzodonini/ocpp-go/ocpp"
Expand Down Expand Up @@ -108,7 +109,7 @@ func (c *Client) Stop() {
// - the endpoint doesn't support the feature
//
// - the output queue is full
func (c *Client) SendRequest(request ocpp.Request) error {
func (c *Client) SendRequestCtx(ctx context.Context, request ocpp.Request) error {
if !c.dispatcher.IsRunning() {
return fmt.Errorf("ocppj client is not started, couldn't send request")
}
Expand All @@ -125,14 +126,18 @@ func (c *Client) SendRequest(request ocpp.Request) error {
return err
}
// Message will be processed by dispatcher. A dedicated mechanism allows to delegate the message queue handling.
if err := c.dispatcher.SendRequest(RequestBundle{Call: call, Data: jsonMessage}); err != nil {
if err := c.dispatcher.SendRequest(RequestBundle{Call: call, Data: jsonMessage, Ctx: ctx}); err != nil {
log.Errorf("request %v - %v: %v", call.UniqueId, call.Action, err)
return err
}
log.Debugf("enqueued request %v - %v", call.UniqueId, call.Action)
return nil
}

func (c *Client) SendRequest(request ocpp.Request) error {
return c.SendRequestCtx(context.Background(), request)
}

// Sends an OCPP Response to the server.
// The requestID parameter is required and identifies the previously received request.
//
Expand Down
54 changes: 36 additions & 18 deletions ocppj/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,20 @@ func (d *DefaultClientDispatcher) SendRequest(req interface{}) error {

func (d *DefaultClientDispatcher) messagePump() {
rdy := true // Ready to transmit at the beginning
cancelled := func() {
if d.pendingRequestState.HasPendingRequest() {
// Current request timed out. Removing request and triggering cancel callback
el := d.requestQueue.Peek()
bundle, _ := el.(RequestBundle)
d.CompleteRequest(bundle.Call.UniqueId)
if d.onRequestCancel != nil {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload)
}
}
// No request is currently pending -> set timer to high number
d.timer.Reset(defaultTimeoutTick)
}
var reqContextDone <-chan struct{}
for {
select {
case _, ok := <-d.requestChannel:
Expand All @@ -177,22 +191,15 @@ func (d *DefaultClientDispatcher) messagePump() {
d.requestChannel = nil
return
}
case <-reqContextDone:
// user cancelled request
cancelled()
case _, ok := <-d.timer.C:
// Timeout elapsed
if !ok {
continue
}
if d.pendingRequestState.HasPendingRequest() {
// Current request timed out. Removing request and triggering cancel callback
el := d.requestQueue.Peek()
bundle, _ := el.(RequestBundle)
d.CompleteRequest(bundle.Call.UniqueId)
if d.onRequestCancel != nil {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload)
}
}
// No request is currently pending -> set timer to high number
d.timer.Reset(defaultTimeoutTick)
cancelled()
case rdy = <-d.readyForDispatch:
// Ready flag set, keep going
}
Expand All @@ -206,22 +213,32 @@ func (d *DefaultClientDispatcher) messagePump() {
}
// Only dispatch request if able to send and request queue isn't empty
if rdy && !d.requestQueue.IsEmpty() {
d.dispatchNextRequest()
rdy = false
// Set timer
if !d.timer.Stop() {
<-d.timer.C
if done, ok := d.dispatchNextRequest(); ok {
rdy = false
// Set timer
if !d.timer.Stop() {
<-d.timer.C
}
d.timer.Reset(d.timeout)
reqContextDone = done
}
d.timer.Reset(d.timeout)
}
}
}

func (d *DefaultClientDispatcher) dispatchNextRequest() {
func (d *DefaultClientDispatcher) dispatchNextRequest() (<-chan struct{}, bool) {
// Get first element in queue
el := d.requestQueue.Peek()
bundle, _ := el.(RequestBundle)
jsonMessage := bundle.Data
select {
case <-bundle.Ctx.Done():
if d.onRequestCancel != nil {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload)
}
return nil, false
default:
}
d.pendingRequestState.AddPendingRequest(bundle.Call.UniqueId, bundle.Call.Payload)
// Attempt to send over network
err := d.network.Write(jsonMessage)
Expand All @@ -232,6 +249,7 @@ func (d *DefaultClientDispatcher) dispatchNextRequest() {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload)
}
}
return bundle.Ctx.Done(), true
}

func (d *DefaultClientDispatcher) Pause() {
Expand Down
2 changes: 2 additions & 0 deletions ocppj/queue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocppj

import (
"context"
"fmt"
"sync"
)
Expand All @@ -10,6 +11,7 @@ import (
type RequestBundle struct {
Call *Call
Data []byte
Ctx context.Context
}

// RequestQueue can be arbitrarily implemented, as long as it conforms to the Queue interface.
Expand Down

0 comments on commit 24ecdea

Please sign in to comment.