Skip to content

Commit

Permalink
Allow users to cancel requests
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbeaumont committed Jul 25, 2021
1 parent 0d571bd commit a9dd40f
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 29 deletions.
5 changes: 3 additions & 2 deletions ocpp1.6/charge_point.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocpp16

import (
"context"
"fmt"

"github.com/lorenzodonini/ocpp-go/internal/callbackqueue"
Expand Down Expand Up @@ -238,7 +239,7 @@ func (cp *chargePoint) SendRequest(request ocpp.Request) (ocpp.Response, error)
}
}

func (cp *chargePoint) SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, err error)) error {
func (cp *chargePoint) SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(confirmation ocpp.Response, err error)) error {
featureName := request.GetFeatureName()
if _, found := cp.client.GetProfileForFeature(featureName); !found {
return fmt.Errorf("feature %v is unsupported on charge point (missing profile), cannot send request", featureName)
Expand All @@ -252,7 +253,7 @@ func (cp *chargePoint) SendRequestAsync(request ocpp.Request, callback func(conf
}
// Response will be retrieved asynchronously via asyncHandler
send := func() error {
return cp.client.SendRequest(request)
return cp.client.SendRequestCtx(ctx, request)
}
err := cp.callbacks.TryQueue("main", send, callback)
return err
Expand Down
3 changes: 2 additions & 1 deletion ocpp1.6/v16.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package ocpp16

import (
"context"
"crypto/tls"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -86,7 +87,7 @@ type ChargePoint interface {
// The central system will respond with a confirmation messages, or with an error if the request was invalid or could not be processed.
// This result is propagated via a callback, called asynchronously.
// In case of network issues (i.e. the remote host couldn't be reached), the function returns an error directly. In this case, the callback is never called.
SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error
SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error
// Connects to the central system and starts the charge point routine.
// The function doesn't block and returns right away, after having attempted to open a connection to the central system.
// If the connection couldn't be opened, an error is returned.
Expand Down
3 changes: 2 additions & 1 deletion ocpp1.6_test/ocpp16_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocpp16_test

import (
"context"
"crypto/tls"
"fmt"
"reflect"
Expand Down Expand Up @@ -550,7 +551,7 @@ func testUnsupportedRequestFromChargePoint(suite *OcppV16TestSuite, request ocpp
err := suite.chargePoint.Start(wsUrl)
assert.Nil(t, err)
// Run request test, expecting an error
err = suite.chargePoint.SendRequestAsync(request, func(confirmation ocpp.Response, err error) {
err = suite.chargePoint.SendRequestAsync(context.Background(), request, func(confirmation ocpp.Response, err error) {
t.Fail()
})
assert.Error(t, err)
Expand Down
5 changes: 3 additions & 2 deletions ocpp2.0.1/charging_station.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocpp2

import (
"context"
"fmt"

"github.com/lorenzodonini/ocpp-go/internal/callbackqueue"
Expand Down Expand Up @@ -489,7 +490,7 @@ func (cs *chargingStation) SendRequest(request ocpp.Request) (ocpp.Response, err
return asyncResult.r, asyncResult.e
}

func (cs *chargingStation) SendRequestAsync(request ocpp.Request, callback func(response ocpp.Response, err error)) error {
func (cs *chargingStation) SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(response ocpp.Response, err error)) error {
featureName := request.GetFeatureName()
if _, found := cs.client.GetProfileForFeature(featureName); !found {
return fmt.Errorf("feature %v is unsupported on charging station (missing profile), cannot send request", featureName)
Expand Down Expand Up @@ -526,7 +527,7 @@ func (cs *chargingStation) SendRequestAsync(request ocpp.Request, callback func(
}
// Response will be retrieved asynchronously via asyncHandler
send := func() error {
return cs.client.SendRequest(request)
return cs.client.SendRequestCtx(ctx, request)
}
err := cs.callbacks.TryQueue("main", send, callback)
return err
Expand Down
3 changes: 2 additions & 1 deletion ocpp2.0.1/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package ocpp2

import (
"context"
"crypto/tls"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -150,7 +151,7 @@ type ChargingStation interface {
// This result is propagated via a callback, called asynchronously.
//
// In case of network issues (i.e. the remote host couldn't be reached), the function returns an error directly. In this case, the callback is never invoked.
SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error
SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error
// Connects to the CSMS and starts the charging station routine.
// The function doesn't block and returns right away, after having attempted to open a connection to the CSMS.
// If the connection couldn't be opened, an error is returned.
Expand Down
3 changes: 2 additions & 1 deletion ocpp2.0.1_test/ocpp2_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocpp2_test

import (
"context"
"crypto/tls"
"fmt"
"reflect"
Expand Down Expand Up @@ -984,7 +985,7 @@ func testUnsupportedRequestFromChargingStation(suite *OcppV2TestSuite, request o
err := suite.chargingStation.Start(wsUrl)
require.Nil(t, err)
// Run request test
err = suite.chargingStation.SendRequestAsync(request, func(confirmation ocpp.Response, err error) {
err = suite.chargingStation.SendRequestAsync(context.Background(), request, func(confirmation ocpp.Response, err error) {
t.Fail()
})
require.Error(t, err)
Expand Down
9 changes: 7 additions & 2 deletions ocppj/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocppj

import (
"context"
"fmt"

"github.com/lorenzodonini/ocpp-go/ocpp"
Expand Down Expand Up @@ -108,7 +109,7 @@ func (c *Client) Stop() {
// - the endpoint doesn't support the feature
//
// - the output queue is full
func (c *Client) SendRequest(request ocpp.Request) error {
func (c *Client) SendRequestCtx(ctx context.Context, request ocpp.Request) error {
if !c.dispatcher.IsRunning() {
return fmt.Errorf("ocppj client is not started, couldn't send request")
}
Expand All @@ -125,14 +126,18 @@ func (c *Client) SendRequest(request ocpp.Request) error {
return err
}
// Message will be processed by dispatcher. A dedicated mechanism allows to delegate the message queue handling.
if err := c.dispatcher.SendRequest(RequestBundle{Call: call, Data: jsonMessage}); err != nil {
if err := c.dispatcher.SendRequest(RequestBundle{Call: call, Data: jsonMessage, Ctx: ctx}); err != nil {
log.Errorf("request %v - %v: %v", call.UniqueId, call.Action, err)
return err
}
log.Debugf("enqueued request %v - %v", call.UniqueId, call.Action)
return nil
}

func (c *Client) SendRequest(request ocpp.Request) error {
return c.SendRequestCtx(context.Background(), request)
}

// Sends an OCPP Response to the server.
// The requestID parameter is required and identifies the previously received request.
//
Expand Down
59 changes: 41 additions & 18 deletions ocppj/dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocppj

import (
"context"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -168,6 +169,20 @@ func (d *DefaultClientDispatcher) SendRequest(req RequestBundle) error {

func (d *DefaultClientDispatcher) messagePump() {
rdy := true // Ready to transmit at the beginning
cancelled := func() {
if d.pendingRequestState.HasPendingRequest() {
// Current request timed out. Removing request and triggering cancel callback
el := d.requestQueue.Peek()
bundle, _ := el.(RequestBundle)
d.CompleteRequest(bundle.Call.UniqueId)
if d.onRequestCancel != nil {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload)
}
}
// No request is currently pending -> set timer to high number
d.timer.Reset(defaultTimeoutTick)
}
var reqContextDone <-chan struct{}
for {
select {
case _, ok := <-d.requestChannel:
Expand All @@ -177,22 +192,15 @@ func (d *DefaultClientDispatcher) messagePump() {
d.requestChannel = nil
return
}
case <-reqContextDone:
// user cancelled request
cancelled()
case _, ok := <-d.timer.C:
// Timeout elapsed
if !ok {
continue
}
if d.pendingRequestState.HasPendingRequest() {
// Current request timed out. Removing request and triggering cancel callback
el := d.requestQueue.Peek()
bundle, _ := el.(RequestBundle)
d.CompleteRequest(bundle.Call.UniqueId)
if d.onRequestCancel != nil {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload)
}
}
// No request is currently pending -> set timer to high number
d.timer.Reset(defaultTimeoutTick)
cancelled()
case rdy = <-d.readyForDispatch:
// Ready flag set, keep going
}
Expand All @@ -206,22 +214,36 @@ func (d *DefaultClientDispatcher) messagePump() {
}
// Only dispatch request if able to send and request queue isn't empty
if rdy && !d.requestQueue.IsEmpty() {
d.dispatchNextRequest()
rdy = false
// Set timer
if !d.timer.Stop() {
<-d.timer.C
if done, ok := d.dispatchNextRequest(); ok {
rdy = false
// Set timer
if !d.timer.Stop() {
<-d.timer.C
}
d.timer.Reset(d.timeout)
reqContextDone = done
}
d.timer.Reset(d.timeout)
}
}
}

func (d *DefaultClientDispatcher) dispatchNextRequest() {
func (d *DefaultClientDispatcher) dispatchNextRequest() (<-chan struct{}, bool) {
// Get first element in queue
el := d.requestQueue.Peek()
bundle, _ := el.(RequestBundle)
jsonMessage := bundle.Data
ctx := bundle.Ctx
if ctx == nil {
ctx = context.Background()
}
select {
case <-ctx.Done():
if d.onRequestCancel != nil {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload)
}
return nil, false
default:
}
d.pendingRequestState.AddPendingRequest(bundle.Call.UniqueId, bundle.Call.Payload)
// Attempt to send over network
err := d.network.Write(jsonMessage)
Expand All @@ -232,6 +254,7 @@ func (d *DefaultClientDispatcher) dispatchNextRequest() {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload)
}
}
return ctx.Done(), true
}

func (d *DefaultClientDispatcher) Pause() {
Expand Down
2 changes: 2 additions & 0 deletions ocppj/queue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocppj

import (
"context"
"fmt"
"sync"
)
Expand All @@ -10,6 +11,7 @@ import (
type RequestBundle struct {
Call *Call
Data []byte
Ctx context.Context
}

// RequestQueue can be arbitrarily implemented, as long as it conforms to the Queue interface.
Expand Down
3 changes: 2 additions & 1 deletion ocppj/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocppj

import (
"context"
"fmt"

"github.com/lorenzodonini/ocpp-go/ocpp"
Expand Down Expand Up @@ -137,7 +138,7 @@ func (s *Server) SendRequest(clientID string, request ocpp.Request) error {
return err
}
// Will not send right away. Queuing message and let it be processed by dedicated requestPump routine
if err := s.dispatcher.SendRequest(clientID, RequestBundle{call, jsonMessage}); err != nil {
if err := s.dispatcher.SendRequest(clientID, RequestBundle{call, jsonMessage, context.Background()}); err != nil {
log.Errorf("request %v - %v for client %v: %v", call.UniqueId, call.Action, clientID, err)
return err
}
Expand Down

0 comments on commit a9dd40f

Please sign in to comment.