Skip to content

Commit

Permalink
[FIX] [JS] JsMsg#ackAck() didn't provide a way to customize the timeo…
Browse files Browse the repository at this point in the history
…ut. Added optional option with timeout for ackAck(), and also defaulted the timeout to match the JSClient's timeout (as provided by the JetStreamOptions)
  • Loading branch information
aricart committed Jun 19, 2024
1 parent 8b7c1f4 commit 786c994
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 40 deletions.
2 changes: 1 addition & 1 deletion jetstream/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
} else {
// push the user message
this._push(toJsMsg(msg));
this._push(toJsMsg(msg, this.consumer.api.timeout));
this.received++;
if (this.pending.msgs) {
this.pending.msgs--;
Expand Down
72 changes: 39 additions & 33 deletions jetstream/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ export class JetStreamClientImpl extends BaseApiClient
if (err) {
throw err;
}
return toJsMsg(msg);
return toJsMsg(msg, this.timeout);
}

/*
Expand Down Expand Up @@ -389,7 +389,7 @@ export class JetStreamClientImpl extends BaseApiClient
// if we are doing heartbeats, message resets
monitor?.work();
qi.received++;
qi.push(toJsMsg(msg));
qi.push(toJsMsg(msg, this.timeout));
}
},
});
Expand Down Expand Up @@ -653,7 +653,7 @@ export class JetStreamClientImpl extends BaseApiClient
jsi: JetStreamSubscriptionInfo,
): TypedSubscriptionOptions<JsMsg> {
const so = {} as TypedSubscriptionOptions<JsMsg>;
so.adapter = msgAdapter(jsi.callbackFn === undefined);
so.adapter = msgAdapter(jsi.callbackFn === undefined, this.timeout);
so.ingestionFilterFn = JetStreamClientImpl.ingestionFn(jsi.ordered);
so.protocolFilterFn = (jm, ingest = false): boolean => {
const jsmi = jm as JsMsgImpl;
Expand Down Expand Up @@ -979,44 +979,50 @@ class JetStreamPullSubscriptionImpl extends JetStreamSubscriptionImpl
}
}

function msgAdapter(iterator: boolean): MsgAdapter<JsMsg> {
function msgAdapter(iterator: boolean, ackTimeout: number): MsgAdapter<JsMsg> {
if (iterator) {
return iterMsgAdapter;
return iterMsgAdapter(ackTimeout);
} else {
return cbMsgAdapter;
return cbMsgAdapter(ackTimeout);
}
}

function cbMsgAdapter(
err: NatsError | null,
msg: Msg,
): [NatsError | null, JsMsg | null] {
if (err) {
return [err, null];
}
err = checkJsError(msg);
if (err) {
return [err, null];
}
// assuming that the protocolFilterFn is set!
return [null, toJsMsg(msg)];
function cbMsgAdapter(ackTimeout: number): MsgAdapter<JsMsg> {
return (
err: NatsError | null,
msg: Msg,
): [NatsError | null, JsMsg | null] => {
if (err) {
return [err, null];
}
err = checkJsError(msg);
if (err) {
return [err, null];
}
// assuming that the protocolFilterFn is set!
return [null, toJsMsg(msg, ackTimeout)];
};
}

function iterMsgAdapter(
err: NatsError | null,
msg: Msg,
): [NatsError | null, JsMsg | null] {
if (err) {
return [err, null];
}
// iterator will close if we have an error
// check for errors that shouldn't close it
const ne = checkJsError(msg);
if (ne !== null) {
return [hideNonTerminalJsErrors(ne), null];
}
// assuming that the protocolFilterFn is set
return [null, toJsMsg(msg)];
ackTimeout: number,
): MsgAdapter<JsMsg> {
return (
err: NatsError | null,
msg: Msg,
): [NatsError | null, JsMsg | null] => {
if (err) {
return [err, null];
}
// iterator will close if we have an error
// check for errors that shouldn't close it
const ne = checkJsError(msg);
if (ne !== null) {
return [hideNonTerminalJsErrors(ne), null];
}
// assuming that the protocolFilterFn is set
return [null, toJsMsg(msg, ackTimeout)];
};
}

function hideNonTerminalJsErrors(ne: NatsError): NatsError | null {
Expand Down
17 changes: 11 additions & 6 deletions jetstream/jsmsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export interface JsMsg {
* successfully and that the JetStream server should acknowledge back
* that the acknowledgement was received.
*/
ackAck(): Promise<boolean>;
ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean>;

/**
* Convenience method to parse the message payload as JSON. This method
Expand All @@ -125,8 +125,8 @@ export interface JsMsg {
string(): string;
}

export function toJsMsg(m: Msg): JsMsg {
return new JsMsgImpl(m);
export function toJsMsg(m: Msg, ackTimeout = 5000): JsMsg {
return new JsMsgImpl(m, ackTimeout);
}

export function parseInfo(s: string): DeliveryInfo {
Expand Down Expand Up @@ -164,10 +164,12 @@ export class JsMsgImpl implements JsMsg {
msg: Msg;
di?: DeliveryInfo;
didAck: boolean;
timeout: number;

constructor(msg: Msg) {
constructor(msg: Msg, timeout: number) {
this.msg = msg;
this.didAck = false;
this.timeout = timeout;
}

get subject(): string {
Expand Down Expand Up @@ -220,7 +222,10 @@ export class JsMsgImpl implements JsMsg {

// this has to dig into the internals as the message has access
// to the protocol but not the high-level client.
async ackAck(): Promise<boolean> {
async ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean> {
opts = opts || {};
opts.timeout = opts.timeout || this.timeout;
console.log(opts.timeout);
const d = deferred<boolean>();
if (!this.didAck) {
this.didAck = true;
Expand All @@ -229,7 +234,7 @@ export class JsMsgImpl implements JsMsg {
const proto = mi.publisher as unknown as ProtocolHandler;
const trace = !(proto.options?.noAsyncTraces || false);
const r = new RequestOne(proto.muxSubscriptions, this.msg.reply, {
timeout: 1000,
timeout: opts.timeout,
}, trace);
proto.request(r);
try {
Expand Down

0 comments on commit 786c994

Please sign in to comment.