Skip to content

Commit

Permalink
added tests and removed console log
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Jun 20, 2024
1 parent 786c994 commit 24df2ca
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 1 deletion.
1 change: 0 additions & 1 deletion jetstream/jsmsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ export class JsMsgImpl implements JsMsg {
async ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean> {
opts = opts || {};
opts.timeout = opts.timeout || this.timeout;
console.log(opts.timeout);
const d = deferred<boolean>();
if (!this.didAck) {
this.didAck = true;
Expand Down
95 changes: 95 additions & 0 deletions jetstream/tests/jsmsg_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
} from "../../src/mod.ts";
import { JsMsgImpl, parseInfo, toJsMsg } from "../jsmsg.ts";
import {
assertBetween,
cleanup,
jetstreamServerConf,
setup,
Expand Down Expand Up @@ -225,3 +226,97 @@ Deno.test("jsmsg - explicit consumer ackAck timeout", async () => {

await cleanup(ns, nc);
});

Deno.test("jsmsg - custom consumer ackAck timeout", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager() as JetStreamManagerImpl;
await jsm.streams.add({
name: "A",
subjects: ["a.>"],
storage: StorageType.Memory,
allow_direct: true,
});

const js = nc.jetstream();
await js.publish("a.a");

await jsm.consumers.add("A", { durable_name: "a" });
const c = await js.consumers.get("A", "a");
const jm = await c.next();
// change the subject
((jm as JsMsgImpl).msg as MsgImpl)._reply = "xxxx";
nc.subscribe("xxxx");
const start = Date.now();
await assertRejects(
(): Promise<boolean> => {
return jm!.ackAck({ timeout: 1500 });
},
Error,
"TIMEOUT",
);
assertBetween(Date.now() - start, 1300, 1700);
await cleanup(ns, nc);
});

Deno.test("jsmsg - custom consumer ackAck timeout in jsopts", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager() as JetStreamManagerImpl;
await jsm.streams.add({
name: "A",
subjects: ["a.>"],
storage: StorageType.Memory,
allow_direct: true,
});

const js = nc.jetstream({ timeout: 2000 });
await js.publish("a.a");

await jsm.consumers.add("A", { durable_name: "a" });
const c = await js.consumers.get("A", "a");
const jm = await c.next();
// change the subject
((jm as JsMsgImpl).msg as MsgImpl)._reply = "xxxx";
nc.subscribe("xxxx");
const start = Date.now();
await assertRejects(
(): Promise<boolean> => {
return jm!.ackAck();
},
Error,
"TIMEOUT",
);
assertBetween(Date.now() - start, 1800, 2200);

await cleanup(ns, nc);
});

Deno.test("jsmsg - ackAck() timeout legacy jsopts", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager() as JetStreamManagerImpl;
await jsm.streams.add({
name: "A",
subjects: ["a.>"],
storage: StorageType.Memory,
allow_direct: true,
});

const js = nc.jetstream({ timeout: 1500 });
await js.publish("a.a");

await jsm.consumers.add("A", { durable_name: "a" });
const jm = await js.pull("A", "a");
// change the subject
((jm as JsMsgImpl).msg as MsgImpl)._reply = "xxxx";
nc.subscribe("xxxx");
const start = Date.now();
await assertRejects(
(): Promise<boolean> => {
return jm!.ackAck();
},
Error,
"TIMEOUT",
);
assertBetween(Date.now() - start, 1300, 1700);

await cleanup(ns, nc);
});

0 comments on commit 24df2ca

Please sign in to comment.