Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow users to cancel requests #105

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ocpp1.6/charge_point.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocpp16

import (
"context"
"fmt"

"github.com/lorenzodonini/ocpp-go/internal/callbackqueue"
Expand Down Expand Up @@ -238,7 +239,7 @@ func (cp *chargePoint) SendRequest(request ocpp.Request) (ocpp.Response, error)
}
}

func (cp *chargePoint) SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, err error)) error {
func (cp *chargePoint) SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(confirmation ocpp.Response, err error)) error {
featureName := request.GetFeatureName()
if _, found := cp.client.GetProfileForFeature(featureName); !found {
return fmt.Errorf("feature %v is unsupported on charge point (missing profile), cannot send request", featureName)
Expand All @@ -252,7 +253,7 @@ func (cp *chargePoint) SendRequestAsync(request ocpp.Request, callback func(conf
}
// Response will be retrieved asynchronously via asyncHandler
send := func() error {
return cp.client.SendRequest(request)
return cp.client.SendRequestCtx(ctx, request)
}
err := cp.callbacks.TryQueue("main", send, callback)
return err
Expand Down
3 changes: 2 additions & 1 deletion ocpp1.6/v16.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package ocpp16

import (
"context"
"crypto/tls"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -86,7 +87,7 @@ type ChargePoint interface {
// The central system will respond with a confirmation messages, or with an error if the request was invalid or could not be processed.
// This result is propagated via a callback, called asynchronously.
// In case of network issues (i.e. the remote host couldn't be reached), the function returns an error directly. In this case, the callback is never called.
SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error
SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error
// Connects to the central system and starts the charge point routine.
// The function doesn't block and returns right away, after having attempted to open a connection to the central system.
// If the connection couldn't be opened, an error is returned.
Expand Down
3 changes: 2 additions & 1 deletion ocpp1.6_test/ocpp16_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocpp16_test

import (
"context"
"crypto/tls"
"fmt"
"reflect"
Expand Down Expand Up @@ -550,7 +551,7 @@ func testUnsupportedRequestFromChargePoint(suite *OcppV16TestSuite, request ocpp
err := suite.chargePoint.Start(wsUrl)
assert.Nil(t, err)
// Run request test, expecting an error
err = suite.chargePoint.SendRequestAsync(request, func(confirmation ocpp.Response, err error) {
err = suite.chargePoint.SendRequestAsync(context.Background(), request, func(confirmation ocpp.Response, err error) {
t.Fail()
})
assert.Error(t, err)
Expand Down
5 changes: 3 additions & 2 deletions ocpp2.0.1/charging_station.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocpp2

import (
"context"
"fmt"

"github.com/lorenzodonini/ocpp-go/internal/callbackqueue"
Expand Down Expand Up @@ -489,7 +490,7 @@ func (cs *chargingStation) SendRequest(request ocpp.Request) (ocpp.Response, err
return asyncResult.r, asyncResult.e
}

func (cs *chargingStation) SendRequestAsync(request ocpp.Request, callback func(response ocpp.Response, err error)) error {
func (cs *chargingStation) SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(response ocpp.Response, err error)) error {
featureName := request.GetFeatureName()
if _, found := cs.client.GetProfileForFeature(featureName); !found {
return fmt.Errorf("feature %v is unsupported on charging station (missing profile), cannot send request", featureName)
Expand Down Expand Up @@ -526,7 +527,7 @@ func (cs *chargingStation) SendRequestAsync(request ocpp.Request, callback func(
}
// Response will be retrieved asynchronously via asyncHandler
send := func() error {
return cs.client.SendRequest(request)
return cs.client.SendRequestCtx(ctx, request)
}
err := cs.callbacks.TryQueue("main", send, callback)
return err
Expand Down
3 changes: 2 additions & 1 deletion ocpp2.0.1/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package ocpp2

import (
"context"
"crypto/tls"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -150,7 +151,7 @@ type ChargingStation interface {
// This result is propagated via a callback, called asynchronously.
//
// In case of network issues (i.e. the remote host couldn't be reached), the function returns an error directly. In this case, the callback is never invoked.
SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error
SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error
// Connects to the CSMS and starts the charging station routine.
// The function doesn't block and returns right away, after having attempted to open a connection to the CSMS.
// If the connection couldn't be opened, an error is returned.
Expand Down
3 changes: 2 additions & 1 deletion ocpp2.0.1_test/ocpp2_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocpp2_test

import (
"context"
"crypto/tls"
"fmt"
"reflect"
Expand Down Expand Up @@ -984,7 +985,7 @@ func testUnsupportedRequestFromChargingStation(suite *OcppV2TestSuite, request o
err := suite.chargingStation.Start(wsUrl)
require.Nil(t, err)
// Run request test
err = suite.chargingStation.SendRequestAsync(request, func(confirmation ocpp.Response, err error) {
err = suite.chargingStation.SendRequestAsync(context.Background(), request, func(confirmation ocpp.Response, err error) {
t.Fail()
})
require.Error(t, err)
Expand Down
29 changes: 21 additions & 8 deletions ocppj/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocppj

import (
"context"
"fmt"

"github.com/lorenzodonini/ocpp-go/ocpp"
Expand Down Expand Up @@ -108,7 +109,7 @@ func (c *Client) Stop() {
// - the endpoint doesn't support the feature
//
// - the output queue is full
func (c *Client) SendRequest(request ocpp.Request) error {
func (c *Client) SendRequestCtx(ctx context.Context, request ocpp.Request) error {
if !c.dispatcher.IsRunning() {
return fmt.Errorf("ocppj client is not started, couldn't send request")
}
Expand All @@ -125,14 +126,18 @@ func (c *Client) SendRequest(request ocpp.Request) error {
return err
}
// Message will be processed by dispatcher. A dedicated mechanism allows to delegate the message queue handling.
if err := c.dispatcher.SendRequest(RequestBundle{Call: call, Data: jsonMessage}); err != nil {
if err := c.dispatcher.SendRequest(RequestBundle{Call: call, Data: jsonMessage, Ctx: ctx}); err != nil {
log.Errorf("request %v - %v: %v", call.UniqueId, call.Action, err)
return err
}
log.Debugf("enqueued request %v - %v", call.UniqueId, call.Action)
return nil
}

func (c *Client) SendRequest(request ocpp.Request) error {
return c.SendRequestCtx(context.Background(), request)
}

// Sends an OCPP Response to the server.
// The requestID parameter is required and identifies the previously received request.
//
Expand Down Expand Up @@ -205,15 +210,23 @@ func (c *Client) ocppMessageHandler(data []byte) error {
c.requestHandler(call.Payload, call.UniqueId, call.Action)
case CALL_RESULT:
callResult := message.(*CallResult)
c.dispatcher.CompleteRequest(callResult.GetUniqueId()) // Remove current request from queue and send next one
if c.responseHandler != nil {
c.responseHandler(callResult.Payload, callResult.UniqueId)
done := c.dispatcher.CompleteRequest(callResult.GetUniqueId()) // Remove current request from queue and send next one
select {
case <-done:
default:
if c.responseHandler != nil {
c.responseHandler(callResult.Payload, callResult.UniqueId)
}
}
case CALL_ERROR:
callError := message.(*CallError)
c.dispatcher.CompleteRequest(callError.GetUniqueId()) // Remove current request from queue and send next one
if c.errorHandler != nil {
c.errorHandler(ocpp.NewError(callError.ErrorCode, callError.ErrorDescription, callError.UniqueId), callError.ErrorDetails)
done := c.dispatcher.CompleteRequest(callError.GetUniqueId()) // Remove current request from queue and send next one
select {
case <-done:
default:
if c.errorHandler != nil {
c.errorHandler(ocpp.NewError(callError.ErrorCode, callError.ErrorDescription, callError.UniqueId), callError.ErrorDetails)
}
}
}
}
Expand Down
70 changes: 47 additions & 23 deletions ocppj/dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocppj

import (
"context"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -42,7 +43,7 @@ type ClientDispatcher interface {
// Notifies the dispatcher that a request has been completed (i.e. a response was received).
// The dispatcher takes care of removing the request marked by the requestID from
// the pending requests. It will then attempt to process the next queued request.
CompleteRequest(requestID string)
CompleteRequest(requestID string) <-chan struct{}
// Sets a callback to be invoked when a request gets canceled, due to network timeouts.
// The callback passes the original message ID, feature name and request struct of the failed request.
//
Expand Down Expand Up @@ -168,6 +169,20 @@ func (d *DefaultClientDispatcher) SendRequest(req RequestBundle) error {

func (d *DefaultClientDispatcher) messagePump() {
rdy := true // Ready to transmit at the beginning
cancelled := func() {
if d.pendingRequestState.HasPendingRequest() {
// Current request timed out. Removing request and triggering cancel callback
el := d.requestQueue.Peek()
bundle, _ := el.(RequestBundle)
d.CompleteRequest(bundle.Call.UniqueId)
if d.onRequestCancel != nil {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload)
}
}
// No request is currently pending -> set timer to high number
d.timer.Reset(defaultTimeoutTick)
}
var reqContextDone <-chan struct{}
for {
select {
case _, ok := <-d.requestChannel:
Expand All @@ -177,22 +192,15 @@ func (d *DefaultClientDispatcher) messagePump() {
d.requestChannel = nil
return
}
case <-reqContextDone:
// user cancelled request
cancelled()
case _, ok := <-d.timer.C:
// Timeout elapsed
if !ok {
continue
}
if d.pendingRequestState.HasPendingRequest() {
// Current request timed out. Removing request and triggering cancel callback
el := d.requestQueue.Peek()
bundle, _ := el.(RequestBundle)
d.CompleteRequest(bundle.Call.UniqueId)
if d.onRequestCancel != nil {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload)
}
}
// No request is currently pending -> set timer to high number
d.timer.Reset(defaultTimeoutTick)
cancelled()
case rdy = <-d.readyForDispatch:
// Ready flag set, keep going
}
Expand All @@ -206,22 +214,36 @@ func (d *DefaultClientDispatcher) messagePump() {
}
// Only dispatch request if able to send and request queue isn't empty
if rdy && !d.requestQueue.IsEmpty() {
d.dispatchNextRequest()
rdy = false
// Set timer
if !d.timer.Stop() {
<-d.timer.C
if done, ok := d.dispatchNextRequest(); ok {
rdy = false
// Set timer
if !d.timer.Stop() {
<-d.timer.C
}
d.timer.Reset(d.timeout)
reqContextDone = done
}
d.timer.Reset(d.timeout)
}
}
}

func (d *DefaultClientDispatcher) dispatchNextRequest() {
func (d *DefaultClientDispatcher) dispatchNextRequest() (<-chan struct{}, bool) {
// Get first element in queue
el := d.requestQueue.Peek()
bundle, _ := el.(RequestBundle)
jsonMessage := bundle.Data
ctx := bundle.Ctx
if ctx == nil {
ctx = context.Background()
}
select {
case <-ctx.Done():
if d.onRequestCancel != nil {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload)
}
return nil, false
default:
}
d.pendingRequestState.AddPendingRequest(bundle.Call.UniqueId, bundle.Call.Payload)
// Attempt to send over network
err := d.network.Write(jsonMessage)
Expand All @@ -232,6 +254,7 @@ func (d *DefaultClientDispatcher) dispatchNextRequest() {
d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload)
}
}
return ctx.Done(), true
}

func (d *DefaultClientDispatcher) Pause() {
Expand All @@ -257,22 +280,23 @@ func (d *DefaultClientDispatcher) Resume() {
}
}

func (d *DefaultClientDispatcher) CompleteRequest(requestId string) {
func (d *DefaultClientDispatcher) CompleteRequest(requestId string) <-chan struct{} {
el := d.requestQueue.Peek()
if el == nil {
log.Errorf("attempting to pop front of queue, but queue is empty")
return
return nil
}
bundle, _ := el.(RequestBundle)
if bundle.Call.UniqueId != requestId {
log.Errorf("internal state mismatch: received response for %v but expected response for %v", requestId, bundle.Call.UniqueId)
return
return nil
}
d.requestQueue.Pop()
request := d.requestQueue.Pop().(RequestBundle)
d.pendingRequestState.DeletePendingRequest(requestId)
log.Debugf("removed request %v from front of queue", bundle.Call.UniqueId)
// Signal that next message in queue may be sent
d.readyForDispatch <- true
return request.Ctx.Done()
}

// ServerDispatcher contains the state and logic for handling outgoing messages on a server endpoint.
Expand Down
2 changes: 2 additions & 0 deletions ocppj/queue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocppj

import (
"context"
"fmt"
"sync"
)
Expand All @@ -10,6 +11,7 @@ import (
type RequestBundle struct {
Call *Call
Data []byte
Ctx context.Context
}

// RequestQueue can be arbitrarily implemented, as long as it conforms to the Queue interface.
Expand Down
3 changes: 2 additions & 1 deletion ocppj/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocppj

import (
"context"
"fmt"

"github.com/lorenzodonini/ocpp-go/ocpp"
Expand Down Expand Up @@ -137,7 +138,7 @@ func (s *Server) SendRequest(clientID string, request ocpp.Request) error {
return err
}
// Will not send right away. Queuing message and let it be processed by dedicated requestPump routine
if err := s.dispatcher.SendRequest(clientID, RequestBundle{call, jsonMessage}); err != nil {
if err := s.dispatcher.SendRequest(clientID, RequestBundle{call, jsonMessage, context.Background()}); err != nil {
log.Errorf("request %v - %v for client %v: %v", call.UniqueId, call.Action, clientID, err)
return err
}
Expand Down