From 24ecdea9fc79856c315ffe167c1c5bd08a321dc4 Mon Sep 17 00:00:00 2001 From: Michael Beaumont Date: Wed, 21 Jul 2021 09:27:24 +0200 Subject: [PATCH] Allow users to cancel requests --- ocpp1.6/charge_point.go | 5 ++-- ocpp1.6/v16.go | 3 ++- ocppj/client.go | 9 +++++-- ocppj/dispatcher.go | 54 +++++++++++++++++++++++++++-------------- ocppj/queue.go | 2 ++ 5 files changed, 50 insertions(+), 23 deletions(-) diff --git a/ocpp1.6/charge_point.go b/ocpp1.6/charge_point.go index c30fd0b0..a61ef812 100644 --- a/ocpp1.6/charge_point.go +++ b/ocpp1.6/charge_point.go @@ -1,6 +1,7 @@ package ocpp16 import ( + "context" "fmt" "github.com/lorenzodonini/ocpp-go/internal/callbackqueue" @@ -238,7 +239,7 @@ func (cp *chargePoint) SendRequest(request ocpp.Request) (ocpp.Response, error) } } -func (cp *chargePoint) SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, err error)) error { +func (cp *chargePoint) SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(confirmation ocpp.Response, err error)) error { featureName := request.GetFeatureName() if _, found := cp.client.GetProfileForFeature(featureName); !found { return fmt.Errorf("feature %v is unsupported on charge point (missing profile), cannot send request", featureName) @@ -252,7 +253,7 @@ func (cp *chargePoint) SendRequestAsync(request ocpp.Request, callback func(conf } // Response will be retrieved asynchronously via asyncHandler send := func() error { - return cp.client.SendRequest(request) + return cp.client.SendRequestCtx(ctx, request) } err := cp.callbacks.TryQueue("main", send, callback) return err diff --git a/ocpp1.6/v16.go b/ocpp1.6/v16.go index 2382d2ad..d0e56ed0 100644 --- a/ocpp1.6/v16.go +++ b/ocpp1.6/v16.go @@ -2,6 +2,7 @@ package ocpp16 import ( + "context" "crypto/tls" "github.com/gorilla/websocket" @@ -86,7 +87,7 @@ type ChargePoint interface { // The central system will respond with a confirmation messages, or with an error if the request was invalid or could not be processed. // This result is propagated via a callback, called asynchronously. // In case of network issues (i.e. the remote host couldn't be reached), the function returns an error directly. In this case, the callback is never called. - SendRequestAsync(request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error + SendRequestAsync(ctx context.Context, request ocpp.Request, callback func(confirmation ocpp.Response, protoError error)) error // Connects to the central system and starts the charge point routine. // The function doesn't block and returns right away, after having attempted to open a connection to the central system. // If the connection couldn't be opened, an error is returned. diff --git a/ocppj/client.go b/ocppj/client.go index 08ea779f..1e02394d 100644 --- a/ocppj/client.go +++ b/ocppj/client.go @@ -1,6 +1,7 @@ package ocppj import ( + "context" "fmt" "github.com/lorenzodonini/ocpp-go/ocpp" @@ -108,7 +109,7 @@ func (c *Client) Stop() { // - the endpoint doesn't support the feature // // - the output queue is full -func (c *Client) SendRequest(request ocpp.Request) error { +func (c *Client) SendRequestCtx(ctx context.Context, request ocpp.Request) error { if !c.dispatcher.IsRunning() { return fmt.Errorf("ocppj client is not started, couldn't send request") } @@ -125,7 +126,7 @@ func (c *Client) SendRequest(request ocpp.Request) error { return err } // Message will be processed by dispatcher. A dedicated mechanism allows to delegate the message queue handling. - if err := c.dispatcher.SendRequest(RequestBundle{Call: call, Data: jsonMessage}); err != nil { + if err := c.dispatcher.SendRequest(RequestBundle{Call: call, Data: jsonMessage, Ctx: ctx}); err != nil { log.Errorf("request %v - %v: %v", call.UniqueId, call.Action, err) return err } @@ -133,6 +134,10 @@ func (c *Client) SendRequest(request ocpp.Request) error { return nil } +func (c *Client) SendRequest(request ocpp.Request) error { + return c.SendRequestCtx(context.Background(), request) +} + // Sends an OCPP Response to the server. // The requestID parameter is required and identifies the previously received request. // diff --git a/ocppj/dispatcher.go b/ocppj/dispatcher.go index afcca16f..19976ffa 100644 --- a/ocppj/dispatcher.go +++ b/ocppj/dispatcher.go @@ -168,6 +168,20 @@ func (d *DefaultClientDispatcher) SendRequest(req interface{}) error { func (d *DefaultClientDispatcher) messagePump() { rdy := true // Ready to transmit at the beginning + cancelled := func() { + if d.pendingRequestState.HasPendingRequest() { + // Current request timed out. Removing request and triggering cancel callback + el := d.requestQueue.Peek() + bundle, _ := el.(RequestBundle) + d.CompleteRequest(bundle.Call.UniqueId) + if d.onRequestCancel != nil { + d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload) + } + } + // No request is currently pending -> set timer to high number + d.timer.Reset(defaultTimeoutTick) + } + var reqContextDone <-chan struct{} for { select { case _, ok := <-d.requestChannel: @@ -177,22 +191,15 @@ func (d *DefaultClientDispatcher) messagePump() { d.requestChannel = nil return } + case <-reqContextDone: + // user cancelled request + cancelled() case _, ok := <-d.timer.C: // Timeout elapsed if !ok { continue } - if d.pendingRequestState.HasPendingRequest() { - // Current request timed out. Removing request and triggering cancel callback - el := d.requestQueue.Peek() - bundle, _ := el.(RequestBundle) - d.CompleteRequest(bundle.Call.UniqueId) - if d.onRequestCancel != nil { - d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload) - } - } - // No request is currently pending -> set timer to high number - d.timer.Reset(defaultTimeoutTick) + cancelled() case rdy = <-d.readyForDispatch: // Ready flag set, keep going } @@ -206,22 +213,32 @@ func (d *DefaultClientDispatcher) messagePump() { } // Only dispatch request if able to send and request queue isn't empty if rdy && !d.requestQueue.IsEmpty() { - d.dispatchNextRequest() - rdy = false - // Set timer - if !d.timer.Stop() { - <-d.timer.C + if done, ok := d.dispatchNextRequest(); ok { + rdy = false + // Set timer + if !d.timer.Stop() { + <-d.timer.C + } + d.timer.Reset(d.timeout) + reqContextDone = done } - d.timer.Reset(d.timeout) } } } -func (d *DefaultClientDispatcher) dispatchNextRequest() { +func (d *DefaultClientDispatcher) dispatchNextRequest() (<-chan struct{}, bool) { // Get first element in queue el := d.requestQueue.Peek() bundle, _ := el.(RequestBundle) jsonMessage := bundle.Data + select { + case <-bundle.Ctx.Done(): + if d.onRequestCancel != nil { + d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload) + } + return nil, false + default: + } d.pendingRequestState.AddPendingRequest(bundle.Call.UniqueId, bundle.Call.Payload) // Attempt to send over network err := d.network.Write(jsonMessage) @@ -232,6 +249,7 @@ func (d *DefaultClientDispatcher) dispatchNextRequest() { d.onRequestCancel(bundle.Call.UniqueId, bundle.Call.Action, bundle.Call.Payload) } } + return bundle.Ctx.Done(), true } func (d *DefaultClientDispatcher) Pause() { diff --git a/ocppj/queue.go b/ocppj/queue.go index c4f004a7..b402b8c6 100644 --- a/ocppj/queue.go +++ b/ocppj/queue.go @@ -1,6 +1,7 @@ package ocppj import ( + "context" "fmt" "sync" ) @@ -10,6 +11,7 @@ import ( type RequestBundle struct { Call *Call Data []byte + Ctx context.Context } // RequestQueue can be arbitrarily implemented, as long as it conforms to the Queue interface.